SparkStreaming: تجنب فحص نقاط التحقق

2

أنا أكتب مكتبة لدمج Apache Spark مع بيئة مخصصة. أقوم بتطبيق كل من مصادر البث المخصصة وكتاب البث.

بعض المصادر التي أقوم بتطويرها غير قابلة للاسترداد ، على الأقل بعد تعطل التطبيق. إذا تم إعادة تشغيل التطبيق ، فإنه يحتاج إلى إعادة تحميل جميع البيانات. لذلك نود أن نتجنب أن يضطر المستخدمون إلى تعيين خيار "checkpointLocation" بشكل صريح. ولكن إذا لم يتم توفير الخيار ، فسترى الخطأ التالي:

org.apache.spark.sql.AnalysisException: checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...);

ومع ذلك ، إذا كنت تستخدم إخراج دفق وحدة التحكم ، كل شيء يعمل بشكل جيد.

هل هناك طريقة للحصول على نفس السلوك؟

ملاحظة: نحن نستخدم واجهات Spark v2 لقراء / كتاب البث.


سجل شرارة:

18/06/29 16:36:48 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/C:/mydir/spark-warehouse/').
18/06/29 16:36:48 INFO SharedState: Warehouse path is 'file:/C:/mydir/spark-warehouse/'.
18/06/29 16:36:48 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
org.apache.spark.sql.AnalysisException: checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...);
    at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$3.apply(StreamingQueryManager.scala:213)
    at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$3.apply(StreamingQueryManager.scala:208)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:207)
    at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:299)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:296)
    ...
18/06/29 16:36:50 INFO SparkContext: Invoking stop() from shutdown hook

هذه هي الطريقة التي أبدأ بها مهمة البث:

spark.readStream().format("mysource").load()
  .writeStream().format("mywriter").outputMode(OutputMode.Append()).start();

كل شيء يعمل بشكل جيد ، بدلاً من ذلك ، إذا قمت بتشغيل:

spark.readStream().format("mysource").load()
  .writeStream().format("console").outputMode(OutputMode.Append()).start();

لا يمكنني مشاركة الرمز الكامل لكاتب البيانات. على أي حال ، لقد فعلت شيئًا كهذا:

class MySourceProvider extends DataSourceRegister with StreamWriteSupport {
  def createStreamWriter(queryId: String, schema: StructType, mode: OutputMode, options: DataSourceOptions): StreamWriter = {
    new MyStreamWriter(...)
  }
  def shortName(): String = {
    "mywriter"
  }
}

class MyStreamWriter(...) extends StreamWriter { 
  def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
  def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
  def createWriterFactory(): DataWriterFactory[Row] = {
    new MyDataWriterFactory()
  }
}

1 إجابة

1
افضل جواب

تحتاج إلى إضافة checkpointLocation في التعليمات البرمجية الخاصة بك

option("checkpointLocation", "/tmp/vaquarkhan/checkpoint"). // <-- checkpoint directory

مثال:

import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val q = records.
  writeStream.
  format("console").
  option("truncate", false).
  option("checkpointLocation", "/tmp/vaquarkhan/checkpoint"). // <-- checkpoint directory
  trigger(Trigger.ProcessingTime(10.seconds)).
  outputMode(OutputMode.Update).
  start

فيما يتعلق بسؤالك لديه ثلاثة خيارات:

.option("startingOffsets", "latest") // read data from the end of the stream

  • أقرب - ابدأ القراءة في بداية الدفق. ويستثنى من ذلك البيانات التي تم حذفها بالفعل من كافكا لأنها كانت أقدم من فترة الاحتفاظ (بيانات "انتهت مدة صلاحيتها").

  • الأحدث - ابدأ الآن ، معالجة البيانات الجديدة التي تصل بعد بدء الاستعلام.

  • تخصيص لكل قسم - حدد الإزاحة الدقيقة للبدء من كل قسم ، مما يسمح بالتحكم الدقيق في المكان الذي يجب أن تبدأ فيه المعالجة بالضبط. على سبيل المثال ، إذا أردنا تحديد المكان الذي توقف فيه نظام أو استعلام آخر بالضبط ، فيمكن الاستفادة من هذا الخيار.

إذا تعذر العثور على اسم الدليل لموقع نقطة التحقق ، فإن createQuery يبلغ عن AnalysisException.

checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...)

فيما يلي كود شرارة اباتشي:

  private def createQuery(
      userSpecifiedName: Option[String],
      userSpecifiedCheckpointLocation: Option[String],
      df: DataFrame,
      extraOptions: Map[String, String],
      sink: BaseStreamingSink,
      outputMode: OutputMode,
      useTempCheckpointLocation: Boolean,
      recoverFromCheckpointLocation: Boolean,
      trigger: Trigger,
      triggerClock: Clock): StreamingQueryWrapper = {
    var deleteCheckpointOnStop = false
    val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
      new Path(userSpecified).toUri.toString
    }.orElse {
      df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
        new Path(location, userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toUri.toString
      }
    }.getOrElse {
      if (useTempCheckpointLocation) {
        // Delete the temp checkpoint when a query is being stopped without errors.
        deleteCheckpointOnStop = true
        Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
      } else {
        throw new AnalysisException(
          "checkpointLocation must be specified either " +
            """through option("checkpointLocation", ...) or """ +
            s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""")
      }
    }
:مؤلف
فوق
قائمة طعام