久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

如何理解MapReduce計數器

176次閱讀
沒有評論

共計 12654 個字符,預計需要花費 32 分鐘才能閱讀完成。

這篇文章將為大家詳細講解有關如何理解 MapReduce 計數器,文章內容質量較高,因此丸趣 TV 小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。

1、計數器簡介

在許多情況下,一個用戶需要了解待分析的數據,盡管這并非所要執行的分析任務 的核心內容。以統計數據集中無效記錄數目的任務為例,如果發現無效記錄的比例 相當高,那么就需要認真思考為何存在如此多無效記錄。是所采用的檢測程序存在 缺陷,還是數據集質量確實很低,包含大量無效記錄?如果確定是數據集的質量問 題,則可能需要擴大數據集的規模,以增大有效記錄的比例,從而進行有意義的 分析。
計數器是一種收集作業統計信息的有效手段,用于質量控制或應用級統計。計數器 還可輔助診斷系統故障。如果需要將日志信息傳輸到 map 或 reduce 任務,更好的 方法通常是嘗試傳輸計數器值以監測某一特定事件是否發生。對于大型分布式作業 而言,使用計數器更為方便。首先,獲取計數器值比輸出日志更方便,其次,根據 計數器值統計特定事件的發生次數要比分析一堆日志文件容易得多。
 

2、內置計數器

Hadoop 為每個作業維護若干內置計數器, 以描述該作業的各項指標。例如,某些計數器記錄已處理的字節數和記錄數,使用戶可監控已處理的輸入數據量和已產生的輸出數據量,并以此對 job 做適當的優化。

14/06/08 15:13:35 INFO mapreduce.Job: Counters: 46
 File System Counters
 FILE: Number of bytes read=159
 FILE: Number of bytes written=159447
 FILE: Number of read operations=0
 FILE: Number of large read operations=0
 FILE: Number of write operations=0
 HDFS: Number of bytes read=198
 HDFS: Number of bytes written=35
 HDFS: Number of read operations=6
 HDFS: Number of large read operations=0
 HDFS: Number of write operations=2
 Job Counters 
 Launched map tasks=1
 Launched reduce tasks=1
 Rack-local map tasks=1
 Total time spent by all maps in occupied slots (ms)=3896
 Total time spent by all reduces in occupied slots (ms)=9006
 Map-Reduce Framework
 Map input records=3
 Map output records=12
 Map output bytes=129
 Map output materialized bytes=159
 Input split bytes=117
 Combine input records=0
 Combine output records=0
 Reduce input groups=4
 Reduce shuffle bytes=159
 Reduce input records=12
 Reduce output records=4
 Spilled Records=24
 Shuffled Maps =1
 Failed Shuffles=0
 Merged Map outputs=1
 GC time elapsed (ms)=13
 CPU time spent (ms)=3830
 Physical memory (bytes) snapshot=537718784
 Virtual memory (bytes) snapshot=7365263360
 Total committed heap usage (bytes)=2022309888
 Shuffle Errors
 BAD_ID=0
 CONNECTION=0
 IO_ERROR=0
 WRONG_LENGTH=0
 WRONG_MAP=0
 WRONG_REDUCE=0
 File Input Format Counters 
 Bytes Read=81
 File Output Format Counters 
 Bytes Written=35

計數器由其關聯任務維護,并定期傳到 tasktracker,再由 tasktracker 傳給 jobtracker. 因此,計數器能夠被全局地聚集。詳見第 hadoop 權威指南第 170 頁的“進度和狀態的更新”小節。與其他計數器(包括用戶定義的計數器)不同,內置的作業計數器實際上 由 jobtracker 維護,不必在整個網絡中發送。
一個任務的計數器值每次都是完整傳輸的,而非自上次傳輸之后再繼續數未完成的傳輸,以避免由于消息丟失而引發的錯誤。另外,如果一個任務在作業執行期間失 敗,則相關計數器值會減小。僅當一個作業執行成功之后,計數器的值才是完整可 靠的。
 

3、用戶定義的 Java 計數器

