共計 3165 個字符,預計需要花費 8 分鐘才能閱讀完成。
這篇文章主要介紹“Spark Streaming 怎么使用”,在日常操作中,相信很多人在 Spark Streaming 怎么使用問題上存在疑惑,丸趣 TV 小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Spark Streaming 怎么使用”的疑惑有所幫助!接下來,請跟著丸趣 TV 小編一起來學習吧!
DStream 是邏輯級別的,而 RDD 是物理級別的。DStream 是隨著時間的流動內部將集合封裝 RDD。對 DStream 的操作,轉過來對其內部的 RDD 操作。
縱軸為空間維度:代表的是 RDD 的依賴關系構成的具體的處理邏輯的步驟,是用 DStream 來表示的。
橫軸為時間維度:按照特定的時間間隔不斷地生成 job 對象,并在集群上運行。
隨著時間的推移,基于 DStream Graph 不斷生成 RDD Graph , 也即 DAG 的方式生成 job, 并通過 Job Scheduler 的線程池的方式提交給 spark cluster 不斷的執行。
由上可知,RDD 與 DStream 的關系如下
RDD 是物理級別的,而 DStream 是邏輯級別的
DStream 是 RDD 的封裝類,是 RDD 進一步的抽象
DStream 是 RDD 的模板。DStream 要依賴 RDD 進行具體的數據計算
注意:縱軸維度需要 RDD,DAG 的生成模板,需要 TimeLine 的 job 控制器
橫軸維度(時間維度)包含 batch interval, 窗口長度,窗口滑動時間等。
3,Spark Streaming 源碼解析
StreamingContext 方法中調用 JobScheduler 的 start 方法
JobGenerator 的 start 方法中,調用 startFirstTime 方法,來開啟定時生成 Job 的定時器
startFirstTime 方法,首先調用 DStreamGraph 的 start 方法,然后再調用 RecurringTimer 的 start 方法。
timer 對象為一個定時器,根據 batchInterval 時間間隔定期向 EventLoop 發送 GenerateJobs 的消息。
接收到 GenerateJobs 消息后,會回調 generateJobs 方法。
generateJobs 方法再調用 DStreamGraph 的 generateJobs 方法生成 Job
DStreamGraph 的 generateJobs 方法
DStreamGraph 的實例化是在 StreamingContext 中的
DStreamGraph 類中保存了輸入流和輸出流信息
回到 JobGenerator 的 start 方法中 receiverTracker.start()
其中 ReceiverTrackerEndpoint 對象為一個消息循環體
launchReceivers 方法中發送 StartAllReceivers 消息
接收到 StartAllReceivers 消息后,進行如下處理
StartReceiverFunc 方法如下,實例化 Receiver 監控者,開啟并等待退出
supervisor 的 start 方法中調用 startReceiver 方法
我們以 socketTextStream 為例,其啟動的是 SocketReceiver,內部開啟一個線程,來接收數據。
內部調用 supervisor 的 pushSingle 方法,將數據聚集后存放在內存中
supervisor 的 pushSingle 方法如下,將數據放入到 defaultBlockGenerator 中,defaultBlockGenerator 為 BlockGenerator,保存 Socket 接收到的數據
BlockGenerator 對象中有一個定時器,來更新當前的 Buffer
BlockGenerator 對象中有一個線程,來從阻塞隊列中取出數據
調用 ReceiverSupervisorImpl 類中的繼承 BlockGeneratorListener 的匿名類中的 onPushBlock 方法。
receivedBlockHandler 對象如下
這里我們講解 BlockManagerBasedBlockHandler 的方式
trackerEndpoint 如下
其實是發送給 ReceiverTrackerEndpoint 類,
InputInfoTracker 類的 reportInfo 方法只是對數據進行記錄統計
其 generateJob 方法是被 DStreamGraph 調用
DStreamGraph 的 generateJobs 方法是被 JobGenerator 類的 generateJobs 方法調用。
JobGenerator 類中有一個定時器,batchInterval 發送 GenerateJobs 消息
總結:
1,當調用 StreamingContext 的 start 方法時,啟動了 JobScheduler
2,當 JobScheduler 啟動后會先后啟動 ReceiverTracker 和 JobGenerator
3,ReceiverTracker 啟動后會創建 ReceiverTrackerEndpoint 這個消息循環體,來接收運行在 Executor 上的 Receiver 發送過來的消息
4,ReceiverTracker 在啟動時會給自己發送 StartAllReceivers 消息,自己接收到消息后,向 Spark 提交 startReceiverFunc 的 Job
5,startReceiverFunc 方法中在 Executor 上啟動 Receiver,并實例化 ReceiverSupervisorImpl 對象,來監控 Receiver 的運行
6,ReceiverSupervisorImpl 對象會調用 Receiver 的 onStart 方法,我們以 SocketReceiver 為例,啟動一個線程,連接 Server,讀取網絡數據先調用 ReceiverSupervisorImpl 的 pushSingle 方法,
保存在 BlockGenerator 對象中,該對象內部有個定時器,放到阻塞隊列 blocksForPushing,等待內部線程取出數據放到 BlockManager 中,并發 AddBlock 消息給 ReceiverTrackerEndpoint。
ReceiverTrackerEndpoint 為 ReceiverTracker 的內部類,在接收到 addBlock 消息后將 streamId 對應的數據阻塞隊列 streamIdToUnallocatedBlockQueues 中
7,JobGenerator 啟動后會啟動以 batchInterval 時間間隔發送 GenerateJobs 消息的定時器
8,接收到 GenerateJobs 消息會先后觸發 ReceiverTracker 的 allocateBlocksToBatch 方法和 DStreamGraph 的 generateJobs 方法
9,ReceiverTracker 的 allocateBlocksToBatch 方法會調用 getReceivedBlockQueue 方法從阻塞隊列 streamIdToUnallocatedBlockQueues 中根據 streamId 獲取數據
10,DStreamGraph 的 generateJobs 方法,繼而調用變量名為 outputStreams 的 DStream 集合的 generateJob 方法
11,繼而調用 DStream 的 getOrCompute 來調用具體的 DStream 的 compute 方法,我們以 ReceiverInputDStream 為例,compute 方法是從 ReceiverTracker 中獲取數據
到此,關于“Spark Streaming 怎么使用”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注丸趣 TV 網站,丸趣 TV 小編會繼續努力為大家帶來更多實用的文章!