久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

hadoop中如何利用mapreduce實(shí)現(xiàn)wordcount和電影評(píng)分預(yù)測(cè)

共計(jì) 9080 個(gè)字符,預(yù)計(jì)需要花費(fèi) 23 分鐘才能閱讀完成。

這篇文章將為大家詳細(xì)講解有關(guān) hadoop 中如何利用 mapreduce 實(shí)現(xiàn) wordcount 和電影評(píng)分預(yù)測(cè),文章內(nèi)容質(zhì)量較高,因此丸趣 TV 小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。

mapreduce 中 map 指映射,map 指的是歸約。

mapreduce 是一個(gè) key-value 來處理數(shù)據(jù)的編程模型,它使用 map 將一組 key-value 映射為另一組 key-value

通過底層傳遞給 reduce,在 reduce 中,它將所有 map 過程傳遞過來的 key-value 進(jìn)行歸約,相同的 key 值,value 值會(huì)放在一起。mapreduce 內(nèi)部還會(huì)對(duì) reduce 過程中的 key 值進(jìn)行一次排序。

一.WordCount

public class WordCount
 //
 public static final String HDFS =  hdfs://localhost:8888 
 public static final Pattern DELIMITER = Pattern.compile(\\b([a-zA-Z]+)\\b 
 
 // 自定義 Map 類型執(zhí)行   映射 這一部分
 public static class Map extends Mapper LongWritable, Text, Text, IntWritable 
 {
 //mapreduce 中,Text 相當(dāng)于 String 類型,IntWritable 相當(dāng)于 Int 類型
 //LongWritable 是實(shí)現(xiàn)了 WritableComparable 的一個(gè)數(shù)據(jù)類型。 private final static IntWritable one = new IntWritable(1);
 private Text word = new Text();
 
 @Override
 // 重寫父類 map() 函數(shù)
 public void map(LongWritable key, Text value,
 Context context)
 throws IOException, InterruptedException
 {
 // 讀取一行數(shù)據(jù)
 String line = value.toString();
 // 將該行字符全部變?yōu)樾?
 line = line.toLowerCase();
 // 根據(jù)定義好的正則表達(dá)式拆分一行字符串。 Matcher matcher = DELIMITER.matcher(line);
 while(matcher.find()){
 // 將分解的一個(gè)個(gè)單詞類型轉(zhuǎn)化為 Text。 word.set(matcher.group());
 // 將相應(yīng)的 key-value 值傳入。key 值為單詞,value 值為 1.
 context.write(word,one);
 }
 }
 }
 
 // 自定義 Combine 過程先對(duì)本地進(jìn)行的 map 進(jìn)行一次 reduce 過程,減少傳遞給主機(jī)的數(shù)據(jù)量.
 public static class Combine extends Reducer  Text, IntWritable, Text, IntWritable 
 {
 @Override
 public void reduce(Text key, Iterable IntWritable  values, Context context) throws IOException, InterruptedException {
 int sum = 0;
 // 遍歷同一個(gè) key 值的所有 value, 所有的 value 放在同一個(gè) Iterable 中。 for (IntWritable line : values)
 { sum += line.get();
 }
 IntWritable value = new IntWritable(sum);
 // 將 key-value 按照指定的輸出格式輸出。 context.write(key, value);
 }
 }
 
 public static class Reduce extends Reducer  Text, IntWritable, Text, IntWritable 
 {
 @Override
 public void reduce(Text key, Iterable IntWritable  values, Context context) throws IOException, InterruptedException {
 int sum = 0;
 for (IntWritable line : values)
 { sum += line.get();
 }
 IntWritable value = new IntWritable(sum);
 context.write(key, value);
 
 
 }
 }
 public static void main(String[] args) throws Exception
 { JobConf conf = WordCount.config();
 String input =  data/1.txt 
 String output = HDFS +  /user/hdfs/wordcount 
 // 自定義 HDFS 文件操作工具類
 HdfsDAO hdfs = new HdfsDAO(WordCount.HDFS, conf);
 // 移除存在的文件否則會(huì)報(bào)文件生成文件已存在的錯(cuò)誤
 hdfs.rmr(output);
 Job job = new Job(conf);
 job.setJarByClass(WordCount.class);
 
 // 設(shè)置輸出的 key 值類型
 job.setOutputKeyClass(Text.class);
 // 設(shè)置輸出的 value 值類型
 job.setOutputValueClass(IntWritable.class);
 
 job.setMapperClass(WordCount.Map.class);
 job.setCombinerClass(WordCount.Combine.class);
 job.setReducerClass(WordCount.Reduce.class);
 
 job.setInputFormatClass(TextInputFormat.class);
 // 設(shè)置輸出的格式,這里使用的是自定義的 FileOutputFormat 類,見下文。 job.setOutputFormatClass(ParseTextOutputFormat.class);
 FileInputFormat.setInputPaths(job, new Path(input));
 FileOutputFormat.setOutputPath(job, new Path(output));
 
 
 
 
 System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
 
 
 public static JobConf config() { JobConf conf = new JobConf(WordCount.class);
 conf.setJobName( WordCount 
 conf.addResource( classpath:/hadoop/core-site.xml 
 conf.addResource( classpath:/hadoop/hdfs-site.xml 
 conf.addResource( classpath:/hadoop/mapred-site.xml 
// conf.set( io.sort.mb ,  
 return conf;
 }
 

}

自定義文件輸出格式

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;

public class ParseTextOutputFormat K, V  extends FileOutputFormat K, V {  protected static class LineRecordWriter K, V  extends RecordWriter K, V  {  private static final String utf8 =  UTF-8  private static final byte[] newline;  static {  try { newline =  \n .getBytes(utf8);  } catch (UnsupportedEncodingException uee) {  throw new IllegalArgumentException( can t find   + utf8 +   encoding  }  }  protected DataOutputStream out;  private final byte[] keyValueSeparator;  public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {  this.out = out;  try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8);  } catch (UnsupportedEncodingException uee) {  throw new IllegalArgumentException( can t find   + utf8 +   encoding  }  }  public LineRecordWriter(DataOutputStream out) {  this(out,  \t  }  /**  * Write the object to the byte stream, handling Text as a special  * case.  * @param o the object to print  * @throws IOException if the write throws, we pass it on  */  private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o;  out.write(to.getBytes(), 0, to.getLength());  } else { out.write(o.toString().getBytes(utf8));  }  }  public synchronized void write(K key, V value)  throws IOException {  boolean nullKey = key == null || key instanceof NullWritable;  boolean nullValue = value == null || value instanceof NullWritable;  if (nullKey   nullValue) {  return;  }  if (!nullKey) { writeObject(key);  }  if (!(nullKey || nullValue)) { out.write(keyValueSeparator);  }  if (!nullValue) { writeObject(value);  }  out.write(newline);  }  public synchronized   void close(TaskAttemptContext context) throws IOException { out.close();  }  }  public RecordWriter K, V    getRecordWriter(TaskAttemptContext job  ) throws IOException, InterruptedException { Configuration conf = job.getConfiguration();  boolean isCompressed = getCompressOutput(job);  String keyValueSeparator= conf.get( mapred.textoutputformat.separator ,   :  CompressionCodec codec = null;  String extension =   if (isCompressed) {  Class ? extends CompressionCodec  codecClass =   getOutputCompressorClass(job, GzipCodec.class);  codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);  extension = codec.getDefaultExtension();  }  Path file = getDefaultWorkFile(job, extension);  FileSystem fs = file.getFileSystem(conf);  if (!isCompressed) { FSDataOutputStream fileOut = fs.create(file, false);  return new LineRecordWriter K, V (fileOut, keyValueSeparator);  } else { FSDataOutputStream fileOut = fs.create(file, false);  return new LineRecordWriter K, V (new DataOutputStream  (codec.createOutputStream(fileOut)),  keyValueSeparator);  }  }    }

二. 電影評(píng)分預(yù)測(cè)

整個(gè)算法的實(shí)現(xiàn)中使用了 slop one 算法來預(yù)測(cè)評(píng)分,此處自定義的輸出類與上文一致。

輸入文件格式為 userId::movieId::score

package main.java.org.conan.myhadoop.recommend;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.hadoop.mapred.JobConf;
import main.java.org.conan.myhadoop.hdfs.HdfsDAO;
public class Recommend {
 public static final String HDFS =  hdfs://localhost:8888 
 public static final Pattern DELIMITER = Pattern.compile([\t,] 
 
 public static final Pattern STRING = Pattern.compile([\t,:] 
 
// public final static int movieListLength = 100000;
// public static int []movieList = new int[movieListLength];
 public static List movieList = new ArrayList();
 
 public static Map userScore = new HashMap();
 public static void main(String[] args) throws Exception {
 Map String, String  path = new HashMap String, String 
 String in =  logfile/4.txt 
 String out = HDFS +  /user/hdfs/recommend  +  /step5 
 
// path.put( data ,  logfile/small.csv 
 
// path.put( data ,  logfile/ratings.dat 
 if(args.length == 2){ in = args[0];
 out = HDFS + args[1];
 System.out.println(out);
 }
 // 設(shè)置數(shù)據(jù)輸入路徑
 path.put(data , in);
 
 // 設(shè)置第一步輸入文件路徑
 path.put( Step1Input , HDFS +  /user/hdfs/recommend 
 
 // 設(shè)置第一步結(jié)果輸出路徑
 path.put(Step1Output , path.get( Step1Input) +  /step1 
 
 // 設(shè)置第二步輸入文件路徑
 path.put(Step2Input , path.get( Step1Output));
 
 // 設(shè)置第二步結(jié)果輸出路徑
 path.put(Step2Output , path.get( Step1Input) +  /step2 
 
 // 設(shè)置第三步輸入文件路徑
 path.put(Step3Input1 , path.get( data));
// path.put( Step3Input2 ,  logfile/movie/movies.dat 
 // 設(shè)置第三步結(jié)果輸出路徑
 path.put(Step3Output , path.get( Step1Input) +  /step3 
// path.put(Step3Input2 , path.get( Step2Output));
// path.put(Step3Output2 , path.get( Step1Input) +  /step3_2 
// 
 // 設(shè)置第四步輸入文件路徑 1
 path.put(Step4Input1 , path.get( Step2Output));
 
 // 設(shè)置第四步輸入文件路徑 2
 path.put(Step4Input2 , path.get( Step3Output));
 // 設(shè)置第四步結(jié)果輸出路徑
 path.put(Step4Output , path.get( Step1Input) +  /step4 
// 
 // 設(shè)置第五步輸入文件路徑
 path.put(Step5Input , path.get( Step4Output));
// path.put(Step5Input2 , path.get( Step3Output2));
 // 設(shè)置第五步結(jié)果輸出路徑
 path.put(Step5Output , out);
 
 // 第一步,根據(jù)給出的用戶評(píng)分文件,求出每個(gè)用戶對(duì)物品的評(píng)分矩陣
 Step1.run(path);
 
 // 根據(jù)第一步的輸出結(jié)果計(jì)算物品評(píng)分的同現(xiàn)矩陣
 Step2.run(path);
 
 // 獲取所有用戶評(píng)過分的電影,并輸出每位用戶對(duì)每部電影的評(píng)分,未評(píng)過則記為 0
 Step3.run(path);
 
 // 根據(jù)第二步與第三步的結(jié)果計(jì)算出每位用戶對(duì)每部電影的評(píng)分
 Step4.run(path);
 
 // 整理輸出格式。 Step5.run(path);
 
 System.exit(0);
 }
 public static JobConf config() { JobConf conf = new JobConf(Recommend.class);
 conf.setJobName( Recommand 
 conf.addResource( classpath:/hadoop/core-site.xml 
 conf.addResource( classpath:/hadoop/hdfs-site.xml 
 conf.addResource( classpath:/hadoop/mapred-site.xml 
// conf.set( io.sort.mb ,  
 return conf;
 }
}
// 求出用戶對(duì)物品的評(píng)分矩陣,即得出用戶對(duì)電影   的評(píng)分矩陣 
// 每一行數(shù)據(jù)代表一個(gè)用戶對(duì)所有打分電影的結(jié)果
//key 值為 userId, value 值為 movieID:score,movieId:score

正文完
 
丸趣
版權(quán)聲明:本站原創(chuàng)文章,由 丸趣 2023-08-17發(fā)表,共計(jì)9080字。
轉(zhuǎn)載說明:除特殊說明外本站除技術(shù)相關(guān)以外文章皆由網(wǎng)絡(luò)搜集發(fā)布,轉(zhuǎn)載請(qǐng)注明出處。
評(píng)論(沒有評(píng)論)
主站蜘蛛池模板: 彭水| 牙克石市| 武安市| 扶余县| 晋江市| 莱州市| 资兴市| 镇坪县| 通州区| 海门市| 临城县| 涿州市| 龙门县| 苏州市| 察隅县| 资溪县| 策勒县| 吉安县| 云龙县| 康平县| 阳春市| 镇沅| 仙游县| 年辖:市辖区| 讷河市| 旬阳县| 白城市| 卢氏县| 贵南县| 静安区| 绍兴市| 宣恩县| 岑巩县| 潞西市| 西华县| 南漳县| 庆元县| 托克托县| 龙岩市| 丰县| 海原县|