久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

如何理解TopK算法及其實現(xiàn)

166次閱讀
沒有評論

共計 5967 個字符,預計需要花費 15 分鐘才能閱讀完成。

今天就跟大家聊聊有關(guān)如何理解 TopK 算法及其實現(xiàn),可能很多人都不太了解,為了讓大家更加了解,丸趣 TV 小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

1、問題描述

在大數(shù)據(jù)規(guī)模中,經(jīng)常遇到一類需要求出現(xiàn)頻率最高的 K 個數(shù),這類問題稱為“TOPK”問題!例如:統(tǒng)計歌曲中最熱門的前 10 首歌曲,統(tǒng)計訪問流量最高的前 5 個網(wǎng)站等。

2、例如統(tǒng)計訪問流量最高的前 5 個網(wǎng)站:

數(shù)據(jù) test.data 文件:

數(shù)據(jù)格式解釋:域名   上行流量   下行流量

思路:

1、Mapper 每解析一行內(nèi)容, 按照 \t 獲取各個字段

2、因為 URL 有很多重復記錄, 所以將 URL 放到 key(通過分析 MapReduce 原理), 流量放在 value

3、在 reduce 統(tǒng)計總流量, 通過 TreeMap 進行對數(shù)據(jù)進行緩存, 最后一并輸出(值得注意的是要一次性輸出必須要用到 Reduce 類的 cleanup 方法)

程序如下:

Mapper 類:

