共計 3670 個字符,預計需要花費 10 分鐘才能閱讀完成。
這篇文章主要介紹“MR 程序的組件 combiner 怎么使用”,在日常操作中,相信很多人在 MR 程序的組件 combiner 怎么使用問題上存在疑惑,丸趣 TV 小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”MR 程序的組件 combiner 怎么使用”的疑惑有所幫助!接下來,請跟著丸趣 TV 小編一起來學習吧!
用一句簡單的話語描述 combiner 組件作用:降低 map 任務輸出, 減少 reduce 任務數量, 從而降低網絡負載
工作機制:
Map 任務允許在提交給 Reduce 任務之前在本地執行一次匯總的操作,那就是 combiner 組件,combiner 組件的行為模式和 Reduce 一樣,都是接收 key/values,產生 key/value 輸出
注意:
1、combiner 的輸出是 reduce 的輸入
2、如果 combiner 是可插拔的,那么 combiner 絕不能改變最終結果
3、combiner 是一個優化組件, 但是并不是所有地方都能用到, 所以 combiner 只能用于 reduce 的輸入、輸出 key/value 類型完全一致且不影響最終結果的場景。
例子:WordCount 程序中, 通過統計每一個單詞出現的次數, 我們可以首先通過 Map 任務本地進行一次匯總 (Combiner),然后將匯總的結果交給 Reduce,完成各個 Map 任務存在相同 KEY 的數據進行一次總的匯總,圖:
Combiner 代碼:
Combiner 類,直接打開 Combiner 類源碼是直接繼承 Reducer 類,所以我們直接繼承 Reducer 類即可,最終在提交時指定咱們定義的 Combiner 類即可
package com.itheima.hadoop.mapreduce.combiner;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountCombiner extends
Reducer Text, LongWritable, Text, LongWritable {
@Override
protected void reduce(Text key, Iterable LongWritable values, Context context)
throws IOException, InterruptedException {
long count = 0 ;
for (LongWritable value : values) { count += value.get();
}
context.write(key, new LongWritable(count));
}
}
Mapper 類:
package com.itheima.hadoop.mapreduce.mapper;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountCombinerMapper extends
Mapper LongWritable, Text, Text, LongWritable { public void map(LongWritable key, Text value, Context context)
throws java.io.IOException, InterruptedException {
String line = value.toString(); // 獲取一行數據
String[] words = line.split( // 獲取各個單詞
for (String word : words) {
// 將每一個單詞寫出去
context.write(new Text(word), new LongWritable(1));
}
}
}
驅動類:
package com.itheima.hadoop.drivers;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import com.itheima.hadoop.mapreduce.combiner.WordCountCombiner;
import com.itheima.hadoop.mapreduce.mapper.WordCountCombinerMapper;
public class WordCountCombinerDriver extends Configured implements Tool{
@Override
public int run(String[] args) throws Exception {
/**
* 提交五重奏: * 1、產生作業
* 2、指定 MAP/REDUCE
* 3、指定 MAPREDUCE 輸出數據類型
* 4、指定路徑
* 5、提交作業
*/
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WordCountCombinerDriver.class);
job.setMapperClass(WordCountCombinerMapper.class);
/*** 此處中間小插曲:combiner 組件 ***/
job.setCombinerClass(WordCountCombiner.class);
/*** 此處中間小插曲:combiner 組件 ***/
//reduce 邏輯和 combiner 邏輯一致且 combiner 又是 reduce 的子類
job.setReducerClass(WordCountCombiner.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
}
主類:
package com.itheima.hadoop.runner;
import org.apache.hadoop.util.ToolRunner;
import com.itheima.hadoop.drivers.WordCountCombinerDriver;
public class WordCountCombinerRunner { public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new WordCountCombinerDriver(), args);
System.exit(res);
}
}
運行結果:
到此,關于“MR 程序的組件 combiner 怎么使用”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注丸趣 TV 網站,丸趣 TV 小編會繼續努力為大家帶來更多實用的文章!