久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Spark性能優化的方法是什么

190次閱讀
沒有評論

共計 6763 個字符,預計需要花費 17 分鐘才能閱讀完成。

本篇內容介紹了“Spark 性能優化的方法是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓丸趣 TV 小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

#ShuffleManager 發展概述
在 Spark 的源碼中,負責 shuffle 過程的執行、計算和處理的組件主要就是 ShuffleManager,也即 shuffle 管理器。而隨著 Spark 的版本的發展,ShuffleManager 也在不斷迭代,變得越來越先進。

在 Spark 1.2 以前,默認的 shuffle 計算引擎是 HashShuffleManager。該 ShuffleManager 而 HashShuffleManager 有著一個非常嚴重的弊端,就是會產生大量的中間磁盤文件,進而由大量的磁盤 IO 操作影響了性能。

因此在 Spark 1.2 以后的版本中,默認的 ShuffleManager 改成了 SortShuffleManager。SortShuffleManager 相較于 HashShuffleManager 來說,有了一定的改進。主要就在于,每個 Task 在進行 shuffle 操作時,雖然也會產生較多的臨時磁盤文件,但是最后會將所有的臨時文件合并(merge)成一個磁盤文件,因此每個 Task 就只有一個磁盤文件。在下一個 stage 的 shuffle read task 拉取自己的數據時,只要根據索引讀取每個磁盤文件中的部分數據即可。

下面我們詳細分析一下 HashShuffleManager 和 SortShuffleManager 的原理。

#HashShuffleManager 運行原理

## 未經優化的 HashShuffleManager

下圖說明了未經優化的 HashShuffleManager 的原理。這里我們先明確一個假設前提:每個 Executor 只有 1 個 CPU core,也就是說,無論這個 Executor 上分配多少個 task 線程,同一時間都只能執行一個 task 線程。

我們先從 shuffle write 開始說起。shuffle write 階段,主要就是在一個 stage 結束計算之后,為了下一個 stage 可以執行 shuffle 類的算子(比如 reduceByKey),而將每個 task 處理的數據按 key 進行“分類”。所謂“分類”,就是對相同的 key 執行 hash 算法,從而將相同 key 都寫入同一個磁盤文件中,而每一個磁盤文件都只屬于下游 stage 的一個 task。在將數據寫入磁盤之前,會先將數據寫入內存緩沖中,當內存緩沖填滿之后,才會溢寫到磁盤文件中去。

那么每個執行 shuffle write 的 task,要為下一個 stage 創建多少個磁盤文件呢?很簡單,下一個 stage 的 task 有多少個,當前 stage 的每個 task 就要創建多少份磁盤文件。比如下一個 stage 總共有 100 個 task,那么當前 stage 的每個 task 都要創建 100 份磁盤文件。如果當前 stage 有 50 個 task,總共有 10 個 Executor,每個 Executor 執行 5 個 Task,那么每個 Executor 上總共就要創建 500 個磁盤文件,所有 Executor 上會創建 5000 個磁盤文件。由此可見,未經優化的 shuffle write 操作所產生的磁盤文件的數量是極其驚人的。

接著我們來說說 shuffle read。shuffle read,通常就是一個 stage 剛開始時要做的事情。此時該 stage 的每一個 task 就需要將上一個 stage 的計算結果中的所有相同 key,從各個節點上通過網絡都拉取到自己所在的節點上,然后進行 key 的聚合或連接等操作。由于 shuffle write 的過程中,task 給下游 stage 的每個 task 都創建了一個磁盤文件,因此 shuffle read 的過程中,每個 task 只要從上游 stage 的所有 task 所在節點上,拉取屬于自己的那一個磁盤文件即可。

shuffle read 的拉取過程是一邊拉取一邊進行聚合的。每個 shuffle read task 都會有一個自己的 buffer 緩沖,每次都只能拉取與 buffer 緩沖相同大小的數據,然后通過內存中的一個 Map 進行聚合等操作。聚合完一批數據后,再拉取下一批數據,并放到 buffer 緩沖中進行聚合操作。以此類推,直到最后將所有數據到拉取完,并得到最終的結果。

