久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Storm怎么實現單詞計數

162次閱讀
沒有評論

共計 7777 個字符,預計需要花費 20 分鐘才能閱讀完成。

本篇內容主要講解“Storm 怎么實現單詞計數”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓丸趣 TV 小編來帶大家學習“Storm 怎么實現單詞計數”吧!

在上一次單詞計數的基礎上做如下改動:使用 自定義   分組策略,將首字母相同的單詞發送給同一個 task 計數
自定義  CustomStreamGrouping

package com.zhch.v4;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.grouping.CustomStreamGrouping;
import backtype.storm.task.WorkerTopologyContext;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class ModuleGrouping implements CustomStreamGrouping, Serializable {
 private List Integer  tasks;
 @Override
 public void prepare(WorkerTopologyContext workerContext, GlobalStreamId streamId, List Integer  targetTasks) {
 this.tasks = targetTasks;
 }
 @Override
 public List Integer  chooseTasks(int taskId, List Object  values) {
 List Integer  taskIds = new ArrayList Integer 
 if (values.size()   0) { String str = values.get(0).toString();
 if (str.isEmpty()) { taskIds.add(0);
 } else { Integer index = str.charAt(0) % tasks.size();
 taskIds.add(tasks.get(index));
 }
 }
 return taskIds;
 }
}

數據源 spout

package com.zhch.v4;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
public class SentenceSpout extends BaseRichSpout {
 private FileReader fileReader = null;
 private boolean completed = false;
 private ConcurrentHashMap UUID, Values  pending;
 private SpoutOutputCollector collector;
 @Override
 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( sentence));
 }
 @Override
 public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
 this.collector = spoutOutputCollector;
 this.pending = new ConcurrentHashMap UUID, Values 
 try { this.fileReader = new FileReader(map.get( wordsFile).toString());
 } catch (Exception e) { throw new RuntimeException( Error reading file [  + map.get( wordsFile) +  ] 
 }
 }
 @Override
 public void nextTuple() { if (completed) {
 try { Thread.sleep(1000);
 } catch (InterruptedException e) { }
 }
 String line;
 BufferedReader reader = new BufferedReader(fileReader);
 try { while ((line = reader.readLine()) != null) { Values values = new Values(line);
 UUID msgId = UUID.randomUUID();
 this.pending.put(msgId, values);
 this.collector.emit(values, msgId);
 }
 } catch (Exception e) { throw new RuntimeException( Error reading tuple , e);
 } finally {
 completed = true;
 }
 }
 @Override
 public void ack(Object msgId) { this.pending.remove(msgId);
 }
 @Override
 public void fail(Object msgId) { this.collector.emit(this.pending.get(msgId), msgId);
 }
}

實現語句分割 bolt

package com.zhch.v4;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
public class SplitSentenceBolt extends BaseRichBolt {
 private OutputCollector collector;
 @Override
 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
 this.collector = outputCollector;
 }
 @Override
 public void execute(Tuple tuple) {
 String sentence = tuple.getStringByField( sentence 
 String[] words = sentence.split(   
 for (String word : words) { collector.emit(tuple, new Values(word));
 }
 this.collector.ack(tuple);
 }
 @Override
 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( word));
 }
}

實現單詞計數 bolt 

package com.zhch.v4;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class WordCountBolt extends BaseRichBolt {
 private OutputCollector collector;
 private HashMap String, Long  counts = null;
 @Override
 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
 this.collector = outputCollector;
 this.counts = new HashMap String, Long 
 }
 @Override
 public void execute(Tuple tuple) {
 String word = tuple.getStringByField( word 
 Long count = this.counts.get(word);
 if (count == null) {
 count = 0L;
 }
 count++;
 this.counts.put(word, count);
 BufferedWriter writer = null;
 try { writer = new BufferedWriter(new FileWriter( /home/grid/stormData/result.txt));
 List String  keys = new ArrayList String 
 keys.addAll(this.counts.keySet());
 Collections.sort(keys);
 for (String key : keys) { Long c = this.counts.get(key);
 writer.write(key +   :   + c);
 writer.newLine();
 writer.flush();
 }
 } catch (Exception e) { e.printStackTrace();
 } finally { if (writer != null) {
 try { writer.close();
 } catch (Exception e) { e.printStackTrace();
 }
 writer = null;
 }
 }
 this.collector.ack(tuple);
 }
 @Override
 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( word ,  count));
 }
}

