共計 4003 個字符,預計需要花費 11 分鐘才能閱讀完成。
Storm-kafka 中如何理解 ZkCoordinator 的過程,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。
梳理 ZkCoordinator 的過程
package com.mixbox.storm.kafka;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mixbox.storm.kafka.trident.GlobalPartitionInformation;
import java.util.*;
import static com.mixbox.storm.kafka.KafkaUtils.taskId;
*
*
* ZKCoordinator 協調器
*
* @author Yin Shuai
*/
public class ZkCoordinator implements PartitionCoordinator {
public static final Logger LOG = LoggerFactory
.getLogger(ZkCoordinator.class);
SpoutConfig _spoutConfig;
int _taskIndex;
int _totalTasks;
String _topologyInstanceId;
// 每一個分區對應著一個分區管理器
Map Partition, PartitionManager _managers = new HashMap();
// 緩存的 List
List PartitionManager _cachedList;
// 上次刷新的時間
Long _lastRefreshTime = null;
// 刷新頻率 毫秒
int _refreshFreqMs;
// 動態分區連接
DynamicPartitionConnections _connections;
// 動態 BrokersReader
DynamicBrokersReader _reader;
public ZkCoordinator(DynamicPartitionConnections connections,
Map stormConf, SpoutConfig spoutConfig, ZkState state,
int taskIndex, int totalTasks, String topologyInstanceId) {
this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks,
topologyInstanceId, buildReader(stormConf, spoutConfig));
public ZkCoordinator(DynamicPartitionConnections connections,
Map stormConf, SpoutConfig spoutConfig, ZkState state,
int taskIndex, int totalTasks, String topologyInstanceId,
DynamicBrokersReader reader) {
_spoutConfig = spoutConfig;
_connections = connections;
_taskIndex = taskIndex;
_totalTasks = totalTasks;
_topologyInstanceId = topologyInstanceId;
_stormConf = stormConf;
_state = state;
ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts;
_refreshFreqMs = brokerConf.refreshFreqSecs * 1000;
_reader = reader;
* @param stormConf
* @param spoutConfig
* @return
*/
private static DynamicBrokersReader buildReader(Map stormConf,
SpoutConfig spoutConfig) {ZkHosts hosts = (ZkHosts) spoutConfig.hosts;
return new DynamicBrokersReader(stormConf, hosts.brokerZkStr,
hosts.brokerZkPath, spoutConfig.topic);
@Override
public List PartitionManager getMyManagedPartitions() {
if (_lastRefreshTime == null
|| (System.currentTimeMillis() - _lastRefreshTime) _refreshFreqMs) {refresh();
_lastRefreshTime = System.currentTimeMillis();
return _cachedList;
* 簡單的刷新的行為
*
*/
void refresh() {
try {LOG.info(taskId(_taskIndex, _totalTasks)
+ Refreshing partition manager connections
// 拿到所有的分區信息
GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo();
// 拿到自己任務的所有分區
List Partition mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex);
// 拿到當前任務的分區
Set Partition curr = _managers.keySet();
// 構造一個集合
Set Partition newPartitions = new HashSet Partition (mine);
// 在 new 分區中,移除掉所有 自己擁有的分區
newPartitions.removeAll(curr);
// 要刪除的分區
Set Partition deletedPartitions = new HashSet Partition (curr);
deletedPartitions.removeAll(mine);
LOG.info(taskId(_taskIndex, _totalTasks)
+ Deleted partition managers:
+ deletedPartitions.toString());
for (Partition id : deletedPartitions) {PartitionManager man = _managers.remove(id);
man.close();
LOG.info(taskId(_taskIndex, _totalTasks)
+ New partition managers: + newPartitions.toString());
for (Partition id : newPartitions) {
PartitionManager man = new PartitionManager(_connections,
_topologyInstanceId, _state, _stormConf, _spoutConfig,
id);
_managers.put(id, man);
} catch (Exception e) {throw new RuntimeException(e);
_cachedList = new ArrayList PartitionManager (_managers.values());
LOG.info(taskId(_taskIndex, _totalTasks) + Finished refreshing
@Override
public PartitionManager getManager(Partition partition) {return _managers.get(partition);
}
1:首先 ZKCoorDinator 實現 PartitionCoordinator 的接口
package com.mixbox.storm.kafka;
import java.util.List;
* @author Yin Shuai
*/
public interface PartitionCoordinator { * 拿到我管理的分區列表 List{PartitionManager}
* @return
*/
List PartitionManager getMyManagedPartitions();
PartitionManager getManager(Partition partition);
}
第一個方法拿到所有的 PartitionManager
第二個方法依據特定的 Partition 去得到一個分區管理器
關于 Storm-kafka 中如何理解 ZkCoordinator 的過程問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注丸趣 TV 行業資訊頻道了解更多相關知識。
正文完