久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

如何進行RocketMQ事務消息實現

154次閱讀
沒有評論

共計 3257 個字符,預計需要花費 9 分鐘才能閱讀完成。

這篇文章將為大家詳細講解有關如何進行 RocketMQ 事務消息實現,文章內容質量較高,因此丸趣 TV 小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。

摘要:
            事務消息提交或回滾的實現原理就是根據 commitlogOffset 找到消息,如果是提交動作,就恢復原消息的主題與隊列,再次存入 commitlog 文件進而轉到消息消費隊列,供消費者消費,然后將原預處理消息存入一個新的主題 RMQ_SYS_TRANS_OP_HALF_TOPIC,代表該消息已被處理;回滾消息與提交事務消息不同的是,提交事務消息會將消息恢復原主題與隊列,再次存儲在 commitlog 文件中。

若您對 RocketMQ 技術感興趣,請加入
RocketMQ 技術交流群

丸趣 TV 小編將重點分析 RocketMQ Broker 如何處理事務消息提交、回滾命令,根據前面的介紹,其入口 EndTransactionProcessor#processRequest:

OperationResult result = new OperationResult();if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { // @1result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); // @2
 if (result.getResponseCode() == ResponseCode.SUCCESS) { // @3
 RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); // @4
 if (res.getCode() == ResponseCode.SUCCESS) { MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage()); // @5
 msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
 msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
 msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
 msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp()); // @6
 RemotingCommand sendResult = sendFinalMessage(msgInner); // @7
 if (sendResult.getCode() == ResponseCode.SUCCESS) { 
 this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); // @8
 } return sendResult;
 } return res;
 }
}

代碼 @1:如果請求為提交事務,進入事務消息提交處理流程。
代碼 @2:提交消息,別被這名字誤導了,該方法主要是根據 commitLogOffset 從 commitlog 文件中查找消息返回 OperationResult 實例:

private MessageExt prepareMessage:消息對象。

private int responseCode:查找結果。

private String responseRemark:錯誤提示。

代碼 @3:如果成功查找到消息,則繼續處理,否則返回給客戶端,消息未找到錯誤信息。

代碼 @4:驗證消息必要字段。
驗證消息的生產組與請求信息中的生產者組是否一致。
驗證消息的隊列偏移量(queueOffset)與請求信息中的偏移量是否一致。
驗證消息的 commitLogOffset 與請求信息中的 CommitLogOffset 是否一致。

代碼 @5: 調用 endMessageTransaction 方法,該方法主要的目的就是恢復事務消息的真實的主題、隊列,并設置事務 ID。

代碼 @6:設置消息的相關屬性,這一步應該直接在 endMessageTransaction 中實現就好,統一恢復原消息的數量,特別關注的是取消了事務相關的系統標記。

代碼 @7:發送最終消息,其實現原理非常簡單,調用 MessageStore 將消息存儲在 commitlog 文件中,此時的消息,會被轉發到原消息主題對應的消費隊列,被消費者消費。

代碼 @8:刪除預處理消息 (prepare),其實是將消息存儲在主題為:RMQ_SYS_TRANS_OP_HALF_TOPIC 的主題中,代表這些消息已經被處理(提交或回滾)。

上述就是事務消息提交的流程,事務回滾類似,接下來大概分析一下事務消息回滾的流程。

EndTransactionProcessor#processRequest else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader); // @1
 if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); // @2
 } return res;
 }
}

代碼 @1:回滾消息,其實內部就是根據 commitlogOffset 查找消息。
代碼 @2:將消息存儲在 RMQ_SYS_TRANS_OP_HALF_TOPIC 中,代表該消息已被處理,與提交事務消息不同的是,提交事務消息會將消息恢復原主題與隊列,再次存儲在 commitlog 文件中。

事務消息在 Broker 服務端的提交回滾流程就介紹到這了。其核心實現就是根據 commitlogOffset 找到消息,如果是提交動作,就恢復原消息的主題與隊列,再次存入 commitlog 文件進而轉到消息消費隊列,供消費者消費,然后將原預處理消息存入一個新的主題 RMQ_SYS_TRANS_OP_HALF_TOPIC,代表該消息已被處理;回滾消息與提交事務消息不同的是,提交事務消息會將消息恢復原主題與隊列,再次存儲在 commitlog 文件中。

關于如何進行 RocketMQ 事務消息實現就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-25發表,共計3257字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 南阳市| 宁河县| 洛川县| 阜平县| 渝中区| 镇宁| 黄平县| 象州县| 甘孜县| 巨野县| 界首市| 青河县| 汉寿县| 双鸭山市| 南宁市| 宝山区| 屏山县| 鄱阳县| 黄石市| 库尔勒市| 北碚区| 霍城县| 包头市| 遂平县| 临邑县| 府谷县| 甘孜| 涞水县| 芷江| 塘沽区| 平谷区| 阿拉善盟| 孙吴县| 房山区| 华亭县| 县级市| 乡宁县| 那曲县| 珠海市| 赤峰市| 扶绥县|