久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Transactional topology怎么使用

198次閱讀
沒有評論

共計 4536 個字符,預計需要花費 12 分鐘才能閱讀完成。

本篇內容介紹了“Transactional topology 怎么使用”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓丸趣 TV 小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

你可以通過使用 TransactionalTopologyBuilder 來創建 transactional topology. 下面就是一個 transactional topology 的定義,它的作用是計算輸入流里面的 tuple 的個數。這段代碼來自 storm-starter 里面的 TransactionalGlobalCount。

MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields( word), PARTITION_TAKE_PER_BATCH);

TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder(global-count , spout , spout, 3);

builder.setBolt(partial-count , new BatchCount(), 5)

        .shuffleGrouping(spout

builder.setBolt(sum , new UpdateGlobalCount())

        .globalGrouping(partial-count

TransactionalTopologyBuilder 構造器中接受如下的參數:

?一個 transaction topology 的 id

?spout 在整個 topology 里面的 id。

?一個 transactional spout。

?一個可選的這個 transactional spout 的并行度。

topology 的 id 是用來在 zookeeper 里面保存這個 topology 的當前進度狀態的,所以如果你重啟這個 topology,它可以接著前面的進度繼續執行。

一個 transaction topology 里面有一個唯一的 TransactionalSpout, 這個 spout 是通過 TransactionalTopologyBuilder 的構造函數來指定的。在這個例子里面,MemoryTransactionalSpout 被用來從一個內存變量里面讀取數據 (DATA)。第二個參數指定 spout 發送的 tuple 的字段,第三個參數指定每個 batch 的最大 tuple 數量。關于如何自定義 TransactionalSpout 我們會在后面介紹。

現在說說 bolts。這個 topology 并行地計算 tuple 的總數量。第一個 bolt:BatchBolt,隨機地把輸入 tuple 分給各個 task,然后各個 task 各自統計局部數量。第二個 bolt:UpdateGlobalCount, 用全局 grouping 來匯總這個 batch 中 tuple 的數量,然后再更新到數據庫里面的全局數量。

下面是 BatchCount 的定義:

public static class BatchCount extends BaseBatchBolt {

    Object _id;

    BatchOutputCollector _collector;

    int _count = 0;

    @Override

    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {

        _collector = collector;

        _id = id;

    }

    @Override

    public void execute(Tuple tuple) {

        _count++;

    }

    @Override

    public void finishBatch() {

        _collector.emit(new Values(_id, _count));

    }

    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields( id , count));

    }

}

storm 會為每個正在處理的 batch 創建一個 BatchCount 對象,這個 BatchCount 是運行在 BatchBoltExecutor 里面的。而 BatchBoltExecutor 負責創建以及清理這個對象的實例。

BatchCount 對象的 prepare 方法接收如下參數:

?Storm config

?Topology context

?Output collector

?這個 batch 的 id (txid),在 Transactional Topology 中,這個 id 則是一個 TransactionAttempt 對象。

這個 batch bolt 的抽象在 DRPC 里面也可以用,只是 txid 的類型不一樣而已。實際上,BatchBolt 可以接收一個 txid 類型的參數,所以如果你只是想在 transactioinal topology 里面使用這個 BatchBolt,你可以去繼承 BaseTransactionalBolt 類,如下定義:

public abstract class BaseTransactionalBolt extends BaseBatchBolt TransactionAttempt {

}

在 transaction topology 里面發射的所有的 tuple 都必須以 TransactionAttempt 作為第一個 field,然后 storm 可以根據這個 field 來判斷哪些 tuple 屬于一個 batch。所以你在發射 tuple 的時候需要滿足這個條件。

TransactionAttempt 包含兩個值:一個 transaction id,一個 attempt id。transaction id 的作用就是我們上面介紹的對于每個 batch 是唯一的,而且不管這個 batch 被 replay 多少次都是一樣的。attempt id 是對于每個 batch 唯一的一個 id,但是對于同一個 batch,它 replay 之后的 attempt id 跟 replay 之前就不一樣了,我們可以把 attempt id 理解成 replay-times, storm 利用這個 id 來區別一個 batch 發射的 tuple 的不同版本。

transaction id 對于每個 batch 加一,所以第一個 batch 的 transaction id 是”1″, 第二個 batch 是”2″,依次類推。

每收到一個 batch 中的 tuple,execute 方法便被調用一次。每次當該方法被調用時,你應該把這個 batch 里面的狀態保持在一個本地變量里面。對于這個例子來說,它在 execute 方法里面遞增 tuple 的個數。

最后,當這個 bolt 接收到某個 batch 的所有的 tuple 之后,finishBatch 方法會被調用。這個例子里面的 BatchCount 類會在這個時候發射它的局部數量到它的輸出流里面去。

下面是 UpdateGlobalCount 類的定義:

public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {

    TransactionAttempt _attempt;

    BatchOutputCollector _collector;

    int _sum = 0;

    @Override

    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {

        _collector = collector;

        _attempt = attempt;

    }

    @Override

    public void execute(Tuple tuple) {

        _sum+=tuple.getInteger(1);

    }

    @Override

    public void finishBatch() {

        Value val = DATABASE.get(GLOBAL_COUNT_KEY);

        Value newval;

        if(val == null || !val.txid.equals(_attempt.getTransactionId())) {

            newval = new Value();

            newval.txid = _attempt.getTransactionId();

            if(val==null) {

                newval.count = _sum;

            } else {

                newval.count = _sum + val.count;

            }

            DATABASE.put(GLOBAL_COUNT_KEY, newval);

        } else {

            newval = val;

        }

        _collector.emit(new Values(_attempt, newval.count));

    }

    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields( id , sum));

    }

}

UpdateGlobalCount 是 Transactional Topologies 相關的類,所以它繼承自 BaseTransactionalBolt。在 execute 方法里面,UpdateGlobalCount 累積這個 batch 的計數,比較有趣的是 finishBatch 方法。

首先,注意這個 bolt 實現了 ICommitter 接口,這告訴 storm 要在這個事務的 commit 階段調用 finishBatch 方法,所以對于 finishBatch 的調用會保證強順序性(順序就是 transaction id 的升序 ),另一方面 execute 方法在 processing 或者 commit 階段都可以執行。另外一種把 bolt 標識為 commiter 的方法是調用 TransactionalTopologyBuilder 的 setCommiterBolt 來添加 Bolt(而不是 setBolt)。

UpdateGlobalCount 里面 finishBatch 方法的邏輯是首先從數據庫中獲取當前的值,并且把數據庫里面的 transaction id 與當前這個 batch 的 transaction id 進行比較。如果他們一樣,那么忽略這個 batch。否則把這個 batch 的結果加到總結果里面去,并且更新數據庫。

“Transactional topology 怎么使用”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注丸趣 TV 網站,丸趣 TV 小編將為大家輸出更多高質量的實用文章!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計4536字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 格尔木市| 虹口区| 甘谷县| 精河县| 兴隆县| 平陆县| 临夏市| 临汾市| 灵武市| 黑龙江省| 米泉市| 五大连池市| 璧山县| 桃园市| 五河县| 衡阳县| 司法| 闽侯县| 嵩明县| 改则县| 西乌珠穆沁旗| 新龙县| 彰化县| 紫阳县| 尚义县| 海原县| 乌什县| 古蔺县| 永川市| 新安县| 哈尔滨市| 桐乡市| 阿克苏市| 台湾省| 邹平县| 阿拉尔市| 大足县| 延庆县| 高密市| 洛浦县| 绥化市|