شرارة: جرت محاولة استخدام البث بعد إتلافه

0

يعمل الكود التالي

 @throws(classOf[IKodaMLException])
 def soMergeTarget1( oldTargetIdx: Double, newTargetIdx: Double): RDDLabeledPoint =
 {
   try
   {
    logger.trace("\n\n--sparseOperationRenameTargetsInNumeriOrder--\n\n")
    val oldTargetIdxb=spark.sparkContext.broadcast(oldTargetIdx)
    val newTargetIdxb=spark.sparkContext.broadcast(newTargetIdx)

    val newdata:RDD[(LabeledPoint,Int,String)] = sparseData.map
    {
      r =>

        val currentLabel: Double = r._1.label
        currentLabel match
        {
          case x if x == oldTargetIdxb.value =>
          val newtrgt=newTargetIdxb.value
          (new LabeledPoint(newtrgt, r._1.features), r._2, r._3)
          case _ => r
        }
    }
  val newtargetmap=ilp.targetMap.filter(e=> !(e._2 == oldTargetIdx))
  oldTargetIdxb.destroy
  newTargetIdxb.destroy
  new RDDLabeledPoint(newdata,copyColumnMap,newtargetmap,ilp.name)
}

ولكن بعد أن دمرت متغيرات البث في نهاية الطريقة ، newtrgt يتم تدمير المتغير في RDD أيضا. المشكلة هي أنه بمجرد إرجاع RDD من هذه الطريقة ، يمكن استخدامه من قبل أي محلل في أي رمز. لذا ، يبدو أنني فقدت كل التحكم في متغيرات البث.

الأسئلة:

إذا لم أقم بتدمير المتغيرات ، فهل ستقوم الشرارة بتدميرها عندما تختفي الإشارة إلى RDD؟

(ربما سؤال ساذج ولكن ....) جربت القليل من الاختراق val newtrgt=oldTargetIdxb.value + 1 -1 التفكير قد يؤدي إلى إنشاء مرجع جديد مختلف عن متغير البث. لم تنجح. يجب أن أعترف أن فاجأني. هل يمكن لأي شخص أن يشرح سبب عدم نجاح الاختراق (أنا لا أقترح أنها فكرة جيدة ، لكنني فضولي).

1 إجابة

1
افضل جواب

لقد وجدت إجابة هنا

ليس جوابي ولكن يستحق المشاركة على SO ... ولماذا لا يمكنني رؤية ذلك في وثائق Spark. من المهم:

شون أوين:

you want to actively unpersist() or destroy() broadcast variables when they're no longer needed. They can eventually be removed when the reference on the driver is garbage collected, but you usually would not want to rely on that.

سؤال المتابعة:

Thank you for the response. The only problem is that actively managing broadcast variables require to return the broadcast variables to the caller if the function that creates the broadcast variables does not contain any action. That is the scope that uses the broadcast variables cannot destroy the broadcast variables in many cases. For example:

==============

def perfromTransformation(rdd: RDD[int]) = {
   val sharedMap = sc.broadcast(map)
   rdd.map{id => 
      val localMap = sharedMap.vlaue
      (id, localMap(id))
   }
}

def main = {
    ....
    performTransformation(rdd).toDF("id", "i").write.parquet("dummy_example")
}

==============

In this example above, we cannot destroy the sharedMap before the write.parquet is executed because RDD is evaluated lazily. We will get a exception

شون أوين:

Yes, although there's a difference between unpersist and destroy, you'll hit the same type of question either way. You do indeed have to reason about when you know the broadcast variable is no longer needed in the face of lazy evaluation, and that's hard.

Sometimes it's obvious and you can take advantage of this to proactively free resources. You may have to consider restructuring the computation to allow for more resources to be freed, if this is important to scale.

Keep in mind that things that are computed and cached may be lost and recomputed even after their parent RDDs were definitely already computed and don't seem to be needed. This is why unpersist is often the better thing to call because it allows for variables to be rebroadcast if needed in this case. Destroy permanently closes the broadcast.

:مؤلف
فوق
قائمة طعام