لماذا لا يمكنني استخدام foreach في mapPartitions في Spark

1

لقد أنشأت مصفوفة باستخدام SparkContext باستخدام قسمين ، أحاول أيضًا استخدامه mapPartition للتعامل مع العناصر ، ولكني حصلت على خطأ غريب جدًا ، عندما أكود على هذا النحو:

val masterURL = "local[*]"

val conf = new SparkConf().setAppName("KMeans Test").setMaster(masterURL)
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")

val data = sc.textFile("file:/d:/data/kmeans_data.txt")
val parsedData = data.mapPartitions(partition => parseData(partition)).cache()

parsedData.mapPartitions(points =>
  points.map(point =>
    println(point)
  )
)

لا توجد أخطاء ، ومع ذلك ، عندما أقوم باستبدال الخريطة بالبحث ، فإن ذلك يشير إلى خطأ:

parsedData.mapPartitions(points =>
  points.foreach(point =>
   println(point)
  )
)

الخطأ كما يلي:

Type mismatch, expected: (Iterator[Vector]) => Iterator[NotInferedU], actual: (Iterator[Vector]) => Unit Expression of type Unit doesn't conform to expected type Iterator[U_]

بالإضافة إلى ذلك ، لا يقوم قصاصة الرمز الأول أيضًا بطباعة أي شيء في لوحة وحدة التحكم ، لماذا؟

2 الاجابة

3
افضل جواب

أنت تحصل على هذا الخطأ لأن foreach يعود الأسلوب Unit نوع. ال mapPartitions لا يمكن للطريقة إرجاع هذا النوع. حاول إرجاع نفس المكرر الذي تتلقاه:

parsedData.mapPartitions(points =>
  points.foreach(point =>
   println(point)
  )
  points
)

يجب أن تعمل.

In addition, the first code snip also do not print anything in console panel, why?

لأن وظيفة الخريطة لا تنفذ أي شيء حتى يتم استدعاء الإجراء ، (على سبيل المثال ، collect أو foreach ).

:مؤلف
-1
افضل جواب

تتوقع mapPartitions دالة تقوم بإرجاع مكرر جديد للأقسام ( Iterator[Vector] => Iterator[NotInferedU] ) ، يقوم بتعيين مكرر إلى مكرر آخر. باستخدام foreach تقوم بإرجاع الفراغ (الوحدة في سكالا) الذي يختلف عن نوع الإرجاع المتوقع.

لطباعة محتوى RDD ، يمكنك استخدامه foreachPartition بدلا من mapPartitions :

parsedData.foreachPartition(points =>
  points.foreach(point =>
    println(point)
  )
)
:مؤلف

أسئلة ذات صلة

فوق
قائمة طعام