共計 6644 個字符,預計需要花費 17 分鐘才能閱讀完成。
這篇文章主要介紹“Storm 怎么改變并行度”,在日常操作中,相信很多人在 Storm 怎么改變并行度問題上存在疑惑,丸趣 TV 小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Storm 怎么改變并行度”的疑惑有所幫助!接下來,請跟著丸趣 TV 小編一起來學習吧!
package bolts;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class WordNormalizer implements IRichBolt{
private OutputCollector collector;
public void cleanup(){}
/**
* The bolt will receive the line from the
* words file and process it to Normalize this line
*
* The normalize will be put the words in lower case
* and split the line to get all words in this
*/
public void execute(Tuple input) { String sentence = input.getString(0);
String[]words= sentence.split(
for(String word:words){ word =word.trim();
if(!word.isEmpty()){ word =word.toLowerCase();
//Emit the word
List a =new ArrayList();
a.add(input);
collector.emit(a,new Values(word));
}
}
// Acknowledge the tuple
collector.ack(input);
}
public void prepare(Map stormConf,TopologyContext context,OutputCollector collector) {
this.collector=collector;
}
/**
* The bolt will only emit the field word
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields( word));
}
}
提示:在這個類中,每調用一次 execute()方法,會發送多個元組。例如,當 execute()方法收到“This is the Storm book”這個句子時,該方法會發送 5 個新元組。
第二個 bolt,WordCounter,負責統計每個單詞個數。當 topology 結束時 (cleanup() 方法被調用時),顯示每個單詞的個數。
提示:第二個 bolt 中什么也不發送,本例中,將數據添加到一個 map 對象中,但是現實生活中,bolt 可以將數據存儲到一個數據庫中。
package bolts;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class WordCounter implements IRichBolt{
Integer id;
String name;
Map String,Integer counters;
private OutputCollector collector;
/**
* At the end of the spout (when the cluster is shutdown
* We will show the word counters
*/
@Override
public void cleanup(){ System.out.println( -- Word Counter [ +name+ - +id+]--
for(Map.Entry String,Integer entry: counters.entrySet()){ System.out.println(entry.getKey()+ : +entry.getValue());
}
}
/**
* On each word We will count
*/
@Override
public void execute(Tuple input) { String str =input.getString(0);
/**
* If the word dosn t exist in the map we will create
* this, if not We will add 1
*/
if(!counters.containsKey(str)){ counters.put(str,1);
}else{ Integer c =counters.get(str) +1;
counters.put(str,c);
}
//Set the tuple as Acknowledge
collector.ack(input);
}
/**
* On create
*/
@Override
public void prepare(Map stormConf,TopologyContext context,OutputCollector collector) {
this.counters=newHashMap String,Integer
this.collector=collector;
this.name=context.getThisComponentId();
this.id=context.getThisTaskId();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {}
}
execute()方法使用一個映射 (Map 類型) 采集單詞并統計這些單詞個數。當 topology 結束的時候,cleanup()方法被調用并且打印出 counter 映射。(這僅僅是個例子,通常情況下,當 topology 關閉時,你應該使用 cleanup()方法關閉活動鏈接和其他資源。)
主類
在主類中,你將創建 topology 和一個 LocalCluster 對象,LocalCluster 對象使你可以在本地測試和調試 topology。LocalCluster 結合 Config 對象允許你嘗試不同的集群配置。例如,如果不慎使用一個全局變量或者類變量,當配置不同數量的 worker 測試 topology 的時候,你將會發現這個錯誤。(關于 config 對象在第三章會有更多介紹)
提示:所有的 topology 結點應該可以在進程間沒有數據共享的情形下獨立運行(也就是說沒有全局或者類變量),因為當 topology 運行在一個真實的集群上時,這些進程可能運行在不同的機器上。
你將使用 TopologyBuilder 創建 topology,TopologyBuilder 會告訴 Storm 怎么安排節點順序、它們怎么交換數據。
TopologyBuilder builder =new TopologyBuilder();
builder.setSpout(word-reader ,new WordReader());
builder.setBolt(word-normalizer ,new WordNormalizer()).shuffleGrouping( word-reader
builder.setBolt(word-counter ,new WordCounter(),2).fieldsGrouping(word-normalizer ,new Fields( word));
本例中 spout 和 bolt 之間使用隨機分組 (shuffleGrouping) 連接,這種分組類型告訴 Storm 以隨機分布的方式從源節點往目標節點發送消息。
接著,創建一個包含 topology 配置信息的 Config 對象,該配置信息在運行時會與集群配置信息合并,并且通過 prepare()方法發送到所有節點。
Config conf =new Config();
conf.put(wordsFile ,args[0]);
conf.setDebug(false);
將 wordFile 屬性設置為將要被 spout 讀取的文件名稱(文件名在 args 參數中傳入),并將 debug 屬性設置為 true,因為你在開發過程中,當 debug 為 true 時,Storm 會打印節點間交換的所有消息和其他調試數據,這些信息有助于理解 topology 是如何運行的。
前面提到,你將使用 LocalCluster 來運行 topology。在一個產品環境中,topology 會持續運行,但是在本例中,你僅需運行 topology 幾秒鐘就能看到結果。
LocalCluster cluster =new LocalCluster();
cluster.submitTopology(Getting-Started-Toplogie ,conf,builder.createTopology());
Thread.sleep(1000);
cluster.shutdown();
使用 createTopology 和 submitTopology 創建、運行 topology,睡眠兩秒(topology 運行在不同的線程中),然后通過關閉集群來停止 topology。
例 2 - 3 將上面代碼拼湊到一起。
例 2 -3.src/main/java/TopologyMain.java
import spouts.WordReader;
import bolts.WordCounter;
import bolts.WordNormalizer;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
public class TopologyMain{ public static void main(String[]args)throws InterruptedException{
//Topology definition
TopologyBuilder builder =new TopologyBuilder();
builder.setSpout(word-reader ,new WordReader());
builder.setBolt(word-normalizer ,new WordNormalizer()).shuffleGrouping( word-reader
builder.setBolt(word-counter ,new WordCounter(),2).fieldsGrouping(word-normalizer ,new Fields( word));
//Configuration
Config conf =new Config();
conf.put(wordsFile ,args[0]);
conf.setDebug(false);
//Topology run
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,1);
LocalCluster cluster =new LocalCluster();
cluster.submitTopology(Getting-Started-Toplogie ,conf,builder.createTopology());
Thread.sleep(1000);
cluster.shutdown();
}
}
運行本項目
現在開始準備運行第一個 topology!如果你新建一個文本文件(src/main/resources/words.txt)并且每行一個單詞,則可以通過如下命令運行這個 topology:
mvn exec:java -Dexec.main >
例如,如果你使用如下 words.txt 文件:
Storm
test
are
great
is
an
Storm
simple
application
but
very
powerful
really
Storm
is
great
在日志中,你將會看到類似如下信息:
is: 2
application: 1
but: 1
great: 1
test: 1
simple: 1
Storm: 3
really: 1
are: 1
great: 1
an: 1
powerful: 1
very: 1
在本例中,你只使用了每個結點的一個單一實例,假如此時有一個非常大的日志文件怎么去統計每個單詞的個數?此時可以很方便地改系統中節點數量來并行工作,如創建 WordCounter 的兩個實例:
1builder.setBolt(
word-counter
,
new
WordCounter(),
2
).shuffleGrouping(
word-normalizer
);
重新運行這個程序,你將看到:
– Word Counter [word-counter-2] –
application: 1
is: 1
great: 1
are: 1
powerful: 1
Storm: 3
– Word Counter [word-counter-3] –
really: 1
is: 1
but: 1
great: 1
test: 1
simple: 1
an: 1
very: 1
太棒了!改變并行度,so easy(當然,在實際生活中,每個實例運行在不同的機器中)。但仔細一看似乎還有點問題:“is”和“great”這兩個單詞在每個 WordCounter 實例中都被計算了一次。Why?當使用隨機分組 (shuffleGrouping) 時,Storm 以隨機分布的方式向每個 bolt 實例發送每條消息。在這個例子中,將相同的單詞發送到同一個 WordCounter 實例是更理想的。為了實現這個,你可以將 shuffleGrounping(“word-normalizer”)改成 fieldsGrouping(“word-normalizer”,new Fields(“word”))。嘗試一下并重新運行本程序來確認結果。
到此,關于“Storm 怎么改變并行度”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注丸趣 TV 網站,丸趣 TV 小編會繼續努力為大家帶來更多實用的文章!