久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Storm中怎么使用Direct Grouping分組策略

214次閱讀
沒有評論

共計 7440 個字符,預計需要花費 19 分鐘才能閱讀完成。

這篇文章主要介紹“Storm 中怎么使用 Direct Grouping 分組策略”,在日常操作中,相信很多人在 Storm 中怎么使用 Direct Grouping 分組策略問題上存在疑惑,丸趣 TV 小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Storm 中怎么使用 Direct Grouping 分組策略”的疑惑有所幫助!接下來,請跟著丸趣 TV 小編一起來學習吧!

使用  Direct Grouping 分組策略,將首字母相同的單詞發送給同一個 task 計數
數據源 spout
 

package com.zhch.v3;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
public class SentenceSpout extends BaseRichSpout {
 private FileReader fileReader = null;
 private boolean completed = false;
 private ConcurrentHashMap UUID, Values  pending;
 private SpoutOutputCollector collector;
 @Override
 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( sentence));
 }
 @Override
 public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
 this.collector = spoutOutputCollector;
 this.pending = new ConcurrentHashMap UUID, Values 
 try { this.fileReader = new FileReader(map.get( wordsFile).toString());
 } catch (Exception e) { throw new RuntimeException( Error reading file [  + map.get( wordsFile) +  ] 
 }
 }
 @Override
 public void nextTuple() { if (completed) {
 try { Thread.sleep(1000);
 } catch (InterruptedException e) { }
 }
 String line;
 BufferedReader reader = new BufferedReader(fileReader);
 try { while ((line = reader.readLine()) != null) { Values values = new Values(line);
 UUID msgId = UUID.randomUUID();
 this.pending.put(msgId, values);
 this.collector.emit(values, msgId);
 }
 } catch (Exception e) { throw new RuntimeException( Error reading tuple , e);
 } finally {
 completed = true;
 }
 }
 @Override
 public void ack(Object msgId) { this.pending.remove(msgId);
 }
 @Override
 public void fail(Object msgId) { this.collector.emit(this.pending.get(msgId), msgId);
 }
}

實現語句分割 bolt

package com.zhch.v3;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.List;
import java.util.Map;
public class SplitSentenceBolt extends BaseRichBolt {
 private OutputCollector collector;
 private List Integer  numCounterTasks;
 @Override
 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
 this.collector = outputCollector;
 // 獲取下游 bolt 的 taskId 列表
 this.numCounterTasks = topologyContext.getComponentTasks(WordCountTopology.COUNT_BOLT_ID);
 }
 @Override
 public void execute(Tuple tuple) {
 String sentence = tuple.getStringByField( sentence 
 String[] words = sentence.split(   
 for (String word : words) { Integer taskId = this.numCounterTasks.get(this.getWordCountIndex(word));
 collector.emitDirect(taskId, tuple, new Values(word));
 }
 this.collector.ack(tuple);
 }
 public Integer getWordCountIndex(String word) { word = word.trim().toUpperCase();
 if (word.isEmpty())
 return 0;
 else {
 // 單詞首字母對下游  bolt taskId  列表長度取余
 return word.charAt(0) % numCounterTasks.size();
 }
 }
 @Override
 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( word));
 }
}

實現單詞計數 bolt 

package com.zhch.v3;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
public class WordCountBolt extends BaseRichBolt {
 private OutputCollector collector;
 private HashMap String, Long  counts = null;
 @Override
 public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
 this.collector = outputCollector;
 this.counts = new HashMap String, Long 
 }
 @Override
 public void execute(Tuple tuple) {
 String word = tuple.getStringByField( word 
 Long count = this.counts.get(word);
 if (count == null) {
 count = 0L;
 }
 count++;
 this.counts.put(word, count);
 BufferedWriter writer = null;
 try { writer = new BufferedWriter(new FileWriter( /home/grid/stormData/result.txt));
 Iterator String  keys = this.counts.keySet().iterator();
 while (keys.hasNext()) { String w = keys.next();
 Long c = this.counts.get(w);
 writer.write(w +   :   + c);
 writer.newLine();
 writer.flush();
 }
 } catch (Exception e) { e.printStackTrace();
 } finally { if (writer != null) {
 try { writer.close();
 } catch (Exception e) { e.printStackTrace();
 }
 writer = null;
 }
 }
 this.collector.ack(tuple);
 }
 @Override
 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields( word ,  count));
 }
}

