久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Storm如何實現單詞計數

166次閱讀
沒有評論

共計 6895 個字符,預計需要花費 18 分鐘才能閱讀完成。

這篇文章主要介紹“Storm 如何實現單詞計數”,在日常操作中,相信很多人在 Storm 如何實現單詞計數問題上存在疑惑,丸趣 TV 小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Storm 如何實現單詞計數”的疑惑有所幫助!接下來,請跟著丸趣 TV 小編一起來學習吧!

1. 使用 mvn 命令創建項目

mvn archetype:generate -DgroupId=storm.test -DartifactId=Storm01 -DpackageName=com.zhch.v1

然后編輯配置文件 pom.xml,添加 storm 依賴  

dependency 
  groupId org.apache.storm /groupId 
  artifactId storm-core /artifactId 
  version 0.9.4 /version 
 /dependency

最后通過下述命令來編譯項目,編譯正確完成后導入到 IDE 中  

mvn install

當然,也可以在 IDE 中安裝 maven 插件,從而直接在 IDE 中創建 maven 項目  

2. 實現數據源,用重復的靜態語句來模擬數據源

package storm.test.v1;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.Map;
public class SentenceSpout extends BaseRichSpout { private String[] sentences = {
  storm integrates with the queueing ,
  and database technologies you already use ,
  a storm topology consumes streams of data ,
  and processes those streams in arbitrarily complex ways ,
  repartitioning the streams between each stage of the computation however needed 
 };
 private int index = 0;
 private SpoutOutputCollector collector;
 @Override
 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( sentence));
 }
 @Override
 public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
 this.collector = spoutOutputCollector;
 }
 @Override
 public void nextTuple() { this.collector.emit(new Values(sentences[index]));
 index++;
 if (index  = sentences.length) {
 index = 0;
 }
 try { Thread.sleep(1);
 } catch (InterruptedException e) { }
 }
}

3. 實現語句分割 bolt 

package storm.test.v1;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
public class SplitSentenceBolt extends BaseRichBolt {
 private OutputCollector collector;
 @Override
 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
 this.collector = outputCollector;
 }
 @Override
 public void execute(Tuple tuple) {
 String sentence = tuple.getStringByField( sentence 
 String[] words = sentence.split(   
 for (String word : words) { this.collector.emit(new Values(word));
 }
 }
 @Override
 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( word));
 }
}

4. 實現單詞計數 bolt 

package storm.test.v1;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
public class WordCountBolt extends BaseRichBolt {
 private OutputCollector collector;
 private HashMap String, Long  counts = null;
 @Override
 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
 this.collector = outputCollector;
 this.counts = new HashMap String, Long 
 }
 @Override
 public void execute(Tuple tuple) {
 String word = tuple.getStringByField( word 
 Long count = this.counts.get(word);
 if (count == null) {
 count = 0L;
 }
 count++;
 this.counts.put(word, count);
 this.collector.emit(new Values(word, count));
 }
 @Override
 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( word ,  count));
 }
}

5. 實現上報 bolt 

package storm.test.v1;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ReportBolt extends BaseRichBolt {
 private HashMap String, Long  counts = null;
 @Override
 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
 counts = new HashMap String, Long 
 }
 @Override
 public void execute(Tuple tuple) {
 String word = tuple.getStringByField( word 
 Long count = tuple.getLongByField( count 
 this.counts.put(word, count);
 }
 @Override
 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { }
 @Override
 public void cleanup() { // 本地模式下,終止 topology 時可以保證 cleanup() 被執行
 System.out.println( --- FINAL COUNTS --- 
 List String  keys = new ArrayList String 
 keys.addAll(this.counts.keySet());
 Collections.sort(keys);
 for (String key : keys) { System.out.println(key +   :   + this.counts.get(key));
 }
 System.out.println( ---------- 
 }
}

6. 實現單詞計數 topology 

package storm.test.v1;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
public class WordCountTopology {
 private static final String SENTENCE_SPOUT_ID =  sentence-spout 
 private static final String SPLIT_BOLT_ID =  split-bolt 
 private static final String COUNT_BOLT_ID =  count-bolt 
 private static final String REPORT_BOLT_ID =  report-bolt 
 private static final String TOPOLOGY_NAME =  word-count-topology 
 public static void main(String[] args) { SentenceSpout spout = new SentenceSpout();
 SplitSentenceBolt spiltBolt = new SplitSentenceBolt();
 WordCountBolt countBolt = new WordCountBolt();
 ReportBolt reportBolt = new ReportBolt();
 TopologyBuilder builder = new TopologyBuilder();
 builder.setSpout(SENTENCE_SPOUT_ID, spout); // 注冊數據源
 builder.setBolt(SPLIT_BOLT_ID, spiltBolt) // 注冊 bolt
 .shuffleGrouping(SENTENCE_SPOUT_ID); // 該 bolt 訂閱 spout 隨機均勻發射來的數據流
 builder.setBolt(COUNT_BOLT_ID, countBolt)
 .fieldsGrouping(SPLIT_BOLT_ID, new Fields( word)); // 該 bolt 訂閱 spiltBolt 發射來的數據流,并且保證 word 字段值相同的 tuple 會被路由到同一個 countBolt
 builder.setBolt(REPORT_BOLT_ID, reportBolt)
 .globalGrouping(COUNT_BOLT_ID); // 該 bolt 訂閱 countBolt 發射來的數據流,并且所有的 tuple 都會被路由到唯一的一個 reportBolt 中
 Config config = new Config();
 // 本地模式啟動
 LocalCluster cluster = new LocalCluster();
 cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
 try { Thread.sleep(5 * 1000);
 } catch (InterruptedException e) { }
 cluster.killTopology(TOPOLOGY_NAME);
 cluster.shutdown();
 }
}

7. 運行結果:

--- FINAL COUNTS ---
a : 302
already : 302
and : 604
arbitrarily : 302
between : 302
complex : 302
computation : 302
consumes : 302
data : 302
database : 302
each : 302
however : 302
in : 302
integrates : 302
needed : 302
of : 604
processes : 302
queueing : 302
repartitioning : 302
stage : 302
storm : 604
streams : 906
technologies : 302
the : 906
those : 302
topology : 302
use : 302
ways : 302
with : 302
you : 302
----------

到此,關于“Storm 如何實現單詞計數”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注丸趣 TV 網站,丸趣 TV 小編會繼續努力為大家帶來更多實用的文章!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計6895字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 深泽县| 西宁市| 达拉特旗| 曲阜市| 锡林浩特市| 专栏| 寿光市| 梅河口市| 正镶白旗| 河间市| 柳江县| 山东省| 安远县| 凤城市| 阿拉尔市| 翁源县| 金秀| 建阳市| 绥阳县| 镇雄县| 丹东市| 沈阳市| 光泽县| 岳阳市| 庄河市| 镇江市| 肥城市| 星子县| 阜阳市| 桦南县| 卓资县| 阳原县| 唐山市| 昭通市| 九江县| 黎城县| 江津市| 合山市| 磐石市| 湘阴县| 屏边|