久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

spark性能調優的方法是什么

160次閱讀
沒有評論

共計 5039 個字符,預計需要花費 13 分鐘才能閱讀完成。

這篇文章主要講解了“spark 性能調優的方法是什么”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著丸趣 TV 小編的思路慢慢深入,一起來研究和學習“spark 性能調優的方法是什么”吧!

分配哪些資源?executor、cpu per executor、memory per executor、driver memory

在哪里分配這些資源?在我們在生產環境中,提交 spark 作業時,用的 spark-submit shell 腳本,里面調整對應的參數

/usr/local/spark/bin/spark-submit \
--class cn.spark.sparktest.core.WordCountCluster \
--num-executors 3 \  配置 executor 的數量
--driver-memory 100m \  配置 driver 的內存(影響很大)--executor-memory 100m \  配置每個 executor 的內存大小
--executor-cores 3 \  配置每個 executor 的 cpu core 數量
/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \

 

調節到多大,算是最大呢?

第一種,Spark Standalone,公司集群上,搭建了一套 Spark 集群,你心里應該清楚每臺機器還能夠給你使用的,大概有多少內存,多少 cpu core;那么,設置的時候,就根據這個實際的情況,去調節每個 spark 作業的資源分配。比如說你的每臺機器能夠給你使用 4G 內存,2 個 cpu core;20 臺機器;executor,20;平均每個 executor:4G 內存,2 個 cpu core。

第二種,Yarn。資源隊列。資源調度。應該去查看,你的 spark 作業,要提交到的資源隊列,大概有多少資源?500G 內存,100 個 cpu core;executor,50;平均每個 executor:10G 內存,2 個 cpu core。

設置隊列名稱:spark.yarn.queue default

一個原則,你能使用的資源有多大,就盡量去調節到最大的大?。╡xecutor 的數量,幾十個到上百個不等;executor 內存;executor cpu core)

為什么調節了資源以后,性能可以提升?

增加 executor:

如果 executor 數量比較少,那么,能夠并行執行的 task 數量就比較少,就意味著,我們的 Application 的并行執行的能力就很弱。比如有 3 個 executor,每個 executor 有 2 個 cpu core,那么同時能夠并行執行的 task,就是 6 個。6 個執行完以后,再換下一批 6 個 task。增加了 executor 數量以后,那么,就意味著,能夠并行執行的 task 數量,也就變多了。比如原先是 6 個,現在可能可以并行執行 10 個,甚至 20 個,100 個。那么并行能力就比之前提升了數倍,數十倍。相應的,性能(執行的速度),也能提升數倍~ 數十倍。

有時候數據量比較少,增加大量的 task 反而性能會降低,為什么?(想想就明白了,你用多了,別人用的就少了。。。。)

增加每個 executor 的 cpu core:

也是增加了執行的并行能力。原本 20 個 executor,每個才 2 個 cpu core。能夠并行執行的 task 數量,就是 40 個 task?,F在每個 executor 的 cpu core,增加到了 5 個。能夠并行執行的 task 數量,就是 100 個 task。執行的速度,提升了 2.5 倍。

SparkContext,DAGScheduler,TaskScheduler,會將我們的算子,切割成大量的 task,
提交到 Application 的 executor 上面去執行。

增加每個 executor 的內存量:

增加了內存量以后,對性能的提升,有三點:
1、如果需要對 RDD 進行 cache,那么更多的內存,就可以緩存更多的數據,將更少的數據寫入磁盤,甚至不寫入磁盤。減少了磁盤 IO。
2、對于 shuffle 操作,reduce 端,會需要內存來存放拉取的數據并進行聚合。如果內存不夠,也會寫入磁盤。如果給 executor 分配更多內存以后,就有更少的數據,需要寫入磁盤,
甚至不需要寫入磁盤。減少了磁盤 IO,提升了性能。
3、對于 task 的執行,可能會創建很多對象。如果內存比較小,可能會頻繁導致 JVM 堆內存滿了,然后頻繁 GC,垃圾回收,minor GC 和 full GC。(速度很慢)。內存加大以后,帶來更少的 GC,垃圾回收,避免了速度變慢,速度變快了。

