久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

IBatchSpout API怎么使用

173次閱讀
沒有評論

共計 2678 個字符,預計需要花費 7 分鐘才能閱讀完成。

這篇文章主要介紹“IBatchSpout API 怎么使用”,在日常操作中,相信很多人在 IBatchSpout API 怎么使用問題上存在疑惑,丸趣 TV 小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”IBatchSpout API 怎么使用”的疑惑有所幫助!接下來,請跟著丸趣 TV 小編一起來學習吧!

IBatchSpout 是 storm trident 推出的一種可以批量發(fā)射的 Spout。非事務性,基本的 spout

1:Map getComponentConfiguration(); 定義配置,可以用 backtype.storm.Config。

2:void open(Map conf, TopologyContext context); Spout 的初始化方法,參數(shù) conf 即是 getComponentConfiguration 定義的配置

3:Fields getOutputFields(); 聲明輸出的 fields

4:void emitBatch(long batchId, TridentCollector collector); 批量發(fā)射 tuple,本次的批次號為 batchId

5:void ack(long batchId); 批次號為 batchId 的數(shù)據(jù)處理成功

6:  void close();

一個例子

package storm.projectA;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IBatchSpout;
import backtype.storm.Config;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class MySpout implements IBatchSpout{
 /**
 * 
 */
 private static final long serialVersionUID = 1L;
 private long maxBatchSize;// 每批次最大的數(shù)量
 private BufferedReader br;// 源文件流
 HashMap Long, List List Object  batches = new HashMap Long, List List Object // 保存發(fā)送過的所有數(shù)據(jù),以便于重復發(fā)送
 /**
 * @param conf  配置
 * @param context 
 */
 @Override
 public void open(Map conf, TopologyContext context) { String filePath = (String)conf.get( filePath 
 maxBatchSize = (Long)conf.get( maxBatchSize 
 try { br = new BufferedReader(new FileReader(filePath));
 } catch (FileNotFoundException e) { e.printStackTrace();
 }
 }
 /*** spout 的發(fā)送方法
 * @param batchId  批次 id
 * @param collector  批量發(fā)射器
 */
 @Override
 public void emitBatch(long batchId, TridentCollector collector) { List List Object  batch = batches.get(batchId);
 if (batch == null) { batch = new ArrayList List Object ();
 for (int i = 0; i   maxBatchSize; i++) {
 try { String line = br.readLine();
 if(line == null){
 break;
 }
 batch.add(new Values(line));
 } catch (IOException e) { e.printStackTrace();
 }
 }
 }
 for(List Object  list : batch){ collector.emit(list);
 }
 }
 @Override
 public void ack(long batchId) { batches.remove(batchId);
 }
 /**
 * close  方法
 */
 @Override
 public void close() { if(br!=null){
 try { br.close();
 } catch (IOException e) { e.printStackTrace();
 }
 }
 
 }
 @Override
 public Map getComponentConfiguration() { Config conf = new Config();
 // 最大并行度   本地模式設置為 1
 conf.setMaxTaskParallelism(1);
 conf.put( filePath ,  D:\\aaa.txt 
 conf.put(maxBatchSize , 2);
 return conf;
 }
 /**
 *  輸出的 fileds
 */
 @Override
 public Fields getOutputFields() {
 return new Fields( sentence 
 }
}

到此,關于“IBatchSpout API 怎么使用”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關知識,請繼續(xù)關注丸趣 TV 網(wǎng)站,丸趣 TV 小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

正文完
 
丸趣
版權聲明:本站原創(chuàng)文章,由 丸趣 2023-08-16發(fā)表,共計2678字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網(wǎng)絡搜集發(fā)布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 普兰县| 昔阳县| 门源| 安丘市| 襄城县| 闵行区| 兰坪| 宁安市| 贵阳市| 徐汇区| 林甸县| 望奎县| 星子县| 仙游县| 北宁市| 葵青区| 阆中市| 盐边县| 资溪县| 淮安市| 交城县| 得荣县| 海安县| 黎平县| 方正县| 塘沽区| 洛阳市| 镇坪县| 东港市| 通山县| 定边县| 克拉玛依市| 江川县| 平原县| 兖州市| 临沂市| 肥西县| 原平市| 三江| 敦煌市| 荥经县|