共計 5267 個字符,預計需要花費 14 分鐘才能閱讀完成。
這篇文章主要介紹“spark shuffle 調優的方法是什么”,在日常操作中,相信很多人在 spark shuffle 調優的方法是什么問題上存在疑惑,丸趣 TV 小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”spark shuffle 調優的方法是什么”的疑惑有所幫助!接下來,請跟著丸趣 TV 小編一起來學習吧!
什么情況下會發生 shuffle,然后 shuffle 的原理是什么?
在 spark 中,主要是以下幾個算子:groupByKey、reduceByKey、countByKey、join,等等。
groupByKey,要把分布在集群各個節點上的數據中的同一個 key,對應的 values,都給集中到一塊兒,集中到集群中同一個節點上,更嚴密一點說,就是集中到一個節點的一個 executor 的一個 task 中。然后呢,集中一個 key 對應的 values 之后,才能交給我們來進行處理,key, Iterable value;reduceByKey,算子函數去對 values 集合進行 reduce 操作,最后變成一個 value;countByKey,需要在一個 task 中,獲取到一個 key 對應的所有的 value,然后進行計數,統計總共有多少個 value;join,RDD key, value,RDD key, value,只要是兩個 RDD 中,key 相同對應的 2 個 value,都能到一個節點的 executor 的 task 中,給我們進行處理。
問題在于,同一個單詞,比如說(hello, 1),可能散落在不同的節點上;對每個單詞進行累加計數,就必須讓所有單詞都跑到同一個節點的一個 task 中,給一個 task 來進行處理;
每一個 shuffle 的前半部分 stage 的 task,每個 task 都會創建下一個 stage 的 task 數量相同的文件,比如下一個 stage 會有 100 個 task,那么當前 stage 每個 task 都會創建 100 份文件;會將同一個 key 對應的 values,一定是寫入同一個文件中的;
shuffle 的后半部分 stage 的 task,每個 task 都會從各個節點上的 task 寫的屬于自己的那一份文件中,拉取 key, value 對;然后 task 會有一個內存緩沖區,然后會用 HashMap,進行 key, values 的匯聚;(key ,values);
task 會用我們自己定義的聚合函數,比如 reduceByKey(_+_),把所有 values 進行一對一的累加,聚合出來最終的值。就完成了 shuffle;
shuffle,一定是分為兩個 stage 來完成的。因為這其實是個逆向的過程,不是 stage 決定 shuffle,是 shuffle 決定 stage。
reduceByKey(_+_),在某個 action 觸發 job 的時候,DAGScheduler,會負責劃分 job 為多個 stage。劃分的依據,就是,如果發現有會觸發 shuffle 操作的算子,比如 reduceByKey,就將這個操作的前半部分,以及之前所有的 RDD 和 transformation 操作,劃分為一個 stage;shuffle 操作的后半部分,以及后面的,直到 action 為止的 RDD 和 transformation 操作,劃分為另外一個 stage;
shuffle 前半部分的 task 在寫入數據到磁盤文件之前,都會先寫入一個一個的內存緩沖,內存緩沖滿溢之后,再 spill 溢寫到磁盤文件中。
如果不合并 map 端輸出文件的話,會怎么樣?
減少網絡傳輸、disk io、減少 reduce 端內存緩沖
實際生產環境的條件:
100 個節點(每個節點一個 executor):100 個 executor,每個 executor:2 個 cpu core,總共 1000 個 task:每個 executor 平均 10 個 task,上游 1000 個 task,下游 1000 個 task,每個節點,10 個 task,每個節點或者說每一個 executor 會輸出多少份 map 端文件?10 * 1000= 1 萬個文件(M*R)
總共有多少份 map 端輸出文件?100 * 10000 = 100 萬。
問題來了:默認的這種 shuffle 行為,對性能有什么樣的惡劣影響呢?
shuffle 中的寫磁盤的操作,基本上就是 shuffle 中性能消耗最為嚴重的部分。
通過上面的分析,一個普通的生產環境的 spark job 的一個 shuffle 環節,會寫入磁盤 100 萬個文件。
磁盤 IO 對性能和 spark 作業執行速度的影響,是極其驚人和嚇人的。
基本上,spark 作業的性能,都消耗在 shuffle 中了,雖然不只是 shuffle 的 map 端輸出文件這一個部分,但是這里也是非常大的一個性能消耗點。
new SparkConf().set( spark.shuffle.consolidateFiles , true)
開啟 shuffle map 端輸出文件合并的機制;默認情況下,是不開啟的,就是會發生如上所述的大量 map 端輸出文件的操作,嚴重影響性能。
開啟了 map 端輸出文件的合并機制之后:
第一個 stage,同時就運行 cpu core 個 task,比如 cpu core 是 2 個,并行運行 2 個 task;
每個 task 都創建下一個 stage 的 task 數量個文件;
第一個 stage,并行運行的 2 個 task 執行完以后,就會執行另外兩個 task;
另外 2 個 task 不會再重新創建輸出文件;而是復用之前的 task 創建的 map 端輸出文件,將數據寫入上一批 task 的輸出文件中;
第二個 stage,task 在拉取數據的時候,就不會去拉取上一個 stage 每一個 task 為自己創建的那份輸出文件了;
提醒一下(map 端輸出文件合并):
只有并行執行的 task 會去創建新的輸出文件;
下一批并行執行的 task,就會去復用之前已有的輸出文件;
但是有一個例外,比如 2 個 task 并行在執行,但是此時又啟動要執行 2 個 task(不是同一批次);
那么這個時候的話,就無法去復用剛才的 2 個 task 創建的輸出文件了;
而是還是只能去創建新的輸出文件。
要實現輸出文件的合并的效果,必須是一批 task 先執行,然后下一批 task 再執行,
才能復用之前的輸出文件;負責多批 task 同時起來執行,還是做不到復用的。
開啟了 map 端輸出文件合并機制之后,生產環境上的例子,會有什么樣的變化?
實際生產環境的條件:
100 個節點(每個節點一個 executor):100 個 executor
每個 executor:2 個 cpu core
總共 1000 個 task:每個 executor 平均 10 個 task
上游 1000 個 task,下游 1000 個 task
每個節點,2 個 cpu core,有多少份輸出文件呢?2 * 1000 = 2000 個(C*R)
總共 100 個節點,總共創建多少份輸出文件呢?100 * 2000 = 20 萬個文件
相比較開啟合并機制之前的情況,100 萬個
map 端輸出文件,在生產環境中,立減 5 倍!
合并 map 端輸出文件,對咱們的 spark 的性能有哪些方面的影響呢?
map task 寫入磁盤文件的 IO,減少:100 萬文件 – 20 萬文件
第二個 stage,原本要拉取第一個 stage 的 task 數量份文件,1000 個 task,第二個 stage 的每個 task,都要拉取 1000 份文件,走網絡傳輸;合并以后,100 個節點,每個節點 2 個 cpu core,第二個 stage 的每個 task,主要拉取 1000 * 2 = 2000 個文件即可;網絡傳輸的性能消耗是不是也大大減少分享一下,實際在生產環境中,使用了 spark.shuffle.consolidateFiles 機制以后,實際的性能調優的效果:對于上述的這種生產環境的配置,性能的提升,還是相當的客觀的。
spark 作業,5 個小時 – 2~3 個小時。
大家不要小看這個 map 端輸出文件合并機制。實際上,在數據量比較大,你自己本身做了前面的性能調優,
executor 上去 - cpu core 上去 - 并行度(task 數量)上去,shuffle 沒調優,shuffle 就很糟糕了;
大量的 map 端輸出文件的產生。對性能有比較惡劣的影響。
這個時候,去開啟這個機制,可以很有效的提升性能。
spark.shuffle.manager hash M*R 個小文件
spark.shuffle.manager sort C*R 個小文件 (默認的 shuffle 管理機制)
spark.shuffle.file.buffer,默認 32k
spark.shuffle.memoryFraction,0.2
默認情況下,shuffle 的 map task,輸出到磁盤文件的時候,統一都會先寫入每個 task 自己關聯的一個內存緩沖區。這個緩沖區大小,默認是 32kb。每一次,當內存緩沖區滿溢之后,才會進行 spill 操作,溢寫操作,溢寫到磁盤文件中去 reduce 端 task,在拉取到數據之后,會用 hashmap 的數據格式,來對各個 key 對應的 values 進行匯聚。針對每個 key 對應的 values,執行我們自定義的聚合函數的代碼,比如_ + _(把所有 values 累加起來)reduce task,在進行匯聚、聚合等操作的時候,實際上,使用的就是自己對應的 executor 的內存,executor(jvm 進程,堆),默認 executor 內存中劃分給 reduce task 進行聚合的比例,是 0.2。問題來了,因為比例是 0.2,所以,理論上,很有可能會出現,拉取過來的數據很多,那么在內存中,放不下;這個時候,默認的行為,就是說,將在內存放不下的數據,都 spill(溢寫)到磁盤文件中去。
原理說完之后,來看一下,默認情況下,不調優,可能會出現什么樣的問題?
默認,map 端內存緩沖是每個 task,32kb。
默認,reduce 端聚合內存比例,是 0.2,也就是 20%。
如果 map 端的 task,處理的數據量比較大,但是呢,你的內存緩沖大小是固定的。
可能會出現什么樣的情況?
每個 task 就處理 320kb,32kb,總共會向磁盤溢寫 320 / 32 = 10 次。
每個 task 處理 32000kb,32kb,總共會向磁盤溢寫 32000 / 32 = 1000 次。
在 map task 處理的數據量比較大的情況下,而你的 task 的內存緩沖默認是比較小的,32kb。可能會造成多次的 map 端往磁盤文件的 spill 溢寫操作,發生大量的磁盤 IO,從而降低性能。
reduce 端聚合內存,占比。默認是 0.2。如果數據量比較大,reduce task 拉取過來的數據很多,那么就會頻繁發生 reduce 端聚合內存不夠用,頻繁發生 spill 操作,溢寫到磁盤上去。而且最要命的是,磁盤上溢寫的數據量越大,后面在進行聚合操作的時候,很可能會多次讀取磁盤中的數據,進行聚合。
默認不調優,在數據量比較大的情況下,可能頻繁地發生 reduce 端的磁盤文件的讀寫。
這兩個點之所以放在一起講,是因為他們倆是有關聯的。數據量變大,map 端肯定會出點問題;
reduce 端肯定也會出點問題;出的問題是一樣的,都是磁盤 IO 頻繁,變多,影響性能。
調優:
調節 map task 內存緩沖:spark.shuffle.file.buffer,默認 32k(spark 1.3.x 不是這個參數,
后面還有一個后綴,kb;spark 1.5.x 以后,變了,就是現在這個參數)
調節 reduce 端聚合內存占比:spark.shuffle.memoryFraction,0.2
在實際生產環境中,我們在什么時候來調節兩個參數?
看 Spark UI,如果你的公司是決定采用 standalone 模式,那么很簡單,你的 spark 跑起來,會顯示一個 Spark UI 的地址,4040 的端口,進去看,依次點擊進去,可以看到,你的每個 stage 的詳情,有哪些 executor,有哪些 task,每個 task 的 shuffle write 和 shuffle read 的量,shuffle 的磁盤和內存,讀寫的數據量;如果是用的 yarn 模式來提交,課程最前面,從 yarn 的界面進去,點擊對應的 application,進入 Spark UI,查看詳情。
如果發現 shuffle 磁盤的 write 和 read,很大,可以調節這兩個參數
調節上面說的那兩個參數。調節的時候的原則。spark.shuffle.file.buffer,每次擴大一倍,然后看看效果,64,128;spark.shuffle.memoryFraction,每次提高 0.1,看看效果。不能調節的太大,太大了以后過猶不及,因為內存資源是有限的,你這里調節的太大了,其他環節的內存使用就會有問題了。
調節了以后,效果?map task 內存緩沖變大了,減少 spill 到磁盤文件的次數;reduce 端聚合內存變大了,
減少 spill 到磁盤的次數,而且減少了后面聚合讀取磁盤文件的數量。
到此,關于“spark shuffle 調優的方法是什么”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注丸趣 TV 網站,丸趣 TV 小編會繼續努力為大家帶來更多實用的文章!