共計 1107 個字符,預計需要花費 3 分鐘才能閱讀完成。
本篇文章給大家分享的是有關如何進行 JobScheduler 內幕實現,丸趣 TV 小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著丸趣 TV 小編一起來看看吧。
在 spark stream 程序中的一條關鍵的語句就是:ssc.start()
1,跟蹤進入 StreamingContext 的 start 方法,有一句非常關鍵的語句 scheduler.start(),是個 JobScheduler(spark stream 用來 job 調度的)
進行 job 調度的入口!
2,計入 JobScheduler 的 start 方法。
在這個方法中幾個關鍵的點是:
eventLoop.start() 一個事件循環器,用于響應其它組件發來的事件 ( 包括 job 的啟動,完成,以及錯誤報告)。
receiverTracker.start() 控制了整個 receiver 的生成,與數據的接受
jobGenerator.start() 真正開始進行 job 的生成
在這個方法中也維護了一個事件處理的循環器 eventLoop,用于處理各種事件
其中最為關鍵的事件是 GenerateJobs(time),這個事件是進行生成 job 的事件!!
跟蹤計入 generateJobs(time)
jobScheduler.receiverTracker.allocateBlocksToBatch(time) 為當前的 bath 分發收到的數據 Blocks。
graph.generateJobs(time):根據當前編寫的程序的 output 動作生成相應的 job 并封裝進入集合中。
最終通過
提交作業到 executor
在回去看看 jobGenerator.start() 中的 startFirstTime()
private def startFirstTime() { val startTime = new Time(timer.getStartTime())
graph.start(startTime - graph.batchDuration)
timer.start(startTime.milliseconds)
logInfo(Started JobGenerator at + startTime)
}
第一次啟動會啟動一個定時器,該定時器會根基 duration bath 不斷的的給 jobGenerator 中的消息循環體!
在 jobGenerator 中的消息循環體就會不斷的去除消息進行處理
以上就是如何進行 JobScheduler 內幕實現,丸趣 TV 小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注丸趣 TV 行業資訊頻道。