共計(jì) 9080 個(gè)字符,預(yù)計(jì)需要花費(fèi) 23 分鐘才能閱讀完成。
這篇文章將為大家詳細(xì)講解有關(guān) hadoop 中如何利用 mapreduce 實(shí)現(xiàn) wordcount 和電影評(píng)分預(yù)測(cè),文章內(nèi)容質(zhì)量較高,因此丸趣 TV 小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。
mapreduce 中 map 指映射,map 指的是歸約。
mapreduce 是一個(gè) key-value 來處理數(shù)據(jù)的編程模型,它使用 map 將一組 key-value 映射為另一組 key-value
通過底層傳遞給 reduce,在 reduce 中,它將所有 map 過程傳遞過來的 key-value 進(jìn)行歸約,相同的 key 值,value 值會(huì)放在一起。mapreduce 內(nèi)部還會(huì)對(duì) reduce 過程中的 key 值進(jìn)行一次排序。
一.WordCount
public class WordCount
//
public static final String HDFS = hdfs://localhost:8888
public static final Pattern DELIMITER = Pattern.compile(\\b([a-zA-Z]+)\\b
// 自定義 Map 類型執(zhí)行 映射 這一部分
public static class Map extends Mapper LongWritable, Text, Text, IntWritable
{
//mapreduce 中,Text 相當(dāng)于 String 類型,IntWritable 相當(dāng)于 Int 類型
//LongWritable 是實(shí)現(xiàn)了 WritableComparable 的一個(gè)數(shù)據(jù)類型。 private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
// 重寫父類 map() 函數(shù)
public void map(LongWritable key, Text value,
Context context)
throws IOException, InterruptedException
{
// 讀取一行數(shù)據(jù)
String line = value.toString();
// 將該行字符全部變?yōu)樾?
line = line.toLowerCase();
// 根據(jù)定義好的正則表達(dá)式拆分一行字符串。 Matcher matcher = DELIMITER.matcher(line);
while(matcher.find()){
// 將分解的一個(gè)個(gè)單詞類型轉(zhuǎn)化為 Text。 word.set(matcher.group());
// 將相應(yīng)的 key-value 值傳入。key 值為單詞,value 值為 1.
context.write(word,one);
}
}
}
// 自定義 Combine 過程先對(duì)本地進(jìn)行的 map 進(jìn)行一次 reduce 過程,減少傳遞給主機(jī)的數(shù)據(jù)量.
public static class Combine extends Reducer Text, IntWritable, Text, IntWritable
{
@Override
public void reduce(Text key, Iterable IntWritable values, Context context) throws IOException, InterruptedException {
int sum = 0;
// 遍歷同一個(gè) key 值的所有 value, 所有的 value 放在同一個(gè) Iterable 中。 for (IntWritable line : values)
{ sum += line.get();
}
IntWritable value = new IntWritable(sum);
// 將 key-value 按照指定的輸出格式輸出。 context.write(key, value);
}
}
public static class Reduce extends Reducer Text, IntWritable, Text, IntWritable
{
@Override
public void reduce(Text key, Iterable IntWritable values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable line : values)
{ sum += line.get();
}
IntWritable value = new IntWritable(sum);
context.write(key, value);
}
}
public static void main(String[] args) throws Exception
{ JobConf conf = WordCount.config();
String input = data/1.txt
String output = HDFS + /user/hdfs/wordcount
// 自定義 HDFS 文件操作工具類
HdfsDAO hdfs = new HdfsDAO(WordCount.HDFS, conf);
// 移除存在的文件否則會(huì)報(bào)文件生成文件已存在的錯(cuò)誤
hdfs.rmr(output);
Job job = new Job(conf);
job.setJarByClass(WordCount.class);
// 設(shè)置輸出的 key 值類型
job.setOutputKeyClass(Text.class);
// 設(shè)置輸出的 value 值類型
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(WordCount.Map.class);
job.setCombinerClass(WordCount.Combine.class);
job.setReducerClass(WordCount.Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
// 設(shè)置輸出的格式,這里使用的是自定義的 FileOutputFormat 類,見下文。 job.setOutputFormatClass(ParseTextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static JobConf config() { JobConf conf = new JobConf(WordCount.class);
conf.setJobName( WordCount
conf.addResource( classpath:/hadoop/core-site.xml
conf.addResource( classpath:/hadoop/hdfs-site.xml
conf.addResource( classpath:/hadoop/mapred-site.xml
// conf.set( io.sort.mb ,
return conf;
}
}
自定義文件輸出格式
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
public class ParseTextOutputFormat K, V extends FileOutputFormat K, V {
protected static class LineRecordWriter K, V extends RecordWriter K, V {
private static final String utf8 = UTF-8
private static final byte[] newline;
static {
try { newline = \n .getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException( can t find + utf8 + encoding
}
}
protected DataOutputStream out;
private final byte[] keyValueSeparator;
public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
this.out = out;
try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException( can t find + utf8 + encoding
}
}
public LineRecordWriter(DataOutputStream out) {
this(out, \t
}
/**
* Write the object to the byte stream, handling Text as a special
* case.
* @param o the object to print
* @throws IOException if the write throws, we pass it on
*/
private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else { out.write(o.toString().getBytes(utf8));
}
}
public synchronized void write(K key, V value)
throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey nullValue) {
return;
}
if (!nullKey) { writeObject(key);
}
if (!(nullKey || nullValue)) { out.write(keyValueSeparator);
}
if (!nullValue) { writeObject(value);
}
out.write(newline);
}
public synchronized
void close(TaskAttemptContext context) throws IOException { out.close();
}
}
public RecordWriter K, V
getRecordWriter(TaskAttemptContext job
) throws IOException, InterruptedException { Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator= conf.get( mapred.textoutputformat.separator ,
:
CompressionCodec codec = null;
String extension =
if (isCompressed) {
Class ? extends CompressionCodec codecClass =
getOutputCompressorClass(job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
extension = codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
if (!isCompressed) { FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter K, V (fileOut, keyValueSeparator);
} else { FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter K, V (new DataOutputStream
(codec.createOutputStream(fileOut)),
keyValueSeparator);
}
}
}
二. 電影評(píng)分預(yù)測(cè)
整個(gè)算法的實(shí)現(xiàn)中使用了 slop one 算法來預(yù)測(cè)評(píng)分,此處自定義的輸出類與上文一致。
輸入文件格式為 userId::movieId::score
package main.java.org.conan.myhadoop.recommend;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.hadoop.mapred.JobConf;
import main.java.org.conan.myhadoop.hdfs.HdfsDAO;
public class Recommend {
public static final String HDFS = hdfs://localhost:8888
public static final Pattern DELIMITER = Pattern.compile([\t,]
public static final Pattern STRING = Pattern.compile([\t,:]
// public final static int movieListLength = 100000;
// public static int []movieList = new int[movieListLength];
public static List movieList = new ArrayList();
public static Map userScore = new HashMap();
public static void main(String[] args) throws Exception {
Map String, String path = new HashMap String, String
String in = logfile/4.txt
String out = HDFS + /user/hdfs/recommend + /step5
// path.put( data , logfile/small.csv
// path.put( data , logfile/ratings.dat
if(args.length == 2){ in = args[0];
out = HDFS + args[1];
System.out.println(out);
}
// 設(shè)置數(shù)據(jù)輸入路徑
path.put(data , in);
// 設(shè)置第一步輸入文件路徑
path.put( Step1Input , HDFS + /user/hdfs/recommend
// 設(shè)置第一步結(jié)果輸出路徑
path.put(Step1Output , path.get( Step1Input) + /step1
// 設(shè)置第二步輸入文件路徑
path.put(Step2Input , path.get( Step1Output));
// 設(shè)置第二步結(jié)果輸出路徑
path.put(Step2Output , path.get( Step1Input) + /step2
// 設(shè)置第三步輸入文件路徑
path.put(Step3Input1 , path.get( data));
// path.put( Step3Input2 , logfile/movie/movies.dat
// 設(shè)置第三步結(jié)果輸出路徑
path.put(Step3Output , path.get( Step1Input) + /step3
// path.put(Step3Input2 , path.get( Step2Output));
// path.put(Step3Output2 , path.get( Step1Input) + /step3_2
//
// 設(shè)置第四步輸入文件路徑 1
path.put(Step4Input1 , path.get( Step2Output));
// 設(shè)置第四步輸入文件路徑 2
path.put(Step4Input2 , path.get( Step3Output));
// 設(shè)置第四步結(jié)果輸出路徑
path.put(Step4Output , path.get( Step1Input) + /step4
//
// 設(shè)置第五步輸入文件路徑
path.put(Step5Input , path.get( Step4Output));
// path.put(Step5Input2 , path.get( Step3Output2));
// 設(shè)置第五步結(jié)果輸出路徑
path.put(Step5Output , out);
// 第一步,根據(jù)給出的用戶評(píng)分文件,求出每個(gè)用戶對(duì)物品的評(píng)分矩陣
Step1.run(path);
// 根據(jù)第一步的輸出結(jié)果計(jì)算物品評(píng)分的同現(xiàn)矩陣
Step2.run(path);
// 獲取所有用戶評(píng)過分的電影,并輸出每位用戶對(duì)每部電影的評(píng)分,未評(píng)過則記為 0
Step3.run(path);
// 根據(jù)第二步與第三步的結(jié)果計(jì)算出每位用戶對(duì)每部電影的評(píng)分
Step4.run(path);
// 整理輸出格式。 Step5.run(path);
System.exit(0);
}
public static JobConf config() { JobConf conf = new JobConf(Recommend.class);
conf.setJobName( Recommand
conf.addResource( classpath:/hadoop/core-site.xml
conf.addResource( classpath:/hadoop/hdfs-site.xml
conf.addResource( classpath:/hadoop/mapred-site.xml
// conf.set( io.sort.mb ,
return conf;
}
}
// 求出用戶對(duì)物品的評(píng)分矩陣,即得出用戶對(duì)電影 的評(píng)分矩陣
// 每一行數(shù)據(jù)代表一個(gè)用戶對(duì)所有打分電影的結(jié)果
//key 值為 userId, value 值為 movieID:score,movieId:score