久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

如何理解Receiver啟動以及啟動源碼分析

158次閱讀
沒有評論

共計 4670 個字符,預計需要花費 12 分鐘才能閱讀完成。

今天就跟大家聊聊有關如何理解 Receiver 啟動以及啟動源碼分析,可能很多人都不太了解,為了讓大家更加了解,丸趣 TV 小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。

為什么要 Receiver?

Receiver 不斷持續接收外部數據源的數據,并把數據匯報給 Driver 端,這樣我們每隔 BatchDuration 會把匯報數據生成不同的 Job,來執行 RDD 的操作。

Receiver 是隨著應用程序的啟動而啟動的。

Receiver 和 InputDStream 是一一對應的。

RDD[Receiver] 只有一個 Partition,一個 Receiver 實例。

Spark Core 并不知道 RDD[Receiver] 的特殊性,依然按照普通 RDD 對應的 Job 進行調度,就有可能在同樣一個 Executor 上啟動多個 Receiver,會導致負載不均衡,會導致 Receiver 啟動失敗。

Receiver 在 Executor 啟動的方案:

1,啟動不同 Receiver 采用 RDD 中不同 Partiton 的方式,不同的 Partiton 代表不同的 Receiver,在執行層面就是不同的 Task,在每個 Task 啟動時就啟動 Receiver。

這種方式實現簡單巧妙,但是存在弊端啟動可能失敗,運行過程中 Receiver 失敗,會導致 TaskRetry,如果 3 次失敗就會導致 Job 失敗,會導致整個 Spark 應用程序失敗。因為 Receiver 的故障,導致 Job 失敗,不能容錯。

2. 第二種方式就是 Spark Streaming 采用的方式。

在 ReceiverTacker 的 start 方法中,先實例化 Rpc 消息通信體 ReceiverTrackerEndpoint,再調用

launchReceivers 方法。

/** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {
 if (isTrackerStarted) {
 throw new SparkException(ReceiverTracker already started)
 }

 if (!receiverInputStreams.isEmpty) {
 endpoint = ssc.env.rpcEnv.setupEndpoint(
  ReceiverTracker , new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
 if (!skipReceiverLaunch) launchReceivers()
 logInfo(ReceiverTracker started)
 trackerState = Started
 }
}

在 launchReceivers 方法中,先對每一個 ReceiverInputStream 獲取到對應的一個 Receiver,然后發送 StartAllReceivers 消息。Receiver 對應一個數據來源。

/**
 * Get the receivers from the ReceiverInputDStreams, distributes them to the
 * worker nodes as a parallel collection, and runs them.
 */
private def launchReceivers(): Unit = {
 val receivers = receiverInputStreams.map(nis = {
 val rcvr = nis.getReceiver()
 rcvr.setReceiverId(nis.id)
 rcvr
 })

 runDummySparkJob()

 logInfo(Starting + receivers.length + receivers)
 endpoint.send(StartAllReceivers(receivers))
}

ReceiverTrackerEndpoint 接收到 StartAllReceivers 消息后,先找到 Receiver 運行在哪些 Executor 上,然后調用 startReceiver 方法。

override def receive: PartialFunction[Any, Unit] = {
 // Local messages
 case StartAllReceivers(receivers) =
 val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
 for (receiver – receivers) {
 val executors = scheduledLocations(receiver.streamId)
 updateReceiverScheduledExecutors(receiver.streamId, executors)
 receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
 startReceiver(receiver, executors)
 }

startReceiver 方法在 Driver 層面自己指定了 TaskLocation,而不用 Spark Core 來幫我們選擇 TaskLocation。其有以下特點: 終止 Receiver 不需要重啟 Spark Job;第一次啟動 Receiver,不會執行第二次;為了啟動 Receiver 而啟動了一個 Spark 作業,一個 Spark 作業啟動一個 Receiver。每個 Receiver 啟動觸發一個 Spark 作業,而不是每個 Receiver 是在一個 Spark 作業的一個 Task 來啟動。當提交啟動 Receiver 的作業失敗時發送 RestartReceiver 消息,來重啟 Receiver。

/**
 * Start a receiver along with its scheduled executors
 */
private def startReceiver(
 receiver: Receiver[_],
 scheduledLocations: Seq[TaskLocation]): Unit = {
 def shouldStartReceiver: Boolean = {
 // It s okay to start when trackerState is Initialized or Started
 !(isTrackerStopping || isTrackerStopped)
 }

 val receiverId = receiver.streamId
 if (!shouldStartReceiver) {
 onReceiverJobFinish(receiverId)
 return
 }

 val checkpointDirOption = Option(ssc.checkpointDir)
 val serializableHadoopConf =
 new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)

 // Function to start the receiver on the worker node
 val startReceiverFunc: Iterator[Receiver[_]] = Unit =
 (iterator: Iterator[Receiver[_]]) = {
 if (!iterator.hasNext) {
 throw new SparkException(
  Could not start receiver as object not found. )
 }
 if (TaskContext.get().attemptNumber() == 0) {
 val receiver = iterator.next()
 assert(iterator.hasNext == false)
 val supervisor = new ReceiverSupervisorImpl(
 receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
 supervisor.start()
 supervisor.awaitTermination()
 } else {
 // It s restarted by TaskScheduler, but we want to reschedule it again. So exit it.
 }
 }

 // Create the RDD using the scheduledLocations to run the receiver in a Spark job
 val receiverRDD: RDD[Receiver[_]] =
 if (scheduledLocations.isEmpty) {
 ssc.sc.makeRDD(Seq(receiver), 1)
 } else {
 val preferredLocations = scheduledLocations.map(_.toString).distinct
 ssc.sc.makeRDD(Seq(receiver – preferredLocations))
 }
 receiverRDD.setName(s Receiver $receiverId)
 ssc.sparkContext.setJobDescription(s Streaming job running receiver $receiverId)
 ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))

 val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
 receiverRDD, startReceiverFunc, Seq(0), (_, _) = Unit, ())
 // We will keep restarting the receiver job until ReceiverTracker is stopped
 future.onComplete {
 case Success(_) =
 if (!shouldStartReceiver) {
 onReceiverJobFinish(receiverId)
 } else {
 logInfo(s Restarting Receiver $receiverId)
 self.send(RestartReceiver(receiver))
 }
 case Failure(e) =
 if (!shouldStartReceiver) {
 onReceiverJobFinish(receiverId)
 } else {
 logError(Receiver has been stopped. Try to restart it. , e)
 logInfo(s Restarting Receiver $receiverId)
 self.send(RestartReceiver(receiver))
 }
 }(submitJobThreadPool)
 logInfo(s Receiver ${receiver.streamId} started )
}

看完上述內容,你們對如何理解 Receiver 啟動以及啟動源碼分析有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注丸趣 TV 行業資訊頻道,感謝大家的支持。

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-17發表,共計4670字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 城固县| 土默特右旗| 洪泽县| 炎陵县| 重庆市| 萝北县| 峡江县| 于都县| 清丰县| 偃师市| 河曲县| 玉门市| 将乐县| 临夏市| 依安县| 聂拉木县| 和顺县| 新邵县| 法库县| 石台县| 河池市| 保山市| 云龙县| 吉林省| 玉林市| 桑植县| 凭祥市| 宁海县| 黑龙江省| 稷山县| 麻阳| 萨嘎县| 改则县| 仙居县| 玉环县| 凉山| 青岛市| 临西县| 湘潭市| 礼泉县| 嘉祥县|