共計 3111 個字符,預計需要花費 8 分鐘才能閱讀完成。
這期內容當中丸趣 TV 小編將會給大家帶來有關 storm java 的編程思路是什么,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
總體思路
storm 編程和 hadoop 的 mapreduce 的編程很類似,hadoop 的 mapreduce 需要自己實現 map 函數,reduce 函數,還有一個主類驅動;storm 需要自己實現 spout,bolt 和一個主函數。storm 編程為以下三步:
創建一個 Spout 讀取數據
創建 bolt 處理數據
創建一個主類,在主類中創建拓撲和一個集群對象,將拓撲提交到集群
Topology 運行方式
Topology 的運行可以分為本地模式和分布式模式,模式的設置可以在配置文件中設定,也可以在代碼中設置。本地模式其實什么都不需要安裝,有 storm jar 包就夠了
(1)本地運行的提交方式:
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(topologyName, conf, topology);
cluster.killTopology(topologyName);
cluster.shutdown();
(2)分布式提交方式:
StormSubmitter.submitTopology(topologyName, topologyConfig, builder.createTopology());
需要注意的是,在 Storm 代碼編寫完成之后,需要打包成 jar 包放到 Nimbus 中運行,打包的時候,不需要把依賴的 jar 都打進去,否則如果把依賴的 storm.jar 包打進去的話,運行時會出現重復的配置文件錯誤導致 Topology 無法運行。因為 Topology 運行之前,會加載本地的 storm.yaml 配置文件。
在 Nimbus 運行的命令如下:
storm jar StormTopology.jar maincalss args
Topology 運行流程
有幾點需要說明的地方:
(1)Storm 提交后,會把代碼首先存放到 Nimbus 節點的 inbox 目錄下,之后,會把當前 Storm 運行的配置生成一個 stormconf.ser 文件放到 Nimbus 節點的 stormdist 目錄中,在此目錄中同時還有序列化之后的 Topology 代碼文件;
(2) 在設定 Topology 所關聯的 Spouts 和 Bolts 時,可以同時設置當前 Spout 和 Bolt 的 executor 數目和 task 數目,默認情況下,一個 Topology 的 task 的總和是和 executor 的總和一致的。之后,系統根據 worker 的數目,盡量平均的分配這些 task 的執行。worker 在哪個 supervisor 節點上運行是由 storm 本身決定的;
(3)任務分配好之后,Nimbes 節點會將任務的信息提交到 zookeeper 集群,同時在 zookeeper 集群中會有 workerbeats 節點,這里存儲了當前 Topology 的所有 worker 進程的心跳信息;
(4)Supervisor 節點會不斷的輪詢 zookeeper 集群,在 zookeeper 的 assignments 節點中保存了所有 Topology 的任務分配信息、代碼存儲目錄、任務之間的關聯關系等,Supervisor 通過輪詢此節點的內容,來領取自己的任務,啟動 worker 進程運行;
(5)一個 Topology 運行之后,就會不斷的通過 Spouts 來發送 Stream 流,通過 Bolts 來不斷的處理接收到的 Stream 流,Stream 流是無界的。
最后一步會不間斷的執行,除非手動結束 Topology。
Topology 方法調用流程
Topology 中的 Stream 處理時的方法調用過程如下:
有幾點需要說明的地方:
(1)每個組件 (Spout 或者 Bolt) 的構造方法和 declareOutputFields 方法都只被調用一次。
(2)open 方法、prepare 方法的調用是多次的。入口函數中設定的 setSpout 或者 setBolt 里的并行度參數指的是 executor 的數目,是負責運行組件中的 task 的線程 的數目,此數目是多少,上述的兩個方法就會被調用多少次,在每個 executor 運行的時候調用一次。相當于一個線程的構造方法。
(3)nextTuple 方法、execute 方法是一直被運行的,nextTuple 方法不斷的發射 Tuple,Bolt 的 execute 不斷的接收 Tuple 進行處理。只有這樣不斷地運行,才會產生無界的 Tuple 流,體現實時性。相當于線程的 run 方法。
(4)在提交了一個 topology 之后,Storm 就會創建 spout/bolt 實例并進行序列化。之后,將序列化的 component 發送給所有的任務所在的機器 (即 Supervisor 節點),在每一個任務上反序列化 component。
(5)Spout 和 Bolt 之間、Bolt 和 Bolt 之間的通信,是通過 zeroMQ 的消息隊列實現的。
(6) 上圖沒有列出 ack 方法和 fail 方法,在一個 Tuple 被成功處理之后,需要調用 ack 方法來標記成功,否則調用 fail 方法標記失敗,重新處理這個 Tuple。
Topology 并行度
在 Topology 的執行單元里,有幾個和并行度相關的概念。
(1)worker: 每個 worker 都屬于一個特定的 Topology,每個 Supervisor 節點的 worker 可以有多個,每個 worker 使用一個單獨的端口,它對 Topology 中的每個 component 運行一個或者多個 executor 線程來提供 task 的運行服務。
(2)executor:executor 是產生于 worker 進程內部的線程,會執行同一個 component 的一個或者多個 task。
(3)task: 實際的數據處理由 task 完成,在 Topology 的生命周期中,每個組件的 task 數目是不會發生變化的,而 executor 的數目卻不一定。executor 數目小于等于 task 的數目,默認情況下,二者是相等的。
在運行一個 Topology 時,可以根據具體的情況來設置不同數量的 worker、task、executor,而設置的位置也可以在多個地方。
(1)worker 設置:
(1.1) 可以通過設置 yaml 中的 topology.workers 屬性
(1.2) 在代碼中通過 Config 的 setNumWorkers 方法設定
(2)executor 設置:
通過在 Topology 的入口類中 setBolt、setSpout 方法的最后一個參數指定,不指定的話,默認為 1;
(3)task 設置:
(3.1) 默認情況下,和 executor 數目一致;
(3.2) 在代碼中通過 TopologyBuilder 的 setNumTasks 方法設定具體某個組件的 task 數目;
終止 Topology
通過在 Nimbus 節點利用如下命令來終止一個 Topology 的運行:
storm kill topologyName
kill 之后,可以通過 UI 界面查看 topology 狀態,會首先變成 KILLED 狀態,在清理完本地目錄和 zookeeper 集群中的和當前 Topology 相關的信息之后,此 Topology 就會徹底消失了。
Topology 跟蹤
Topology 提交后,可以在 Nimbus 節點的 web 界面查看,默認的地址是 http://NimbusIp:8080。
上述就是丸趣 TV 小編為大家分享的 storm java 的編程思路是什么了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注丸趣 TV 行業資訊頻道。