久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

如何進行Spark Shuffle實現

145次閱讀
沒有評論

共計 3016 個字符,預計需要花費 8 分鐘才能閱讀完成。

本篇文章為大家展示了如何進行 Spark Shuffle 實現,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

對于大數據計算框架而言,Shuffle 階段的設計優劣是決定性能好壞的關鍵因素之一。丸趣 TV 小編將介紹目前 Spark 的 shuffle 實現,并將之與 MapReduce 進行簡單對比。

(1) shuffle 基本概念與常見實現方式

shuffle,是一個算子,表達的是多對多的依賴關系,在類 MapReduce 計算框架中,是連接 Map 階段和 Reduce 階段的紐帶,即每個 Reduce Task 從每個 Map Task 產生數的據中讀取一片數據,極限情況下可能觸發 M * R 個數據拷貝通道(M 是 Map Task 數目,R 是 Reduce Task 數目)。通常 shuffle 分為兩部分:Map 階段的數據準備和 Reduce 階段的數據拷貝。首先,Map 階段需根據 Reduce 階段的 Task 數量決定每個 Map Task 輸出的數據分片數目,有多種方式存放這些數據分片:

1)保存在內存中或者磁盤上(Spark 和 MapReduce 都存放在磁盤上);

2)每個分片一個文件(現在 Spark 采用的方式,若干年前 MapReduce 采用的方式),或者所有分片放到一個數據文件中,外加一個索引文件記錄每個分片在數據文件中的偏移量(現在 MapReduce 采用的方式)。

在 Map 端,不同的數據存放方式各有優缺點和適用場景。一般而言,shuffle 在 Map 端的數據要存儲到磁盤上,以防止容錯觸發重算帶來的龐大開銷(如果保存到 Reduce 端內存中,一旦 Reduce Task 掛掉了,所有 Map Task 需要重算)。但數據在磁盤上存放方式有多種可選方案,在 MapReduce 前期設計中,采用了現在 Spark 的方案(目前一直在改進),每個 Map Task 為每個 Reduce Task 產生一個文件,該文件只保存特定 Reduce Task 需處理的數據,這樣會產生 M * R 個文件,如果 M 和 R 非常龐大,比如均為 1000,則會產生 100w 個文件,產生和讀取這些文件會產生大量的隨機 IO,效率非常低下。解決這個問題的一種直觀方法是減少文件數目,常用的方法有:

        1) 將一個節點上所有 Map 產生的文件合并成一個大文件(MapReduce 現在采用的方案),

        2) 每個節點產生 {(slot 數目)*R} 個文件(Spark 優化后的方案)。對后面這種方案簡單解釋一下:不管是 MapReduce 1.0 還是 Spark,每個節點的資源會被抽象成若干個 slot,由于一個 Task 占用一個 slot,因此 slot 數目可看成是最多同時運行的 Task 數目。如果一個 Job 的 Task 數目非常多,限于 slot 數目有限,可能需要運行若干輪。這樣,只需要由第一輪產生 {(slot 數目)*R} 個文件,后續幾輪產生的數據追加到這些文件末尾即可。

        因此,后一種方案可減少大作業產生的文件數目。

        在 Reduce 端,各個 Task 會并發啟動多個線程同時從多個 Map Task 端拉取數據。由于 Reduce 階段的主要任務是對數據進行按組規約。

        也就是說,需要將數據分成若干組,以便以組為單位進行處理。大家知道,分組的方式非常多,常見的有:Map/HashTable(key 相同的,放到同一個 value list 中)和 Sort(按 key 進行排序,key 相同的一組,經排序后會挨在一起),這兩種方式各有優缺點,第一種復雜度低,效率高,但是需要將數據全部放到內存中,第二種方案復雜度高,但能夠借助磁盤(外部排序)處理龐大的數據集。Spark 前期采用了第一種方案,而在最新的版本中加入了第二種方案,MapReduce 則從一開始就選用了基于 sort 的方案。

(2)MapReduce Shuffle 發展史

