久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

如何理解Storm的并行度、Grouping策略以及消息可靠處理機制

200次閱讀
沒有評論

共計 4346 個字符,預計需要花費 11 分鐘才能閱讀完成。

如何理解 Storm 的并行度、Grouping 策略以及消息可靠處理機制,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。

概念:

Workers (JVMs): 在一個節點上可以運行一個或多個獨立的 JVM 進程。一個 Topology 可以包含一個或多個 worker(并行的跑在不同的 machine 上), 所以 worker process 就是執行一個 topology 的子集, 并且 worker 只能對應于一個 topology

Executors (threads): 在一個 worker JVM 進程中運行著多個 Java 線程。一個 executor 線程可以執行一個或多個 tasks。但一般默認每個 executor 只執行一個 task。一個 worker 可以包含一個或多個 executor, 每個 component (spout 或 bolt) 至少對應于一個 executor, 所以可以說 executor 執行一個 compenent 的子集, 同時一個 executor 只能對應于一個 component。

Tasks(bolt/spout instances):Task 就是具體的處理邏輯對象,每一個 Spout 和 Bolt 會被當作很多 task 在整個集群里面執行。每一個 task 對應到一個線程,而 stream grouping 則是定義怎么從一堆 task 發射 tuple 到另外一堆 task。你可以調用 TopologyBuilder.setSpout 和 TopBuilder.setBolt 來設置并行度 — 也就是有多少個 task。

 

配置并行度

對于并發度的配置, 在 storm 里面可以在多個地方進行配置, 優先級為:defaults.yaml storm.yaml topology-specific configuration internal component-specific configuration external component-specific configuration

worker processes 的數目, 可以通過配置文件和代碼中配置, worker 就是執行進程, 所以考慮并發的效果, 數目至少應該大亍 machines 的數目

executor 的數目, component 的并發線程數,只能在代碼中配置 (通過 setBolt 和 setSpout 的參數), 例如, setBolt(green-bolt , new GreenBolt(), 2)

tasks 的數目, 可以不配置, 默認和 executor1:1, 也可以通過 setNumTasks() 配置

Topology 的 worker 數通過 config 設置,即執行該 topology 的 worker(java)進程數。它可以通過 storm rebalance 命令任意調整。