Spark 并行度指的是什么?

Spark 作業,Application,Jobs,action(collect)觸發一個 job,1 個 job;每個 job 拆成多個 stage,
發生 shuffle 的時候,會拆分出一個 stage,reduceByKey。

stage0
val lines = sc.textFile(hdfs://)
val words = lines.flatMap(_.split(  ))
val pairs = words.map((_,1))
val wordCount = pairs.reduceByKey(_ + _)
stage1
val wordCount = pairs.reduceByKey(_ + _)
wordCount.collect()

reduceByKey,stage0 的 task,在最后,執行到 reduceByKey 的時候,會為每個 stage1 的 task,都創建一份文件(也可能是合并在少量的文件里面);每個 stage1 的 task,會去各個節點上的各個 task 創建的屬于自己的那一份文件里面,拉取數據;每個 stage1 的 task,拉取到的數據,一定是相同 key 對應的數據。對相同的 key,對應的 values,才能去執行我們自定義的 function 操作(_ + _)

并行度:其實就是指的是,Spark 作業中,各個 stage 的 task 數量,也就代表了 Spark 作業的在各個階段 (stage) 的并行度。

如果不調節并行度,導致并行度過低,會怎么樣?

task 沒有設置,或者設置的很少,比如就設置了,100 個 task。50 個 executor,每個 executor 有 3 個 cpu core,也就是說,你的 Application 任何一個 stage 運行的時候,都有總數在 150 個 cpu core,可以并行運行。但是你現在,只有 100 個 task,平均分配一下,每個 executor 分配到 2 個 task,ok,那么同時在運行的 task,只有 100 個,每個 executor 只會并行運行 2 個 task。每個 executor 剩下的一個 cpu core,就浪費掉了。

你的資源雖然分配足夠了,但是問題是,并行度沒有與資源相匹配,導致你分配下去的資源都浪費掉了。合理的并行度的設置,應該是要設置的足夠大,大到可以完全合理的利用你的集群資源;比如上面的例子,總共集群有 150 個 cpu core,可以并行運行 150 個 task。那么就應該將你的 Application 的并行度,至少設置成 150,才能完全有效的利用你的集群資源,讓 150 個 task,并行執行;而且 task 增加到 150 個以后,即可以同時并行運行,還可以讓每個 task 要處理的數據量變少;比如總共 150G 的數據要處理,如果是 100 個 task,每個 task 計算 1.5G 的數據;現在增加到 150 個 task,可以并行運行,而且每個 task 主要處理 1G 的數據就可以。

很簡單的道理,只要合理設置并行度,就可以完全充分利用你的集群計算資源,并且減少每個 task 要處理的數據量,最終,就是提升你的整個 Spark 作業的性能和運行速度。

task 數量,至少設置成與 Spark application 的總 cpu core 數量相同(最理想情況,比如總共 150 個 cpu core,分配了 150 個 task,一起運行,差不多同一時間運行完畢)

官方是推薦,task 數量,設置成 spark application 總 cpu core 數量的 2~3 倍,比如 150 個 cpu core,基本要設置 task 數量為 300~500;實際情況,與理想情況不同的,有些 task 會運行的快一點,比如 50s 就完了,有些 task,可能會慢一點,要 1 分半才運行完,所以如果你的 task 數量,剛好設置的跟 cpu core 數量相同,可能還是會導致資源的浪費,因為,比如 150 個 task,10 個先運行完了,剩余 140 個還在運行,但是這個時候,有 10 個 cpu core 就空閑出來了,就導致了浪費。那如果 task 數量設置成 cpu core 總數的 2~3 倍,那么一個 task 運行完了以后,另一個 task 馬上可以補上來,就盡量讓 cpu core 不要空閑,同時也是盡量提升 spark 作業運行的效率和速度,提升性能。

如何設置一個 Spark Application 的并行度?

spark.default.parallelism 
SparkConf conf = new SparkConf().set( spark.default.parallelism ,  500)

 

默認情況下,多次對一個 RDD 執行算子,去獲取不同的 RDD;都會對這個 RDD 以及之前的父 RDD,全部重新計算一次;讀取 HDFS- RDD1- RDD2-RDD4 這種情況,是絕對絕對,一定要避免的,一旦出現一個 RDD 重復計算的情況,就會導致性能急劇降低。比如,HDFS- RDD1-RDD2 的時間是 15 分鐘,那么此時就要走兩遍,變成 30 分鐘

RDD 架構重構與優化盡量去復用 RDD,差不多的 RDD,可以抽取稱為一個共同的 RDD,供后面的 RDD 計算時,反復使用。

公共 RDD 一定要實現持久化。就好比北方吃餃子,現包現煮。你人來了,要點一盤餃子。餡料 + 餃子皮 + 水 - 包好的餃子,對包好的餃子去煮,煮開了以后,才有你需要的熟的,熱騰騰的餃子?,F實生活中,餃子現包現煮,當然是最好的了。但是 Spark 中,RDD 要去“現包現煮”,那就是一場致命的災難。對于要多次計算和使用的公共 RDD,一定要進行持久化。持久化,也就是說,將 RDD 的數據緩存到內存中 / 磁盤中,(BlockManager),以后無論對這個 RDD 做多少次計算,那么都是直接取這個 RDD 的持久化的數據,比如從內存中或者磁盤中,直接提取一份數據。

持久化,是可以進行序列化的如果正常將數據持久化在內存中,那么可能會導致內存的占用過大,這樣的話,也許,會導致 OOM 內存溢出。當純內存無法支撐公共 RDD 數據完全存放的時候,就優先考慮,使用序列化的方式在純內存中存儲。將 RDD 的每個 partition 的數據,序列化成一個大的字節數組,就一個對象;序列化后,大大減少內存的空間占用。序列化的方式,唯一的缺點就是,在獲取數據的時候,需要反序列化。如果序列化純內存方式,還是導致 OOM,內存溢出;就只能考慮磁盤的方式,內存 + 磁盤的普通方式(無序列化)。內存 + 磁盤,序列化。

為了數據的高可靠性,而且內存充足,可以使用雙副本機制,進行持久化持久化的雙副本機制,持久化后的一個副本,因為機器宕機了,副本丟了,就還是得重新計算一次;持久化的每個數據單元,存儲一份副本,放在其他節點上面;從而進行容錯;一個副本丟了,不用重新計算,還可以使用另外一份副本。這種方式,僅僅針對你的內存資源極度充足.

持久化,很簡單,就是對 RDD 調用 persist()方法,并傳入一個持久化級別

如果是 persist(StorageLevel.MEMORY_ONLY()),純內存,無序列化,那么就可以用 cache()方法來替代

StorageLevel.MEMORY_ONLY_SER(),第二選擇

StorageLevel.MEMORY_AND_DISK(),第三選擇

StorageLevel.MEMORY_AND_DISK_SER(),第四選擇

StorageLevel.DISK_ONLY(),第五選擇

如果內存充足,要使用雙副本高可靠機制, 選擇后綴帶_2 的策略

StorageLevel.MEMORY_ONLY_2()

感謝各位的閱讀,以上就是“spark 性能調優的方法是什么”的內容了,經過本文的學習后,相信大家對 spark 性能調優的方法是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是丸趣 TV,丸趣 TV 小編將為大家推送更多相關知識點的文章,歡迎關注!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計5039字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 沅江市| 沧州市| 柳林县| 长子县| 明溪县| 高邮市| 庆城县| 咸阳市| 石城县| 鹤岗市| 中西区| 镇宁| 新郑市| 普安县| 松江区| 靖远县| 郓城县| 嘉禾县| 年辖:市辖区| 教育| 宁化县| 孟州市| 简阳市| 崇礼县| 灌南县| 靖宇县| 蚌埠市| 岱山县| 岳阳县| 潼南县| 贵阳市| 郑州市| 如东县| 布尔津县| 抚顺县| 神池县| 呼图壁县| 博兴县| 武义县| 蓬溪县| 寿宁县|