久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Storm

150次閱讀
沒有評論

共計 4003 個字符,預計需要花費 11 分鐘才能閱讀完成。

Storm-kafka 中如何理解 ZkCoordinator 的過程,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。

梳理 ZkCoordinator 的過程

package com.mixbox.storm.kafka;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mixbox.storm.kafka.trident.GlobalPartitionInformation;
import java.util.*;
import static com.mixbox.storm.kafka.KafkaUtils.taskId;
 * 
 * 
 * ZKCoordinator  協調器
 * 
 * @author Yin Shuai
 */
public class ZkCoordinator implements PartitionCoordinator {
 public static final Logger LOG = LoggerFactory
 .getLogger(ZkCoordinator.class);
 SpoutConfig _spoutConfig;
 int _taskIndex;
 int _totalTasks;
 String _topologyInstanceId;
 //  每一個分區對應著一個分區管理器
 Map Partition, PartitionManager  _managers = new HashMap();
 // 緩存的 List
 List PartitionManager  _cachedList;
 // 上次刷新的時間
 Long _lastRefreshTime = null;
 // 刷新頻率   毫秒
 int _refreshFreqMs;
 // 動態分區連接
 DynamicPartitionConnections _connections;
 // 動態 BrokersReader
 DynamicBrokersReader _reader;

public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) { this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks, topologyInstanceId, buildReader(stormConf, spoutConfig)); public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId, DynamicBrokersReader reader) { _spoutConfig = spoutConfig; _connections = connections; _taskIndex = taskIndex; _totalTasks = totalTasks; _topologyInstanceId = topologyInstanceId; _stormConf = stormConf; _state = state; ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts; _refreshFreqMs = brokerConf.refreshFreqSecs * 1000; _reader = reader;  * @param stormConf  * @param spoutConfig  * @return  */ private static DynamicBrokersReader buildReader(Map stormConf, SpoutConfig spoutConfig) {ZkHosts hosts = (ZkHosts) spoutConfig.hosts; return new DynamicBrokersReader(stormConf, hosts.brokerZkStr, hosts.brokerZkPath, spoutConfig.topic); @Override public List PartitionManager  getMyManagedPartitions() { if (_lastRefreshTime == null || (System.currentTimeMillis() - _lastRefreshTime)   _refreshFreqMs) {refresh(); _lastRefreshTime = System.currentTimeMillis(); return _cachedList;  *  簡單的刷新的行為  *   */ void refresh() { try {LOG.info(taskId(_taskIndex, _totalTasks) +  Refreshing partition manager connections //  拿到所有的分區信息 GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo(); //  拿到自己任務的所有分區 List Partition  mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex); //  拿到當前任務的分區 Set Partition  curr = _managers.keySet(); //  構造一個集合 Set Partition  newPartitions = new HashSet Partition (mine); //  在 new 分區中,移除掉所有   自己擁有的分區 newPartitions.removeAll(curr); //  要刪除的分區 Set Partition  deletedPartitions = new HashSet Partition (curr); deletedPartitions.removeAll(mine); LOG.info(taskId(_taskIndex, _totalTasks) +  Deleted partition managers:  + deletedPartitions.toString()); for (Partition id : deletedPartitions) {PartitionManager man = _managers.remove(id); man.close(); LOG.info(taskId(_taskIndex, _totalTasks) +  New partition managers:   + newPartitions.toString()); for (Partition id : newPartitions) { PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _state, _stormConf, _spoutConfig, id); _managers.put(id, man); } catch (Exception e) {throw new RuntimeException(e); _cachedList = new ArrayList PartitionManager (_managers.values()); LOG.info(taskId(_taskIndex, _totalTasks) +  Finished refreshing @Override public PartitionManager getManager(Partition partition) {return _managers.get(partition); }

   1:首先 ZKCoorDinator 實現  PartitionCoordinator 的接口

package com.mixbox.storm.kafka;
import java.util.List;
 * @author Yin Shuai
 */
public interface PartitionCoordinator { *  拿到我管理的分區列表  List{PartitionManager}
  * @return
  */
 List PartitionManager  getMyManagedPartitions();

PartitionManager getManager(Partition partition); }

          第一個方法拿到所有的   PartitionManager

          第二個方法依據特定的   Partition 去得到一個分區管理器

關于 Storm-kafka 中如何理解 ZkCoordinator 的過程問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注丸趣 TV 行業資訊頻道了解更多相關知識。

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-17發表,共計4003字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 嘉鱼县| 台东市| 宿州市| 滕州市| 吉木萨尔县| 肥东县| 五河县| 建湖县| 磴口县| 黎平县| 治县。| 河津市| 彭阳县| 民权县| 广河县| 溧水县| 龙里县| 基隆市| 龙口市| 祁阳县| 昌都县| 安图县| 乐清市| 南雄市| 原阳县| 双桥区| 旬阳县| 石林| 华亭县| 怀来县| 滦平县| 贡觉县| 措美县| 开封市| 丁青县| 分宜县| 积石山| 周宁县| 迁安市| 京山县| 无极县|