共計 5998 個字符,預計需要花費 15 分鐘才能閱讀完成。
本篇內容介紹了“storm 消息的可靠處理方法是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓丸趣 TV 小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
4.1 簡介
storm 可以確保 spout 發送出來的每個消息都會被完整的處理。本章將會描述 storm 體系是如何達到這個目標的,并將會詳述開發者應該如何使用 storm 的這些機制來實現數據的可靠處理。
4.2 理解消息被完整處理
一個消息 (tuple) 從 spout 發送出來,可能會導致成百上千的消息基于此消息被創建。
我們來思考一下流式的“單詞統計”的例子:
storm 任務從數據源(Kestrel queue)每次讀取一個完整的英文句子;將這個句子分解為獨立的單詞,最后,實時的輸出每個單詞以及它出現過的次數。
本例中,每個從 spout 發送出來的消息(每個英文句子)都會觸發很多的消息被創建,那些從句子中分隔出來的單詞就是被創建出來的新消息。
這些消息構成一個樹狀結構,我們稱之為“tuple tree”,看起來如圖 1 所示:
圖 1 示例 tuple tree
在什么條件下,Storm 才會認為一個從 spout 發送出來的消息被完整處理呢?答案就是下面的條件同時被滿足:
tuple tree 不再生長
樹中的任何消息被標識為“已處理”
如果在指定的時間內,一個消息衍生出來的 tuple tree 未被完全處理成功,則認為此消息未被完整處理。這個超時值可以通過任務級參數 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 進行配置,默認超時值為 30 秒。
4.3 消息的生命周期
如果消息被完整處理或者未被完整處理,Storm 會如何進行接下來的操作呢?為了弄清這個問題,我們來研究一下從 spout 發出來的消息的生命周期。這里列出了 spout 應該實現的接口:
首先,Storm 使用 spout 實例的 nextTuple()方法從 spout 請求一個消息(tuple)。收到請求以后,spout 使用 open 方法中提供的 SpoutOutputCollector 向它的輸出流發送一個或多個消息。每發送一個消息,Spout 會給這個消息提供一個 message ID,它將會被用來標識這個消息。
假設我們從 kestrel 隊列中讀取消息,Spout 會將 kestrel 隊列為這個消息設置的 ID 作為此消息的 message ID。向 SpoutOutputCollector 中發送消息格式如下:
接來下,這些消息會被發送到后續業務處理的 bolts,并且 Storm 會跟蹤由此消息產生出來的新消息。當檢測到一個消息衍生出來的 tuple tree 被完整處理后,Storm 會調用 Spout 中的 ack 方法,并將此消息的 messageID 作為參數傳入。同理,如果某消息處理超時,則此消息對應的 Spout 的 fail 方法會被調用,調用時此消息的 messageID 會被作為參數傳入。
注意:一個消息只會由發送它的那個 spout 任務來調用 ack 或 fail。如果系統中某個 spout 由多個任務運行,消息也只會由創建它的 spout 任務來應答(ack 或 fail),決不會由其他的 spout 任務來應答。
我們繼續使用從 kestrel 隊列中讀取消息的例子來闡述高可靠性下 spout 需要做些什么(假設這個 spout 的名字是 KestrelSpout)。
我們先簡述一下 kestrel 消息隊列:
當 KestrelSpout 從 kestrel 隊列中讀取一個消息,表示它“打開”了隊列中某個消息。這意味著,此消息并未從隊列中真正的刪除,而是將此消息設置為“pending”狀態,它等待來自客戶端的應答,被應答以后,此消息才會被真正的從隊列中刪除。處于“pending”狀態的消息不會被其他的客戶端看到。另外,如果一個客戶端意外的斷開連接,則由此客戶端“打開”的所有消息都會被重新加入到隊列中。當消息被“打開”的時候,kestrel 隊列同時會為這個消息提供一個唯一的標識。
KestrelSpout 就是使用這個唯一的標識作為這個 tuple 的 messageID 的。稍后當 ack 或 fail 被調用的時候,KestrelSpout 會把 ack 或者 fail 連同 messageID 一起發送給 kestrel 隊列,kestrel 會將消息從隊列中真正刪除或者將它重新放回隊列中。
4.4 可靠相關的 API
為了使用 Storm 提供的可靠處理特性,我們需要做兩件事情:
無論何時在 tuple tree 中創建了一個新的節點,我們需要明確的通知 Storm;
當處理完一個單獨的消息時,我們需要告訴 Storm 這棵 tuple tree 的變化狀態。
通過上面的兩步,storm 就可以檢測到一個 tuple tree 何時被完全處理了,并且會調用相關的 ack 或 fail 方法。Storm 提供了簡單明了的方法來完成上述兩步。
為 tuple tree 中指定的節點增加一個新的節點,我們稱之為錨定(anchoring)。錨定是在我們發送消息的同時進行的。為了更容易說明問題,我們使用下面代碼作為例子。本示例的 bolt 將包含整句話的消息分解為一系列的子消息,每個子消息包含一個單詞。
每個消息都通過這種方式被錨定:把輸入消息作為 emit 方法的第一個參數。因為 word 消息被錨定在了輸入消息上,這個輸入消息是 spout 發送過來的 tuple tree 的根節點,如果任意一個 word 消息處理失敗,派生這個 tuple tree 那個 spout 消息將會被重新發送。
與此相反,我們來看看使用下面的方式 emit 消息時,Storm 會如何處理:
如果以這種方式發送消息,將會導致這個消息不會被錨定。如果此 tuple tree 中的消息處理失敗,派生此 tuple tree 的根消息不會被重新發送。根據任務的容錯級別,有時候很適合發送一個非錨定的消息。
一個輸出消息可以被錨定在一個或者多個輸入消息上,這在做 join 或聚合的時候是很有用的。一個被多重錨定的消息處理失敗,會導致與之關聯的多個 spout 消息被重新發送。多重錨定通過在 emit 方法中指定多個輸入消息來實現:
多重錨定會將被錨定的消息加到多棵 tuple tree 上。
注意:多重綁定可能會破壞傳統的樹形結構,從而構成一個 DAGs(有向無環圖),如圖 2 所示:
圖 2 多重錨定構成的鉆石型結構
Storm 的實現可以像處理樹那樣來處理 DAGs。
錨定表明了如何將一個消息加入到指定的 tuple tree 中,高可靠處理 API 的接下來部分將向您描述當處理完 tuple tree 中一個單獨的消息時我們該做些什么。這些是通過 OutputCollector 的 ack 和 fail 方法來實現的?;仡^看一下例子 SplitSentence,可以發現當所有的 word 消息被發送完成后,輸入的表示句子的消息會被應答(acked)。
每個被處理的消息必須表明成功或失?。╝cked 或者 failed)。Storm 是使用內存來跟蹤每個消息的處理情況的,如果被處理的消息沒有應答的話,遲早內存會被耗盡!
很多 bolt 遵循特定的處理流程:讀取一個消息、發送它派生出來的子消息、在 execute 結尾處應答此消息。一般的過濾器(filter)或者是簡單的處理功能都是這類的應用。Storm 有一個 BasicBolt 接口封裝了上述的流程。示例 SplitSentence 可以使用 BasicBolt 來重寫:
使用這種方式,代碼比之前稍微簡單了一些,但是實現的功能是一樣的。發送到 BasicOutputCollector 的消息會被自動的錨定到輸入消息,并且,當 execute 執行完畢的時候,會自動的應答輸入消息。
很多情況下,一個消息需要延遲應答,例如聚合或者是 join。只有根據一組輸入消息得到一個結果之后,才會應答之前所有的輸入消息。并且聚合和 join 大部分時候對輸出消息都是多重錨定。然而,這些特性不是 IBasicBolt 所能處理的。
4.5 高效的實現 tuple tree
Storm 系統中有一組叫做“acker”的特殊的任務,它們負責跟蹤 DAG(有向無環圖)中的每個消息。每當發現一個 DAG 被完全處理,它就向創建這個根消息的 spout 任務發送一個信號。拓撲中 acker 任務的并行度可以通過配置參數 Config.TOPOLOGY_ACKERS 來設置。默認的 acker 任務并行度為 1,當系統中有大量的消息時,應該適當提高 acker 任務的并發度。
為了理解 Storm 可靠性處理機制,我們從研究一個消息的生命周期和 tuple tree 的管理入手。當一個消息被創建的時候(無論是在 spout 還是 bolt 中),系統都為該消息分配一個 64bit 的隨機值作為 id。這些隨機的 id 是 acker 用來跟蹤由 spout 消息派生出來的 tuple tree 的。
每個消息都知道它所在的 tuple tree 對應的根消息的 id。每當 bolt 新生成一個消息,對應 tuple tree 中的根消息的 messageId 就拷貝到這個消息中。當這個消息被應答的時候,它就把關于 tuple tree 變化的信息發送給跟蹤這棵樹的 acker。例如,他會告訴 acker:本消息已經處理完畢,但是我派生出了一些新的消息,幫忙跟蹤一下吧。
舉個例子,假設消息 D 和 E 是由消息 C 派生出來的,這里演示了消息 C 被應答時,tuple tree 是如何變化的。
因為在 C 被從樹中移除的同時 D 和 E 會被加入到 tuple tree 中,因此 tuple tree 不會被過早的認為已完全處理。
關于 Storm 如何跟蹤 tuple tree,我們再深入的探討一下。前面說過系統中可以有任意個數的 acker,那么,每當一個消息被創建或應答的時候,它怎么知道應該通知哪個 acker 呢?
系統使用一種哈希算法來根據 spout 消息的 messageId 確定由哪個 acker 跟蹤此消息派生出來的 tuple tree。因為每個消息都知道與之對應的根消息的 messageId,因此它知道應該與哪個 acker 通信。
當 spout 發送一個消息的時候,它就通知對應的 acker 一個新的根消息產生了,這時 acker 就會創建一個新的 tuple tree。當 acker 發現這棵樹被完全處理之后,他就會通知對應的 spout 任務。
tuple 是如何被跟蹤的呢?系統中有成千上萬的消息,如果為每個 spout 發送的消息都構建一棵樹的話,很快內存就會耗盡。所以,必須采用不同的策略來跟蹤每個消息。由于使用了新的跟蹤算法,Storm 只需要固定的內存(大約 20 字節)就可以跟蹤一棵樹。這個算法是 storm 正確運行的核心,也是 storm 最大的突破。
acker 任務保存了 spout 消息 id 到一對值的映射。第一個值就是 spout 的任務 id,通過這個 id,acker 就知道消息處理完成時該通知哪個 spout 任務。第二個值是一個 64bit 的數字,我們稱之為“ack val”,它是樹中所有消息的隨機 id 的異或結果。ack val 表示了整棵樹的的狀態,無論這棵樹多大,只需要這個固定大小的數字就可以跟蹤整棵樹。當消息被創建和被應答的時候都會有相同的消息 id 發送過來做異或。
每當 acker 發現一棵樹的 ack val 值為 0 的時候,它就知道這棵樹已經被完全處理了。因為消息的隨機 ID 是一個 64bit 的值,因此 ack val 在樹處理完之前被置為 0 的概率非常小。假設你每秒鐘發送一萬個消息,從概率上說,至少需要 50,000,000 年才會有機會發生一次錯誤。即使如此,也只有在這個消息確實處理失敗的情況下才會有數據的丟失!
4.6 選擇合適的可靠性級別
Acker 任務是輕量級的,所以在拓撲中并不需要太多的 acker 存在。可以通過 Storm UI 來觀察 acker 任務的吞吐量,如果看上去吞吐量不夠的話,說明需要添加額外的 acker。
如果你并不要求每個消息必須被處理(你允許在處理過程中丟失一些信息),那么可以關閉消息的可靠處理機制,從而可以獲取較好的性能。關閉消息的可靠處理機制意味著系統中的消息數會減半(每個消息不需要應答了)。另外,關閉消息的可靠處理可以減少消息的大?。ú恍枰總€ tuple 記錄它的根 id 了),從而節省帶寬。
有三種方法可以關系消息的可靠處理機制:
將參數 Config.TOPOLOGY_ACKERS 設置為 0,通過此方法,當 Spout 發送一個消息的時候,它的 ack 方法將立刻被調用;
第二個方法是 Spout 發送一個消息時,不指定此消息的 messageID。當需要關閉特定消息可靠性的時候,可以使用此方法;
最后,如果你不在意某個消息派生出來的子孫消息的可靠性,則此消息派生出來的子消息在發送時不要做錨定,即在 emit 方法中不指定輸入消息。因為這些子孫消息沒有被錨定在任何 tuple tree 中,因此他們的失敗不會引起任何 spout 重新發送消息。
4.7 集群的各級容錯
到現在為止,大家已經理解了 Storm 的可靠性機制,并且知道了如何選擇不同的可靠性級別來滿足需求。接下來我們研究一下 Storm 如何保證在各種情況下確保數據不丟失。
3.7.1 任務級失敗
因為 bolt 任務 crash 引起的消息未被應答。此時,acker 中所有與此 bolt 任務關聯的消息都會因為超時而失敗,對應 spout 的 fail 方法將被調用。
acker 任務失敗。如果 acker 任務本身失敗了,它在失敗之前持有的所有消息都將會因為超時而失敗。Spout 的 fail 方法將被調用。
Spout 任務失敗。這種情況下,Spout 任務對接的外部設備(如 MQ)負責消息的完整性。例如當客戶端異常的情況下,kestrel 隊列會將處于 pending 狀態的所有的消息重新放回到隊列中。
4.7.2 任務槽(slot) 故障
worker 失敗。每個 worker 中包含數個 bolt(或 spout)任務。supervisor 負責監控這些任務,當 worker 失敗后,supervisor 會嘗試在本機重啟它。
supervisor 失敗。supervisor 是無狀態的,因此 supervisor 的失敗不會影響當前正在運行的任務,只要及時的將它重新啟動即可。supervisor 不是自舉的,需要外部監控來及時重啟。
nimbus 失敗。nimbus 是無狀態的,因此 nimbus 的失敗不會影響當前正在運行的任務(nimbus 失敗時,無法提交新的任務),只要及時的將它重新啟動即可。nimbus 不是自舉的,需要外部監控來及時重啟。
4.7.3. 集群節點(機器)故障
storm 集群中的節點故障。此時 nimbus 會將此機器上所有正在運行的任務轉移到其他可用的機器上運行。
zookeeper 集群中的節點故障。zookeeper 保證少于半數的機器宕機仍可正常運行,及時修復故障機器即可。
“storm 消息的可靠處理方法是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注丸趣 TV 網站,丸趣 TV 小編將為大家輸出更多高質量的實用文章!