共計 6320 個字符,預計需要花費 16 分鐘才能閱讀完成。
本篇內容介紹了“怎么掌握 Flink on YARN 應用啟動流程”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓丸趣 TV 小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
Flink on YARN 流程圖
Flink on YARN 集群部署模式涉及 YARN 和 Flink 兩大開源框架,應用啟動流程的很多環節交織在一起,為了便于大家理解,在一張圖上畫出了 Flink on YARN 基礎架構和應用啟動全流程,并對關鍵角色和流程進行了介紹說明,整個啟動流程又被劃分成客戶端提交(流程標注為紫色)、Flink Cluster 啟動和 Job 提交運行(流程標注為橙色)兩個階段分別闡述,由于分支和細節太多,本文會忽略掉一些,只介紹關鍵流程(基于 Flink 開源 1.9 版本源碼整理)。
客戶端提交流程
1. 執行命令:bin/flink run -d -m yarn-cluster … 或 bin/yarn-session.sh … 來提交 per-job 運行模式或 session 運行模式的應用;
2. 解析命令參數項并初始化,啟動指定運行模式,如果是 per-job 運行模式將根據命令行參數指定的 Job 主類創建 job graph;
如果可以從命令行參數 (-yid) 或 YARN properties 臨時文件 (${java.io.tmpdir}/.yarn-properties-${user.name}) 中獲取應用 ID,向指定的應用提交 Job;
否則當命令行參數中包含 -d(表示 detached 模式)和 -m yarn-cluster(表示指定 YARN 集群模式),啟動 per-job 運行模式;
否則當命令行參數項不包含 -yq(表示查詢 YARN 集群可用資源)時,啟動 session 運行模式;
3. 獲取 YARN 集群信息、新應用 ID 并啟動運行前檢查;
通過 YarnClient 向 YARN ResourceManager(下文縮寫為:YARN RM,YARN Master 節點,負責整個集群資源的管理和調度)請求創建一個新應用(YARN RM 收到創建應用請求后生成新應用 ID 和 container 申請的資源上限后返回),并且獲取 YARN Slave 節點報告(YARN RM 返回全部 slave 節點的 ID、狀態、rack、http 地址、總資源、已使用資源等信息);
運行前檢查:(1) 簡單驗證 YARN 集群能否訪問;(2) 最大 node 資源能否滿足 flink JobManager/TaskManager vcores 資源申請需求;(3) 指定 queue 是否存在 (不存在也只是打印 WARN 信息,后續向 YARN 提交時排除異常并退出);(4) 當預期應用申請的 Container 資源會超出 YARN 資源限制時拋出異常并退出;(5) 當預期應用申請不能被滿足時(例如總資源超出 YARN 集群可用資源總量、Container 申請資源超出 NM 可用資源最大值等)提供一些參考信息。
4. 將應用配置 (flink-conf.yaml、logback.xml、log4j.properties) 和相關文件 (flink jars、ship files、user jars、job graph 等) 上傳至分布式存儲 (例如 HDFS) 的應用暫存目錄(/user/${user.name}/.flink/);
5. 準備應用提交上下文(ApplicationSubmissionContext,包括應用的名稱、類型、隊列、標簽等信息和應用 Master 的 container 的環境變量、classpath、資源大小等),注冊處理部署失敗的 shutdown hook(清理應用對應的 HDFS 目錄),然后通過 YarnClient 向 YARN RM 提交應用;
6. 循環等待直到應用狀態為 RUNNING,包含兩個階段:
循環等待應用提交成功(SUBMITTED):默認每隔 200ms 通過 YarnClient 獲取應用報告,如果應用狀態不是 NEW 和 NEW_SAVING 則認為提交成功并退出循環,每循環 10 次會將當前的應用狀態輸出至日志:Application submission is not finished, submitted application is still in,提交成功后輸出日志:Submitted application
循環等待應用正常運行(RUNNING):每隔 250ms 通過 YarnClient 獲取應用報告,每輪循環也會將當前的應用狀態輸出至日志:Deploying cluster, current state。應用狀態成功變為 RUNNING 后將輸出日志 YARN application has been deployed successfully. 并退出循環,如果等到的是非預期狀態如 FAILED/FINISHED/KILLED, 就會在輸出 YARN 返回的診斷信息(The YARN application unexpectedly switched to state during deployment. Diagnostics from YARN: …)之后拋出異常并退出。
Flink Cluster 啟動流程
1.YARN RM 中的 ClientRMService(為普通用戶提供的 RPC 服務組件,處理來自客戶端的各種 RPC 請求,比如查詢 YARN 集群信息,提交、終止應用等)接收到應用提交請求,簡單校驗后將請求轉交給 RMAppManager(YARN RM 內部管理應用生命周期的組件);
2.RMAppManager 根據應用提交上下文內容創建初始狀態為 NEW 的應用,將應用狀態持久化到 RM 狀態存儲服務(例如 ZooKeeper 集群,RM 狀態存儲服務用來保證 RM 重啟、HA 切換或發生故障后集群應用能夠正常恢復,后續流程中的涉及狀態存儲時不再贅述),應用狀態變為 NEW_SAVING;
3. 應用狀態存儲完成后,應用狀態變為 SUBMITTED;RMAppManager 開始向 ResourceScheduler(YARN RM 可拔插資源調度器,YARN 自帶三種調度器 FifoScheduler/FairScheduler/CapacityScheduler,其中 CapacityScheduler 支持功能最多使用最廣泛,FifoScheduler 功能最簡單基本不可用,今年社區已明確不再繼續支持 FairScheduler,建議已有用戶遷至 CapacityScheduler)提交應用,如果無法正常提交(例如隊列不存在、不是葉子隊列、隊列已停用、超出隊列最大應用數限制等)則拋出拒絕該應用,應用狀態先變為 FINAL_SAVING 觸發應用狀態存儲流程并在完成后變為 FAILED;如果提交成功,應用狀態變為 ACCEPTED;
4. 開始創建應用運行實例(ApplicationAttempt,由于一次運行實例中最重要的組件是 ApplicationMaster,下文簡稱 AM,它的狀態代表了 ApplicationAttempt 的當前狀態,所以 ApplicationAttempt 實際也代表了 AM),初始狀態為 NEW;
5. 初始化應用運行實例信息,并向 ApplicationMasterService(AM RM 協議接口服務,處理來自 AM 的請求,主要包括注冊和心跳)注冊,應用實例狀態變為 SUBMITTED;
6.RMAppManager 維護的應用實例開始初始化 AM 資源申請信息并重新校驗隊列,然后向 ResourceScheduler 申請 AM Container(Container 是 YARN 中資源的抽象,包含了內存、CPU 等多維度資源),應用實例狀態變為 ACCEPTED;
7.ResourceScheduler 會根據優先級(隊列 / 應用 / 請求每個維度都有優先級配置)從根隊列開始層層遞進,先后選擇當前優先級最高的子隊列、應用直至具體某個請求,然后結合集群資源分布等情況作出分配決策,AM Container 分配成功后,應用實例狀態變為 ALLOCATED_SAVING,并觸發應用實例狀態存儲流程,存儲成功后應用實例狀態變為 ALLOCATED;
8.RMAppManager 維護的應用實例開始通知 ApplicationMasterLauncher(AM 生命周期管理服務,負責啟動或清理 AM container)啟動 AM container,ApplicationMasterLauncher 與 YARN NodeManager(下文簡稱 YARN NM,與 YARN RM 保持通信,負責管理單個節點上的全部資源、Container 生命周期、附屬服務等,監控節點健康狀況和 Container 資源使用)建立通信并請求啟動 AM container;
9.ContainerManager(YARN NM 核心組件,管理所有 Container 的生命周期)接收到 AM container 啟動請求,YARN NM 開始校驗 Container Token 及資源文件,創建應用實例和 Container 實例并存儲至本地,結果返回后應用實例狀態變為 LAUNCHED;
10.ResourceLocalizationService(資源本地化服務,負責 Container 所需資源的本地化。它能夠按照描述從 HDFS 上下載 Container 所需的文件資源,并盡量將它們分攤到各個磁盤上以防止出現訪問熱點)初始化各種服務組件、創建工作目錄、從 HDFS 下載運行所需的各種資源至 Container 工作目錄(路徑為: ${yarn.nodemanager.local-dirs}/usercache/${user}/appcache//);
11.ContainersLauncher(負責 container 的具體操作,包括啟動、重啟、恢復和清理等)將待運行 Container 所需的環境變量和運行命令寫到 Container 工作目錄下的 launch_container.sh 腳本中,然后運行該腳本啟動 Container;
12.Container 進程加載并運行 ClusterEntrypoint(Flink JobManager 入口類,每種集群部署模式和應用運行模式都有相應的實現,例如在 YARN 集群部署模式下,per-job 應用運行模式實現類是 YarnJobClusterEntrypoint,session 應用運行模式實現類是 YarnSessionClusterEntrypoint),首先初始化相關運行環境:
輸出各軟件版本及運行環境信息、命令行參數項、classpath 等信息;
注冊處理各種 SIGNAL 的 handler: 記錄到日志
注冊 JVM 關閉保障的 shutdown hook:避免 JVM 退出時被其他 shutdown hook 阻塞
打印 YARN 運行環境信息:用戶名
從運行目錄中加載 flink conf
初始化文件系統
創建并啟動各類內部服務(包括 RpcService、HAService、BlobServer、HeartbeatServices、MetricRegistry、ExecutionGraphStore 等)
將 RPC address 和 port 更新到 flink conf 配置
13. 啟動 ResourceManager(Flink 資源管理核心組件,包含 YarnResourceManager 和 SlotManager 兩個子組件,YarnResourceManager 負責外部資源管理,與 YARN RM 建立通信并保持心跳,申請或釋放 TaskManager 資源,注銷應用等;SlotManager 則負責內部資源管理,維護全部 Slot 信息和狀態)及相關服務,創建異步 AMRMClient,開始注冊 AM,注冊成功后每隔一段時間(心跳間隔配置項:${yarn.heartbeat.interval},默認 5s)向 YARN RM 發送心跳來發送資源更新請求和接受資源變更結果。YARN RM 內部該應用和應用運行實例的狀態都變為 RUNNING,并通知 AMLivelinessMonitor 服務監控 AM 是否存活狀態,當心跳超過一定時間(默認 10 分鐘)觸發 AM failover 流程;
14. 啟動 Dispatcher(負責接收用戶提供的作業,并且負責為這個新提交的作業拉起一個新的 JobManager)及相關服務(包括 REST endpoint 等),在 per-job 運行模式下,Dispatcher 將直接從 Container 工作目錄加載 JobGrap 文件;在 session 運行模式下,Dispatcher 將在接收客戶端提交的 Job(_通過 BlockServer 接收 job grap 文件)后再進行后續流程;
15. 根據 JobGraph 啟動 JobManager(負責作業調度、管理 Job 和 Task 的生命周期),構建 ExecutionGraph(JobGraph 的并行化版本,調度層最核心的數據結構);
16.JobManager 開始執行 ExecutionGraph,向 ResourceManager 申請資源;
17.ResourceManager 將資源請求加入等待請求隊列,并通過心跳向 YARN RM 申請新的 Container 資源來啟動 TaskManager 進程;后續流程如果有空閑 Slot 資源,SlotManager 將其分配給等待請求隊列中匹配的請求,不用再通過 18. YarnResourceManager 申請新的 TaskManager;
**18.YARN ApplicationMasterService 接收到資源請求后,解析出新的資源請求并更新應用請求信息;
**
19.YARN ResourceScheduler 成功為該應用分配資源后更新應用信息,ApplicationMasterService 接收到 Flink JobManager 的下一次心跳時返回新分配資源信息;
20.Flink ResourceManager 接收到新分配的 Container 資源后,準備好 TaskManager 啟動上下文(ContainerLauncherContext,生成 TaskManager 配置并上傳至分布式存儲,配置其他依賴和環境變量等),然后向 YARN NM 申請啟動 TaskManager 進程,YARN NM 啟動 Container 的流程與 AM Container 啟動流程基本類似,區別在于應用實例在 NM 上已存在并未 RUNNING 狀態時則跳過應用實例初始化流程,這里不再贅述;
21.TaskManager 進程加載并運行 YarnTaskExecutorRunner(Flink TaskManager 入口類),初始化流程完成后啟動 TaskExecutor(負責執行 Task 相關操作);
22.TaskExecutor 啟動后先向 ResourceManager 注冊,成功后再向 SlotManager 匯報自己的 Slot 資源與狀態;
SlotManager 接收到 Slot 空閑資源后主動觸發 Slot 分配,從等待請求隊列中選出合適的資源請求后,向
TaskManager 請求該 Slot 資源
23.TaskManager 收到請求后檢查該 Slot 是否可分配(不存在則返回異常信息)、Job 是否已注冊(沒有則先注冊再分配 Slot),檢查通過后將 Slot 分配給 JobManager;
24.JobManager 檢查 Slot 分配是否重復,通過后通知 Execution 執行部署 task 流程,向 TaskExecutor 提交 task;
TaskExecutor 啟動新的線程運行 Task。
“怎么掌握 Flink on YARN 應用啟動流程”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注丸趣 TV 網站,丸趣 TV 小編將為大家輸出更多高質量的實用文章!