## 優化后的 HashShuffleManager

下圖說明了優化后的 HashShuffleManager 的原理。這里說的優化,是指我們可以設置一個參數,spark.shuffle.consolidateFiles。該參數默認值為 false,將其設置為 true 即可開啟優化機制。通常來說,如果我們使用 HashShuffleManager,那么都建議開啟這個選項。

開啟 consolidate 機制之后,在 shuffle write 過程中,task 就不是為下游 stage 的每個 task 創建一個磁盤文件了。此時會出現 shuffleFileGroup 的概念,每個 shuffleFileGroup 會對應一批磁盤文件,磁盤文件的數量與下游 stage 的 task 數量是相同的。一個 Executor 上有多少個 CPU core,就可以并行執行多少個 task。而第一批并行執行的每個 task 都會創建一個 shuffleFileGroup,并將數據寫入對應的磁盤文件內。

當 Executor 的 CPU core 執行完一批 task,接著執行下一批 task 時,下一批 task 就會復用之前已有的 shuffleFileGroup,包括其中的磁盤文件。也就是說,此時 task 會將數據寫入已有的磁盤文件中,而不會寫入新的磁盤文件中。因此,consolidate 機制允許不同的 task 復用同一批磁盤文件,這樣就可以有效將多個 task 的磁盤文件進行一定程度上的合并,從而大幅度減少磁盤文件的數量,進而提升 shuffle write 的性能。

假設第二個 stage 有 100 個 task,第一個 stage 有 50 個 task,總共還是有 10 個 Executor,每個 Executor 執行 5 個 task。那么原本使用未經優化的 HashShuffleManager 時,每個 Executor 會產生 500 個磁盤文件,所有 Executor 會產生 5000 個磁盤文件的。但是此時經過優化之后,每個 Executor 創建的磁盤文件的數量的計算公式為:CPU core 的數量 * 下一個 stage 的 task 數量。也就是說,每個 Executor 此時只會創建 100 個磁盤文件,所有 Executor 只會創建 1000 個磁盤文件。

#SortShuffleManager 運行原理

SortShuffleManager 的運行機制主要分成兩種,一種是普通運行機制,另一種是 bypass 運行機制。當 shuffle read task 的數量小于等于 spark.shuffle.sort.bypassMergeThreshold 參數的值時(默認為 200),就會啟用 bypass 機制。

## 普通運行機制

下圖說明了普通的 SortShuffleManager 的原理。在該模式下,數據會先寫入一個內存數據結構中,此時根據不同的 shuffle 算子,可能選用不同的數據結構。如果是 reduceByKey 這種聚合類的 shuffle 算子,那么會選用 Map 數據結構,一邊通過 Map 進行聚合,一邊寫入內存;如果是 join 這種普通的 shuffle 算子,那么會選用 Array 數據結構,直接寫入內存。接著,每寫一條數據進入內存數據結構之后,就會判斷一下,是否達到了某個臨界閾值。如果達到臨界閾值的話,那么就會嘗試將內存數據結構中的數據溢寫到磁盤,然后清空內存數據結構。

在溢寫到磁盤文件之前,會先根據 key 對內存數據結構中已有的數據進行排序。排序過后,會分批將數據寫入磁盤文件。默認的 batch 數量是 10000 條,也就是說,排序好的數據,會以每批 1 萬條數據的形式分批寫入磁盤文件。寫入磁盤文件是通過 Java 的 BufferedOutputStream 實現的。BufferedOutputStream 是 Java 的緩沖輸出流,首先會將數據緩沖在內存中,當內存緩沖滿溢之后再一次寫入磁盤文件中,這樣可以減少磁盤 IO 次數,提升性能。

一個 task 將所有數據寫入內存數據結構的過程中,會發生多次磁盤溢寫操作,也就會產生多個臨時文件。最后會將之前所有的臨時磁盤文件都進行合并,這就是 merge 過程,此時會將之前所有臨時磁盤文件中的數據讀取出來,然后依次寫入最終的磁盤文件之中。此外,由于一個 task 就只對應一個磁盤文件,也就意味著該 task 為下游 stage 的 task 準備的數據都在這一個文件中,因此還會單獨寫一份索引文件,其中標識了下游各個 task 的數據在文件中的 start offset 與 end offset。