MapReduce 允許用戶編寫程序來定義計數器,計數器的值可在 mapper 或 reducer 中增加。多個計數器由一個 Java 枚舉 (enum) 類型來定義,以便對計數器分組。一 個作業可以定義的枚舉類型數量不限,各個枚舉類型所包含的字段數量也不限。枚 舉類型的名稱即為組的名稱,枚舉類型的字段就是計數器名稱。計數器是全局的。換言之,MapReduce 框架將跨所有 map 和 reduce 聚集這些計數器,并在作業結束 時產生一個最終結果。

Notice1:需要說明的是,不同的 hadoop 版本定義的方式會有些許差異。

(1)在 0.20.x 版本中使用 counter 很簡單, 直接定義即可,如無此 counter,hadoop 會自動添加此 counter.
 

Counter ct = context.getCounter( INPUT_WORDS ,  count 
ct.increment(1);

(2)在 0.19.x 版本中, 需要定義 enum

enum MyCounter {INPUT_WORDS };
reporter.incrCounter(MyCounter.INPUT_WORDS, 1);
RunningJob job = JobClient.runJob(conf);
Counters c = job.getCounters();
long cnt = c.getCounter(MyCounter.INPUT_WORDS);

Notice2:使用計數器需要清楚的是它們都存儲在 jobTracker 的內存里。Mapper/Reducer 任務序列化它們,連同更新狀態被發送。為了運行正常且 jobTracker 不會出問題,計數器的數量應該在 10-100 個,計數器不僅僅只用來聚合 MapReduce job 的統計值。新版本的 hadoop 限制了計數器的數量,以防給 jobTracker 帶來損害。你最不想看到的事情就是由于定義上百個計數器而使 jobTracker 宕機。
下面咱們來看一個計數器的實例(以下代碼請運行在 0.20.1 版本以上):
 

3.1 測試數據:

hello world 2013 mapreduce
hello world 2013 mapreduce
hello world 2013 mapreduce

3.2 代碼:

/**
 * Project Name:CDHJobs
 * File Name:MapredCounter.java
 * Package Name:tmp
 * Date:2014-6- 8 下午 2:12:48
 * Copyright (c) 2014, decli#qq.com All Rights Reserved.
 *
 */
package tmp;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountWithCounter {
 static enum WordsNature {
 STARTS_WITH_DIGIT, STARTS_WITH_LETTER, ALL
  * The map class of WordCount.
  */
 public static class TokenCounterMapper extends Mapper Object, Text, Text, IntWritable  {private final static IntWritable one = new IntWritable(1);
 private Text word = new Text();
 public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());
 while (itr.hasMoreTokens()) {word.set(itr.nextToken());
 context.write(word, one);
  * The reducer class of WordCount
  */
 public static class TokenCounterReducer extends Reducer Text, IntWritable, Text, IntWritable  {public void reduce(Text key, Iterable IntWritable  values, Context context) throws IOException,
 InterruptedException {
 int sum = 0;
 String token = key.toString();
 if (StringUtils.isNumeric(token)) {context.getCounter(WordsNature.STARTS_WITH_DIGIT).increment(1);
 } else if (StringUtils.isAlpha(token)) {context.getCounter(WordsNature.STARTS_WITH_LETTER).increment(1);
 context.getCounter(WordsNature.ALL).increment(1);
 for (IntWritable value : values) {sum += value.get();
 context.write(key, new IntWritable(sum));
  * The main entry point.
  */
 public static void main(String[] args) throws Exception {Configuration conf = new Configuration();
 Job job = new Job(conf,  WordCountWithCounter 
 job.setJarByClass(WordCountWithCounter.class);
 job.setMapperClass(TokenCounterMapper.class);
 job.setReducerClass(TokenCounterReducer.class);
 job.setOutputKeyClass(Text.class);
 job.setOutputValueClass(IntWritable.class);
 FileInputFormat.addInputPath(job, new Path( /tmp/dsap/rawdata/june/a.txt));
 FileOutputFormat.setOutputPath(job, new Path( /tmp/dsap/rawdata/june/a_result));
 int exitCode = job.waitForCompletion(true) ? 0 : 1;
 Counters counters = job.getCounters();
 Counter c1 = counters.findCounter(WordsNature.STARTS_WITH_DIGIT);
 System.out.println(-------------- :   + c1.getDisplayName() +  :   + c1.getValue());
 // The below example shows how to get built-in counter groups that Hadoop provides basically.
 for (CounterGroup group : counters) {
 System.out.println( ========================================================== 
 System.out.println(* Counter Group:   + group.getDisplayName() +   ( + group.getName() +  ) 
 System.out.println( number of counters in this group:   + group.size());
 for (Counter counter : group) {System.out.println(  ++++   + counter.getDisplayName() +  :   + counter.getName() +  :  
 + counter.getValue());
 System.exit(exitCode);
}

3.3 結果與計數器詳解

運行結果下面會一并給出。Counter 有 組 group 的概念,用于表示邏輯上相同范圍的所有數值。MapReduce job 提供的默認 Counter 分為 7 個組,下面逐一介紹。這里也拿上面的測試數據來做詳細比對,我將會針對具體的計數器,挑選一些主要的簡述一下。 

...  前面省略  job  運行信息  xx  字  ...
 ALL=4
 STARTS_WITH_DIGIT=1
 STARTS_WITH_LETTER=3
-------------- : STARTS_WITH_DIGIT: 1
==========================================================
#MapReduce job 執行所依賴的數據來自于不同的文件系統,這個 group 表示 job 與文件系統交互的讀寫統計  
* Counter Group: File System Counters (org.apache.hadoop.mapreduce.FileSystemCounter)
 number of counters in this group: 10
 #job 讀取本地文件系統的文件字節數。假定我們當前 map 的輸入數據都來自于 HDFS,那么在 map 階段,這個數據應該是 0。但 reduce 在執行前,它   的輸入數據是經過 shuffle 的 merge 后存儲在 reduce 端本地磁盤中,所以這個數據就是所有 reduce 的總輸入字節數。 ++++ FILE: Number of bytes read: FILE_BYTES_READ: 159
 #map 的中間結果都會 spill 到本地磁盤中,在 map 執行完后,形成最終的 spill 文件。所以 map 端這里的數據就表示 map task 往本地磁盤中總共寫了多少字節。與 map 端相對應的是,reduce 端在 shuffle 時,會不斷地拉取 map 端的中間結果,然后做 merge 并   不斷 spill 到自己的本地磁盤中。最終形成一個單獨文件,這個文件就是 reduce 的輸入文件。 
 ++++ FILE: Number of bytes written: FILE_BYTES_WRITTEN: 159447
 ++++ FILE: Number of read operations: FILE_READ_OPS: 0
 ++++ FILE: Number of large read operations: FILE_LARGE_READ_OPS: 0
 ++++ FILE: Number of write operations: FILE_WRITE_OPS: 0
 #  整個 job 執行過程中,只有 map 端運行時,才從 HDFS 讀取數據,這些數據不限于源文件內容,還包括所有 map 的 split 元數據。所以這個值應該比 FileInputFormatCounters.BYTES_READ  要略大些。 
 ++++ HDFS: Number of bytes read: HDFS_BYTES_READ: 198
 #Reduce 的最終結果都會寫入 HDFS,就是一個 job 執行結果的總量。 
 ++++ HDFS: Number of bytes written: HDFS_BYTES_WRITTEN: 35
 ++++ HDFS: Number of read operations: HDFS_READ_OPS: 6
 ++++ HDFS: Number of large read operations: HDFS_LARGE_READ_OPS: 0
 ++++ HDFS: Number of write operations: HDFS_WRITE_OPS: 2
==========================================================
#這個 group 描述與 job 調度相關的統計  
* Counter Group: Job Counters (org.apache.hadoop.mapreduce.JobCounter)
 number of counters in this group: 5
 #Job 在被調度時,如果啟動了一個 data-local(源文件的幅本在執行 map task 的 taskTracker 本地) 
 ++++ Data-local map tasks 
 # 當前 job 為某些 map task 的執行保留了 slot,總共保留的時間是多少  
 ++++ FALLOW_SLOTS_MILLIS_MAPS/REDUCES
 # 所有 map task 占用 slot 的總時間,包含執行時間和創建 / 銷毀子 JVM 的時間
 ++++ SLOTS_MILLIS_MAPS/REDUCES
 #  此 job 啟動了多少個 map task 
 ++++ Launched map tasks: TOTAL_LAUNCHED_MAPS: 1
 #  此 job 啟動了多少個 reduce task 
 ++++ Launched reduce tasks: TOTAL_LAUNCHED_REDUCES: 1
 ++++ Rack-local map tasks: RACK_LOCAL_MAPS: 1
 ++++ Total time spent by all maps in occupied slots (ms): SLOTS_MILLIS_MAPS: 3896
 ++++ Total time spent by all reduces in occupied slots (ms): SLOTS_MILLIS_REDUCES: 9006
==========================================================
#這個 Counter group 包含了相當多地 job 執行細節數據。這里需要有個概念認識是:一般情況下,record 就表示一行數據,而相對地 byte 表示這行數據的大小是   多少,這里的 group 表示經過 reduce merge 后像這樣的輸入形式{aaa , [5, 8, 2, …]}。 
* Counter Group: Map-Reduce Framework (org.apache.hadoop.mapreduce.TaskCounter)
 number of counters in this group: 20
 # 所有 map task 從 HDFS 讀取的文件總行數  
 ++++ Map input records: MAP_INPUT_RECORDS: 3
 #map task 的直接輸出 record 是多少,就是在 map 方法中調用 context.write 的次數,也就是未經過 Combine 時的原生輸出條數  
 ++++ Map output records: MAP_OUTPUT_RECORDS: 12
 # Map 的輸出結果 key/value 都會被序列化到內存緩沖區中,所以這里的 bytes 指序列化后的最終字節之和  
 ++++ Map output bytes: MAP_OUTPUT_BYTES: 129
 ++++ Map output materialized bytes: MAP_OUTPUT_MATERIALIZED_BYTES: 159
 # # 與 map task  的 split 相關的數據都會保存于 HDFS 中,而在保存時元數據也相應地存儲著數據是以怎樣的壓縮方式放入的,它的具體類型是什么,這些額外的數據是  MapReduce 框架加入的,與 job 無關,這里記錄的大小就是表示額外信息的字節大小
 ++++ Input split bytes: SPLIT_RAW_BYTES: 117
 #Combiner 是為了減少盡量減少需要拉取和移動的數據,所以 combine 輸入條數與 map 的輸出條數是一致的。 ++++ Combine input records: COMBINE_INPUT_RECORDS: 0
 #  經過 Combiner 后,相同 key 的數據經過壓縮,在 map 端自己解決了很多重復數據,表示最終在 map 端中間文件中的所有條目數  
 ++++ Combine output records: COMBINE_OUTPUT_RECORDS: 0
 #Reduce 總共讀取了多少個這樣的 groups 
 ++++ Reduce input groups: REDUCE_INPUT_GROUPS: 4
 #Reduce 端的 copy 線程總共從 map 端抓取了多少的中間數據,表示各個 map task 最終的中間文件總和  
 ++++ Reduce shuffle bytes: REDUCE_SHUFFLE_BYTES: 159
 # 如果有 Combiner 的話,那么這里的數值就等于 map 端 Combiner 運算后的最后條數,如果沒有,那么就應該等于 map 的輸出條數  
 ++++ Reduce input records: REDUCE_INPUT_RECORDS: 12
 # 所有 reduce 執行后輸出的總條目數  
 ++++ Reduce output records: REDUCE_OUTPUT_RECORDS: 4
 #spill 過程在 map 和 reduce 端都會發生,這里統計在總共從內存往磁盤中 spill 了多少條數據  
 ++++ Spilled Records: SPILLED_RECORDS: 24
 # 每個 reduce 幾乎都得從所有 map 端拉取數據,每個 copy 線程拉取成功一個 map 的數據,那么增 1,所以它的總數基本等于  reduce number * map number 
 ++++ Shuffled Maps : SHUFFLED_MAPS: 1
 # copy 線程在抓取 map 端中間數據時,如果因為網絡連接異?;蚴?IO 異常,所引起的 shuffle 錯誤次數  
 ++++ Failed Shuffles: FAILED_SHUFFLE: 0
 # 記錄著 shuffle 過程中總共經歷了多少次 merge 動作  
 ++++ Merged Map outputs: MERGED_MAP_OUTPUTS: 1
 # 通過 JMX 獲取到執行 map 與 reduce 的子 JVM 總共的 GC 時間消耗  
 ++++ GC time elapsed (ms): GC_TIME_MILLIS: 13
 ++++ CPU time spent (ms): CPU_MILLISECONDS: 3830
 ++++ Physical memory (bytes) snapshot: PHYSICAL_MEMORY_BYTES: 537718784
 ++++ Virtual memory (bytes) snapshot: VIRTUAL_MEMORY_BYTES: 7365263360
 ++++ Total committed heap usage (bytes): COMMITTED_HEAP_BYTES: 2022309888
==========================================================
#這組內描述 Shuffle 過程中的各種錯誤情況發生次數,基本定位于 Shuffle 階段 copy 線程抓取 map 端中間數據時的各種錯誤。* Counter Group: Shuffle Errors (Shuffle Errors)
 number of counters in this group: 6
 # 每個 map 都有一個 ID,如 attempt_201109020150_0254_m_000000_0,如果 reduce 的 copy 線程抓取過來的元數據中這個 ID 不是標準格式,那么此 Counter 增加  
 ++++ BAD_ID: BAD_ID: 0
 # 表示 copy 線程建立到 map 端的連接有誤  
 ++++ CONNECTION: CONNECTION: 0
 #Reduce 的 copy 線程如果在抓取 map 端數據時出現 IOException,那么這個值相應增加  
 ++++ IO_ERROR: IO_ERROR: 0
 #map 端的那個中間結果是有壓縮好的有格式數據,所有它有兩個 length 信息:源數據大小與壓縮后數據大小。如果這兩個 length 信息傳輸的有誤(負值),那么此 Counter 增加
 ++++ WRONG_LENGTH: WRONG_LENGTH: 0
 # 每個 copy 線程當然是有目的: 為某個 reduce 抓取某些 map 的中間結果,如果當前抓取的 map 數據不是 copy 線程之前定義好的 map,那么就表示把數據拉錯了
 ++++ WRONG_MAP: WRONG_MAP: 0
 # 與上面描述一致,如果抓取的數據表示它不是為此 reduce 而準備的,那還是拉錯數據了。 
 ++++ WRONG_REDUCE: WRONG_REDUCE: 0
==========================================================
#這個 group 表示 map task 讀取文件內容 (總輸入數據) 的統計  
* Counter Group: File Input Format Counters (org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter)
 number of counters in this group: 1
# Map task 的所有輸入數據(字節),等于各個 map task 的 map 方法傳入的所有 value 值字節之和。 
 ++++ Bytes Read: BYTES_READ: 81
==========================================================
## 這個 group 表示 reduce task 輸出文件內容 (總輸出數據) 的統計  
* Counter Group: File Output Format Counters (org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter)
 number of counters in this group: 1
 ++++ Bytes Written: BYTES_WRITTEN: 35
==========================================================
#  自定義計數器的統計
* Counter Group: tmp.WordCountWithCounter$WordsNature (tmp.WordCountWithCounter$WordsNature)
 number of counters in this group: 3
 ++++ ALL: ALL: 4
 ++++ STARTS_WITH_DIGIT: STARTS_WITH_DIGIT: 1
 ++++ STARTS_WITH_LETTER: STARTS_WITH_LETTER: 3

關于如何理解 MapReduce 計數器就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-25發表,共計12654字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 偃师市| 长治县| 山东| 阜城县| 成安县| 正定县| 湾仔区| 兴安盟| 伽师县| 罗平县| 南漳县| 莲花县| 拜城县| 新丰县| 苗栗县| 茶陵县| 浙江省| 平乐县| 哈巴河县| 沐川县| 仲巴县| 杭锦后旗| 梨树县| 合川市| 彰化县| 双江| 西乡县| 静宁县| 延吉市| 浦江县| 舒城县| 曲阜市| 连平县| 江永县| 临邑县| 邢台县| 大英县| 长兴县| 隆林| 铁岭县| 红河县|