共計 4670 個字符,預計需要花費 12 分鐘才能閱讀完成。
今天就跟大家聊聊有關如何理解 Receiver 啟動以及啟動源碼分析,可能很多人都不太了解,為了讓大家更加了解,丸趣 TV 小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
為什么要 Receiver?
Receiver 不斷持續接收外部數據源的數據,并把數據匯報給 Driver 端,這樣我們每隔 BatchDuration 會把匯報數據生成不同的 Job,來執行 RDD 的操作。
Receiver 是隨著應用程序的啟動而啟動的。
Receiver 和 InputDStream 是一一對應的。
RDD[Receiver] 只有一個 Partition,一個 Receiver 實例。
Spark Core 并不知道 RDD[Receiver] 的特殊性,依然按照普通 RDD 對應的 Job 進行調度,就有可能在同樣一個 Executor 上啟動多個 Receiver,會導致負載不均衡,會導致 Receiver 啟動失敗。
Receiver 在 Executor 啟動的方案:
1,啟動不同 Receiver 采用 RDD 中不同 Partiton 的方式,不同的 Partiton 代表不同的 Receiver,在執行層面就是不同的 Task,在每個 Task 啟動時就啟動 Receiver。
這種方式實現簡單巧妙,但是存在弊端啟動可能失敗,運行過程中 Receiver 失敗,會導致 TaskRetry,如果 3 次失敗就會導致 Job 失敗,會導致整個 Spark 應用程序失敗。因為 Receiver 的故障,導致 Job 失敗,不能容錯。
2. 第二種方式就是 Spark Streaming 采用的方式。
在 ReceiverTacker 的 start 方法中,先實例化 Rpc 消息通信體 ReceiverTrackerEndpoint,再調用
launchReceivers 方法。
/** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {
if (isTrackerStarted) {
throw new SparkException(ReceiverTracker already started)
}
if (!receiverInputStreams.isEmpty) {
endpoint = ssc.env.rpcEnv.setupEndpoint(
ReceiverTracker , new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
if (!skipReceiverLaunch) launchReceivers()
logInfo(ReceiverTracker started)
trackerState = Started
}
}
在 launchReceivers 方法中,先對每一個 ReceiverInputStream 獲取到對應的一個 Receiver,然后發送 StartAllReceivers 消息。Receiver 對應一個數據來源。
/**
* Get the receivers from the ReceiverInputDStreams, distributes them to the
* worker nodes as a parallel collection, and runs them.
*/
private def launchReceivers(): Unit = {
val receivers = receiverInputStreams.map(nis = {
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
})
runDummySparkJob()
logInfo(Starting + receivers.length + receivers)
endpoint.send(StartAllReceivers(receivers))
}
ReceiverTrackerEndpoint 接收到 StartAllReceivers 消息后,先找到 Receiver 運行在哪些 Executor 上,然后調用 startReceiver 方法。
override def receive: PartialFunction[Any, Unit] = {
// Local messages
case StartAllReceivers(receivers) =
val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
for (receiver – receivers) {
val executors = scheduledLocations(receiver.streamId)
updateReceiverScheduledExecutors(receiver.streamId, executors)
receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
startReceiver(receiver, executors)
}
startReceiver 方法在 Driver 層面自己指定了 TaskLocation,而不用 Spark Core 來幫我們選擇 TaskLocation。其有以下特點: 終止 Receiver 不需要重啟 Spark Job;第一次啟動 Receiver,不會執行第二次;為了啟動 Receiver 而啟動了一個 Spark 作業,一個 Spark 作業啟動一個 Receiver。每個 Receiver 啟動觸發一個 Spark 作業,而不是每個 Receiver 是在一個 Spark 作業的一個 Task 來啟動。當提交啟動 Receiver 的作業失敗時發送 RestartReceiver 消息,來重啟 Receiver。
/**
* Start a receiver along with its scheduled executors
*/
private def startReceiver(
receiver: Receiver[_],
scheduledLocations: Seq[TaskLocation]): Unit = {
def shouldStartReceiver: Boolean = {
// It s okay to start when trackerState is Initialized or Started
!(isTrackerStopping || isTrackerStopped)
}
val receiverId = receiver.streamId
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
return
}
val checkpointDirOption = Option(ssc.checkpointDir)
val serializableHadoopConf =
new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
// Function to start the receiver on the worker node
val startReceiverFunc: Iterator[Receiver[_]] = Unit =
(iterator: Iterator[Receiver[_]]) = {
if (!iterator.hasNext) {
throw new SparkException(
Could not start receiver as object not found. )
}
if (TaskContext.get().attemptNumber() == 0) {
val receiver = iterator.next()
assert(iterator.hasNext == false)
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
} else {
// It s restarted by TaskScheduler, but we want to reschedule it again. So exit it.
}
}
// Create the RDD using the scheduledLocations to run the receiver in a Spark job
val receiverRDD: RDD[Receiver[_]] =
if (scheduledLocations.isEmpty) {
ssc.sc.makeRDD(Seq(receiver), 1)
} else {
val preferredLocations = scheduledLocations.map(_.toString).distinct
ssc.sc.makeRDD(Seq(receiver – preferredLocations))
}
receiverRDD.setName(s Receiver $receiverId)
ssc.sparkContext.setJobDescription(s Streaming job running receiver $receiverId)
ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
receiverRDD, startReceiverFunc, Seq(0), (_, _) = Unit, ())
// We will keep restarting the receiver job until ReceiverTracker is stopped
future.onComplete {
case Success(_) =
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logInfo(s Restarting Receiver $receiverId)
self.send(RestartReceiver(receiver))
}
case Failure(e) =
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logError(Receiver has been stopped. Try to restart it. , e)
logInfo(s Restarting Receiver $receiverId)
self.send(RestartReceiver(receiver))
}
}(submitJobThreadPool)
logInfo(s Receiver ${receiver.streamId} started )
}
看完上述內容,你們對如何理解 Receiver 啟動以及啟動源碼分析有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注丸趣 TV 行業資訊頻道,感謝大家的支持。