久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Storm

184次閱讀
沒有評論

共計 2360 個字符,預計需要花費 6 分鐘才能閱讀完成。

這篇文章主要介紹 Storm-kafka 中如何封裝 DynamicBrokerReader 類,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!

在細節上把握 DynamicBrokerReder 的封裝類 – ZkBrokerReader

package com.mixbox.storm.kafka.trident;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mixbox.storm.kafka.DynamicBrokersReader;
import com.mixbox.storm.kafka.ZkHosts;
import java.util.Map;
 * 2014/07/22
 *  在 ZK 中間拿到  GlobalPartitionInformation
 * 
 * ZkBrokerReader  是對于 DynamicBrokersReader 的一個簡單的封裝
 * @author Yin Shuai
 */
public class ZkBrokerReader implements IBrokerReader {
 public static final Logger LOG = LoggerFactory
 .getLogger(ZkBrokerReader.class);
 GlobalPartitionInformation cachedBrokers;

public ZkBrokerReader(Map conf, String topic, ZkHosts hosts) { reader = new DynamicBrokersReader(conf, hosts.brokerZkStr, hosts.brokerZkPath, topic); cachedBrokers = reader.getBrokerInfo(); lastRefreshTimeMs = System.currentTimeMillis(); refreshMillis = hosts.refreshFreqSecs * 1000L; @Override public GlobalPartitionInformation getCurrentBrokers() {long currTime = System.currentTimeMillis(); //  很簡單,  指定了你多長時間開始去刷新 Brokerlibiao if (currTime   lastRefreshTimeMs + refreshMillis) { LOG.info( brokers need refreshing because   + refreshMillis +  ms have expired cachedBrokers = reader.getBrokerInfo(); lastRefreshTimeMs = currTime; return cachedBrokers; @Override public void close() {reader.close(); }

      總覽我們的 Code: 

 ZkBrokerReader   是對于  DynamicBrokersReader 的一個簡單封裝,ZkBrokerReader 之中持有 2 個主要的 Class

 

 1 GlobalPartitionInformatio  cachedBroker;

    2 DynamicBrokersReader  reader;

    3 long lastRefreshTimeMs;   最新的刷新時間

lastRefreshTimeMs = System.currentTimeMillis();  最新的刷新時間為系統的當前時間 

    4 long refreshMillis 

refreshMillis = host.refreshFreqSecs * 1000L  設定刷新的毫秒數為 

    5 

public GlobalPartitionInformation getCurrentBrokers() {long currTime = System.currentTimeMillis();
 //  很簡單,  指定了你多長時間開始去刷新 Brokerlibiao
 if (currTime   lastRefreshTimeMs + refreshMillis) {
 LOG.info( brokers need refreshing because   + refreshMillis
 +  ms have expired 
 cachedBrokers = reader.getBrokerInfo();
 lastRefreshTimeMs = currTime;
 return cachedBrokers;
 }

    每一次調用 getCurrentBrokers,首先會取 System.currentTimeMillis 當當前的系統時間超過了 最早的刷新時間 + 刷新

的間隔,就會再次的去跟新:

 cachedBrokers = reader.getBrokerInfo();getBrokerInfo() 方法每調用一次,也就重新在 zk 之中重新去取

一次。

 ZkBrokerReader 是對于 DynamicBrokerReader 的一個封裝,DynamicBrokerReader 的 Dynamic 性質并不程序動態的因數,而只是簡單在讀取 ZK 數據的過程之中,Zk 數據已經動態的發生變化?

以上是“Storm-kafka 中如何封裝 DynamicBrokerReader 類”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注丸趣 TV 行業資訊頻道!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計2360字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 苍梧县| 济南市| 常熟市| 绥江县| 曲阜市| 吉首市| 县级市| 安徽省| 修水县| 阜南县| 墨竹工卡县| 扬州市| 桃园县| 监利县| 长白| 霞浦县| 四川省| 临城县| 佳木斯市| 凤庆县| 彩票| 丹东市| 华安县| 清丰县| 灵山县| 醴陵市| 上饶市| 云和县| 临西县| 拉萨市| 武定县| 江川县| 河南省| 永泰县| 天津市| 梧州市| 大名县| 清流县| 贡嘎县| 读书| 临桂县|