共計 2310 個字符,預(yù)計需要花費 6 分鐘才能閱讀完成。
本篇內(nèi)容介紹了“Exactly once 事務(wù)的處理方法是什么”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓丸趣 TV 小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
1,Exactly once 事務(wù)
什么事 Exactly once 事務(wù)?
數(shù)據(jù)僅處理一次并且僅輸出一次,這樣才是完整的事務(wù)處理。
Spark 在運行出錯時不能保證輸出也是事務(wù)級別的。在 Task 執(zhí)行一半的時候出錯了,雖然在語義上做了事務(wù)處理,數(shù)據(jù)僅被處理一次,但是如果是輸出到數(shù)據(jù)庫中,那有空能將結(jié)果多次保存到數(shù)據(jù)庫中。Spark 在任務(wù)失敗時會進(jìn)行重試,這樣會導(dǎo)致結(jié)果多次保存到數(shù)據(jù)庫中。
如下圖,當(dāng)運行在 Executor 上的 Receiver 接收到數(shù)據(jù)通過 BlockManager 寫入內(nèi)存和磁盤,或者通過 WAL 機(jī)制寫記錄日志,然后把 metedata 信息匯報給 Driver。在 Driver 端定期進(jìn)行 checkpoint 操作。Job 的執(zhí)行還是基于 Spark Core 的調(diào)度模式在 Executor 上執(zhí)行 Task。
Exactly once 事務(wù)的處理:
1,數(shù)據(jù)零丟失:必須有可靠的數(shù)據(jù)來源和可靠的 Receiver,且整個應(yīng)用程序的 metadata 必須進(jìn)行 checkpoint,且通過 WAL 來保證數(shù)據(jù)安全。
我們以數(shù)據(jù)來自 Kafka 為例,運行在 Executor 上的 Receiver 在接收到來自 Kafka 的數(shù)據(jù)時會向 Kafka 發(fā)送 ACK 確認(rèn)收到信息并讀取下一條信息,kafka 會 updateOffset 來記錄 Receiver 接收到的偏移,這種方式保證了在 Executor 數(shù)據(jù)零丟失。
在 Driver 端,定期進(jìn)行 checkpoint 操作,出錯時從 Checkpoint 的文件系統(tǒng)中把數(shù)據(jù)讀取進(jìn)來進(jìn)行恢復(fù),內(nèi)部會重新構(gòu)建 StreamingContext(也就是構(gòu)建 SparkContext)并啟動,恢復(fù)出元數(shù)據(jù) metedata,再次產(chǎn)生 RDD,恢復(fù)的是上次的 Job,然后再次提交到集群執(zhí)行。
那么數(shù)據(jù)可能丟失的地方有哪些呢和相應(yīng)的解決方式?
在 Receiver 收到數(shù)據(jù)且通過 Driver 的調(diào)度 Executor 開始計算數(shù)據(jù)的時候,如果 Driver 突然奔潰,則此時 Executor 會被殺死,那么 Executor 中的數(shù)據(jù)就會丟失(如果沒有進(jìn)行 WAL 的操作)。
解決方式:此時就必須通過例如 WAL 的方式,讓所有的數(shù)據(jù)都通過例如 HDFS 的方式首先進(jìn)行安全性容錯處理。此時如果 Executor 中的數(shù)據(jù)丟失的話,就可以通過 WAL 恢復(fù)回來。
這種方式的弊端是通過 WAL 的方式會極大額損傷 SparkStreaming 中 Receivers 接收數(shù)據(jù)的性能。
數(shù)據(jù)重復(fù)讀取的情況:
在 Receiver 收到數(shù)據(jù)保存到 HDFS 等持久化引擎但是沒有來得及進(jìn)行 updateOffsets(以 Kafka 為例),此時 Receiver 崩潰后重新啟動就會通過管理 Kafka 的 Zookeeper 中元數(shù)據(jù)再次重復(fù)讀取數(shù)據(jù),但是此時 SparkStreaming 認(rèn)為是成功的,但是 kafka 認(rèn)為是失敗的(因為沒有更新 offset 到 ZooKeeper 中),此時就會導(dǎo)致數(shù)據(jù)重新消費的情況。
解決方式:以 Receiver 基于 ZooKeeper 的方式,當(dāng)讀取數(shù)據(jù)時去訪問 Kafka 的元數(shù)據(jù)信息,在處理代碼中例如 foreachRDD 或 transform 時,將信息寫入到內(nèi)存數(shù)據(jù)庫中(memorySet),在計算時讀取內(nèi)存數(shù)據(jù)庫信息,判斷是否已處理過,如果以處理過則跳過計算。這些元數(shù)據(jù)信息可以保存到內(nèi)存數(shù)據(jù)結(jié)構(gòu)或者 memsql,sqllite 中。
如果通過 Kafka 作為數(shù)據(jù)來源的話,Kafka 中有數(shù)據(jù),然后 Receiver 接收的時候又會有數(shù)據(jù)副本,這個時候其實是存儲資源的浪費。
Spark 在 1.3 的時候為了避免 WAL 的性能損失和實現(xiàn) Exactly Once 而提供了 Kafka Direct API,把 Kafka 作為文件存儲系統(tǒng)。此時兼具有流的優(yōu)勢和文件系統(tǒng)的優(yōu)勢,至此 Spark Streaming+Kafka 就構(gòu)建了完美的流處理世界(1,數(shù)據(jù)不需要拷貝副本;2,不需要 WAL 對性能的損耗;3,Kafka 使用 ZeroCopy 比 HDFS 更高效)。所有的 Executors 通過 Kafka API 直接消息數(shù)據(jù),直接管理 Offset,所以也不會重復(fù)消費數(shù)據(jù)。
2,輸出不重復(fù)
關(guān)于 Spark Streaming 數(shù)據(jù)輸出多次重寫及其解決方案:
1,為什么會有這個問題,因為 Spark Streaming 在計算的時候基于 Spark Core 天生會做以下事情導(dǎo)致 Spark Streaming 的結(jié)果 (部分) 重復(fù)輸出。Task 重試,慢任務(wù)推測,Stage 重試,Job 重試。
2,具體解決方案:
設(shè)置 spark.task.maxFailures 次數(shù)為 1,這樣就不會有 Task 重試了。設(shè)置 spark.speculation 為關(guān)閉狀態(tài),就不會有慢任務(wù)推測了,因為慢任務(wù)推測非常消耗性能,所以關(guān)閉后可以顯著提高 Spark Streaming 處理性能。
Spark Streaming On Kafka 的話,Job 失敗后可以設(shè)置 Kafka 的參數(shù) auto.offset.reset 為 largest 方式。
最后再次強(qiáng)調(diào)可以通過 transform 和 foreachRDD 基于業(yè)務(wù)邏輯代碼進(jìn)行邏輯控制來實現(xiàn)數(shù)據(jù)不重復(fù)消費和輸出不重復(fù)。這兩個方法類似于 Spark Streaming 的后門,可以做任意想象的控制操作。
“Exactly once 事務(wù)的處理方法是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注丸趣 TV 網(wǎng)站,丸趣 TV 小編將為大家輸出更多高質(zhì)量的實用文章!