RocketMQ之事務訊息

2022-02-15 16:59:48 字數 2573 閱讀 6000

由於工作流引擎專案中,工作流引擎服務和業務服務是分開的,所以就涉及到了分布式事務的問題。綜合考慮到併發量和分布式事務的保障,最終選擇了事務訊息的方式。

首先我們來介紹下本地訊息表這種方案,當訊息佇列不支援事務訊息的時候,我們可以考慮這種方案。

基本流程

1、a 系統在自己本地乙個事務裡操作同時,插入一條資料到訊息表;

2、接著 a 系統將這個訊息傳送到 mq 中去;

3、b 系統接收到訊息之後,在乙個事務裡,往自己本地訊息表裡插入一條資料,同時執行其他的業務操作,如果這個訊息已經被處理過了,那麼此時這個事務會回滾,這樣保證不會重複處理訊息;

4、b 系統執行成功之後,就會更新自己本地訊息表的狀態以及 a 系統訊息表的狀態;

5、如果 b 系統處理失敗了,那麼就不會更新訊息表狀態,那麼此時 a 系統會定時掃瞄自己的訊息表,如果有未處理的訊息,會再次傳送到 mq 中去,讓 b 再次處理;

6、這個方案保證了最終一致性,哪怕 b 事務失敗了,但是 a 會不斷重發訊息,直到 b 那邊成功為止。

備註:a的訊息表用於保證b正確消費了a傳送的訊息,b的訊息表用於保證不重複消費同一條訊息。

這個方案說實話最大的問題就在於嚴重依賴於資料庫的訊息表來管理事務,高平發場景下不好擴充套件,所以應用好像也不太多。

本地訊息表是 base 理論,是最終一致模型,適用於對一致性要求不高的。實現這個模型時需要注意重試的冪等。

這個方案的思路其實跟上面講的本地訊息表基本相同,但是不基於資料庫,而是基於mq來實現事務,rocketmq提供了事務訊息來支援這種方式。

訊息傳送:

(a)傳送方將半事務訊息傳送至訊息佇列 rocketmq 版服務端。

(b)訊息佇列 rocketmq 服務端將訊息持久化成功之後,向傳送方返回 ack 確認訊息已經傳送成功,此時訊息為半事務訊息。

(c)傳送方開始執行本地事務邏輯。

(d)傳送方根據本地事務執行結果向服務端提交二次確認(commit 或是 rollback),服務端收到 commit 狀態則將半事務訊息標記為可投遞,訂閱方最終將收到該訊息;服務端收到 rollback 狀態則刪除半事務訊息,訂閱方將不會接受該訊息。

訊息回查:

(a)在斷網或者是應用重啟的特殊情況下,上述步驟 d 提交的二次確認最終未到達服務端,經過固定時間後rocketmq服務端將對該訊息發起訊息回查。(mq會自動定時輪詢所有 prepared 訊息**你的介面,問你,這個訊息是不是本地事務處理失敗了,所有沒傳送確認的訊息,是繼續重試還是回滾?一般來說這裡你就可以查下資料庫看之前本地事務是否執行,如果回滾了,那麼這裡也回滾吧。這個就是避免可能本地事務執行成功了,而確認訊息卻傳送失敗了。)

(b)傳送方收到訊息回查後,需要檢查對應訊息的本地事務執行的最終結果。

(c)傳送方根據檢查得到的本地事務的最終狀態再次提交二次確認,服務端仍按照步驟 d 對半事務訊息進行操作。

在分布式訊息佇列中,目前唯一提供完整的事務訊息的,只有 rocketmq 。

可能會有網友說,rabbitmq 和 kafka 也有事務訊息啊,也支援傳送事務訊息的傳送,以及後續的事務訊息的 commit提交或 rollbackc 回滾。但是要考慮乙個極端的情況,在本地資料庫事務已經提交的時時候,如果因為網路原因,又或者崩潰等等意外,導致事務訊息沒有被 commit ,最終導致這條事務訊息丟失,分布式事務出現問題。

相比來說,rocketmq 提供事務回查機制,如果應用超過一定時長未 commit 或 rollback 這條事務訊息,rocketmq 會主動回查應用,詢問這條事務訊息是 commit 還是 rollback ,從而實現事務訊息的狀態最終能夠被 commit 或是 rollback ,達到最終事務的一致性。

// rocketmq事務訊息監聽

@rocketmqtransactionlistener(txproducergroup = tx_producer_group)

public class transactionlistenerimpl implements rocketmqlocaltransactionlistener arg:{}]", msg, arg);

return rocketmqlocaltransactionstate.unknown;

}@override

public rocketmqlocaltransactionstate checklocaltransaction(message msg) ]", msg);

return rocketmqlocaltransactionstate.commit;}}

一般來說,有兩種方式實現本地事務回查時,返回事務訊息的狀態。

第一種,通過msg訊息,獲得某個業務上的標識或者編號,然後去資料庫中查詢業務記錄,從而判斷該事務訊息的狀態是提交還是回滾。

第二種,記錄msg的事務編號,與事務狀態到資料庫中。

芋道 spring boot 訊息佇列 rocketmq 入門

阿里雲事務訊息文件

RocketMQ 事務訊息

一 事務訊息實現方式 應用使用事務訊息的步驟 1 應用傳送訊息,使用prepare欄位標示準備訊息 2 應用執行本地業務邏輯 3 應用傳送事務提交或回滾訊息 broker收到prepare訊息後會將topic替換為rmq sys trans half topic,queueid替換為0,然後寫入co...

RocketMQ事務訊息思路

通過訊息佇列 rocketmq 事務訊息,能達到分布式事務的最終一致。模擬a賬戶轉賬給b賬戶操作,這個分布式事務有兩個子事務 子事務a areducetransaction 代表a賬戶扣款 子事務b bincreasetransaction 代表b賬戶收款 一 向訊息佇列伺服器傳送半訊息 半訊息無法...

RocketMQ事務訊息實現分析

這週rocketmq發布了4.3.0版本,new feature中最受關注的一點就是支援了事務訊息 今天花了點時間看了下具體的實現內容,下面是簡單的總結。通過馮嘉發布的 rocketmq 4.3正式發布,支援分布式事務 一文可以看到rocketmq採用了2pc的方案來提交事務訊息,同時增加乙個補償邏...