共計 4769 個字符,預計需要花費 12 分鐘才能閱讀完成。
這篇文章主要講解了“Storm 如何接收數據”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著丸趣 TV 小編的思路慢慢深入,一起來研究和學習“Storm 如何接收數據”吧!
簡要的模擬如何接收數據:
package com.cc.storm.spout;
import java.io.IOException;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.log4j.Logger;
import redis.clients.jedis.JedisPubSub;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
public class RandomEmitSpout extends BaseRichSpout {
private Random _random;
private static final long serialVersionUID = 4092527421163270357L;
static Logger LOG = Logger.getLogger(RandomEmitSpout.class);
private SpoutOutputCollector _collector;
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
_collector = collector;
_random = new Random();
@Override
public void nextTuple() {
try {Thread.sleep(1000);
} catch (Exception e) {e.printStackTrace();
String[] userIds = { 1 , 2 , 3 , 4 };
String[] merchandiseIDS = { 1 };
_collector.emit(new Values(userIds[_random.nextInt(userIds.length)],
merchandiseIDS[_random.nextInt(merchandiseIDS.length)]));
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields( userIdS , merchandiseIDS));
@Override
public void close() {}
plus:如果您采用的是 Redis
那么:
package com.cc.storm.spout;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.log4j.Logger;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisPubSub;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
public class RedisPubSubSpout extends BaseRichSpout {
* @Fields serialVersionUID : TODO
*/
private static final long serialVersionUID = 4092527421163270357L;
static Logger LOG = Logger.getLogger(RedisPubSubSpout.class);
private SpoutOutputCollector _collector;
private final String host;
private final int port;
private final String pattern;
LinkedBlockingQueue String queue;
JedisPool pool;
public RedisPubSubSpout(String host, int port, String pattern) {
// TODO Auto-generated constructor stub
this.host = host;
this.port = port;
this.pattern = pattern;
// 監聽線程,從 redis 訂閱的興趣事件中獲取數據
class ListenerThread extends Thread {
private LinkedBlockingQueue String queue;
JedisPool pool;
String pattern;
public ListenerThread(LinkedBlockingQueue String queue,
JedisPool pool, String pattern) {
// TODO Auto-generated constructor stub
this.queue = queue;
this.pool = pool;
this.pattern = pattern;
@Override
public void run() {JedisPubSub listener = new JedisPubSub() {
@Override
public void onUnsubscribe(String arg0, int arg1) {
// TODO Auto-generated method stub
@Override
public void onSubscribe(String arg0, int arg1) {
// TODO Auto-generated method stub
@Override
public void onPUnsubscribe(String arg0, int arg1) {
// TODO Auto-generated method stub
@Override
public void onPSubscribe(String arg0, int arg1) {
// TODO Auto-generated method stub
@Override
public void onPMessage(String pattern, String channel,
String message) {
// TODO Auto-generated method stub
queue.offer(message);
@Override
public void onMessage(String channel, String message) {
// TODO Auto-generated method stub
queue.offer(message);
Jedis jedis = pool.getResource();
try {jedis.psubscribe(listener, pattern);
} finally {pool.returnResource(jedis);
@SuppressWarnings(rawtypes)
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
// TODO Auto-generated method stub
_collector = collector;
// 隊列最大支持 1000 個
queue = new LinkedBlockingQueue String (1000);
JedisPoolConfig config = new JedisPoolConfig();
// error
pool = null;
ListenerThread listener = new ListenerThread(queue, pool, pattern);
// 啟動線程
listener.start();
@Override
public void nextTuple() {
// TODO Auto-generated method stub
String ret = queue.poll();
if (null == ret) {
// 如果隊列中暫無數據可取,休息 500ms
Utils.sleep(500);
} else {
// 數據格式為 “userID:merchandiseID”,可以依據需求更改此處
String[] s = ret.split( :
_collector.emit(new Values(s[0], s[1]));
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields( userIdS , merchandiseIDS));
@Override
public void close() {
// TODO Auto-generated method stub
pool.destroy();}
感謝各位的閱讀,以上就是“Storm 如何接收數據”的內容了,經過本文的學習后,相信大家對 Storm 如何接收數據這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是丸趣 TV,丸趣 TV 小編將為大家推送更多相關知識點的文章,歡迎關注!
正文完