實現單詞計數 topology 

package com.zhch.v3;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
public class WordCountTopology {
 public static final String SENTENCE_SPOUT_ID =  sentence-spout 
 public static final String SPLIT_BOLT_ID =  split-bolt 
 public static final String COUNT_BOLT_ID =  count-bolt 
 public static final String TOPOLOGY_NAME =  word-count-topology-v3 
 public static void main(String[] args) throws Exception { SentenceSpout spout = new SentenceSpout();
 SplitSentenceBolt spiltBolt = new SplitSentenceBolt();
 WordCountBolt countBolt = new WordCountBolt();
 TopologyBuilder builder = new TopologyBuilder();
 builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);
 builder.setBolt(SPLIT_BOLT_ID, spiltBolt, 2).setNumTasks(4)
 .shuffleGrouping(SENTENCE_SPOUT_ID);
 builder.setBolt(COUNT_BOLT_ID, countBolt, 2)
 .directGrouping(SPLIT_BOLT_ID); // 使用  Direct Grouping  分組策略
 Config config = new Config();
 config.put(wordsFile , args[0]);
 if (args != null   args.length   1) { config.setNumWorkers(2);
 // 集群模式啟動
 StormSubmitter.submitTopology(args[1], config, builder.createTopology());
 } else { LocalCluster cluster = new LocalCluster();
 cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
 try { Thread.sleep(5 * 1000);
 } catch (InterruptedException e) { }
 cluster.killTopology(TOPOLOGY_NAME);
 cluster.shutdown();
 }
 }
}

提交到 Storm 集群  

storm jar Storm02-1.0-SNAPSHOT.jar com.zhch.v3.WordCountTopology /home/grid/stormData/input.txt word-count-topology-v3

運行結果:

[grid@hadoop5 stormData]$ cat result.txt 
second : 1
can : 1
set : 1
simple : 1
use : 2
unbounded : 1
used : 1
It : 1
Storm : 4
online : 1
cases: : 1
open : 1
Apache : 1
of : 2
over : 1
more : 1
clocked : 1
easy : 2
scalable : 1
any : 1
guarantees : 1
ETL : 1
million : 1
continuous : 1
is : 6
with : 1
it : 2
makes : 1
your : 1
a : 4
at : 1
machine : 1
analytics : 1
up : 1
and : 5
many : 1
system : 1
source : 1
what : 1
operate : 1
will : 1
computation : 2
streams : 1
[grid@hadoop6 stormData]$ cat result.txt 
to : 3
for : 2
data : 2
distributed : 2
has : 1
free : 1
programming : 1
reliably : 1
fast: : 1
processing : 2
be : 2
Hadoop : 1
did : 1
fun : 1
learning : 1
torm : 1
process : 1
RPC : 1
node : 1
processed : 2
per : 2
realtime : 3
benchmark : 1
batch : 1
doing : 1
lot : 1
language : 1
tuples : 1
fault-tolerant : 1

到此,關于“Storm 中怎么使用 Direct Grouping 分組策略”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注丸趣 TV 網站,丸趣 TV 小編會繼續努力為大家帶來更多實用的文章!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計7440字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 乌拉特前旗| 乃东县| 富蕴县| 高雄市| 凤庆县| 防城港市| 丰原市| 沾益县| 眉山市| 岳阳市| 鹤山市| 汶川县| 淳安县| 清徐县| 县级市| 都昌县| 玉树县| 达拉特旗| 田阳县| 洪泽县| 开阳县| 伊金霍洛旗| 贡觉县| 承德县| 泽州县| 淮北市| 保康县| 巧家县| 巍山| 稷山县| 临漳县| 于都县| 松原市| 南汇区| 隆化县| 措美县| 高淳县| 察雅县| 望江县| 灵台县| 老河口市|