久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Storm如何接收數據

217次閱讀
沒有評論

共計 4769 個字符,預計需要花費 12 分鐘才能閱讀完成。

這篇文章主要講解了“Storm 如何接收數據”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著丸趣 TV 小編的思路慢慢深入,一起來研究和學習“Storm 如何接收數據”吧!

    簡要的模擬如何接收數據:

package com.cc.storm.spout;
import java.io.IOException;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.log4j.Logger;
import redis.clients.jedis.JedisPubSub;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
public class RandomEmitSpout extends BaseRichSpout {
 private Random _random;
 private static final long serialVersionUID = 4092527421163270357L;
 static Logger LOG = Logger.getLogger(RandomEmitSpout.class);
 private SpoutOutputCollector _collector;
 @Override
 public void open(Map conf, TopologyContext context,
 SpoutOutputCollector collector) {
 _collector = collector;
 _random = new Random();
 @Override
 public void nextTuple() {
 try {Thread.sleep(1000);
 } catch (Exception e) {e.printStackTrace();
 String[] userIds = {  1 ,  2 ,  3 ,  4  };
 String[] merchandiseIDS = {  1  };
 _collector.emit(new Values(userIds[_random.nextInt(userIds.length)],
 merchandiseIDS[_random.nextInt(merchandiseIDS.length)]));
 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {
 // TODO Auto-generated method stub
 declarer.declare(new Fields( userIdS ,  merchandiseIDS));
 @Override
 public void close() {}

     plus:如果您采用的是 Redis

    那么:   

package com.cc.storm.spout;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.log4j.Logger;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisPubSub;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
public class RedisPubSubSpout extends BaseRichSpout {
  * @Fields serialVersionUID : TODO
  */
 private static final long serialVersionUID = 4092527421163270357L;
 static Logger LOG = Logger.getLogger(RedisPubSubSpout.class);
 private SpoutOutputCollector _collector;
 private final String host;
 private final int port;
 private final String pattern;
 LinkedBlockingQueue String  queue;
 JedisPool pool;
 public RedisPubSubSpout(String host, int port, String pattern) {
 // TODO Auto-generated constructor stub
 this.host = host;
 this.port = port;
 this.pattern = pattern;
 //  監聽線程,從 redis 訂閱的興趣事件中獲取數據
 class ListenerThread extends Thread {
 private LinkedBlockingQueue String  queue;
 JedisPool pool;
 String pattern;
 public ListenerThread(LinkedBlockingQueue String  queue,
 JedisPool pool, String pattern) {
 // TODO Auto-generated constructor stub
 this.queue = queue;
 this.pool = pool;
 this.pattern = pattern;
 @Override
 public void run() {JedisPubSub listener = new JedisPubSub() {
 @Override
 public void onUnsubscribe(String arg0, int arg1) {
 // TODO Auto-generated method stub
 @Override
 public void onSubscribe(String arg0, int arg1) {
 // TODO Auto-generated method stub
 @Override
 public void onPUnsubscribe(String arg0, int arg1) {
 // TODO Auto-generated method stub
 @Override
 public void onPSubscribe(String arg0, int arg1) {
 // TODO Auto-generated method stub
 @Override
 public void onPMessage(String pattern, String channel,
 String message) {
 // TODO Auto-generated method stub
 queue.offer(message);
 @Override
 public void onMessage(String channel, String message) {
 // TODO Auto-generated method stub
 queue.offer(message);
 Jedis jedis = pool.getResource();
 try {jedis.psubscribe(listener, pattern);
 } finally {pool.returnResource(jedis);
 @SuppressWarnings(rawtypes)
 @Override
 public void open(Map conf, TopologyContext context,
 SpoutOutputCollector collector) {
 // TODO Auto-generated method stub
 _collector = collector;
 //  隊列最大支持 1000 個
 queue = new LinkedBlockingQueue String (1000);
 JedisPoolConfig config = new JedisPoolConfig();
 // error
 pool = null;
 ListenerThread listener = new ListenerThread(queue, pool, pattern);
 //  啟動線程
 listener.start();
 @Override
 public void nextTuple() {
 // TODO Auto-generated method stub
 String ret = queue.poll();
 if (null == ret) {
 //  如果隊列中暫無數據可取,休息 500ms
 Utils.sleep(500);
 } else {
 //  數據格式為  “userID:merchandiseID”,可以依據需求更改此處
 String[] s = ret.split( : 
 _collector.emit(new Values(s[0], s[1]));
 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {
 // TODO Auto-generated method stub
 declarer.declare(new Fields( userIdS ,  merchandiseIDS));
 @Override
 public void close() {
 // TODO Auto-generated method stub
 pool.destroy();}

感謝各位的閱讀,以上就是“Storm 如何接收數據”的內容了,經過本文的學習后,相信大家對 Storm 如何接收數據這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是丸趣 TV,丸趣 TV 小編將為大家推送更多相關知識點的文章,歡迎關注!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計4769字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 泽普县| 襄垣县| 军事| 中西区| 宁安市| 库伦旗| 兴文县| 成武县| 临桂县| 许昌市| 涪陵区| 巴南区| 图片| 久治县| 大足县| 桂林市| 南通市| 三江| 措勤县| 栾川县| 穆棱市| 柳江县| 崇文区| 濮阳县| 易门县| 丹东市| 五台县| 淮滨县| 治多县| 容城县| 三台县| 靖远县| 三门峡市| 泸西县| 宁强县| 银川市| 弥渡县| 德令哈市| 永昌县| 图木舒克市| 云阳县|