共計 3258 個字符,預計需要花費 9 分鐘才能閱讀完成。
本篇內容主要講解“Storm DRPC 怎么使用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓丸趣 TV 小編來帶大家學習“Storm DRPC 怎么使用”吧!
Storm 里面引入 DRPC 主要是利用 storm 的實時計算能力來并行化 CPU 密集型(CPU intensive)的計算任務。DRPC 的 stormtopology 以函數的參數流作為輸入,而把這些函數調用的返回值作為 topology 的輸出流。
DRPC 其實不能算是 storm 本身的一個特性,它是通過組合 storm 的原語 stream、spout、bolt、topology 而成的一種模式(pattern)。本來應該把 DRPC 單獨打成一個包的,但是 DRPC 實在是太有用了,所以我們我們把它和 storm 捆綁在一起。
概覽
Distributed RPC 是由一個”DPRC 服務器”協調(storm 自帶了一個實現)。DRPC 服務器協調:① 接收一個 RPC 請求 ② 發送請求到 storm topology ③ 從 storm topology 接收結果 ④ 把結果發回給等待的客戶端。從客戶端的角度來看一個 DRPC 調用跟一個普通的 RPC 調用沒有任何區別。比如下面是客戶端如何調用 RPC 計算“reach”功能(function)的結果
DRPCClient client = new DRPCClient(drpc-host , 3772);
String result = client.execute(reach , http://twitter.com
DRPC 的工作流大致是這樣的(重要☆):
客戶端給 DRPC 服務器發送要執行的函數(function)的名字,以及這個函數的參數。實現了這個函數的 topology 使用 DRPCSpout 從 DRPC 服務器接收函數調用流,每個函數調用被 DRPC 服務器標記了一個唯一的 id。這個 topology 然后計算結果,在 topology 的最后,一個叫做 ReturnResults 的 bolt 會連接到 DRPC 服務器,并且把這個調用的結果發送給 DRPC 服務器(通過那個唯一的 id 標識)。DRPC 服務器用那個唯一 id 來跟等待的客戶端匹配上,喚醒這個客戶端并且把結果發送給它。
LinearDRPCTopologyBuilder
Storm 自帶了一個稱作 LinearDRPCTopologyBuilder 的 topology builder,它把實現 DRPC 的幾乎所有步驟都自動化了。這些步驟包括:
1、設置 spout
2、把結果返回給 DRPC 服務器
3、給 bolt 提供有限聚合幾組 tuples 的能力
來看一個簡單的例子,下面是一個把輸入參數后面添加一個”!”的 DRPC topology 的實現:
public static class ExclaimBolt extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector collector) {
String input = tuple.getString(1);
collector.emit(new Values(tuple.getValue(0), input + ! ));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields( id , result));
}
}
public static void main(String[] args) throws Exception {
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder(exclamation
builder.addBolt(new ExclaimBolt(), 3);
// …
}
可以看出來,我們需要做的事情非常的少。創建 LinearDRPCTopologyBuilder 的時候,你需要告訴它你要實現的 DRPC 函數 (DRPC function) 的名字。一個 DRPC 服務器可以協調很多函數,函數與函數之間靠函數名字來區分。你聲明的第一個 bolt 會接收一個兩維 tuple,tuple 的第一個字段是 request-id,第二個字段是這個請求的參數。LinearDRPCTopologyBuilder 同時要求我們 topology 的最后一個 bolt 發送一個形如 [id, result] 的二維 tuple:第一個 field 是 request-id,第二個 field 是這個函數的結果。最后所有中間 tuple 的第一個 field 必須是 request-id。
在這里例子里面 ExclaimBolt 簡單地在輸入 tuple 的第二個 field 后面再添加一個”!”,其余的事情都由 LinearDRPCTopologyBuilder 幫我們搞定:連接到 DRPC 服務器,并且把結果發回。
本地模式 DRPC
DRPC 可以以本地模式運行,下面就是以本地模式運行上面例子的代碼:
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(drpc-demo , conf, builder.createLocalTopology(drpc));
System.out.println(Results for hello : + drpc.execute( exclamation , hello));
cluster.shutdown();
drpc.shutdown();
首先你創建一個 LocalDRPC 對象,這個對象在進程內模擬一個 DRPC 服務器(這很類似于 LocalCluster 在進程內模擬一個 Storm 集群),然后創建 LocalCluster 對象在本地模式運行 topology。LinearTopologyBuilder 有單獨的方法來創建本地的 topology 和遠程的 topology。在本地模式里面 LocalDRPC 對象不和任何端口綁定,所以我們的 topology 對象需要知道和誰交互,這就是為什么 createLocalTopology 方法接受一個 LocalDRPC 對象作為輸入的原因。
把 topology 啟動了之后,你就可以通過調用 LocalDRPC 對象的 execute 來調用 RPC 方法了。
遠程模式 DRPC
在一個真實集群上面 DRPC 也是非常簡單的,有三個步驟:
1、啟動 DRPC 服務器
2、配置 DRPC 服務器的地址
3、提交 DRPCtopology 到 storm 集群里面去。
我們可以通過“bin/storm drpc”命令來啟動 DRPC 服務器。
接著,你需要讓你的 storm 集群知道你的 DRPC 服務器的地址。DRPCSpout 需要這個地址從而可以從 DRPC 服務器來接收函數調用。這個可以配置在 storm.yaml 或者通過代碼的方式配置在 topology 里面。通過 storm.yaml 配置是這樣的:
drpc.servers:
– drpc1.foo.com
– drpc2.foo.com
最后,你通過 StormSubmitter 對象來提交 DRPC topology(這個跟你提交其它 topology 沒有區別)。如果要以遠程的方式運行上面的例子,用下面的代碼:
StormSubmitter.submitTopology(exclamation-drpc , conf, builder.createRemoteTopology());
我們用 createRemoteTopology 方法來創建運行在真實集群上的 DRPC topology。
到此,相信大家對“Storm DRPC 怎么使用”有了更深的了解,不妨來實際操作一番吧!這里是丸趣 TV 網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!