實現單詞計數 topology 

package com.zhch.v4;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
public class WordCountTopology {
 public static final String SENTENCE_SPOUT_ID =  sentence-spout 
 public static final String SPLIT_BOLT_ID =  split-bolt 
 public static final String COUNT_BOLT_ID =  count-bolt 
 public static final String TOPOLOGY_NAME =  word-count-topology-v4 
 public static void main(String[] args) throws Exception { SentenceSpout spout = new SentenceSpout();
 SplitSentenceBolt spiltBolt = new SplitSentenceBolt();
 WordCountBolt countBolt = new WordCountBolt();
 TopologyBuilder builder = new TopologyBuilder();
 builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);
 builder.setBolt(SPLIT_BOLT_ID, spiltBolt, 2).setNumTasks(4)
 .shuffleGrouping(SENTENCE_SPOUT_ID);
 builder.setBolt(COUNT_BOLT_ID, countBolt, 2)
 .customGrouping(SPLIT_BOLT_ID, new ModuleGrouping()); // 使用   自定義   分組策略
 Config config = new Config();
 config.put(wordsFile , args[0]);
 if (args != null   args.length   1) { config.setNumWorkers(2);
 // 集群模式啟動
 StormSubmitter.submitTopology(args[1], config, builder.createTopology());
 } else { LocalCluster cluster = new LocalCluster();
 cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
 try { Thread.sleep(5 * 1000);
 } catch (InterruptedException e) { }
 cluster.killTopology(TOPOLOGY_NAME);
 cluster.shutdown();
 }
 }
}

提交到 Storm 集群  

storm jar Storm02-1.0-SNAPSHOT.jar com.zhch.v4.WordCountTopology /home/grid/stormData/input.txt word-count-topology-v4

運行結果:

[grid@hadoop5 stormData]$ cat result.txt 
Apache : 1
ETL : 1
It : 1
Storm : 4
a : 4
analytics : 1
and : 5
any : 1
at : 1
can : 1
cases: : 1
clocked : 1
computation : 2
continuous : 1
easy : 2
guarantees : 1
is : 6
it : 2
machine : 1
makes : 1
many : 1
million : 1
more : 1
of : 2
online : 1
open : 1
operate : 1
over : 1
scalable : 1
second : 1
set : 1
simple : 1
source : 1
streams : 1
system : 1
unbounded : 1
up : 1
use : 2
used : 1
what : 1
will : 1
with : 1
your : 1
[grid@hadoop6 stormData]$ cat result.txt 
Hadoop : 1
RPC : 1
batch : 1
be : 2
benchmark : 1
data : 2
did : 1
distributed : 2
doing : 1
fast: : 1
fault-tolerant : 1
for : 2
free : 1
fun : 1
has : 1
language : 1
learning : 1
lot : 1
node : 1
per : 2
process : 1
processed : 2
processing : 2
programming : 1
realtime : 3
reliably : 1
to : 3
torm : 1
tuples : 1

到此,相信大家對“Storm 怎么實現單詞計數”有了更深的了解,不妨來實際操作一番吧!這里是丸趣 TV 網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計7777字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 米脂县| 余姚市| 潮州市| 德保县| 县级市| 会理县| 陕西省| 南充市| 封开县| 竹山县| 汶上县| 交口县| 林西县| 滨州市| 抚宁县| 柳江县| 泰州市| 武宣县| 定兴县| 南岸区| 兴化市| 绥芬河市| 綦江县| 澳门| 南涧| 平武县| 彰化县| 克东县| 洪泽县| 饶河县| 保定市| 张家川| 屏东县| 大渡口区| 上犹县| 化德县| 买车| 教育| 鄢陵县| 邢台市| 焦作市|