久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Spark Streaming怎么使用

168次閱讀
沒有評論

共計 3165 個字符,預計需要花費 8 分鐘才能閱讀完成。

這篇文章主要介紹“Spark Streaming 怎么使用”,在日常操作中,相信很多人在 Spark Streaming 怎么使用問題上存在疑惑,丸趣 TV 小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Spark Streaming 怎么使用”的疑惑有所幫助!接下來,請跟著丸趣 TV 小編一起來學習吧!

DStream 是邏輯級別的,而 RDD 是物理級別的。DStream 是隨著時間的流動內部將集合封裝 RDD。對 DStream 的操作,轉過來對其內部的 RDD 操作。

縱軸為空間維度:代表的是 RDD 的依賴關系構成的具體的處理邏輯的步驟,是用 DStream 來表示的。

橫軸為時間維度:按照特定的時間間隔不斷地生成 job 對象,并在集群上運行。

隨著時間的推移,基于 DStream Graph 不斷生成 RDD Graph , 也即 DAG 的方式生成 job, 并通過 Job Scheduler 的線程池的方式提交給 spark cluster 不斷的執行。

由上可知,RDD     與  DStream 的關系如下

RDD 是物理級別的,而 DStream 是邏輯級別的

DStream 是 RDD 的封裝類,是 RDD 進一步的抽象

DStream 是 RDD 的模板。DStream 要依賴 RDD 進行具體的數據計算

注意:縱軸維度需要 RDD,DAG 的生成模板,需要 TimeLine 的 job 控制器

橫軸維度(時間維度)包含 batch interval, 窗口長度,窗口滑動時間等。

3,Spark Streaming 源碼解析

StreamingContext 方法中調用 JobScheduler 的 start 方法

JobGenerator 的 start 方法中,調用 startFirstTime 方法,來開啟定時生成 Job 的定時器

startFirstTime 方法,首先調用 DStreamGraph 的 start 方法,然后再調用 RecurringTimer 的 start 方法。

timer 對象為一個定時器,根據 batchInterval 時間間隔定期向 EventLoop 發送 GenerateJobs 的消息。

接收到 GenerateJobs 消息后,會回調 generateJobs 方法。

generateJobs 方法再調用 DStreamGraph 的 generateJobs 方法生成 Job

DStreamGraph 的 generateJobs 方法

DStreamGraph 的實例化是在 StreamingContext 中的

DStreamGraph 類中保存了輸入流和輸出流信息

Spark Streaming 怎么使用

Spark Streaming 怎么使用

回到 JobGenerator 的 start 方法中 receiverTracker.start()

Spark Streaming 怎么使用

Spark Streaming 怎么使用

其中 ReceiverTrackerEndpoint 對象為一個消息循環體

Spark Streaming 怎么使用

launchReceivers 方法中發送 StartAllReceivers 消息

Spark Streaming 怎么使用

接收到 StartAllReceivers 消息后,進行如下處理

Spark Streaming 怎么使用

Spark Streaming 怎么使用

StartReceiverFunc 方法如下,實例化 Receiver 監控者,開啟并等待退出

Spark Streaming 怎么使用

supervisor 的 start 方法中調用 startReceiver 方法

Spark Streaming 怎么使用

Spark Streaming 怎么使用

我們以 socketTextStream 為例,其啟動的是 SocketReceiver,內部開啟一個線程,來接收數據。

Spark Streaming 怎么使用

Spark Streaming 怎么使用

內部調用 supervisor 的 pushSingle 方法,將數據聚集后存放在內存中

Spark Streaming 怎么使用

supervisor 的 pushSingle 方法如下,將數據放入到 defaultBlockGenerator 中,defaultBlockGenerator 為 BlockGenerator,保存 Socket 接收到的數據

Spark Streaming 怎么使用

Spark Streaming 怎么使用

BlockGenerator 對象中有一個定時器,來更新當前的 Buffer

Spark Streaming 怎么使用

Spark Streaming 怎么使用

BlockGenerator 對象中有一個線程,來從阻塞隊列中取出數據

Spark Streaming 怎么使用

Spark Streaming 怎么使用

Spark Streaming 怎么使用

調用 ReceiverSupervisorImpl 類中的繼承 BlockGeneratorListener 的匿名類中的 onPushBlock 方法。

