共計 5039 個字符,預計需要花費 13 分鐘才能閱讀完成。
這篇文章主要講解了“spark 性能調優的方法是什么”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著丸趣 TV 小編的思路慢慢深入,一起來研究和學習“spark 性能調優的方法是什么”吧!
分配哪些資源?executor、cpu per executor、memory per executor、driver memory
在哪里分配這些資源?在我們在生產環境中,提交 spark 作業時,用的 spark-submit shell 腳本,里面調整對應的參數
/usr/local/spark/bin/spark-submit \
--class cn.spark.sparktest.core.WordCountCluster \
--num-executors 3 \ 配置 executor 的數量
--driver-memory 100m \ 配置 driver 的內存(影響很大)--executor-memory 100m \ 配置每個 executor 的內存大小
--executor-cores 3 \ 配置每個 executor 的 cpu core 數量
/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
調節到多大,算是最大呢?
第一種,Spark Standalone,公司集群上,搭建了一套 Spark 集群,你心里應該清楚每臺機器還能夠給你使用的,大概有多少內存,多少 cpu core;那么,設置的時候,就根據這個實際的情況,去調節每個 spark 作業的資源分配。比如說你的每臺機器能夠給你使用 4G 內存,2 個 cpu core;20 臺機器;executor,20;平均每個 executor:4G 內存,2 個 cpu core。
第二種,Yarn。資源隊列。資源調度。應該去查看,你的 spark 作業,要提交到的資源隊列,大概有多少資源?500G 內存,100 個 cpu core;executor,50;平均每個 executor:10G 內存,2 個 cpu core。
設置隊列名稱:spark.yarn.queue default
一個原則,你能使用的資源有多大,就盡量去調節到最大的大?。╡xecutor 的數量,幾十個到上百個不等;executor 內存;executor cpu core)
為什么調節了資源以后,性能可以提升?
增加 executor:
如果 executor 數量比較少,那么,能夠并行執行的 task 數量就比較少,就意味著,我們的 Application 的并行執行的能力就很弱。比如有 3 個 executor,每個 executor 有 2 個 cpu core,那么同時能夠并行執行的 task,就是 6 個。6 個執行完以后,再換下一批 6 個 task。增加了 executor 數量以后,那么,就意味著,能夠并行執行的 task 數量,也就變多了。比如原先是 6 個,現在可能可以并行執行 10 個,甚至 20 個,100 個。那么并行能力就比之前提升了數倍,數十倍。相應的,性能(執行的速度),也能提升數倍~ 數十倍。
有時候數據量比較少,增加大量的 task 反而性能會降低,為什么?(想想就明白了,你用多了,別人用的就少了。。。。)
增加每個 executor 的 cpu core:
也是增加了執行的并行能力。原本 20 個 executor,每個才 2 個 cpu core。能夠并行執行的 task 數量,就是 40 個 task?,F在每個 executor 的 cpu core,增加到了 5 個。能夠并行執行的 task 數量,就是 100 個 task。執行的速度,提升了 2.5 倍。
SparkContext,DAGScheduler,TaskScheduler,會將我們的算子,切割成大量的 task,
提交到 Application 的 executor 上面去執行。
增加每個 executor 的內存量:
增加了內存量以后,對性能的提升,有三點:
1、如果需要對 RDD 進行 cache,那么更多的內存,就可以緩存更多的數據,將更少的數據寫入磁盤,甚至不寫入磁盤。減少了磁盤 IO。
2、對于 shuffle 操作,reduce 端,會需要內存來存放拉取的數據并進行聚合。如果內存不夠,也會寫入磁盤。如果給 executor 分配更多內存以后,就有更少的數據,需要寫入磁盤,
甚至不需要寫入磁盤。減少了磁盤 IO,提升了性能。
3、對于 task 的執行,可能會創建很多對象。如果內存比較小,可能會頻繁導致 JVM 堆內存滿了,然后頻繁 GC,垃圾回收,minor GC 和 full GC。(速度很慢)。內存加大以后,帶來更少的 GC,垃圾回收,避免了速度變慢,速度變快了。
Spark 并行度指的是什么?
Spark 作業,Application,Jobs,action(collect)觸發一個 job,1 個 job;每個 job 拆成多個 stage,
發生 shuffle 的時候,會拆分出一個 stage,reduceByKey。
stage0
val lines = sc.textFile(hdfs://)
val words = lines.flatMap(_.split( ))
val pairs = words.map((_,1))
val wordCount = pairs.reduceByKey(_ + _)
stage1
val wordCount = pairs.reduceByKey(_ + _)
wordCount.collect()
reduceByKey,stage0 的 task,在最后,執行到 reduceByKey 的時候,會為每個 stage1 的 task,都創建一份文件(也可能是合并在少量的文件里面);每個 stage1 的 task,會去各個節點上的各個 task 創建的屬于自己的那一份文件里面,拉取數據;每個 stage1 的 task,拉取到的數據,一定是相同 key 對應的數據。對相同的 key,對應的 values,才能去執行我們自定義的 function 操作(_ + _)
并行度:其實就是指的是,Spark 作業中,各個 stage 的 task 數量,也就代表了 Spark 作業的在各個階段 (stage) 的并行度。
如果不調節并行度,導致并行度過低,會怎么樣?
task 沒有設置,或者設置的很少,比如就設置了,100 個 task。50 個 executor,每個 executor 有 3 個 cpu core,也就是說,你的 Application 任何一個 stage 運行的時候,都有總數在 150 個 cpu core,可以并行運行。但是你現在,只有 100 個 task,平均分配一下,每個 executor 分配到 2 個 task,ok,那么同時在運行的 task,只有 100 個,每個 executor 只會并行運行 2 個 task。每個 executor 剩下的一個 cpu core,就浪費掉了。
你的資源雖然分配足夠了,但是問題是,并行度沒有與資源相匹配,導致你分配下去的資源都浪費掉了。合理的并行度的設置,應該是要設置的足夠大,大到可以完全合理的利用你的集群資源;比如上面的例子,總共集群有 150 個 cpu core,可以并行運行 150 個 task。那么就應該將你的 Application 的并行度,至少設置成 150,才能完全有效的利用你的集群資源,讓 150 個 task,并行執行;而且 task 增加到 150 個以后,即可以同時并行運行,還可以讓每個 task 要處理的數據量變少;比如總共 150G 的數據要處理,如果是 100 個 task,每個 task 計算 1.5G 的數據;現在增加到 150 個 task,可以并行運行,而且每個 task 主要處理 1G 的數據就可以。
很簡單的道理,只要合理設置并行度,就可以完全充分利用你的集群計算資源,并且減少每個 task 要處理的數據量,最終,就是提升你的整個 Spark 作業的性能和運行速度。
task 數量,至少設置成與 Spark application 的總 cpu core 數量相同(最理想情況,比如總共 150 個 cpu core,分配了 150 個 task,一起運行,差不多同一時間運行完畢)
官方是推薦,task 數量,設置成 spark application 總 cpu core 數量的 2~3 倍,比如 150 個 cpu core,基本要設置 task 數量為 300~500;實際情況,與理想情況不同的,有些 task 會運行的快一點,比如 50s 就完了,有些 task,可能會慢一點,要 1 分半才運行完,所以如果你的 task 數量,剛好設置的跟 cpu core 數量相同,可能還是會導致資源的浪費,因為,比如 150 個 task,10 個先運行完了,剩余 140 個還在運行,但是這個時候,有 10 個 cpu core 就空閑出來了,就導致了浪費。那如果 task 數量設置成 cpu core 總數的 2~3 倍,那么一個 task 運行完了以后,另一個 task 馬上可以補上來,就盡量讓 cpu core 不要空閑,同時也是盡量提升 spark 作業運行的效率和速度,提升性能。
如何設置一個 Spark Application 的并行度?
spark.default.parallelism
SparkConf conf = new SparkConf().set( spark.default.parallelism , 500)
默認情況下,多次對一個 RDD 執行算子,去獲取不同的 RDD;都會對這個 RDD 以及之前的父 RDD,全部重新計算一次;讀取 HDFS- RDD1- RDD2-RDD4 這種情況,是絕對絕對,一定要避免的,一旦出現一個 RDD 重復計算的情況,就會導致性能急劇降低。比如,HDFS- RDD1-RDD2 的時間是 15 分鐘,那么此時就要走兩遍,變成 30 分鐘
RDD 架構重構與優化盡量去復用 RDD,差不多的 RDD,可以抽取稱為一個共同的 RDD,供后面的 RDD 計算時,反復使用。
公共 RDD 一定要實現持久化。就好比北方吃餃子,現包現煮。你人來了,要點一盤餃子。餡料 + 餃子皮 + 水 - 包好的餃子,對包好的餃子去煮,煮開了以后,才有你需要的熟的,熱騰騰的餃子?,F實生活中,餃子現包現煮,當然是最好的了。但是 Spark 中,RDD 要去“現包現煮”,那就是一場致命的災難。對于要多次計算和使用的公共 RDD,一定要進行持久化。持久化,也就是說,將 RDD 的數據緩存到內存中 / 磁盤中,(BlockManager),以后無論對這個 RDD 做多少次計算,那么都是直接取這個 RDD 的持久化的數據,比如從內存中或者磁盤中,直接提取一份數據。
持久化,是可以進行序列化的如果正常將數據持久化在內存中,那么可能會導致內存的占用過大,這樣的話,也許,會導致 OOM 內存溢出。當純內存無法支撐公共 RDD 數據完全存放的時候,就優先考慮,使用序列化的方式在純內存中存儲。將 RDD 的每個 partition 的數據,序列化成一個大的字節數組,就一個對象;序列化后,大大減少內存的空間占用。序列化的方式,唯一的缺點就是,在獲取數據的時候,需要反序列化。如果序列化純內存方式,還是導致 OOM,內存溢出;就只能考慮磁盤的方式,內存 + 磁盤的普通方式(無序列化)。內存 + 磁盤,序列化。
為了數據的高可靠性,而且內存充足,可以使用雙副本機制,進行持久化持久化的雙副本機制,持久化后的一個副本,因為機器宕機了,副本丟了,就還是得重新計算一次;持久化的每個數據單元,存儲一份副本,放在其他節點上面;從而進行容錯;一個副本丟了,不用重新計算,還可以使用另外一份副本。這種方式,僅僅針對你的內存資源極度充足.
持久化,很簡單,就是對 RDD 調用 persist()方法,并傳入一個持久化級別
如果是 persist(StorageLevel.MEMORY_ONLY()),純內存,無序列化,那么就可以用 cache()方法來替代
StorageLevel.MEMORY_ONLY_SER(),第二選擇
StorageLevel.MEMORY_AND_DISK(),第三選擇
StorageLevel.MEMORY_AND_DISK_SER(),第四選擇
StorageLevel.DISK_ONLY(),第五選擇
如果內存充足,要使用雙副本高可靠機制, 選擇后綴帶_2 的策略
StorageLevel.MEMORY_ONLY_2()
感謝各位的閱讀,以上就是“spark 性能調優的方法是什么”的內容了,經過本文的學習后,相信大家對 spark 性能調優的方法是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是丸趣 TV,丸趣 TV 小編將為大家推送更多相關知識點的文章,歡迎關注!