久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Flume整體流程是怎樣的

169次閱讀
沒有評論

共計 9492 個字符,預(yù)計需要花費 24 分鐘才能閱讀完成。

本篇內(nèi)容介紹了“Flume 整體流程是怎樣的”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓丸趣 TV 小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!

整體流程

不管是 Source 還是 Sink 都依賴 Channel,那么啟動時應(yīng)該先啟動 Channel 然后再啟動 Source 或 Sink 即可。

Flume 有兩種啟動方式:使用 EmbeddedAgent 內(nèi)嵌在 Java 應(yīng)用中或使用 Application 單獨啟動一個進程,此處我們已 Application 分析為主。

首先進入 org.apache.flume.node.Application 的 main 方法啟動:

//1、設(shè)置默認值啟動參數(shù)、參數(shù)是否必須的
Options options = new Options();
Option option = new Option( n ,  name , true,  the name of this agent 
option.setRequired(true);
options.addOption(option);
option = new Option( f ,  conf-file , true,
 specify a config file (required if -z missing) 
option.setRequired(false);
options.addOption(option);
//2、接著解析命令行參數(shù)
CommandLineParser parser = new GnuParser();
CommandLine commandLine = parser.parse(options, args);
String agentName = commandLine.getOptionValue( n 
boolean reload = !commandLine.hasOption( no-reload-conf 
if (commandLine.hasOption( z) || commandLine.hasOption(zkConnString)) {
 isZkConfigured = true;
if (isZkConfigured) { //3、如果是通過 ZooKeeper 配置,則使用 ZooKeeper 參數(shù)啟動,此處忽略,我們以配置文件講解} else {
 //4、打開配置文件,如果不存在則快速失敗
 File configurationFile = new File(commandLine.getOptionValue( f 
 if (!configurationFile.exists()) {
 throw new ParseException(  The specified configuration file does not exist:   + path);
 }
 List LifecycleAware  components = Lists.newArrayList();
 if (reload) { //5、如果需要定期 reload 配置文件,則走如下方式
 //5.1、此處使用 Guava 提供的事件總線
 EventBus eventBus = new EventBus(agentName +  -event-bus 
 //5.2、讀取配置文件,使用定期輪訓(xùn)拉起策略,默認 30s 拉取一次
 PollingPropertiesFileConfigurationProvider configurationProvider =
 new PollingPropertiesFileConfigurationProvider( agentName, configurationFile, eventBus, 30);
 components.add(configurationProvider);
 application = new Application(components); //5.3、向 Application 注冊組件
 //5.4、向事件總線注冊本應(yīng)用,EventBus 會自動注冊 Application 中使用 @Subscribe 聲明的方法
 eventBus.register(application);
 } else { //5、配置文件不支持定期 reload
 PropertiesFileConfigurationProvider configurationProvider =
 new PropertiesFileConfigurationProvider( agentName, configurationFile);
 application = new Application();
 //6.2、直接使用配置文件初始化 Flume 組件
 application.handleConfigurationEvent(configurationProvider
 .getConfiguration());
 }
//7、啟動 Flume 應(yīng)用
application.start();
//8、注冊虛擬機關(guān)閉鉤子,當(dāng)虛擬機關(guān)閉時調(diào)用 Application 的 stop 方法進行終止
final Application appReference = application;
Runtime.getRuntime().addShutdownHook(new Thread( agent-shutdown-hook) {
 @Override
 public void run() { appReference.stop();
 }
});

以上流程只提取了核心代碼中的一部分,比如 ZK 的實現(xiàn)直接忽略了,而 Flume 啟動大體流程如下:

1、讀取命令行參數(shù);

2、讀取配置文件;

3、根據(jù)是否需要 reload 使用不同的策略初始化 Flume;如果需要 reload,則使用 Guava 的事件總線實現(xiàn),Application 的 handleConfigurationEvent 是事件訂閱者,PollingPropertiesFileConfigurationProvider 是事件發(fā)布者,其會定期輪訓(xùn)檢查文件是否變更,如果變更則重新讀取配置文件,發(fā)布配置文件事件變更,而 handleConfigurationEvent 會收到該配置變更重新進行初始化;

4、啟動 Application,并注冊虛擬機關(guān)閉鉤子。

handleConfigurationEvent 方法比較簡單,首先調(diào)用了 stopAllComponents 停止所有組件,接著調(diào)用 startAllComponents 使用配置文件初始化所有組件: 

@Subscribe
public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { stopAllComponents();
 startAllComponents(conf);
}

MaterializedConfiguration 存儲 Flume 運行時需要的組件:Source、Channel、Sink、SourceRunner、SinkRunner 等,其是通過 ConfigurationProvider 進行初始化獲取,比如 PollingPropertiesFileConfigurationProvider 會讀取配置文件然后進行組件的初始化。

對于 startAllComponents 實現(xiàn)大體如下: 

//1、首先啟動 Channel
supervisor.supervise(Channels,
 new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
//2、確保所有 Channel 是否都已啟動
for(Channel ch: materializedConfiguration.getChannels().values()){ while(ch.getLifecycleState() != LifecycleState.START
   !supervisor.isComponentInErrorState(ch)){
 try { Thread.sleep(500);
 } catch (InterruptedException e) { Throwables.propagate(e);
 }
 }
//3、啟動 SinkRunner
supervisor.supervise(SinkRunners, 
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
//4、啟動 SourceRunner
supervisor.supervise(SourceRunner,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
//5、初始化監(jiān)控服務(wù)
this.loadMonitoring();

從如下代碼中可以看到,首先要準(zhǔn)備好 Channel,因為 Source 和 Sink 會操作它,對于 Channel 如果初始化失敗則整個流程是失敗的;然后啟動 SinkRunner,先準(zhǔn)備好消費者;接著啟動 SourceRunner 開始進行采集日志。此處我們發(fā)現(xiàn)有兩個單獨的組件 LifecycleSupervisor 和 MonitorService,一個是組件守護哨兵,一個是監(jiān)控服務(wù)。守護哨兵對這些組件進行守護,假設(shè)出問題了默認策略是自動重啟這些組件。

對于 stopAllComponents 實現(xiàn)大體如下:

//1、首先停止 SourceRunner
supervisor.unsupervise(SourceRunners);
//2、接著停止 SinkRunner
supervisor.unsupervise(SinkRunners);
//3、然后停止 Channel
supervisor.unsupervise(Channels);
//4、最后停止 MonitorService
monitorServer.stop();

此處可以看出,停止的順序是 Source、Sink、Channel,即先停止生產(chǎn),再停止消費,最后停止管道。

Application 中的 start 方法代碼實現(xiàn)如下:

public synchronized void start() { for(LifecycleAware component : components) {
 supervisor.supervise(component,
 new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
 }
}

其循環(huán) Application 注冊的組件,然后守護哨兵對它進行守護,默認策略是出現(xiàn)問題會自動重啟組件,假設(shè)我們支持 reload 配置文件,則之前啟動 Application 時注冊過 PollingPropertiesFileConfigurationProvider 組件,即該組件會被守護哨兵守護著,出現(xiàn)問題默認策略自動重啟。

而 Application 關(guān)閉執(zhí)行了如下動作: 

public synchronized void stop() { supervisor.stop();
 if(monitorServer != null) { monitorServer.stop();
 }
}

即關(guān)閉守護哨兵和監(jiān)控服務(wù)。

到此基本的 Application 分析結(jié)束了,我們還有很多疑問,守護哨兵怎么實現(xiàn)的。 

整體流程可以總結(jié)為:

1、首先初始化命令行配置;

2、接著讀取配置文件;

3、根據(jù)是否需要 reload 初始化配置文件中的組件;如果需要 reload 會使用 Guava 事件總線進行發(fā)布訂閱變化;

4、接著創(chuàng)建 Application,創(chuàng)建守護哨兵,并先停止所有組件,接著啟動所有組件;啟動順序:Channel、SinkRunner、SourceRunner,并把這些組件注冊給守護哨兵、初始化監(jiān)控服務(wù);停止順序:SourceRunner、SinkRunner、Channel;

5、如果配置文件需要定期 reload,則需要注冊 Polling***ConfigurationProvider 到守護哨兵;

6、最后注冊虛擬機關(guān)閉鉤子,停止守護哨兵和監(jiān)控服務(wù)。

輪訓(xùn)實現(xiàn)的 SourceRunner  和 SinkRunner 會創(chuàng)建一個線程進行工作,之前已經(jīng)介紹了其工作方式。接下來我們看下守護哨兵的實現(xiàn)。

首先創(chuàng)建 LifecycleSupervisor:

//1、用于存放被守護的組件
 supervisedProcesses = new HashMap LifecycleAware, Supervisoree 
 //2、用于存放正在被監(jiān)控的組件
 monitorFutures = new HashMap LifecycleAware, ScheduledFuture ? ();
 //3、創(chuàng)建監(jiān)控服務(wù)線程池
 monitorService = new ScheduledThreadPoolExecutor(10,
 new ThreadFactoryBuilder().setNameFormat(  lifecycleSupervisor-  + Thread.currentThread().getId() +  -%d)
 .build());
 monitorService.setMaximumPoolSize(20);
 monitorService.setKeepAliveTime(30, TimeUnit.SECONDS);
 //4、定期清理被取消的組件
 purger = new Purger();
 //4.1、默認不進行清理
 needToPurge = false;

LifecycleSupervisor 啟動時會進行如下操作:

public synchronized void start() { monitorService.scheduleWithFixedDelay(purger, 2, 2, TimeUnit.HOURS);
 lifecycleState = LifecycleState.START;
}

首先每隔兩個小時執(zhí)行清理組件,然后改變狀態(tài)為啟動。而 LifecycleSupervisor 停止時直接停止了監(jiān)控服務(wù),然后更新守護組件狀態(tài)為 STOP:

//1、首先停止守護監(jiān)控服務(wù)
 if (monitorService != null) { monitorService.shutdown();
 try { monitorService.awaitTermination(10, TimeUnit.SECONDS);
 } catch (InterruptedException e) {
 logger.error( Interrupted while waiting for monitor service to stop 
 }
 }
 //2、更新所有守護組件狀態(tài)為 STOP,并調(diào)用組件的 stop 方法進行停止
 for (final Entry LifecycleAware, Supervisoree  entry : supervisedProcesses.entrySet()) { if (entry.getKey().getLifecycleState().equals(LifecycleState.START)) { entry.getValue().status.desiredState = LifecycleState.STOP;
 entry.getKey().stop();
 }
 }
 //3、更新本組件狀態(tài)
 if (lifecycleState.equals(LifecycleState.START)) {
 lifecycleState = LifecycleState.STOP;
 }
 //4、最后的清理
 supervisedProcesses.clear();
 monitorFutures.clear();

接下來就是調(diào)用 supervise 進行組件守護了:

if(this.monitorService.isShutdown() || this.monitorService.isTerminated()
 || this.monitorService.isTerminating()){
 //1、如果哨兵已停止則拋出異常,不再接收任何組件進行守護
 }
 //2、初始化守護組件
 Supervisoree process = new Supervisoree();
 process.status = new Status();
 //2.1、默認策略是失敗重啟
 process.policy = policy;
 //2.2、初始化組件默認狀態(tài),大多數(shù)組件默認為 START
 process.status.desiredState = desiredState;
 process.status.error = false;
 //3、組件監(jiān)控器,用于定時獲取組件的最新狀態(tài),或者重新啟動組件
 MonitorRunnable monitorRunnable = new MonitorRunnable();
 monitorRunnable.lifecycleAware = lifecycleAware;
 monitorRunnable.supervisoree = process;
 monitorRunnable.monitorService = monitorService;
 supervisedProcesses.put(lifecycleAware, process);
 //4、定期的去執(zhí)行組件監(jiān)控器,獲取組件最新狀態(tài),或者重新啟動組件
 ScheduledFuture ?  future = monitorService.scheduleWithFixedDelay( monitorRunnable, 0, 3, TimeUnit.SECONDS);
 monitorFutures.put(lifecycleAware, future);
}

如果不需要守護了,則需要調(diào)用 unsupervise:

public synchronized void unsupervise(LifecycleAware lifecycleAware) { synchronized (lifecycleAware) { Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware);
 //1.1、設(shè)置守護組件的狀態(tài)為被丟棄
 supervisoree.status.discard = true;
 //1.2、設(shè)置組件盼望的最新生命周期狀態(tài)為 STOP
 this.setDesiredState(lifecycleAware, LifecycleState.STOP);
 //1.3、停止組件
 lifecycleAware.stop();
 }
 //2、從守護組件中移除
 supervisedProcesses.remove(lifecycleAware);
 //3、取消定時監(jiān)控組件服務(wù)
 monitorFutures.get(lifecycleAware).cancel(false);
 //3.1、通知 Purger 需要進行清理,Purger 會定期的移除 cancel 的組件
 needToPurge = true;
 monitorFutures.remove(lifecycleAware);
}

接下來我們再看下 MonitorRunnable 的實現(xiàn),其負責(zé)進行組件狀態(tài)遷移或組件故障恢復(fù):

public void run() { long now = System.currentTimeMillis();
 try { if (supervisoree.status.firstSeen == null) {
 supervisoree.status.firstSeen = now; //1、記錄第一次狀態(tài)查看時間
 }
 supervisoree.status.lastSeen = now; //2、記錄最后一次狀態(tài)查看時間
 synchronized (lifecycleAware) {
 //3、如果守護組件被丟棄或出錯了,則直接返回
 if (supervisoree.status.discard || supervisoree.status.error) {
 return;
 }
 //4、更新最后一次查看到的狀態(tài)
 supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();
 //5、如果組件的狀態(tài)和守護組件看到的狀態(tài)不一致,則以守護組件的狀態(tài)為準(zhǔn),然后進行初始化
 if (!lifecycleAware.getLifecycleState().equals( supervisoree.status.desiredState)) { switch (supervisoree.status.desiredState) { 
 case START: //6、如果是啟動狀態(tài),則啟動組件
 try { lifecycleAware.start();
 } catch (Throwable e) { if (e instanceof Error) {
 supervisoree.status.desiredState = LifecycleState.STOP;
 try { lifecycleAware.stop();
 } catch (Throwable e1) {
 supervisoree.status.error = true;
 if (e1 instanceof Error) { throw (Error) e1;
 }
 }
 }
 supervisoree.status.failures++;
 }
 break;
 case STOP: //7、如果是停止?fàn)顟B(tài),則停止組件
 try { lifecycleAware.stop();
 } catch (Throwable e) { if (e instanceof Error) { throw (Error) e;
 }
 supervisoree.status.failures++;
 }
 break;
 default:
 }
 } catch(Throwable t) { }
 }
}

如上代碼進行了一些簡化,整體邏輯即定時去采集組件的狀態(tài),如果發(fā)現(xiàn)守護組件和組件的狀態(tài)不一致,則可能需要進行啟動或停止。即守護監(jiān)視器可以用來保證組件如能失敗后自動啟動。默認策略是總是失敗后重啟,還有一種策略是只啟動一次。 

“Flume 整體流程是怎樣的”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注丸趣 TV 網(wǎng)站,丸趣 TV 小編將為大家輸出更多高質(zhì)量的實用文章!

正文完
 
丸趣
版權(quán)聲明:本站原創(chuàng)文章,由 丸趣 2023-08-16發(fā)表,共計9492字。
轉(zhuǎn)載說明:除特殊說明外本站除技術(shù)相關(guān)以外文章皆由網(wǎng)絡(luò)搜集發(fā)布,轉(zhuǎn)載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 雷波县| 盘山县| 北辰区| 东至县| 巴中市| 白山市| 合阳县| 恩施市| 临洮县| 历史| 乐清市| 黑水县| 满洲里市| 安图县| 泽普县| 宣汉县| 尉氏县| 北安市| 济南市| 临澧县| 岚皋县| 手机| 新沂市| 常熟市| 南陵县| 广南县| 大冶市| 文成县| 都安| 绵竹市| 抚远县| 永州市| 商丘市| 化德县| 宜城市| 辰溪县| 西峡县| 泗水县| 子洲县| 拉孜县| 土默特右旗|