共計 7440 個字符,預計需要花費 19 分鐘才能閱讀完成。
這篇文章主要介紹“Storm 中怎么使用 Direct Grouping 分組策略”,在日常操作中,相信很多人在 Storm 中怎么使用 Direct Grouping 分組策略問題上存在疑惑,丸趣 TV 小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Storm 中怎么使用 Direct Grouping 分組策略”的疑惑有所幫助!接下來,請跟著丸趣 TV 小編一起來學習吧!
使用 Direct Grouping 分組策略,將首字母相同的單詞發送給同一個 task 計數
數據源 spout
package com.zhch.v3;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
public class SentenceSpout extends BaseRichSpout {
private FileReader fileReader = null;
private boolean completed = false;
private ConcurrentHashMap UUID, Values pending;
private SpoutOutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( sentence));
}
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
this.pending = new ConcurrentHashMap UUID, Values
try { this.fileReader = new FileReader(map.get( wordsFile).toString());
} catch (Exception e) { throw new RuntimeException( Error reading file [ + map.get( wordsFile) + ]
}
}
@Override
public void nextTuple() { if (completed) {
try { Thread.sleep(1000);
} catch (InterruptedException e) { }
}
String line;
BufferedReader reader = new BufferedReader(fileReader);
try { while ((line = reader.readLine()) != null) { Values values = new Values(line);
UUID msgId = UUID.randomUUID();
this.pending.put(msgId, values);
this.collector.emit(values, msgId);
}
} catch (Exception e) { throw new RuntimeException( Error reading tuple , e);
} finally {
completed = true;
}
}
@Override
public void ack(Object msgId) { this.pending.remove(msgId);
}
@Override
public void fail(Object msgId) { this.collector.emit(this.pending.get(msgId), msgId);
}
}
實現語句分割 bolt
package com.zhch.v3;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.List;
import java.util.Map;
public class SplitSentenceBolt extends BaseRichBolt {
private OutputCollector collector;
private List Integer numCounterTasks;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
// 獲取下游 bolt 的 taskId 列表
this.numCounterTasks = topologyContext.getComponentTasks(WordCountTopology.COUNT_BOLT_ID);
}
@Override
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField( sentence
String[] words = sentence.split(
for (String word : words) { Integer taskId = this.numCounterTasks.get(this.getWordCountIndex(word));
collector.emitDirect(taskId, tuple, new Values(word));
}
this.collector.ack(tuple);
}
public Integer getWordCountIndex(String word) { word = word.trim().toUpperCase();
if (word.isEmpty())
return 0;
else {
// 單詞首字母對下游 bolt taskId 列表長度取余
return word.charAt(0) % numCounterTasks.size();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( word));
}
}
實現單詞計數 bolt
package com.zhch.v3;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
public class WordCountBolt extends BaseRichBolt {
private OutputCollector collector;
private HashMap String, Long counts = null;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
this.counts = new HashMap String, Long
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField( word
Long count = this.counts.get(word);
if (count == null) {
count = 0L;
}
count++;
this.counts.put(word, count);
BufferedWriter writer = null;
try { writer = new BufferedWriter(new FileWriter( /home/grid/stormData/result.txt));
Iterator String keys = this.counts.keySet().iterator();
while (keys.hasNext()) { String w = keys.next();
Long c = this.counts.get(w);
writer.write(w + : + c);
writer.newLine();
writer.flush();
}
} catch (Exception e) { e.printStackTrace();
} finally { if (writer != null) {
try { writer.close();
} catch (Exception e) { e.printStackTrace();
}
writer = null;
}
}
this.collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( word , count));
}
}
實現單詞計數 topology
package com.zhch.v3;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
public class WordCountTopology {
public static final String SENTENCE_SPOUT_ID = sentence-spout
public static final String SPLIT_BOLT_ID = split-bolt
public static final String COUNT_BOLT_ID = count-bolt
public static final String TOPOLOGY_NAME = word-count-topology-v3
public static void main(String[] args) throws Exception { SentenceSpout spout = new SentenceSpout();
SplitSentenceBolt spiltBolt = new SplitSentenceBolt();
WordCountBolt countBolt = new WordCountBolt();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);
builder.setBolt(SPLIT_BOLT_ID, spiltBolt, 2).setNumTasks(4)
.shuffleGrouping(SENTENCE_SPOUT_ID);
builder.setBolt(COUNT_BOLT_ID, countBolt, 2)
.directGrouping(SPLIT_BOLT_ID); // 使用 Direct Grouping 分組策略
Config config = new Config();
config.put(wordsFile , args[0]);
if (args != null args.length 1) { config.setNumWorkers(2);
// 集群模式啟動
StormSubmitter.submitTopology(args[1], config, builder.createTopology());
} else { LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
try { Thread.sleep(5 * 1000);
} catch (InterruptedException e) { }
cluster.killTopology(TOPOLOGY_NAME);
cluster.shutdown();
}
}
}
提交到 Storm 集群
storm jar Storm02-1.0-SNAPSHOT.jar com.zhch.v3.WordCountTopology /home/grid/stormData/input.txt word-count-topology-v3
運行結果:
[grid@hadoop5 stormData]$ cat result.txt
second : 1
can : 1
set : 1
simple : 1
use : 2
unbounded : 1
used : 1
It : 1
Storm : 4
online : 1
cases: : 1
open : 1
Apache : 1
of : 2
over : 1
more : 1
clocked : 1
easy : 2
scalable : 1
any : 1
guarantees : 1
ETL : 1
million : 1
continuous : 1
is : 6
with : 1
it : 2
makes : 1
your : 1
a : 4
at : 1
machine : 1
analytics : 1
up : 1
and : 5
many : 1
system : 1
source : 1
what : 1
operate : 1
will : 1
computation : 2
streams : 1
[grid@hadoop6 stormData]$ cat result.txt
to : 3
for : 2
data : 2
distributed : 2
has : 1
free : 1
programming : 1
reliably : 1
fast: : 1
processing : 2
be : 2
Hadoop : 1
did : 1
fun : 1
learning : 1
torm : 1
process : 1
RPC : 1
node : 1
processed : 2
per : 2
realtime : 3
benchmark : 1
batch : 1
doing : 1
lot : 1
language : 1
tuples : 1
fault-tolerant : 1
到此,關于“Storm 中怎么使用 Direct Grouping 分組策略”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注丸趣 TV 網站,丸趣 TV 小編會繼續努力為大家帶來更多實用的文章!
正文完