共計 2577 個字符,預計需要花費 7 分鐘才能閱讀完成。
本篇內容主要講解“hadoop map-reduce 中的文件并發操作介紹”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓丸趣 TV 小編來帶大家學習“hadoop map-reduce 中的文件并發操作介紹”吧!
這樣的操作在 map 端或者 reduce 端均可。下面以一個實際業務場景中的例子來簡要說明。
問題簡要描述:
假如 reduce 輸入的 key 是 Text(String),value 是 BytesWritable(byte[]), 不同 key 的種類為 100 萬個,value 的大小平均為 30k 左右,每個 key 大概對應 100 個 value, 要求對每一個 key 建立兩個文件,一個用來不斷添加 value 中的二進制數據,一個用來記錄各個 value 在文件中的位置索引。(大量的小文件會影響 HDFS 的性能,所以最好對這些小文件進行拼接)
當文件數量較小時,可以考慮使用 MultipleOutput 來進行 key-value 的分流,可以按照 key 的不同,將其輸出到不同的文件或者目錄中。但是 reduce 的數量只能為 1, 不然每個 reduce 都會生成相同的目錄或者文件,不能達到最終的目的。此外最重要的是,操作系統對每個進程打開的文件數量的限制,默認為 1024,集群的各個 datanode 可能會配置更高的值,但最多在幾萬左右,仍然是一個限制因素。不能滿足百萬文件的需求。
reduce 的主要目的是用來歸并 key-value 并輸出到 HDFS 上,我們當然也可以在 reduce 中進行其他的操作,比如文件讀寫。因為默認的 partitioner 保證同一個 key 的數據肯定會在同一個 reduce 中,所以在每個 reduce 中只用打開兩個文件進行讀寫即可 (一個索引文件,一個數據文件)。并發度由 reduce 數量決定,將 reduce 數量設為 256,那我們就可以同時處理 256 個 key 的數據 (partioner 保證了不同 reduce 處理的 key 不同,不會引起文件讀寫沖突)。這樣的并發度的效率是很客觀的,可以在較短的時間內完成需求。
思路是這樣,但同時由于 hdfs 的特性以及 hadoop 的任務調度,在文件讀寫過程中,仍有可能會出現很多問題,下面簡要說些一些常見的會碰到的問題。
1.org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException 異常
這可能是最經常碰到的一個問題。可能的原因如下:
(1) 文件流沖突。
一般創建文件時都會打開一個供寫入的文件流。而我們希望是追加,所以如果使用了錯誤的 API,就有可能引起上述問題。以 FileSystem 類為例,如果使用 create() 方法之后再調用 append() 方法,就會拋出上述異常。所以最好使用 createNewFile 方法,只創建文件,不打開流。
(2)mapreduce 推測執行機制
mapreduce 為了提高效率,會在一個任務啟動之后,同時啟動一些相同的任務 (attempt),其中有一個 attempt 成功完成之后,視為整個 task 完成,其結果 作為最終結果,并且殺掉那些較慢的 attempt。集群一般會開啟此選項以優化性能 (以空間換時間)。但在本問題環境下推測執行卻不太合適。因為我們一般希望一個 task 用來處理一個文件,但如果啟動推測執行,會有幾個 attempt 同時試圖操作同一個文件,就會引發異常。所以最好關掉此選項,將 mapred.reduce.max.attempts 設為 1, 或者將 mapred.reduce.tasks.speculative.execution 設為 false.
但此時仍有可能會出現問題。因為如果一個 task 的唯一 attempt 出現問題,在被 kill 掉之后,task 仍會另起一個 attempt,此時因為前一個 attempt 異常終止,仍有可能會影響到新起的 attempt 的文件操作,引發異常。所以最安全的方法是,借鑒推測執行的機制 (每個 attempt 各自生成自己的結果,最終選擇一個作為最終結果),以每個 attempt 的 id 號為后綴附加到所操作的文件上,同時捕獲所有文件操作的異常并處理,這樣可避免文件的讀寫沖突。Context 可以用來獲取運行時的一些上下文信息,可以很容易得到 attempt 的 id 號。注意,此時如果開啟推測執行也可以,但是會生成很多相同的文件 (每個 attempt 一份), 仍然不是最好的解決方法。
同時,我們可以利用 reduce 的輸出來記錄運行“不正常的”key. 這些 task 大多數是 attempt_0 被殺掉而重啟了一個 attempt_1,所以下面的文件一般為兩份。可以對這些情況的 key 輸出 (文件異常或者 attemptID 0),并進行一些后續處理,比如文件重命名,或者緊對這些 key 重新寫入。因為此種情況的 key 一般只占極少數,所以并不影響總體的效率。
2. 文件異常處理
最好能將 mapreduce 中的所有文件操作都設置好異常處理。不然一個文件異常就有可能會使整個 job 失敗。所以從效率來講,最好是在文件發生異常時將其 key 作為 reduce 的輸出以進行記錄。因為同時 mapreduce 會重啟一個 task attempts 重新進行文件讀寫,可保證我們得到最終的數據,最后所需的只是對那些異常的 key 進行一些簡單的文件重命名操作即可。
3. 多目錄以及文件拼接
如果我們將 key 的種類設為 1000 萬,上述方法會生成太多的小文件從而影響 hdfs 的性能,另外,因為所有文件都在同一個目錄下,會導致同一個目錄下文件數目過多而影響訪問效率。
在創建文件的同時建立多個子目錄,一個有用的方法是以 reduce 的 taskid 來建立子目錄。這樣有多少個 reduce 就可以建立多少個子目錄,不會有文件沖突。同一個 reduce 處理的 key 都會在同一個目錄下。
文件拼接要考慮的一個索引的問題。為了將文件索引建立的盡量簡單,應該盡量保證同一個 key 的所有數據都在同一個大文件中。這可以利用 key 的 hashCode 來實現。如果我們想在每個目錄下建立 1000 個文件,只需將 hashCode 對 1000 取余即可。
到此,相信大家對“hadoop map-reduce 中的文件并發操作介紹”有了更深的了解,不妨來實際操作一番吧!這里是丸趣 TV 網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!