共計 6103 個字符,預計需要花費 16 分鐘才能閱讀完成。
這篇文章將為大家詳細講解有關 Storm 如何和 Kafka 進行整合,文章內容質量較高,因此丸趣 TV 小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
對于 Storm 如何和 Kafka 進行整合
package com.mixbox.storm.kafka;
import backtype.storm.Config;
import backtype.storm.metric.api.IMetric;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import kafka.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mixbox.storm.kafka.PartitionManager.KafkaMessageId;
import java.util.*;
* @author Yin Shuai
*/
public class KafkaSpout extends BaseRichSpout {public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
* 內部類,Message 和 Offset 的偏移量對象
*
* @author Yin Shuai
*/
public static class MessageAndRealOffset {
public Message msg;
public long offset;
public MessageAndRealOffset(Message msg, long offset) {
this.msg = msg;
this.offset = offset;
* 發射的枚舉類
* @author Yin Shuai
*/
static enum EmitState {
EMITTED_MORE_LEFT, EMITTED_END, NO_EMITTED
String _uuid = UUID.randomUUID().toString();
SpoutConfig _spoutConfig;
SpoutOutputCollector _collector;
// 分區的協調器,getMyManagedPartitions 拿到我所管理的分區
PartitionCoordinator _coordinator;
// 動態的分區鏈接:保存到 kafka 各個節點的連接,以及負責的 topic 的 partition 號碼
DynamicPartitionConnections _connections;
// 提供了從 zookeeper 讀寫 kafka 消費者信息的功能
ZkState _state;
// 上次更新的毫秒數
long _lastUpdateMs = 0;
// 當前的分區
int _currPartitionIndex = 0;
public KafkaSpout(SpoutConfig spoutConf) {
_spoutConfig = spoutConf;
@SuppressWarnings(unchecked)
@Override
public void open(Map conf, final TopologyContext context,
final SpoutOutputCollector collector) {
_collector = collector;
List String zkServers = _spoutConfig.zkServers;
// 初始化的時候如果 zkServers 為空,那么初始化 默認的配置 Zookeeper
if (zkServers == null) {zkServers = new ArrayList String () {
add( 192.168.50.144
add( 192.168.50.169
add( 192.168.50.207
// zkServers =
// (List String)conf.get(Config.STORM_ZOOKEEPER_SERVERS);
System.out.println( 使用的是 Storm 默認配置的 Zookeeper List : + zkServers);
Integer zkPort = _spoutConfig.zkPort;
// 在這里我們也同時 來檢查 zookeeper 的端口是否為空
if (zkPort == null) {
zkPort = 2181;
// zkPort = ((Number)
// conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
Map stateConf = new HashMap(conf);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
// 通過保存的配置文件,我們持有了一個 zookeeper 的 state,支持節點內容的創建和刪除
_state = new ZkState(stateConf);
// 對于連接的維護
_connections = new DynamicPartitionConnections(_spoutConfig,
KafkaUtils.makeBrokerReader(conf, _spoutConfig));
// using TransactionalState like this is a hack
// 拿到總共的任務次數
int totalTasks = context
.getComponentTasks(context.getThisComponentId()).size();
// 判斷當前的主機是否是靜態的 statichost
if (_spoutConfig.hosts instanceof StaticHosts) {
_coordinator = new StaticCoordinator(_connections, conf,
_spoutConfig, _state, context.getThisTaskIndex(),
totalTasks, _uuid);
// 當你拿到的 spoutConfig 是 zkhost 的時候
} else {
_coordinator = new ZkCoordinator(_connections, conf, _spoutConfig,
_state, context.getThisTaskIndex(), totalTasks, _uuid);
context.registerMetric(kafkaOffset , new IMetric() {
KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_spoutConfig.topic, _connections);
@Override
public Object getValueAndReset() {
List PartitionManager pms = _coordinator
.getMyManagedPartitions();
Set Partition latestPartitions = new HashSet();
for (PartitionManager pm : pms) {latestPartitions.add(pm.getPartition());
_kafkaOffsetMetric.refreshPartitions(latestPartitions);
for (PartitionManager pm : pms) {
_kafkaOffsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset());
return _kafkaOffsetMetric.getValueAndReset();}, _spoutConfig.metricsTimeBucketSizeInSecs);
context.registerMetric(kafkaPartition , new IMetric() {
@Override
public Object getValueAndReset() {
List PartitionManager pms = _coordinator
.getMyManagedPartitions();
Map concatMetricsDataMaps = new HashMap();
for (PartitionManager pm : pms) {concatMetricsDataMaps.putAll(pm.getMetricsDataMap());
return concatMetricsDataMaps;
}, _spoutConfig.metricsTimeBucketSizeInSecs);
@Override
public void close() {_state.close();
@Override
public void nextTuple() {
// Storm-spout 是從 kafka 消費數據, 把 kafka 的 consumer
// 當成是一個 spout,并且向其他的 bolt 的發送數據
// 拿到當前我管理的這些 PartitionsManager
List PartitionManager managers = _coordinator.getMyManagedPartitions();
for (int i = 0; i managers.size(); i++) {
// 對于每一個分區的 PartitionManager
// in case the number of managers decreased
// 當前的分區
_currPartitionIndex = _currPartitionIndex % managers.size();
// 拿到當前的分區,并且發送,這里把 SpoutOutputCollector 傳遞進去了,由他發射元祖
EmitState state = managers.get(_currPartitionIndex)
.next(_collector);
// 如果發送狀態為:發送 - 還有剩余
if (state != EmitState.EMITTED_MORE_LEFT) {_currPartitionIndex = (_currPartitionIndex + 1)
% managers.size();
// 如果發送的狀態為: 發送 - 沒有剩余
if (state != EmitState.NO_EMITTED) {
break;
long now = System.currentTimeMillis();
if ((now - _lastUpdateMs) _spoutConfig.stateUpdateIntervalMs) {commit();
@Override
public void ack(Object msgId) {KafkaMessageId id = (KafkaMessageId) msgId;
PartitionManager m = _coordinator.getManager(id.partition);
if (m != null) {m.ack(id.offset);
@Override
public void fail(Object msgId) {KafkaMessageId id = (KafkaMessageId) msgId;
PartitionManager m = _coordinator.getManager(id.partition);
if (m != null) {m.fail(id.offset);
@Override
public void deactivate() {
// 停止工作
commit();
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {System.out.println(_spoutConfig.scheme.getOutputFields());
declarer.declare(_spoutConfig.scheme.getOutputFields());
private void commit() {_lastUpdateMs = System.currentTimeMillis();
for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {manager.commit();
}
在粗淺的代碼閱讀之后,在這里進行詳細的分析:
1 KafkaSpout 之中持有了一個 MessageAndRealOffset 的內部類
public static class MessageAndRealOffset
public Message msg;
public long offset;
public MessageAndRealOffset(Message msg,long offset)
{
this.msg = msg;
this.offset = offset;
}
}
2 在 Spout 之中我們還持有了一個 PartitionCoordinator 的分區協調器,默認的情況我們實例化的對象
是 ZKCoordinator
關于 Storm 如何和 Kafka 進行整合就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
正文完