久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Spark結構化流處理機制之容錯機制的示例分析

156次閱讀
沒有評論

共計 3792 個字符,預計需要花費 10 分鐘才能閱讀完成。

這篇文章給大家分享的是有關 Spark 結構化流處理機制之容錯機制的示例分析的內容。丸趣 TV 小編覺得挺實用的,因此分享給大家做個參考,一起跟隨丸趣 TV 小編過來看看吧。

容錯機制

端到端的有且僅有一次保證, 是結構化流設計的關鍵目標之一.

結構化流設計了  Structured Streaming sources,sinks 等等, 來跟蹤確切的處理進度, 并讓其重啟或重運行來處理任何故障

streaming source 是類似 kafka 的偏移量 (offsets) 來跟蹤流的讀取位置. 執行引擎使用檢查點 (checkpoint) 和預寫日志 (write ahead logs) 來記錄每個執行其的偏移范圍值

streaming sinks 是設計用來保證處理的冪等性

這樣, 依靠可回放的數據源 (streaming source) 和處理冪等(streaming sinks), 結構流來做到任何故障下的端到端的有且僅有一次保證

val lines = spark.readStream
 .format(socket)
 .option(host ,  localhost)
 .option(port , 9999)
 .load()
// Split the lines into words
val words = lines.as[String].flatMap(_.split(  ))
// Generate running word count
val wordCounts = words.groupBy(value).count()

其中,spark 是 SparkSession,lines 是 DataFrame,DataFrame 就是 Dataset[Row]。

DataSet

看看 Dataset 的觸發因子的代碼實現,比如 foreach 操作:

def foreach(f: T =  Unit): Unit = withNewRDDExecutionId { rdd.foreach(f)
 }

 private def withNewRDDExecutionId[U](body: =  U): U = { SQLExecution.withNewExecutionId(sparkSession, rddQueryExecution) {  rddQueryExecution.executedPlan.foreach { plan =  plan.resetMetrics()  }  body  }  }

接著看:

 def withNewExecutionId[T](
 sparkSession: SparkSession,
 queryExecution: QueryExecution,
 name: Option[String] = None)(body: =  T): T = {
 val sc = sparkSession.sparkContext
 val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY)
 val executionId = SQLExecution.nextExecutionId
 sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)
 executionIdToQueryExecution.put(executionId, queryExecution)
 try { 
 withSQLConfPropagated(sparkSession) { 
 try { 
 body
 } catch { 
 } finally { 
 }
 }
 } finally { executionIdToQueryExecution.remove(executionId)
 sc.setLocalProperty(EXECUTION_ID_KEY, oldExecutionId)
 }
 }

執行的真正代碼就是 queryExecution: QueryExecution。 

@transient private lazy val rddQueryExecution: QueryExecution = { val deserialized = CatalystSerde.deserialize[T](logicalPlan)
 sparkSession.sessionState.executePlan(deserialized)
 }

看到了看到了,是 sessionState.executePlan 執行 logicalPlan 而得到了 QueryExecution

這里的 sessionState.executePlan 其實就是創建了一個 QueryExecution 對象。然后執行 QueryExecution 的 executedPlan 方法得到 SparkPlan 這個物理計劃。怎么生成的呢?

lazy val sparkPlan: SparkPlan = tracker.measurePhase(QueryPlanningTracker.PLANNING) { SparkSession.setActiveSession(sparkSession) 
 planner.plan(ReturnAnswer(optimizedPlan.clone())).next()
 }

通過 planner.plan 方法生成。

planner 是 SparkPlanner。在 BaseSessionStateBuilder 類中定義。

protected def planner: SparkPlanner = { new SparkPlanner(session.sparkContext, conf, experimentalMethods) { override def extraPlanningStrategies: Seq[Strategy] =
 super.extraPlanningStrategies ++ customPlanningStrategies
 }
 }

SparkPlanner 類

SparkPlanner 對 LogicalPlan 執行各種策略,返回對應的 SparkPlan。比如對于流應用來說,有這樣的策略:DataSourceV2Strategy。

典型的幾個邏輯計劃到物理計劃的映射關系如下:

StreamingDataSourceV2Relation-》ContinuousScanExec

StreamingDataSourceV2Relation-》MicroBatchScanExec

前一種對應與 Offset 沒有 endOffset 的情況,后一種對應于有 endOffset 的情況。前一種是沒有結束的連續流,后一種是有區間的微批處理流。

前一種的時延可以達到 1ms,后一種的時延只能達到 100ms。

【代碼】:

case r: StreamingDataSourceV2Relation if r.startOffset.isDefined   r.endOffset.isDefined = 
 val microBatchStream = r.stream.asInstanceOf[MicroBatchStream]
 val scanExec = MicroBatchScanExec( r.output, r.scan, microBatchStream, r.startOffset.get, r.endOffset.get)
 val withProjection = if (scanExec.supportsColumnar) {
 scanExec
 } else {
 // Add a Project here to make sure we produce unsafe rows.
 ProjectExec(r.output, scanExec)
 }
 withProjection :: Nil
 case r: StreamingDataSourceV2Relation if r.startOffset.isDefined   r.endOffset.isEmpty = 
 val continuousStream = r.stream.asInstanceOf[ContinuousStream]
 val scanExec = ContinuousScanExec(r.output, r.scan, continuousStream, r.startOffset.get)
 val withProjection = if (scanExec.supportsColumnar) {
 scanExec
 } else {
 // Add a Project here to make sure we produce unsafe rows.
 ProjectExec(r.output, scanExec)
 }
 withProjection :: Nil

感謝各位的閱讀!關于“Spark 結構化流處理機制之容錯機制的示例分析”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計3792字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 格尔木市| 龙游县| 建湖县| 洞头县| 安阳县| 清河县| 汝州市| 濉溪县| 江源县| 七台河市| 乌海市| 南城县| 桃源县| 柘荣县| 息烽县| 富源县| 彩票| 庄浪县| 营口市| 揭东县| 思茅市| 渝中区| 仙居县| 宜兴市| 龙岩市| 福安市| 阳信县| 镇赉县| 石河子市| 雷州市| 文登市| 江孜县| 庆安县| 九江市| 南投市| 长治县| 海林市| 清水县| 南皮县| 阿拉尔市| 西乡县|