共計 6895 個字符,預計需要花費 18 分鐘才能閱讀完成。
這篇文章主要介紹“Storm 如何實現單詞計數”,在日常操作中,相信很多人在 Storm 如何實現單詞計數問題上存在疑惑,丸趣 TV 小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Storm 如何實現單詞計數”的疑惑有所幫助!接下來,請跟著丸趣 TV 小編一起來學習吧!
1. 使用 mvn 命令創建項目
mvn archetype:generate -DgroupId=storm.test -DartifactId=Storm01 -DpackageName=com.zhch.v1
然后編輯配置文件 pom.xml,添加 storm 依賴
dependency
groupId org.apache.storm /groupId
artifactId storm-core /artifactId
version 0.9.4 /version
/dependency
最后通過下述命令來編譯項目,編譯正確完成后導入到 IDE 中
mvn install
當然,也可以在 IDE 中安裝 maven 插件,從而直接在 IDE 中創建 maven 項目
2. 實現數據源,用重復的靜態語句來模擬數據源
package storm.test.v1;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.Map;
public class SentenceSpout extends BaseRichSpout { private String[] sentences = {
storm integrates with the queueing ,
and database technologies you already use ,
a storm topology consumes streams of data ,
and processes those streams in arbitrarily complex ways ,
repartitioning the streams between each stage of the computation however needed
};
private int index = 0;
private SpoutOutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( sentence));
}
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
}
@Override
public void nextTuple() { this.collector.emit(new Values(sentences[index]));
index++;
if (index = sentences.length) {
index = 0;
}
try { Thread.sleep(1);
} catch (InterruptedException e) { }
}
}
3. 實現語句分割 bolt
package storm.test.v1;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
public class SplitSentenceBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
}
@Override
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField( sentence
String[] words = sentence.split(
for (String word : words) { this.collector.emit(new Values(word));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( word));
}
}
4. 實現單詞計數 bolt
package storm.test.v1;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
public class WordCountBolt extends BaseRichBolt {
private OutputCollector collector;
private HashMap String, Long counts = null;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
this.counts = new HashMap String, Long
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField( word
Long count = this.counts.get(word);
if (count == null) {
count = 0L;
}
count++;
this.counts.put(word, count);
this.collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( word , count));
}
}
5. 實現上報 bolt
package storm.test.v1;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ReportBolt extends BaseRichBolt {
private HashMap String, Long counts = null;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
counts = new HashMap String, Long
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField( word
Long count = tuple.getLongByField( count
this.counts.put(word, count);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { }
@Override
public void cleanup() { // 本地模式下,終止 topology 時可以保證 cleanup() 被執行
System.out.println( --- FINAL COUNTS ---
List String keys = new ArrayList String
keys.addAll(this.counts.keySet());
Collections.sort(keys);
for (String key : keys) { System.out.println(key + : + this.counts.get(key));
}
System.out.println( ----------
}
}
6. 實現單詞計數 topology
package storm.test.v1;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
public class WordCountTopology {
private static final String SENTENCE_SPOUT_ID = sentence-spout
private static final String SPLIT_BOLT_ID = split-bolt
private static final String COUNT_BOLT_ID = count-bolt
private static final String REPORT_BOLT_ID = report-bolt
private static final String TOPOLOGY_NAME = word-count-topology
public static void main(String[] args) { SentenceSpout spout = new SentenceSpout();
SplitSentenceBolt spiltBolt = new SplitSentenceBolt();
WordCountBolt countBolt = new WordCountBolt();
ReportBolt reportBolt = new ReportBolt();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SENTENCE_SPOUT_ID, spout); // 注冊數據源
builder.setBolt(SPLIT_BOLT_ID, spiltBolt) // 注冊 bolt
.shuffleGrouping(SENTENCE_SPOUT_ID); // 該 bolt 訂閱 spout 隨機均勻發射來的數據流
builder.setBolt(COUNT_BOLT_ID, countBolt)
.fieldsGrouping(SPLIT_BOLT_ID, new Fields( word)); // 該 bolt 訂閱 spiltBolt 發射來的數據流,并且保證 word 字段值相同的 tuple 會被路由到同一個 countBolt
builder.setBolt(REPORT_BOLT_ID, reportBolt)
.globalGrouping(COUNT_BOLT_ID); // 該 bolt 訂閱 countBolt 發射來的數據流,并且所有的 tuple 都會被路由到唯一的一個 reportBolt 中
Config config = new Config();
// 本地模式啟動
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
try { Thread.sleep(5 * 1000);
} catch (InterruptedException e) { }
cluster.killTopology(TOPOLOGY_NAME);
cluster.shutdown();
}
}
7. 運行結果:
--- FINAL COUNTS ---
a : 302
already : 302
and : 604
arbitrarily : 302
between : 302
complex : 302
computation : 302
consumes : 302
data : 302
database : 302
each : 302
however : 302
in : 302
integrates : 302
needed : 302
of : 604
processes : 302
queueing : 302
repartitioning : 302
stage : 302
storm : 604
streams : 906
technologies : 302
the : 906
those : 302
topology : 302
use : 302
ways : 302
with : 302
you : 302
----------
到此,關于“Storm 如何實現單詞計數”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注丸趣 TV 網站,丸趣 TV 小編會繼續努力為大家帶來更多實用的文章!
正文完