久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Storm如何和Kafka進行整合

143次閱讀
沒有評論

共計 6103 個字符,預計需要花費 16 分鐘才能閱讀完成。

這篇文章將為大家詳細講解有關 Storm 如何和 Kafka 進行整合,文章內容質量較高,因此丸趣 TV 小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。

  對于 Storm 如何和 Kafka 進行整合

package com.mixbox.storm.kafka;
import backtype.storm.Config;
import backtype.storm.metric.api.IMetric;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import kafka.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mixbox.storm.kafka.PartitionManager.KafkaMessageId;
import java.util.*;
 * @author Yin Shuai
 */
public class KafkaSpout extends BaseRichSpout {public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
  *  內部類,Message 和 Offset 的偏移量對象
  * 
  * @author Yin Shuai
  */
 public static class MessageAndRealOffset {
 public Message msg;
 public long offset;
 public MessageAndRealOffset(Message msg, long offset) {
 this.msg = msg;
 this.offset = offset;
  *  發射的枚舉類
  * @author Yin Shuai
  */
 static enum EmitState {
 EMITTED_MORE_LEFT, EMITTED_END, NO_EMITTED
 String _uuid = UUID.randomUUID().toString();
 SpoutConfig _spoutConfig;
 SpoutOutputCollector _collector;
 //  分區的協調器,getMyManagedPartitions  拿到我所管理的分區
 PartitionCoordinator _coordinator;
 //  動態的分區鏈接:保存到 kafka 各個節點的連接,以及負責的 topic 的 partition 號碼
 DynamicPartitionConnections _connections;
 //  提供了從 zookeeper 讀寫 kafka  消費者信息的功能
 ZkState _state;
 //  上次更新的毫秒數
 long _lastUpdateMs = 0;
 //  當前的分區
 int _currPartitionIndex = 0;
 public KafkaSpout(SpoutConfig spoutConf) {
 _spoutConfig = spoutConf;
 @SuppressWarnings(unchecked)
 @Override
 public void open(Map conf, final TopologyContext context,
 final SpoutOutputCollector collector) {
 _collector = collector;
 List String  zkServers = _spoutConfig.zkServers;
 //  初始化的時候如果 zkServers  為空,那么初始化   默認的配置 Zookeeper
 if (zkServers == null) {zkServers = new ArrayList String () {
 add( 192.168.50.144 
 add( 192.168.50.169 
 add( 192.168.50.207 
 // zkServers =
 // (List String)conf.get(Config.STORM_ZOOKEEPER_SERVERS);
 System.out.println(  使用的是 Storm 默認配置的 Zookeeper List :   + zkServers);
 Integer zkPort = _spoutConfig.zkPort;
 //  在這里我們也同時   來檢查 zookeeper 的端口是否為空
 if (zkPort == null) {
 zkPort = 2181;
 // zkPort = ((Number)
 // conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
 Map stateConf = new HashMap(conf);
 stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
 stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
 stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
 //  通過保存的配置文件,我們持有了一個 zookeeper 的 state,支持節點內容的創建和刪除
 _state = new ZkState(stateConf);
 //  對于連接的維護
 _connections = new DynamicPartitionConnections(_spoutConfig,
 KafkaUtils.makeBrokerReader(conf, _spoutConfig));
 // using TransactionalState like this is a hack
 //  拿到總共的任務次數
 int totalTasks = context
 .getComponentTasks(context.getThisComponentId()).size();
 //  判斷當前的主機是否是靜態的 statichost
 if (_spoutConfig.hosts instanceof StaticHosts) {
 _coordinator = new StaticCoordinator(_connections, conf,
 _spoutConfig, _state, context.getThisTaskIndex(),
 totalTasks, _uuid);
 //  當你拿到的 spoutConfig 是 zkhost 的時候
 } else {
 _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig,
 _state, context.getThisTaskIndex(), totalTasks, _uuid);
 context.registerMetric(kafkaOffset , new IMetric() {
 KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_spoutConfig.topic, _connections);
 @Override
 public Object getValueAndReset() {
 List PartitionManager  pms = _coordinator
 .getMyManagedPartitions();
 Set Partition  latestPartitions = new HashSet();
 for (PartitionManager pm : pms) {latestPartitions.add(pm.getPartition());
 _kafkaOffsetMetric.refreshPartitions(latestPartitions);
 for (PartitionManager pm : pms) {
 _kafkaOffsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset());
 return _kafkaOffsetMetric.getValueAndReset();}, _spoutConfig.metricsTimeBucketSizeInSecs);
 context.registerMetric(kafkaPartition , new IMetric() {
 @Override
 public Object getValueAndReset() {
 List PartitionManager  pms = _coordinator
 .getMyManagedPartitions();
 Map concatMetricsDataMaps = new HashMap();
 for (PartitionManager pm : pms) {concatMetricsDataMaps.putAll(pm.getMetricsDataMap());
 return concatMetricsDataMaps;
 }, _spoutConfig.metricsTimeBucketSizeInSecs);
 @Override
 public void close() {_state.close();
 @Override
 public void nextTuple() {
 // Storm-spout  是從 kafka  消費數據, 把  kafka  的  consumer
 //  當成是一個 spout,并且向其他的 bolt 的發送數據
 //  拿到當前我管理的這些 PartitionsManager
 List PartitionManager  managers = _coordinator.getMyManagedPartitions();
 for (int i = 0; i   managers.size(); i++) {
 //  對于每一個分區的  PartitionManager
 // in case the number of managers decreased
 //  當前的分區
 _currPartitionIndex = _currPartitionIndex % managers.size();
 //  拿到當前的分區,并且發送,這里把 SpoutOutputCollector 傳遞進去了,由他發射元祖
 EmitState state = managers.get(_currPartitionIndex)
 .next(_collector);
 //  如果發送狀態為:發送 - 還有剩余
 if (state != EmitState.EMITTED_MORE_LEFT) {_currPartitionIndex = (_currPartitionIndex + 1)
 % managers.size();
 //  如果發送的狀態為:  發送 - 沒有剩余
 if (state != EmitState.NO_EMITTED) {
 break;
 long now = System.currentTimeMillis();
 if ((now - _lastUpdateMs)   _spoutConfig.stateUpdateIntervalMs) {commit();
 @Override
 public void ack(Object msgId) {KafkaMessageId id = (KafkaMessageId) msgId;
 PartitionManager m = _coordinator.getManager(id.partition);
 if (m != null) {m.ack(id.offset);
 @Override
 public void fail(Object msgId) {KafkaMessageId id = (KafkaMessageId) msgId;
 PartitionManager m = _coordinator.getManager(id.partition);
 if (m != null) {m.fail(id.offset);
 @Override
 public void deactivate() {
 //  停止工作
 commit();
 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {System.out.println(_spoutConfig.scheme.getOutputFields());
 declarer.declare(_spoutConfig.scheme.getOutputFields());
 private void commit() {_lastUpdateMs = System.currentTimeMillis();
 for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {manager.commit();
}

        在粗淺的代碼閱讀之后,在這里進行詳細的分析:

      1  KafkaSpout 之中持有了一個  MessageAndRealOffset 的內部類

public static class MessageAndRealOffset
 public Message msg;
 
 public long offset;
 
 public MessageAndRealOffset(Message msg,long offset)
 {
 this.msg = msg;
 this.offset = offset;
 }
}

    2 在 Spout 之中我們還持有了一個 PartitionCoordinator 的分區協調器,默認的情況我們實例化的對象

是 ZKCoordinator

 

關于 Storm 如何和 Kafka 進行整合就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-17發表,共計6103字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 松潘县| 米泉市| 尼勒克县| 长海县| 德江县| 麻城市| 庆城县| 灵丘县| 西畴县| 赤壁市| 扎兰屯市| 镇雄县| 桐庐县| 招远市| 平遥县| 孟村| 五寨县| 财经| 柳林县| 岱山县| 双柏县| 宁远县| 辉县市| 枣庄市| 汽车| 开远市| 论坛| 手机| 吴江市| 思南县| 颍上县| 德州市| 册亨县| 屏南县| 准格尔旗| 凉城县| 越西县| 灯塔市| 黄冈市| 崇文区| 澳门|