SortShuffleManager 由于有一個磁盤文件 merge 的過程,因此大大減少了文件數量。比如第一個 stage 有 50 個 task,總共有 10 個 Executor,每個 Executor 執行 5 個 task,而第二個 stage 有 100 個 task。由于每個 task 最終只有一個磁盤文件,因此此時每個 Executor 上只有 5 個磁盤文件,所有 Executor 只有 50 個磁盤文件。

##bypass 運行機制 下圖說明了 bypass SortShuffleManager 的原理。bypass 運行機制的觸發條件如下:

shuffle map task 數量小于 spark.shuffle.sort.bypassMergeThreshold 參數的值。

不是聚合類的 shuffle 算子(比如 reduceByKey)。

此時 task 會為每個下游 task 都創建一個臨時磁盤文件,并將數據按 key 進行 hash 然后根據 key 的 hash 值,將 key 寫入對應的磁盤文件之中。當然,寫入磁盤文件時也是先寫入內存緩沖,緩沖寫滿之后再溢寫到磁盤文件的。最后,同樣會將所有臨時磁盤文件都合并成一個磁盤文件,并創建一個單獨的索引文件。

該過程的磁盤寫機制其實跟未經優化的 HashShuffleManager 是一模一樣的,因為都要創建數量驚人的磁盤文件,只是在最后會做一個磁盤文件的合并而已。因此少量的最終磁盤文件,也讓該機制相對未經優化的 HashShuffleManager 來說,shuffle read 的性能會更好。

而該機制與普通 SortShuffleManager 運行機制的不同在于:第一,磁盤寫機制不同;第二,不會進行排序。也就是說,啟用該機制的最大好處在于,shuffle write 過程中,不需要進行數據的排序操作,也就節省掉了這部分的性能開銷。

#shuffle 相關參數調優
以下是 Shffule 過程中的一些主要參數,這里詳細講解了各個參數的功能、默認值以及基于實踐經驗給出的調優建議。
spark.shuffle.file.buffer

默認值:32k

參數說明:該參數用于設置 shuffle write task 的 BufferedOutputStream 的 buffer 緩沖大小。將數據寫到磁盤文件之前,會先寫入 buffer 緩沖中,待緩沖寫滿之后,才會溢寫到磁盤。

調優建議:如果作業可用的內存資源較為充足的話,可以適當增加這個參數的大小(比如 64k),從而減少 shuffle write 過程中溢寫磁盤文件的次數,也就可以減少磁盤 IO 次數,進而提升性能。在實踐中發現,合理調節該參數,性能會有 1%~5% 的提升。

spark.reducer.maxSizeInFlight

默認值:48m

參數說明:該參數用于設置 shuffle read task 的 buffer 緩沖大小,而這個 buffer 緩沖決定了每次能夠拉取多少數據。

調優建議:如果作業可用的內存資源較為充足的話,可以適當增加這個參數的大小(比如 96m),從而減少拉取數據的次數,也就可以減少網絡傳輸的次數,進而提升性能。在實踐中發現,合理調節該參數,性能會有 1%~5% 的提升。

spark.shuffle.io.maxRetries

默認值:3

參數說明:shuffle read task 從 shuffle write task 所在節點拉取屬于自己的數據時,如果因為網絡異常導致拉取失敗,是會自動進行重試的。該參數就代表了可以重試的最大次數。如果在指定次數之內拉取還是沒有成功,就可能會導致作業執行失敗。

調優建議:對于那些包含了特別耗時的 shuffle 操作的作業,建議增加重試最大次數(比如 60 次),以避免由于 JVM 的 full gc 或者網絡不穩定等因素導致的數據拉取失敗。在實踐中發現,對于針對超大數據量(數十億~ 上百億)的 shuffle 過程,調節該參數可以大幅度提升穩定性。