Spark Streaming 怎么使用

Spark Streaming 怎么使用

Spark Streaming 怎么使用

receivedBlockHandler 對象如下

Spark Streaming 怎么使用

這里我們講解 BlockManagerBasedBlockHandler 的方式

Spark Streaming 怎么使用

trackerEndpoint 如下

Spark Streaming 怎么使用

Spark Streaming 怎么使用

其實是發送給 ReceiverTrackerEndpoint 類,

Spark Streaming 怎么使用

Spark Streaming 怎么使用

Spark Streaming 怎么使用

Spark Streaming 怎么使用

InputInfoTracker 類的 reportInfo 方法只是對數據進行記錄統計

Spark Streaming 怎么使用

Spark Streaming 怎么使用

Spark Streaming 怎么使用

其 generateJob 方法是被 DStreamGraph 調用

Spark Streaming 怎么使用

DStreamGraph 的 generateJobs 方法是被 JobGenerator 類的 generateJobs 方法調用。

Spark Streaming 怎么使用

JobGenerator 類中有一個定時器,batchInterval 發送 GenerateJobs 消息

Spark Streaming 怎么使用

總結:

1,當調用 StreamingContext 的 start 方法時,啟動了 JobScheduler

2,當 JobScheduler 啟動后會先后啟動 ReceiverTracker 和 JobGenerator

3,ReceiverTracker 啟動后會創建 ReceiverTrackerEndpoint 這個消息循環體,來接收運行在 Executor 上的 Receiver 發送過來的消息

4,ReceiverTracker 在啟動時會給自己發送 StartAllReceivers 消息,自己接收到消息后,向 Spark 提交 startReceiverFunc 的 Job

5,startReceiverFunc 方法中在 Executor 上啟動 Receiver,并實例化 ReceiverSupervisorImpl 對象,來監控 Receiver 的運行

6,ReceiverSupervisorImpl 對象會調用 Receiver 的 onStart 方法,我們以 SocketReceiver 為例,啟動一個線程,連接 Server,讀取網絡數據先調用 ReceiverSupervisorImpl 的 pushSingle 方法,

保存在 BlockGenerator 對象中,該對象內部有個定時器,放到阻塞隊列 blocksForPushing,等待內部線程取出數據放到 BlockManager 中,并發 AddBlock 消息給 ReceiverTrackerEndpoint。

ReceiverTrackerEndpoint 為 ReceiverTracker 的內部類,在接收到 addBlock 消息后將 streamId 對應的數據阻塞隊列 streamIdToUnallocatedBlockQueues 中

7,JobGenerator 啟動后會啟動以 batchInterval 時間間隔發送 GenerateJobs 消息的定時器

8,接收到 GenerateJobs 消息會先后觸發 ReceiverTracker 的 allocateBlocksToBatch 方法和 DStreamGraph 的 generateJobs 方法

9,ReceiverTracker 的 allocateBlocksToBatch 方法會調用 getReceivedBlockQueue 方法從阻塞隊列 streamIdToUnallocatedBlockQueues 中根據 streamId 獲取數據

10,DStreamGraph 的 generateJobs 方法,繼而調用變量名為 outputStreams 的 DStream 集合的 generateJob 方法

11,繼而調用 DStream 的 getOrCompute 來調用具體的 DStream 的 compute 方法,我們以 ReceiverInputDStream 為例,compute 方法是從 ReceiverTracker 中獲取數據

到此,關于“Spark Streaming 怎么使用”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注丸趣 TV 網站,丸趣 TV 小編會繼續努力為大家帶來更多實用的文章!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計3165字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 高淳县| 灵寿县| 五家渠市| 华亭县| 岢岚县| 武平县| 唐山市| 新野县| 桃源县| 稷山县| 进贤县| 内乡县| 英德市| 黔西| 永平县| 和硕县| 开平市| 达孜县| 铜川市| 天祝| 荥经县| 平南县| 女性| 商水县| 茂名市| 台南市| 长丰县| 开平市| 罗山县| 珲春市| 五峰| 陕西省| 盐城市| 玛沁县| 株洲县| 朝阳县| 从江县| 藁城市| 阳信县| 随州市| 孟州市|