共計 4406 個字符,預計需要花費 12 分鐘才能閱讀完成。
這篇文章主要介紹“怎么使用 Storm”,在日常操作中,相信很多人在怎么使用 Storm 問題上存在疑惑,丸趣 TV 小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”怎么使用 Storm”的疑惑有所幫助!接下來,請跟著丸趣 TV 小編一起來學習吧!
項目 Pom(Storm jar 沒有提交到 Maven 中央倉庫,需要在項目中加入下面的倉庫地址):
repositories
repository
id central /id
name Maven Repository Switchboard /name
layout default /layout
url http://maven.oschina.net/content/groups/public/ /url
snapshots
enabled false /enabled
/snapshots
/repository
repository
id clojars /id
url https://clojars.org/repo/ /url
snapshots
enabled false /enabled
/snapshots
releases
enabled true /enabled
/releases
/repository
/repositories
dependencies
dependency
groupId org.yaml /groupId
artifactId snakeyaml /artifactId
version 1.13 /version
/dependency
dependency
groupId org.apache.zookeeper /groupId
artifactId zookeeper /artifactId
version 3.3.3 /version
/dependency
dependency
groupId org.clojure /groupId
artifactId clojure /artifactId
version 1.5.1 /version
/dependency
dependency
groupId storm /groupId
artifactId storm /artifactId
version 0.9.0.1 /version
/dependency
dependency
groupId storm /groupId
artifactId libthrift7 /artifactId
version 0.7.0 /version
/dependency
/dependencies
下面是一個 Storm 的 HelloWord 的例子,代碼有刪減,熟悉 Storm 的讀者自然能把代碼組織成一個完整的例子。
public static void main(String[] args) {Config conf = new Config();
conf.put(Config.STORM_LOCAL_DIR, /Volumes/Study/data/storm
conf.put(Config.STORM_CLUSTER_MODE, local
//conf.put( storm.local.mode.zmq , false
conf.put( storm.zookeeper.root , /storm
conf.put(storm.zookeeper.session.timeout , 50000);
conf.put( storm.zookeeper.servers , nowledgedata-n15
conf.put(storm.zookeeper.port , 2181);
//conf.setDebug(true);
//conf.setNumWorkers(2);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(words , new TestWordSpout(), 2);
builder.setBolt(exclaim2 , new DefaultStringBolt(), 5)
.shuffleGrouping( words
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(test , conf, builder.createTopology());
}
Config.STORM_LOCAL_DIR 是配置一個本地路徑,Storm 會在這個路徑寫入一些配置信息和臨時數(shù)據(jù)。
Config.STORM_CLUSTER_MODE 是運行模式,local 和 distributed 兩個選項,即本地模式和分布式模式。本地模式在運行時時多線程模擬的,開發(fā)測試用;分布式模式在分布式集群下是多進程的,真正的分布式。
Storm 的 Spout 和 Blot 高可用是通過 ZooKeeper 協(xié)調的,storm.zookeeper.root 是一個 ZooKeeper 地址,并且有對應的端口號
Debug 是測試模式,有更詳細的日志信息。
TestWordSpout 是一個 Storm 自帶的例子,用來隨機的產生 code new String[] { nathan , mike , jackson , golda , bertels /code 列表中的字符串,用來提供數(shù)據(jù)源。
其中 DefaultStringBolt 的源碼:
OutputCollector collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
public void execute(Tuple tuple) {log.info( rev a message: + tuple.getString(0));
collector.emit(tuple, new Values(tuple.getString(0) + !!! ));
collector.ack(tuple);
}
運行日志:
10658 [Thread-29-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt – rev a message: jackson 10658 [Thread-31-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt – rev a message: jackson 10758 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt – rev a message: mike 10758 [Thread-33-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt – rev a message: nathan 10859 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt – rev a message: nathan 10859 [Thread-29-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt – rev a message: bertels 10961 [Thread-31-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt – rev a message: jackson 10961 [Thread-33-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt – rev a message: jackson 11061 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt – rev a message: nathan 11062 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt – rev a message: nathan 11162 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt – rev a message: bertels 11163 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt – rev a message: jackson
數(shù)據(jù)由一個 Storm 叫做噴嘴(Spout,也相當一個水龍頭,能產生數(shù)據(jù)的來源端)產生,然后傳遞給后端一連串的的 Blot,最終被轉換和消費。而 Spout 和 Blot 都是并行的,并行度都可以自己設置(本地運行是靠多線程模擬的)。如:
builder.setSpout(words , new TestWordSpout(), 2);
builder.setBolt(exclaim2 , new DefaultStringBolt(), 5)
噴嘴 TestWordSpout 的并行度是 2,DefaultStringBolt 的并行度是 5.
從日志可以看出,數(shù)據(jù)經(jīng)過噴嘴到達預先定于的一個 Blot,打印了日志。我測試代碼設置的并行度是 5,日志中統(tǒng)計,確實是 5 個線程:
Thread-29-exclaim2
Thread-31-exclaim2
Thread-26-exclaim2
Thread-33-exclaim2
Thread-35-exclaim2
借用 OSC 網(wǎng)友的話說,Hadoop 就是商場里自動升降式的電梯,用戶需要排隊等待,選按樓層,然后到達;而 Storm 就像是自動扶梯,扶梯預先設置好運行后,來人就立即運走,目的地是明確的。
Storm 按我的理解,Storm 和 Hadoop 是完全不同的,設計上也沒有半點擬合的部分。Storm 更像是我之前介紹過的 Spring Integration,是一個數(shù)據(jù)流系統(tǒng)。它能把數(shù)據(jù)按照預設定的流程,把數(shù)據(jù)做各種轉換,傳遞,分解,合并,最后數(shù)據(jù)到達后端存儲。只不過 Storm 是可以分布式,而且分布式的能力也是可以自己設置。
Storm 的這種特性很適合大數(shù)據(jù)類的 ETL 系統(tǒng)開發(fā)。
到此,關于“怎么使用 Storm”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關知識,請繼續(xù)關注丸趣 TV 網(wǎng)站,丸趣 TV 小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>