久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

如何進行Twitter Storm Stream Grouping編寫自定義分組實現

151次閱讀
沒有評論

共計 3652 個字符,預計需要花費 10 分鐘才能閱讀完成。

本篇文章為大家展示了如何進行 Twitter Storm Stream Grouping 編寫自定義分組實現,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

## 自定義 Grouping 測試

Storm 是支持自定義分組的,本篇文章就是探究 Storm 如何編寫一個自定義分組器,以及對 Storm 分組器如何分組數據的理解。

這是我寫的一個自定義分組,總是把數據分到第一個 Task:

public class MyFirstStreamGrouping implements CustomStreamGrouping { private static Logger log = LoggerFactory.getLogger(MyFirstStreamGrouping.class);
 private List Integer  tasks;
 @Override
 public void prepare(WorkerTopologyContext context, GlobalStreamId stream,
 List Integer  targetTasks) {
  this.tasks = targetTasks;
  log.info(tasks.toString());
 } 
 @Override
 public List Integer  chooseTasks(int taskId, List Object  values) { log.info(values.toString());
  return Arrays.asList(tasks.get(0));
 }
}

從上面的代碼可以看出,該自定義分組會把數據歸并到第一個 Task code Arrays.asList(tasks.get(0)); /code,也就是數據到達后總是被派發到第一組。

測試代碼:

TopologyBuilder builder = new TopologyBuilder();builder.setSpout(words , new TestWordSpout(), 2); 
// 自定義分組,builder.setBolt(exclaim1 , new DefaultStringBolt(), 3)
  .customGrouping(words , new MyFirstStreamGrouping());

和之前的測試用例一樣,Spout 總是發送 code new String[] {“nathan”,“mike”,“jackson”,“golda”,“bertels”} /code 列表的字符串。我們運行驗證一下:

11878 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
11943 [Thread-41-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [nathan]
11944 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
11979 [Thread-29-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike]
11980 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike
12045 [Thread-41-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson]
12045 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
12080 [Thread-29-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson]
12081 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
12145 [Thread-41-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike]
12146 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike

從這個運行日志我們可以看出,數據總是派發到一個 Blot:Thread-25-exclaim1。因為我時本地測試,Thread-25-exclaim1 是線程名。而派發的線程是數據多個線程的。因此該測試符合預期,即總是發送到一個 Task,并且這個 Task 也是第一個。

## 理解自定義分組實現

自己實現一個自定義分組難嗎?其實如果你理解了 Hadoop 的 Partitioner,Storm 的 CustomStreamGrouping 和它也是一樣的道理。

Hadoop MapReduce 的 Map 完成后會把 Map 的中間結果寫入磁盤,在寫磁盤前,線程首先根據數據最終要傳送到的 Reducer 把數據劃分成相應的分區,然后不同的分區進入不同的 Reduce。我們先來看看 Hadoop 是怎樣把數據怎樣分組的,這是 Partitioner 唯一一個方法:

public class Partitioner K, V  {
 @Override
 public int getPartition(K key, V value, int numReduceTasks) {
 return 0;
 }
}

上面的代碼中:Map 輸出的數據都會經過 getPartition()方法,用來確定下一步的分組。numReduceTasks 是一個 Job 的 Reduce 數量,而返回值就是確定該條數據進入哪個 Reduce。返回值必須大于等于 0,小于 numReduceTasks,否則就會報錯。返回 0 就意味著這條數據進入第一個 Reduce。對于隨機分組來說,這個方法可以這么實現:

public int getPartition(K key, V value, int numReduceTasks) { return hash(key) % numReduceTasks;
}

其實 Hadoop 默認的 Hash 分組策略也正是這么實現的。這樣好處是,數據在整個集群基本上是負載平衡的。

搞通了 Hadoop 的 Partitioner,我們來看看 Storm 的 CustomStreamGrouping。

這是 CustomStreamGrouping 類的源碼:

public interface CustomStreamGrouping extends Serializable { void prepare(WorkerTopologyContext context, GlobalStreamId stream, List Integer  targetTasks);
 List Integer  chooseTasks(int taskId, List Object  values); 
}

一模一樣的道理,targetTasks 就是 Storm 運行時告訴你,當前有幾個目標 Task 可以選擇,每一個都給編上了數字編號。而 code chooseTasks(int taskId, List Object values); /code 就是讓你選擇,你的這條數據 values,是要哪幾個目標 Task 處理?

如上文文章開頭的自定義分組器實現的代碼,我選擇的總是讓第一個 Task 來處理數據,code return Arrays.asList(tasks.get(0)); /code。和 Hadoop 不同的是,Storm 允許一條數據被多個 Task 處理,因此返回值是 List Integer . 就是讓你來在提供的 List Integer targetTasks Task 中選擇任意的幾個(必須至少是一個)Task 來處理數據。

上述內容就是如何進行 Twitter Storm Stream Grouping 編寫自定義分組實現,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注丸趣 TV 行業資訊頻道。

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-17發表,共計3652字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 德州市| 独山县| 剑阁县| 梨树县| 舒城县| 济阳县| 蒲江县| 襄汾县| 偏关县| 海原县| 临潭县| 新巴尔虎右旗| 民勤县| 怀仁县| 大余县| 琼结县| 富民县| 册亨县| 娱乐| 同心县| 绍兴县| 乌拉特前旗| 淳安县| 蓬莱市| 花莲县| 松阳县| 体育| 巴林右旗| 钦州市| 永泰县| 西丰县| 忻州市| 类乌齐县| 达孜县| 开阳县| 武宣县| 伊吾县| 叶城县| 姜堰市| 磐安县| 措美县|