久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Storm的Topology怎么配置

170次閱讀
沒有評論

共計 6433 個字符,預計需要花費 17 分鐘才能閱讀完成。

這篇文章主要介紹“Storm 的 Topology 怎么配置”,在日常操作中,相信很多人在 Storm 的 Topology 怎么配置問題上存在疑惑,丸趣 TV 小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Storm 的 Topology 怎么配置”的疑惑有所幫助!接下來,請跟著丸趣 TV 小編一起來學習吧!

數(shù)據(jù)流組

設計一個拓撲時,你要做的最重要的事情之一就是定義如何在各組件之間交換數(shù)據(jù)(數(shù)據(jù)流是如何被 bolts 消費的)。一個數(shù)據(jù)流組指定了每個 bolt 會消費哪些數(shù)據(jù)流,以及如何消費它們。

NOTE:一個節(jié)點能夠發(fā)布一個以上的數(shù)據(jù)流,一個數(shù)據(jù)流組允許我們選擇接收哪個。

數(shù)據(jù)流組在定義拓撲時設置,就像我們在第二章看到的:

···
 builder.setBolt(word-normalizer , new WordNormalizer())
 .shuffleGrouping( word-reader 
···

在前面的代碼塊里,一個 bolt 由 TopologyBuilder 對象設定,然后使用隨機數(shù)據(jù)流組指定數(shù)據(jù)源。數(shù)據(jù)流組通常將數(shù)據(jù)源組件的 ID 作為參數(shù),取決于數(shù)據(jù)流組的類型不同還有其它可選參數(shù)。

NOTE:每個 InputDeclarer 可以有一個以上的數(shù)據(jù)源,而且每個數(shù)據(jù)源可以分到不同的組。

隨機數(shù)據(jù)流組(隨機分組)

隨機流組是最常用的數(shù)據(jù)流組。它只有一個參數(shù)(數(shù)據(jù)源組件),并且數(shù)據(jù)源會向隨機選擇的 bolt 發(fā)送元組,保證每個消費者收到近似數(shù)量的元組。

隨機數(shù)據(jù)流組用于數(shù)學計算這樣的原子操作。然而,如果操作不能被隨機分配,就像第二章為單詞計數(shù)的例子,你就要考慮其它分組方式了。

域數(shù)據(jù)流組(字段分組)

域數(shù)據(jù)流組允許你基于元組的一個或多個域控制如何把元組發(fā)送給 bolts。它保證擁有相同域組合的值集發(fā)送給同一個 bolt?;氐絾卧~計數(shù)器的例子,如果你用 word 域為數(shù)據(jù)流分組,word-normalizer bolt 將只會把相同單詞的元組發(fā)送給同一個 word-counterbolt 實例。

···
 builder.setBolt(word-counter , new WordCounter(),2)
 .fieldsGrouping(word-normalizer , new Fields( word));
···

NOTE:  在域數(shù)據(jù)流組中的所有域集合必須存在于數(shù)據(jù)源的域聲明中。

全部數(shù)據(jù)流組(全部分組)

全部數(shù)據(jù)流組,為每個接收數(shù)據(jù)的實例復制一份元組副本。這種分組方式用于向 bolts 發(fā)送信號。比如,你要刷新緩存,你可以向所有的 bolts 發(fā)送一個刷新緩存信號。在單詞計數(shù)器的例子里,你可以使用一個全部數(shù)據(jù)流組,添加清除計數(shù)器緩存的功能(見拓撲示例)

 public void execute(Tuple input) {
 String str = null;
 try{ if(input.getSourceStreamId().equals(signals)){
 str = input.getStringByField( action 
 if(refreshCache .equals(str))
 counters.clear();
 }
 }catch (IllegalArgumentException e){
 // 什么也不做
 }
 ···
 }

我們添加了一個 if 分支,用來檢查源數(shù)據(jù)流。Storm 允許我們聲明具名數(shù)據(jù)流(如果你不把元組發(fā)送到一個具名數(shù)據(jù)流,默認發(fā)送到名為”default“的數(shù)據(jù)流)。這是一個識別元組的極好的方式,就像這個例子中,我們想識別 signals 一樣。在拓撲定義中,你要向 word-counter bolt 添加第二個數(shù)據(jù)流,用來接收從 signals-spout 數(shù)據(jù)流發(fā)送到所有 bolt 實例的每一個元組。

 builder.setBolt(word-counter , new WordCounter(),2)
 .fieldsGroupint(word-normalizer ,new Fields( word))
 .allGrouping(signals-spout , signals

signals-spout 的實現(xiàn)請參考 git 倉庫。

自定義數(shù)據(jù)流組(自定義分組)

你可以通過實現(xiàn) backtype.storm.grouping.CustormStreamGrouping 接口創(chuàng)建自定義數(shù)據(jù)流組,讓你自己決定哪些 bolt 接收哪些元組。

讓我們修改單詞計數(shù)器示例,使首字母相同的單詞由同一個 bolt 接收。

public class ModuleGrouping mplents CustormStreamGrouping, Serializable{
 int numTasks = 0;
 @Override
 public List Integer  chooseTasks(List Object  values) {
 List Integer  boltIds = new ArrayList Integer 
 if(values.size() 0){ String str = values.get(0).toString();
 if(str.isEmpty()){ boltIds.add(0);
 }else{ boltIds.add(str.charAt(0) % numTasks);
 }
 }
 return boltIds;
 }
 @Override
 public void prepare(TopologyContext context, Fields outFields, List Integer  targetTasks) { numTasks = targetTasks.size();
 }
 }

這是一個 CustomStreamGrouping 的簡單實現(xiàn),在這里我們采用單詞首字母字符的整數(shù)值與任務數(shù)的余數(shù),決定接收元組的 bolt。

按下述方式 word-normalizer 修改即可使用這個自定義數(shù)據(jù)流組。

 builder.setBolt(word-normalizer , new WordNormalizer())
 .customGrouping(word-reader , new ModuleGrouping());

直接數(shù)據(jù)流組(直接分組)

這是一個特殊的數(shù)據(jù)流組,數(shù)據(jù)源可以用它決定哪個組件接收元組。與前面的例子類似,數(shù)據(jù)源將根據(jù)單詞首字母決定由哪個 bolt 接收元組。要使用直接數(shù)據(jù)流組,在 WordNormalizer bolt 中,使用 emitDirect 方法代替 emit。

public void execute(Tuple input) {
 ...
 for(String word : words){ if(!word.isEmpty()){
 ...
 collector.emitDirect(getWordCountIndex(word),new Values(word));
 }
 }
 // 對元組做出應答
 collector.ack(input);
 }
 public Integer getWordCountIndex(String word) { word = word.trim().toUpperCase();
 if(word.isEmpty()){
 return 0;
 }else{ return word.charAt(0) % numCounterTasks;
 }
 }

在 prepare 方法中計算任務數(shù)

public void prepare(Map stormConf, TopologyContext context, 
 OutputCollector collector) {
 this.collector = collector;
 this.numCounterTasks = context.getComponentTasks( word-counter 
 }

在拓撲定義中指定數(shù)據(jù)流將被直接分組:

builder.setBolt(word-counter , new WordCounter(),2)
 .directGrouping(word-normalizer

全局數(shù)據(jù)流組(全局分組)

全局數(shù)據(jù)流組把所有數(shù)據(jù)源創(chuàng)建的元組發(fā)送給單一目標實例(即擁有最低 ID 的任務)。

不分組(無分組)

寫作本書時(Stom0.7.1 版),這個數(shù)據(jù)流組相當于隨機數(shù)據(jù)流組。也就是說,使用這個數(shù)據(jù)流組時,并不關心數(shù)據(jù)流是如何分組的。

LocalCluster VS StormSubmitter

到目前為止,你已經(jīng)用一個叫做 LocalCluster 的工具在你的本地機器上運行了一個拓撲。Storm 的基礎工具,使你能夠在自己的計算機上方便的運行和調(diào)試不同的拓撲。但是你怎么把自己的拓撲提交給運行中的 Storm 集群呢?Storm 有一個有趣的功能,在一個真實的集群上運行自己的拓撲是很容易的事情。要實現(xiàn)這一點,你需要把 LocalCluster 換成 StormSubmitter 并實現(xiàn) submitTopology 方法,它負責把拓撲發(fā)送給集群。

下面是修改后的代碼:

//LocalCluster cluster = new LocalCluster();
 //cluster.submitTopology( Count-Word-Topology-With-Refresh-Cache , conf, 
 //builder.createTopology());
 StormSubmitter.submitTopology( Count-Word-Topology-With_Refresh-Cache , conf,
 builder.createTopology());
 //Thread.sleep(1000);
 //cluster.shutdown();

NOTE:  當你使用 StormSubmitter 時,你就不能像使用 LocalCluster 時一樣通過代碼控制集群了。

接下來,把源碼壓縮成一個 jar 包,運行 Storm 客戶端命令,把拓撲提交給集群。如果你已經(jīng)使用了 Maven,你只需要在命令行進入源碼目錄運行:mvn package。

現(xiàn)在你生成了一個 jar 包,使用 storm jar 命令提交拓撲(關于如何安裝 Storm 客戶端請參考附錄 A)。命令格式:storm jar allmycode.jar org.me.MyTopology arg1 arg2 arg3。

對于這個例子,在拓撲工程目錄下面運行:

storm jar target/Topologies-0.0.1-SNAPSHOT.jar countword.TopologyMain src/main/resources/words.txt

通過這些命令,你就把拓撲發(fā)布集群上了。

如果想停止或殺死它,運行:

storm kill Count-Word-Topology-With-Refresh-Cache

NOTE:拓撲名稱必須保證惟一性。

NOTE:如何安裝 Storm 客戶端,參考附錄 A

DRPC 拓撲

有一種特殊的拓撲類型叫做分布式遠程過程調(diào)用(DRPC),它利用 Storm 的分布式特性執(zhí)行遠程過程調(diào)用(RPC)(見下圖)。Storm 提供了一些用來實現(xiàn) DRPC 的工具。第一個是 DRPC 服務器,它就像是客戶端和 Storm 拓撲之間的連接器,作為拓撲的 spout 的數(shù)據(jù)源。它接收一個待執(zhí)行的函數(shù)和函數(shù)參數(shù),然后對于函數(shù)操作的每一個數(shù)據(jù)塊,這個服務器都會通過拓撲分配一個請求 ID 用來識別 RPC 請求。拓撲執(zhí)行最后的 bolt 時,它必須分配 RPC 請求 ID 和結(jié)果,使 DRPC 服務器把結(jié)果返回正確的客戶端。

NOTE:單實例 DRPC 服務器能夠執(zhí)行許多函數(shù)。每個函數(shù)由一個惟一的名稱標識。

Storm 提供的第二個工具(已在例子中用過)是 LineDRPCTopologyBuilder,一個輔助構(gòu)建 DRPC 拓撲的抽象概念。生成的拓撲創(chuàng)建 DRPCSpouts——它連接到 DRPC 服務器并向拓撲的其它部分分發(fā)數(shù)據(jù)——并包裝 bolts,使結(jié)果從最后一個 bolt 返回。依次執(zhí)行所有添加到 LinearDRPCTopologyBuilder 對象的 bolts。

作為這種類型的拓撲的一個例子,我們創(chuàng)建了一個執(zhí)行加法運算的進程。雖然這是一個簡單的例子,但是這個概念可以擴展到復雜的分布式計算。

bolt 按下面的方式聲明輸出:

public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields( id , result));
 }

因為這是拓撲中惟一的 bolt,它必須發(fā)布 RPC ID 和結(jié)果。execute 方法負責執(zhí)行加法運算。

public void execute(Tuple input) { String[] numbers = input.getString(1).split( \\+ 
 Integer added = 0;
 if(numbers.length 2){
 throw new InvalidParameterException( Should be at least 2 numbers 
 }
 for(String num : numbers){ added += Integer.parseInt(num);
 }
 collector.emit(new Values(input.getValue(0),added));
 }

包含加法 bolt 的拓撲定義如下:

public static void main(String[] args) { LocalDRPC drpc = new LocalDRPC();
 LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder( add 
 builder.addBolt(AdderBolt(),2);
 Config conf = new Config();
 conf.setDebug(true);
 LocalCluster cluster = new LocalCluster();
 cluster.submitTopology( drpcder-topology , conf,
 builder.createLocalTopology(drpc));
 String result = drpc.execute( add ,  1+-1 
 checkResult(result,0);
 result = drpc.execute( add ,  1+1+5+10 
 checkResult(result,17);
 cluster.shutdown();
 drpc.shutdown();
 }

創(chuàng)建一個 LocalDRPC 對象在本地運行 DRPC 服務器。接下來,創(chuàng)建一個拓撲構(gòu)建器(譯者注:LineDRpctopologyBuilder 對象),把 bolt 添加到拓撲。運行 DRPC 對象(LocalDRPC 對象)的 execute 方法測試拓撲。

NOTE:使用 DRPCClient 類連接遠程 DRPC 服務器。DRPC 服務器暴露了 Thrift API,因此可以跨語言編程;并且不論是在本地還是在遠程運行 DRPC 服務器,它們的 API 都是相同的。對于采用 Storm 配置的 DRPC 配置參數(shù)的 Storm 集群,調(diào)用構(gòu)建器對象的 createRemoteTopology 向 Storm 集群提交一個拓撲,而不是調(diào)用 createLocalTopology。

到此,關于“Storm 的 Topology 怎么配置”的學習就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關知識,請繼續(xù)關注丸趣 TV 網(wǎng)站,丸趣 TV 小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

正文完
 
丸趣
版權聲明:本站原創(chuàng)文章,由 丸趣 2023-08-16發(fā)表,共計6433字。
轉(zhuǎn)載說明:除特殊說明外本站除技術相關以外文章皆由網(wǎng)絡搜集發(fā)布,轉(zhuǎn)載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 唐海县| 哈密市| 鲜城| 临清市| 磐安县| 武义县| 承德县| 永嘉县| 尼勒克县| 河北省| 金塔县| 丹凤县| 北流市| 永嘉县| 仁寿县| 合肥市| 银川市| 台州市| 会同县| 清苑县| 福安市| 崇左市| 洛浦县| 吐鲁番市| 白朗县| 英吉沙县| 营口市| 兴宁市| 安西县| 阳东县| 宁武县| 泰安市| 姜堰市| 屯留县| 高台县| 吐鲁番市| 承德市| 禄丰县| 正阳县| 山西省| 江川县|