Config conf = new Config();
conf.setNumWorkers(2); // use two worker processes
topologyBuilder.setSpout(blue-spout , new BlueSpout(), 2); // set parallelism hint to 2
topologyBuilder.setBolt(green-bolt , new GreenBolt(), 2).setNumTasks(4).shuffleGrouping( blue-spout  //set tasks number to 4
topologyBuilder.setBolt(yellow-bolt , new YellowBolt(), 6).shuffleGrouping( green-bolt 
StormSubmitter.submitTopology(mytopology , conf, topologyBuilder.createTopology());

如何理解 Storm 的并行度、Grouping 策略以及消息可靠處理機制  

動態的改變并行度

Storm 支持在不 restart topology 的情況下, 動態的改變 (增減) worker processes 的數目和 executors 的數目, 稱為 rebalancing. 通過 Storm web UI,或者通過 storm rebalance 命令實現:

storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10

流分組策略 —-Stream Grouping

Stream Grouping,告訴 topology 如何在兩個組件之間發送 tuple

定義一個 topology 的其中一步是定義每個 bolt 接收什么樣的流作為輸入。stream grouping 就是用來定義一個 stream 應該如果分配數據給 bolts 上面的多個 tasks

Storm 里面有 7 種類型的 stream grouping,你也可以通過實現 CustomStreamGrouping 接口來實現自定義流分組

1. Shuffle Grouping

隨機分組,隨機派發 stream 里面的 tuple,保證每個 bolt task 接收到的 tuple 數目大致相同。

2. Fields Grouping

按字段分組,比如,按 user-id 這個字段來分組,那么具有同樣 user-id 的 tuple 會被分到相同的 Bolt 里的一個 task,而不同的 user-id 則可能會被分配到不同的 task。

3. All Grouping

廣播發送,對亍每一個 tuple,所有的 bolts 都會收到

4. Global Grouping

全局分組,整個 stream 被分配到 storm 中的一個 bolt 的其中一個 task。再具體一點就是分配給 id 值最低的那個 task。

5. None Grouping

不分組,這個分組的意思是說 stream 不關心到底怎樣分組。目前這種分組和 Shuffle grouping 是一樣的效果,有一點不同的是 storm 會把使用 none grouping 的這個 bolt 放到這個 bolt 的訂閱者同一個線程里面去執行(如果可能的話)。

6. Direct Grouping

指向型分組,這是一種比較特別的分組方法,用這種分組意味著消息(tuple)的發送者指定由消息接收者的哪個 task 處理這個消息。只有被聲明為 Direct Stream 的消息流可以聲明這種分組方法。而且這種消息 tuple 必須使用 emitDirect 方法來發射。消息處理者可以通過 TopologyContext 來獲取處理它的消息的 task 的 id (OutputCollector.emit 方法也會返回 task 的 id) 

7. Local or shuffle grouping

本地或隨機分組。如果目標 bolt 有一個或者多個 task 與源 bolt 的 task 在同一個工作進程中,tuple 將會被隨機發送給這些同進程中的 tasks。否則,和普通的 Shuffle Grouping 行為一致。

 

消息的可靠處理機制

  在 storm 中,可靠的信息處理機制是從 spout 開始的。一個提供了可靠的處理機制的 spout 需要記錄他發射出去的 tuple,當下游 bolt 處理 tuple 或者子 tuple 失敗時 spout 能夠重新發射。

 Storm 通過調用 Spout 的 nextTuple() 發送一個 tuple。為實現可靠的消息處理,首先要給每個發出的 tuple 帶上唯一的 ID,并且將 ID 作為參數傳遞給 SoputOutputCollector 的 emit() 方法:collector.emit(new Values( value1 , value2), msgId); 給 tuple 指定 ID 告訴 Storm 系統,無論處理成功還是失敗,spout 都要接收 tuple 樹上所有節點返回的通知。如果處理成功,spout 的 ack() 方法將會對編號是 msgId 的消息應答確認;如果處理失敗或者超時,會調用 fail() 方法。

 bolt 要實現可靠的信息處理機制包含兩個步驟:1. 當發射衍生的 tuple 時,需要錨定讀入的 tuple;2. 當處理消息成功或失敗時分別確認應答或者報錯。

  錨定一個 tuple 的意思是,建立讀入 tuple 和衍生出的 tuple 之間的對應關系,這樣下游的 bolt 就可以通過應答確認、報錯或超時來加入到 tuple 樹結構中??梢酝ㄟ^調用 OutputCollector 的 emit() 的一個重載函數錨定一個或一組 tuple:collector.emit(tuple, new Values(word))

  非錨定(collector.emit(new Values(word));)的 tuple 不會對數據流的可靠性起作用。如果一個非錨定的 tuple 在下游處理失敗,原始的根 tuple 不會重新發送。

 

  超時時間可以通過任務級參數 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 進行配置,默認超時值為 30 秒。

 Storm 系統中有一組叫做 acker 的特殊的任務,它們負責跟蹤 DAG(有向無環圖)中的每個消息。acker 任務保存了 spout 消息 id 到一對值的映射。第一個值就是 spout 的任務 id,通過這個 id,acker 就知道消息處理完成時該通知哪個 spout 任務。第二個值是一個 64bit 的數字,我們稱之為 ack val,它是樹中所有消息的隨機 id 的異或計算結果。ack val 表示了整棵樹的的狀態,無論這棵樹多大,只需要這個固定大小的數字就可以跟蹤整棵樹。當消息被創建和被應答的時候都會有相同的消息 id 發送過來做異或。

  每當 acker 發現一棵樹的 ack val 值為 0 的時候,它就知道這棵樹已經被完全處理了。因為消息的隨機 ID 是一個 64bit 的值,因此 ack val 在樹處理完之前被置為 0 的概率非常小。假設你每秒鐘發送一萬個消息,從概率上說,至少需要 50,000,000 年才會有機會發生一次錯誤。即使如此,也只有在這個消息確實處理失敗的情況下才會有數據的丟失!

  有三種方法可以去掉消息的可靠性:

1、將參數 Config.TOPOLOGY_ACKERS 設置為 0,通過此方法,當 Spout 發送一個消息的時候,它的 ack 方法將立刻被調用;

2、Spout 發送一個消息時,不指定此消息的 messageID。當需要關閉特定消息可靠性的時候,可以使用此方法;

3、最后,如果你不在意某個消息派生出來的子孫消息的可靠性,則此消息派生出來的子消息在發送時不要做錨定,即在 emit 方法中不指定輸入消息。因為這些子孫消息沒有被錨定在任何 tuple tree 中,因此他們的失敗不會引起任何 spout 重新發送消息。 

看完上述內容,你們掌握如何理解 Storm 的并行度、Grouping 策略以及消息可靠處理機制的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注丸趣 TV 行業資訊頻道,感謝各位的閱讀!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-25發表,共計4346字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 巴东县| 塘沽区| 通许县| 措美县| 石棉县| 扬州市| 孟津县| 城固县| 博爱县| 衡山县| 广饶县| 大厂| 专栏| 响水县| 嘉峪关市| 林芝县| 怀安县| 乌恰县| 安岳县| 瑞昌市| 鲁山县| 汝州市| 横峰县| 宜昌市| 滦平县| 盈江县| 吉木萨尔县| 淮南市| 澳门| 平和县| 云龙县| 潮安县| 中江县| 神农架林区| 蓝山县| 望城县| 龙州县| 高要市| 彭阳县| 永康市| 九龙城区|