spark.shuffle.io.retryWait

默認值:5s

參數說明:具體解釋同上,該參數代表了每次重試拉取數據的等待間隔,默認是 5s。

調優建議:建議加大間隔時長(比如 60s),以增加 shuffle 操作的穩定性。

spark.shuffle.memoryFraction

默認值:0.2

參數說明:該參數代表了 Executor 內存中,分配給 shuffle read task 進行聚合操作的內存比例,默認是 20%。

調優建議:在資源參數調優中講解過這個參數。如果內存充足,而且很少使用持久化操作,建議調高這個比例,給 shuffle read 的聚合操作更多內存,以避免由于內存不足導致聚合過程中頻繁讀寫磁盤。在實踐中發現,合理調節該參數可以將性能提升 10% 左右。

spark.shuffle.manager

默認值:sort

參數說明:該參數用于設置 ShuffleManager 的類型。Spark 1.5 以后,有三個可選項:hash、sort 和 tungsten-sort。HashShuffleManager 是 Spark 1.2 以前的默認選項,但是 Spark 1.2 以及之后的版本默認都是 SortShuffleManager 了。tungsten-sort 與 sort 類似,但是使用了 tungsten 計劃中的堆外內存管理機制,內存使用效率更高。

調優建議:由于 SortShuffleManager 默認會對數據進行排序,因此如果你的業務邏輯中需要該排序機制的話,則使用默認的 SortShuffleManager 就可以;而如果你的業務邏輯不需要對數據進行排序,那么建議參考后面的幾個參數調優,通過 bypass 機制或優化的 HashShuffleManager 來避免排序操作,同時提供較好的磁盤讀寫性能。這里要注意的是,tungsten-sort 要慎用,因為之前發現了一些相應的 bug。

spark.shuffle.sort.bypassMergeThreshold

默認值:200

參數說明:當 ShuffleManager 為 SortShuffleManager 時,如果 shuffle read task 的數量小于這個閾值(默認是 200),則 shuffle write 過程中不會進行排序操作,而是直接按照未經優化的 HashShuffleManager 的方式去寫數據,但是最后會將每個 task 產生的所有臨時磁盤文件都合并成一個文件,并會創建單獨的索引文件。

調優建議:當你使用 SortShuffleManager 時,如果的確不需要排序操作,那么建議將這個參數調大一些,大于 shuffle read task 的數量。那么此時就會自動啟用 bypass 機制,map-side 就不會進行排序了,減少了排序的性能開銷。但是這種方式下,依然會產生大量的磁盤文件,因此 shuffle write 性能有待提高。

spark.shuffle.consolidateFiles

默認值:false

參數說明:如果使用 HashShuffleManager,該參數有效。如果設置為 true,那么就會開啟 consolidate 機制,會大幅度合并 shuffle write 的輸出文件,對于 shuffle read task 數量特別多的情況下,這種方法可以極大地減少磁盤 IO 開銷,提升性能。

調優建議:如果的確不需要 SortShuffleManager 的排序機制,那么除了使用 bypass 機制,還可以嘗試將 spark.shffle.manager 參數手動指定為 hash,使用 HashShuffleManager,同時開啟 consolidate 機制。在實踐中嘗試過,發現其性能比開啟了 bypass 機制的 SortShuffleManager 要高出 10%~30%。

“Spark 性能優化的方法是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注丸趣 TV 網站,丸趣 TV 小編將為大家輸出更多高質量的實用文章!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計6763字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 同德县| 霍邱县| 修文县| 鹿邑县| 吴桥县| 朝阳县| 西藏| 搜索| 始兴县| 石楼县| 夏邑县| 拉孜县| 祁门县| 丽江市| 辽宁省| 东安县| 临洮县| 扎赉特旗| 弋阳县| 平南县| 仁寿县| 托里县| 精河县| 洛川县| 彰化县| 长治市| 内黄县| 临沭县| 漳州市| 建湖县| 海安县| 靖州| 淮滨县| 东辽县| 景宁| 金乡县| 徐水县| 平远县| 海盐县| 紫金县| 宁陕县|