共計 2360 個字符,預計需要花費 6 分鐘才能閱讀完成。
這篇文章主要介紹 Storm-kafka 中如何封裝 DynamicBrokerReader 類,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!
在細節上把握 DynamicBrokerReder 的封裝類 – ZkBrokerReader
package com.mixbox.storm.kafka.trident;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mixbox.storm.kafka.DynamicBrokersReader;
import com.mixbox.storm.kafka.ZkHosts;
import java.util.Map;
* 2014/07/22
* 在 ZK 中間拿到 GlobalPartitionInformation
*
* ZkBrokerReader 是對于 DynamicBrokersReader 的一個簡單的封裝
* @author Yin Shuai
*/
public class ZkBrokerReader implements IBrokerReader {
public static final Logger LOG = LoggerFactory
.getLogger(ZkBrokerReader.class);
GlobalPartitionInformation cachedBrokers;
public ZkBrokerReader(Map conf, String topic, ZkHosts hosts) {
reader = new DynamicBrokersReader(conf, hosts.brokerZkStr,
hosts.brokerZkPath, topic);
cachedBrokers = reader.getBrokerInfo();
lastRefreshTimeMs = System.currentTimeMillis();
refreshMillis = hosts.refreshFreqSecs * 1000L;
@Override
public GlobalPartitionInformation getCurrentBrokers() {long currTime = System.currentTimeMillis();
// 很簡單, 指定了你多長時間開始去刷新 Brokerlibiao
if (currTime lastRefreshTimeMs + refreshMillis) {
LOG.info( brokers need refreshing because + refreshMillis
+ ms have expired
cachedBrokers = reader.getBrokerInfo();
lastRefreshTimeMs = currTime;
return cachedBrokers;
@Override
public void close() {reader.close();
}
總覽我們的 Code:
ZkBrokerReader 是對于 DynamicBrokersReader 的一個簡單封裝,ZkBrokerReader 之中持有 2 個主要的 Class
1 GlobalPartitionInformatio cachedBroker;
2 DynamicBrokersReader reader;
3 long lastRefreshTimeMs; 最新的刷新時間
lastRefreshTimeMs = System.currentTimeMillis(); 最新的刷新時間為系統的當前時間
4 long refreshMillis
refreshMillis = host.refreshFreqSecs * 1000L 設定刷新的毫秒數為
5
public GlobalPartitionInformation getCurrentBrokers() {long currTime = System.currentTimeMillis();
// 很簡單, 指定了你多長時間開始去刷新 Brokerlibiao
if (currTime lastRefreshTimeMs + refreshMillis) {
LOG.info( brokers need refreshing because + refreshMillis
+ ms have expired
cachedBrokers = reader.getBrokerInfo();
lastRefreshTimeMs = currTime;
return cachedBrokers;
}
每一次調用 getCurrentBrokers,首先會取 System.currentTimeMillis 當當前的系統時間超過了 最早的刷新時間 + 刷新
的間隔,就會再次的去跟新:
cachedBrokers = reader.getBrokerInfo();getBrokerInfo() 方法每調用一次,也就重新在 zk 之中重新去取
一次。
ZkBrokerReader 是對于 DynamicBrokerReader 的一個封裝,DynamicBrokerReader 的 Dynamic 性質并不程序動態的因數,而只是簡單在讀取 ZK 數據的過程之中,Zk 數據已經動態的發生變化?
以上是“Storm-kafka 中如何封裝 DynamicBrokerReader 類”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注丸趣 TV 行業資訊頻道!