【階段 1】:MapReduce Shuffle 的發展也并不是一馬平川的,剛開始(0.10.0 版本之前)采用了“每個 Map Task 產生 R 個文件”的方案,前面提到,該方案會產生大量的隨機讀寫 IO,對于大數據處理而言,非常不利。

【階段 2】:為了避免 Map Task 產生大量文件,HADOOP-331 嘗試對該方案進行優化,優化方法:為每個 Map Task 提供一個環形 buffer,一旦 buffer 滿了后,則將內存數據 spill 到磁盤上(外加一個索引文件,保存每個 partition 的偏移量),最終合并產生的這些 spill 文件,同時創建一個索引文件,保存每個 partition 的偏移量。

(階段 2):這個階段并沒有對 shuffle 架構做調成,只是對 shuffle 的環形 buffer 進行了優化。在 Hadoop 2.0 版本之前,對 MapReduce 作業進行參數調優時,Map 階段的 buffer 調優非常復雜的,涉及到多個參數,這是由于 buffer 被切分成兩部分使用:一部分保存索引(比如 parition、key 和 value 偏移量和長度),一部分保存實際的數據,這兩段 buffer 均會影響 spill 文件數目,因此,需要根據數據特點對多個參數進行調優,非常繁瑣。而 MAPREDUCE-64 則解決了該問題,該方案讓索引和數據共享一個環形緩沖區,不再將其分成兩部分獨立使用,這樣只需設置一個參數控制 spill 頻率。

【階段 3(進行中)】:目前 shuffle 被當做一個子階段被嵌到 Reduce 階段中的。由于 MapReduce 模型中,Map Task 和 Reduce Task 可以同時運行,因此一個作業前期啟動的 Reduce Task 將一直處于 shuffle 階段,直到所有 Map Task 運行完成,而在這個過程中,Reduce Task 占用著資源,但這部分資源利用率非常低,基本上只使用了 IO 資源。為了提高資源利用率,一種非常好的方法是將 shuffle 從 Reduce 階段中獨立處理,變成一個獨立的階段 / 服務,由專門的 shuffler service 負責數據拷貝,目前百度已經實現了該功能(準備開源?),且收益明顯,具體參考:MAPREDUCE-2354。

(3)Spark Shuffle 發展史

目前看來,Spark Shuffle 的發展史與 MapReduce 發展史非常類似。初期 Spark 在 Map 階段采用了“每個 Map Task 產生 R 個文件”的方法,在 Reduce 階段采用了 map 分組方法,但隨 Spark 變得流行,用戶逐漸發現這種方案在處理大數據時存在嚴重瓶頸問題,因此嘗試對 Spark 進行優化和改進,相關鏈接有:External Sorting for Aggregator and CoGroupedRDDs,“Optimizing Shuffle Performance in Spark”,“Consolidating Shuffle Files in Spark”,優化動機和思路與 MapReduce 非常類似。

Spark 在前期設計中過多依賴于內存,使得一些運行在 MapReduce 之上的大作業難以直接運行在 Spark 之上(可能遇到 OOM 問題)。目前 Spark 在處理大數據集方面尚不完善,用戶需根據作業特點選擇性的將一部分作業遷移到 Spark 上,而不是整體遷移。隨著 Spark 的完善,很多內部關鍵模塊的設計思路將變得與 MapReduce 升級版 Tez 非常類似。

上述內容就是如何進行 Spark Shuffle 實現,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注丸趣 TV 行業資訊頻道。

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-17發表,共計3016字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 抚松县| 清徐县| 灵寿县| 湖北省| 隆子县| 曲沃县| 乌鲁木齐市| 清涧县| 宁明县| 毕节市| 迁安市| 宿迁市| 贵南县| 普洱| 湘西| 泰安市| 芦溪县| 通许县| 汾阳市| 江华| 伊春市| 蚌埠市| 南丹县| 水富县| 棋牌| 伊通| 探索| 甘南县| 安远县| 巴塘县| 新干县| 定远县| 高碑店市| 霍邱县| 锡林浩特市| 聂拉木县| 和田市| 嘉义市| 广平县| 海伦市| 汕头市|