共計 3792 個字符,預計需要花費 10 分鐘才能閱讀完成。
這篇文章給大家分享的是有關 Spark 結構化流處理機制之容錯機制的示例分析的內容。丸趣 TV 小編覺得挺實用的,因此分享給大家做個參考,一起跟隨丸趣 TV 小編過來看看吧。
容錯機制
端到端的有且僅有一次保證, 是結構化流設計的關鍵目標之一.
結構化流設計了 Structured Streaming sources,sinks 等等, 來跟蹤確切的處理進度, 并讓其重啟或重運行來處理任何故障
streaming source 是類似 kafka 的偏移量 (offsets) 來跟蹤流的讀取位置. 執行引擎使用檢查點 (checkpoint) 和預寫日志 (write ahead logs) 來記錄每個執行其的偏移范圍值
streaming sinks 是設計用來保證處理的冪等性
這樣, 依靠可回放的數據源 (streaming source) 和處理冪等(streaming sinks), 結構流來做到任何故障下的端到端的有且僅有一次保證
val lines = spark.readStream
.format(socket)
.option(host , localhost)
.option(port , 9999)
.load()
// Split the lines into words
val words = lines.as[String].flatMap(_.split( ))
// Generate running word count
val wordCounts = words.groupBy(value).count()
其中,spark 是 SparkSession,lines 是 DataFrame,DataFrame 就是 Dataset[Row]。
DataSet
看看 Dataset 的觸發因子的代碼實現,比如 foreach 操作:
def foreach(f: T = Unit): Unit = withNewRDDExecutionId { rdd.foreach(f)
}
private def withNewRDDExecutionId[U](body: = U): U = { SQLExecution.withNewExecutionId(sparkSession, rddQueryExecution) {
rddQueryExecution.executedPlan.foreach { plan =
plan.resetMetrics()
}
body
}
}
接著看:
def withNewExecutionId[T](
sparkSession: SparkSession,
queryExecution: QueryExecution,
name: Option[String] = None)(body: = T): T = {
val sc = sparkSession.sparkContext
val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY)
val executionId = SQLExecution.nextExecutionId
sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)
executionIdToQueryExecution.put(executionId, queryExecution)
try {
withSQLConfPropagated(sparkSession) {
try {
body
} catch {
} finally {
}
}
} finally { executionIdToQueryExecution.remove(executionId)
sc.setLocalProperty(EXECUTION_ID_KEY, oldExecutionId)
}
}
執行的真正代碼就是 queryExecution: QueryExecution。
@transient private lazy val rddQueryExecution: QueryExecution = { val deserialized = CatalystSerde.deserialize[T](logicalPlan)
sparkSession.sessionState.executePlan(deserialized)
}
看到了看到了,是 sessionState.executePlan 執行 logicalPlan 而得到了 QueryExecution
這里的 sessionState.executePlan 其實就是創建了一個 QueryExecution 對象。然后執行 QueryExecution 的 executedPlan 方法得到 SparkPlan 這個物理計劃。怎么生成的呢?
lazy val sparkPlan: SparkPlan = tracker.measurePhase(QueryPlanningTracker.PLANNING) { SparkSession.setActiveSession(sparkSession)
planner.plan(ReturnAnswer(optimizedPlan.clone())).next()
}
通過 planner.plan 方法生成。
planner 是 SparkPlanner。在 BaseSessionStateBuilder 類中定義。
protected def planner: SparkPlanner = { new SparkPlanner(session.sparkContext, conf, experimentalMethods) { override def extraPlanningStrategies: Seq[Strategy] =
super.extraPlanningStrategies ++ customPlanningStrategies
}
}
SparkPlanner 類
SparkPlanner 對 LogicalPlan 執行各種策略,返回對應的 SparkPlan。比如對于流應用來說,有這樣的策略:DataSourceV2Strategy。
典型的幾個邏輯計劃到物理計劃的映射關系如下:
StreamingDataSourceV2Relation-》ContinuousScanExec
StreamingDataSourceV2Relation-》MicroBatchScanExec
前一種對應與 Offset 沒有 endOffset 的情況,后一種對應于有 endOffset 的情況。前一種是沒有結束的連續流,后一種是有區間的微批處理流。
前一種的時延可以達到 1ms,后一種的時延只能達到 100ms。
【代碼】:
case r: StreamingDataSourceV2Relation if r.startOffset.isDefined r.endOffset.isDefined =
val microBatchStream = r.stream.asInstanceOf[MicroBatchStream]
val scanExec = MicroBatchScanExec( r.output, r.scan, microBatchStream, r.startOffset.get, r.endOffset.get)
val withProjection = if (scanExec.supportsColumnar) {
scanExec
} else {
// Add a Project here to make sure we produce unsafe rows.
ProjectExec(r.output, scanExec)
}
withProjection :: Nil
case r: StreamingDataSourceV2Relation if r.startOffset.isDefined r.endOffset.isEmpty =
val continuousStream = r.stream.asInstanceOf[ContinuousStream]
val scanExec = ContinuousScanExec(r.output, r.scan, continuousStream, r.startOffset.get)
val withProjection = if (scanExec.supportsColumnar) {
scanExec
} else {
// Add a Project here to make sure we produce unsafe rows.
ProjectExec(r.output, scanExec)
}
withProjection :: Nil
感謝各位的閱讀!關于“Spark 結構化流處理機制之容錯機制的示例分析”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!