package com.itheima.hadoop.mapreduce.mapper;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Counter;
import com.itheima.hadoop.mapreduce.bean.FlowBean;
public class TopKURLMapper extends Mapper LongWritable, Text, Text, FlowBean  {
 /**
 * @param key
 * :  每一行偏移量
 * @param value
 * :  每一行的內(nèi)容
 * @param context
 * :  環(huán)境上下文
 */
 @Override
 public void map(LongWritable key, Text value, Context context)
 throws IOException, InterruptedException {
 /**
 *  該計數(shù)器是 org.apache.hadoop.mapreduce.Counter
 */
 Counter counter = context
 .getCounter( ExistProblem ,  ExistProblemLine  //  自定義存在問題的行錯誤計數(shù)器
 String line = value.toString(); //  讀取一行數(shù)據(jù)
 String[] fields = line.split( \t  //  獲取各個字段, 按照 \t 劃分
 try { String url = fields[0]; //  獲取 URL 字段
 long upFlow = Long.parseLong(fields[1]); //  獲取上行流量 (upFlow) 字段
 long downFlow = Long.parseLong(fields[2]); //  獲取下行流量 (downFlow) 字段
 FlowBean bean = new FlowBean(upFlow, downFlow); //  將上行流量和下行流量封裝到 bean 中
 Text tUrl = new Text(url); //  將 java 數(shù)據(jù)類型轉(zhuǎn)換 hadoop 數(shù)據(jù)類型
 context.write(tUrl, bean); //  傳遞的數(shù)據(jù)較多,封裝到 bean 進行傳輸(tips:bean 傳輸時需要注意序列化問題)
 } catch (Exception e) { e.printStackTrace();
 counter.increment(1); //  記錄錯誤行數(shù)
 }
 }
}

Reduce 類:

package com.itheima.hadoop.mapreduce.reducer;
import java.io.IOException;
import java.util.Map.Entry;
import java.util.TreeMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import com.itheima.hadoop.mapreduce.bean.FlowBean;
public class TopKURLReducer extends Reducer Text, FlowBean, FlowBean, Text  {
 private TreeMap FlowBean, Text  treeMap = new TreeMap FlowBean, Text 
 /**
 * @param key
 * :  每一行相同 URL
 * @param values
 * :  總流量 bean
 */
 @Override
 public void reduce(Text key, Iterable FlowBean  values, Context context)
 throws IOException, InterruptedException {
 long countUpFlow = 0;
 long countDownFlow = 0;
 /*
 * 1、取出每個 bean 的總流量  2、統(tǒng)計多個 bean 的總流量  3、緩存到 treeMap 中
 */
 for (FlowBean bean : values) { countUpFlow += bean.getUpFlow(); //  統(tǒng)計上行流量
 countDownFlow += bean.getDownFlow(); //  統(tǒng)計下行總流量
 }
 //  封裝統(tǒng)計的流量
 FlowBean bean = new FlowBean(countUpFlow, countDownFlow);
 treeMap.put(bean, new Text(key)); //  緩存到 treeMap 中
 }
 @Override
 public void cleanup(Context context) throws IOException,
 InterruptedException {
 // 遍歷緩存
 for (Entry FlowBean,Text  entry : treeMap.entrySet()) { context.write(entry.getKey(), entry.getValue());
 }
 super.cleanup(context); //  不能動原本的銷毀操作
 }
}

FlowBean 類:

package com.itheima.hadoop.mapreduce.bean;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class FlowBean implements Writable, Comparable FlowBean  {
 private long upFlow;
 private long downFlow;
 private long maxFlow;
 @Override
 public String toString() {
 return upFlow +  \t  + downFlow +  \t  + maxFlow;
 }
 /**
 * 1、序列化注意的問題, 序列化需要默認的構(gòu)造方法 (反射) 2、在 readFields() 和 write()方法中, 應(yīng)該遵循按照順序?qū)懗龊妥x入
 */
 public FlowBean() { }
 public FlowBean(long upFlow, long downFlow) {
 this.upFlow = upFlow;
 this.downFlow = downFlow;
 this.maxFlow = upFlow + downFlow;
 }
 public long getUpFlow() {
 return upFlow;
 }
 public void setUpFlow(long upFlow) {
 this.upFlow = upFlow;
 }
 public long getDownFlow() {
 return downFlow;
 }
 public void setDownFlow(long downFlow) {
 this.downFlow = downFlow;
 }
 public long getMaxFlow() {
 return maxFlow;
 }
 public void setMaxFlow(long maxFlow) {
 this.maxFlow = maxFlow;
 }
 @Override
 public void readFields(DataInput dataIn) throws IOException { upFlow = dataIn.readLong();
 downFlow = dataIn.readLong();
 maxFlow = dataIn.readLong();
 }
 @Override
 public void write(DataOutput dataOut) throws IOException { dataOut.writeLong(upFlow);
 dataOut.writeLong(downFlow);
 dataOut.writeLong(maxFlow);
 }
 @Override
 public int compareTo(FlowBean o) {
 return this.maxFlow   o.maxFlow ? -1
 : this.maxFlow   o.maxFlow ? 1 : 0;
 }
}

驅(qū)動類:

package com.itheima.hadoop.drivers;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import com.itheima.hadoop.mapreduce.bean.FlowBean;
import com.itheima.hadoop.mapreduce.mapper.TopKURLMapper;
import com.itheima.hadoop.mapreduce.reducer.TopKURLReducer;
public class TopKURLDriver extends Configured implements Tool{
 @Override
 public int run(String[] args) throws Exception {
 
 /**
 * 1、創(chuàng)建 job 作業(yè)
 * 2、設(shè)置 job 提交的 Class
 * 3、設(shè)置 MapperClass, 設(shè)置 ReduceClass
 * 4、設(shè)置 Mapper 和 Reduce 各自的 OutputKey 和 OutputValue 類型
 * 5、設(shè)置處理文件的路徑, 輸出結(jié)果的路徑
 * 6、提交 job
 */
 Configuration conf = new Configuration();
 Job job = Job.getInstance(conf);
 
 job.setJarByClass(TopKURLRunner.class);
 
 job.setMapperClass(TopKURLMapper.class);
 job.setReducerClass(TopKURLReducer.class);
 
 job.setMapOutputKeyClass(Text.class);
 job.setMapOutputValueClass(FlowBean.class);
 job.setOutputKeyClass(FlowBean.class);
 job.setOutputValueClass(Text.class);
 
 FileInputFormat.setInputPaths(job, new Path(args[0]));
 FileOutputFormat.setOutputPath(job,new Path(args[1]));
 
 // 參數(shù) true 為打印進度
 return job.waitForCompletion(true)?0:1;
 }
}
package com.itheima.hadoop.runner;
import org.apache.hadoop.util.ToolRunner;
import com.itheima.hadoop.runner.TopKURLRunner;
public class TopKURLRunner { public static void main(String[] args) throws Exception { int res = ToolRunner.run(new TopKURLRunner(), args);
 System.exit(res);
 }
}

運行命令:hadoop jar topkurl.jar com.itheima.hadoop.drives.TopKURLDriver /test/inputData /test/outputData

運行結(jié)果:

看完上述內(nèi)容,你們對如何理解 TopK 算法及其實現(xiàn)有進一步的了解嗎?如果還想了解更多知識或者相關(guān)內(nèi)容,請關(guān)注丸趣 TV 行業(yè)資訊頻道,感謝大家的支持。

正文完
 
丸趣
版權(quán)聲明:本站原創(chuàng)文章,由 丸趣 2023-08-17發(fā)表,共計5967字。
轉(zhuǎn)載說明:除特殊說明外本站除技術(shù)相關(guān)以外文章皆由網(wǎng)絡(luò)搜集發(fā)布,轉(zhuǎn)載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 兴业县| 斗六市| 五指山市| 屯昌县| 云安县| 高雄市| 昌江| 盈江县| 东至县| 灌南县| 宁蒗| 泽州县| 湘潭市| 淮阳县| 武胜县| 遂宁市| 化德县| 资讯 | 张掖市| 广宁县| 克拉玛依市| 宜宾县| 沂南县| 包头市| 柘城县| 河东区| 岚皋县| 西丰县| 丽水市| 云梦县| 前郭尔| 松江区| 正镶白旗| 普兰店市| 广南县| 安乡县| 兴城市| 三亚市| 凌源市| 定陶县| 大港区|