久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Storm分布式RPC怎么配置

152次閱讀
沒有評論

共計 3175 個字符,預計需要花費 8 分鐘才能閱讀完成。

這篇文章主要介紹“Storm 分布式 RPC 怎么配置”,在日常操作中,相信很多人在 Storm 分布式 RPC 怎么配置問題上存在疑惑,丸趣 TV 小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Storm 分布式 RPC 怎么配置”的疑惑有所幫助!接下來,請跟著丸趣 TV 小編一起來學習吧!

首先需要在 storm 集群上把 DRPC 的環境準備好,在 storm.yaml 當中增加如下內容

 drpc.servers:
  – 192.168.1.118

之后通過 storm drpc 啟動分布式 RPC 服務。

之后,跟其他的 topology 并沒有什么不同,我們需要寫點代碼,我這邊直接從 storm 的例子當中找了個:

public class BasicDRPCTopology {
  public static class ExclaimBolt extends BaseBasicBolt {
  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {
  String input = tuple.getString(1);
  collector.emit(new Values(tuple.getValue(0), input + ! ));
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declare(new Fields( id , result));
  }

  }

  public static void main(String[] args) throws Exception {
  LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder(exclamation
  builder.addBolt(new ExclaimBolt(), 3);

  Config conf = new Config();
  conf.setNumWorkers(3);
  StormSubmitter.submitTopologyWithProgressBar(DRCP-TEST , conf, builder.createRemoteTopology());
  }
}

從 main 函數開始,簡單解釋一下:

首先 new 一個 LinearDRPCTopologyBuilder 對象,其中的參數【exclamation】就是我們在執行 rpc 調用時候的方法名。

之后我們加入一個自己的 bolt,并行數量為 3

之后用 StormSubmitter 把這個 topology 提交上去就行了。

代碼完成之后,打一個 jar 包,用 storm jar 把 topology 提交到集群上。

客戶端調用,非常簡單

  DRPCClient client = new DRPCClient(192.168.1.118 , 3772);
  String result = client.execute(exclamation , china
  System.out.println(result);

到此為止,一個最簡單的 DRPC 調用的工作已經完成了。

等等,還有點問題,LinearDRPCTopologyBuilder 這個東西是不建議使用的(我這里的版本是 0.9.3)。

源碼上有這么一行:

Trident subsumes the functionality provided by this class, so it s deprecated

大概意思就是 trident 這個東西已經包含了 LinearDRPCTopologyBuilder 當中的功能。

trident 是什么意思?翻譯了一下,【三叉戟】,靠,看起來很牛逼的樣子。必須試試。

那么上第二份代碼:

public class TridentDRPCTopology {
  public static void main(String[] args) throws Exception {
  Config conf = new Config();
  StormSubmitter.submitTopologyWithProgressBar(word-count , conf, buildTopology());
  }

  public static StormTopology buildTopology() {
  TridentTopology topology = new TridentTopology();

  topology.newDRPCStream(word-count).
  each(new Fields( args), new Split(), new Fields( word)).
  groupBy(new Fields( word)).
  aggregate(new One(), new Fields(one)).
  aggregate(new Fields( one), new Sum(), new Fields( word-count));
  return topology.build();
  }

  public static class Split extends BaseFunction {
  @Override
  public void execute(TridentTuple tuple, TridentCollector collector) {
  String sentence = tuple.getString(0);
  for (String word : sentence.split()) {
  collector.emit(new Values(word));
  }
  }
  }

  public static class One implements CombinerAggregator Integer {
  @Override
  public Integer init(TridentTuple tuple) {
  return 1;
  }

  @Override
  public Integer combine(Integer val1, Integer val2) {
  return 1;
  }

  @Override
  public Integer zero() {
  return 1;
  }
  }
}

這個 topology 的功能要稍稍復雜一些,給出一句話,查一下一共有多少個詞,當然了,不能重復計數。main 函數當中非常簡單,提交一個 topology。而這個 topology 的構建過程是在 buildTopology 當中完成的。

  topology.newDRPCStream(word-count).
  each(new Fields( args), new Split(), new Fields( word)).  // 用空格分詞
  groupBy(new Fields( word)).  // 分組
  aggregate(new One(), new Fields(one)).  // 給每組的數量設定為 1
  aggregate(new Fields( one), new Sum(), new Fields( word-count));  //sum 計算總和

這樣的方式看起來跟 spark 當中對 RDD 的操作是有些像的。

好了,還是打包,提交。

然后是客戶端測試:

  DRPCClient client = new DRPCClient(192.168.1.118 , 3772);
  String result = client.execute(word-count , mywife asdf asdf asdfasdfasfweqw saaa weweew
  System.out.println(result);

到此,關于“Storm 分布式 RPC 怎么配置”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注丸趣 TV 網站,丸趣 TV 小編會繼續努力為大家帶來更多實用的文章!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計3175字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 丰顺县| 平遥县| 宁波市| 定陶县| 虹口区| 怀仁县| 肇东市| 宁蒗| 东宁县| 缙云县| 清原| 连城县| 张家港市| 库车县| 佛山市| 二连浩特市| 五常市| 象山县| 三都| 新蔡县| 丰宁| 黄山市| 蕉岭县| 宾阳县| 临澧县| 曲靖市| 辽阳市| 正蓝旗| 收藏| 射阳县| 阿拉善盟| 阿克| 尼木县| 简阳市| 仁化县| 黑山县| 渝北区| 长汀县| 太白县| 丽江市| 广平县|