如何使用訊息佇列的事務訊息

2021-10-08 17:59:08 字數 4023 閱讀 8606

「發訊息」過程,往往是為通知另外乙個系統更新資料,mq的「事務」,主要解決訊息生產者和訊息消費者的資料一致性問題。

先把商品加到購物車

然後幾件商品一起下單

最後支付

完成購物流程,就可以愉快地等待收貨

該過程中有個需用mq。

訂單系統建立訂單後,發訊息給購物車模組,將已下單商品從購物車刪除。

從購物車刪除已下單商品步驟,並非使用者下單支付這個主要流程的必需步驟,所以使用mq非同步清理購物車更合理。

訂單模組建立訂單的過程實際執行了倆操作:

在訂單db插一條訂單資料,用來建立訂單

發訊息給mq,訊息內容即剛建立的訂單

購物車模組訂閱相應主題,接收訂單建立的訊息,然後清理購物車,在購物車中刪除訂單中的商品。

分布式下的這些步驟都有失敗可能性,若不做處理,就可能導致訂單資料與購物車資料不一致:

建立了訂單,沒有清理購物車

訂單沒建立成功,購物車裡面的商品卻被清了

購物車系統收到訂單建立成功訊息清理購物車操作,只要成功執行購物車清理後再提交消費確認即可

如果失敗,由於沒有提交消費確認,mq會自動重試。

問題關鍵點在訂單系統,建立訂單和傳送訊息不允許乙個成功而另乙個失敗。

這就是事務問題。

單體關係型資料庫都完整的實現acid,但對分布式系統

分布式系統在保證可用性和不嚴重犧牲效能的前提下,要實現資料一致性非常困難,所以出現很多「殘血版」一致性,比如順序一致性、最終一致性。

所以分布式事務更多是在分布式系統中事務的不完整實現。在不同場景有不同實現,都是通過一些妥協解決問題。

常見分布式事務實現有2pc、tcc和事務訊息。

每種實現都有其特定的使用場景,也有各自問題,都不是完美方案。

主要是那些需要非同步更新資料,並且對資料實時性要求不高。

比如在建立訂單後,如果出現短暫幾秒,購物車商品沒被及時清空,也不是完全不可接受,只要最終購物車的資料和訂單資料保持一致。

事務訊息需要mq提供相應功能才能實現,kafka和rocketmq都提供事務相關功能。

第二步傳送半訊息第三步建立訂單,這2個順序反一下是等價的,即先建立訂單在傳送半訊息。

半訊息並非訊息內容不完整,包含的就是完整的訊息內容。

半訊息發成功後,訂單系統就可執行本地事務:

在訂單庫建立一條訂單記錄,並提交訂單庫的資料庫事務。

然後根據本地事務執行結果決定提交或者回滾事務訊息。

這就基本實現「都成功/失敗」的一致性要求。

但這實現過程,有個問題沒有解決:如果在第4步提交事務訊息時失敗怎麼辦?

kafka和rocketmq給了不同解決方案。

在我們這裡例子裡面,本地事務就是建立訂單這個資料庫事務。

利用資料庫的事務訊息表。

把訊息資訊的快照和對業務資料的操作作為資料庫事務運算元據庫,操作成功後從資料庫讀取訊息資訊傳送給broker,收到傳送成功的回執後刪除資料庫中的訊息快照。我個人覺得這種方案在不支援半訊息的佇列方案裡也是一種選擇,不知道您覺得這種實現方案有沒有什麼問題。

如果有個生產者和消費者都可訪問,並且效能還不錯的資料庫,肯定使用這個資料庫實現事務較好。

然而大部分事務訊息使用的場景是

如果先建立訂單,當前服務由於不可抗拒因素不能正常工作,沒給購物車系統傳送訊息,這種情況加就會出現:訂單已建立且購物車沒有清空。

而傳送半訊息,可通過定期查詢事務狀態然後根據然後具體的業務回滾操作或者重新傳送訊息(保持業務的冪等性)。

可以採用狀態機的方式

訊息資料唯一鍵+redis setnx來保障

本地訊息表,要確保插入本地訊息表和執行訊息消費業務在同一事務裡

rocketmq事務實現增加了事務反查機制來解決事務訊息提交失敗的問題。

如果producer(即訂單模組),在提交或回滾事務訊息時發生網路異常,broker沒有收到提交或回滾請求,broker會定期去producer反查該事務對應的本地事務的狀態,然後根據反查結果決定提交或者回滾該事務。

要支援事務反查機制,業務**需實現乙個反查本地事務狀態的介面,告知rocketmq本地事務是成功還是失敗。

反查介面的定義,它檢查的是本地事務(在我們這個例子裡面就是資料庫事務)有沒有執行成功,並不比較資料是否一致。

該例中反查本地事務邏輯簡單,只要根據訊息中訂單id,在訂單庫中查詢該訂單是否存在,若訂單存在則返回成功,否則返回失敗。

rocketmq會自動根據事務反查的結果提交或者回滾事務訊息。

反查本地事務的實現並不依賴訊息的傳送方,即訂單服務的某節點的任何資料。

這種情況下,即使傳送事務訊息的訂單服務節點宕機,rocketmq依然可通過其他訂單服務節點執行反查,確保事務完整性。

如果本地事務提交失敗,已發出去的訊息是無法撤回的,會導致資料不一致。

因為消費失敗,會自動重試,所以不會丟訊息,但可能重複消費。

如果發布者本地事務執行太久還沒執行完,訊息中心就來回查是不是有問題,所以應可以把發訊息放本地事務的後面吧,另外次數定義也是經驗值吧

反查一般是定乙個事務超時時間,超時之前會不定期回查。

**實現訂單下單:

首先通過producer.sendmessageintransaction()方法發半訊息給mq

此時會在transactionlistener中的executelocaltransaction()方法阻塞,然後在這個方法裡面進行訂單建立並提交本地事務

如果commit成功,則返回commit狀態

否則是rollback狀態,如果正常返回commit或者rollback的話,不會存在第3步的反查情況。

如果上面的本地事務提交成功以後,此節點突然斷電,那麼checklocaltransaction()反查方法就會在某個時候被mq呼叫,此方法會根據訊息中的訂單號去資料庫確認訂單是否存在,存在就返回commit狀態,否則是rollback狀態。

購物車在另一模組,只要收到mq訊息就將本次訂單的商品從購物車中刪除即可。

rocketmq實現分布式事務,使用兩階段提交,和mysql寫redo log和binlog日誌的兩階段提交類似。以訂單為例

訊息對消費者不可見,將其訊息的主題topic和佇列id修改為half topic,原先的主題和佇列id也做為訊息的屬性,如果事務提交或者回滾會將其訊息的佇列改為原先的佇列。rocketmq開啟任務,從half topic中獲取訊息,呼叫其中的生產者的監聽進行回查是否提交回滾。

rocketmq採用commitlog存放訊息,消費者使用consumequeue二級索引從commitlog獲取訊息實體內容。

理解index file:indexfile的作用就是給commitlog做的索引,提公升讀取訊息時的查詢效率。

回查借助op topic進行獲取到half訊息進行後續的回查操作。

rocketmq事務反查機制通過定期反查事務狀態,來補償提交事務訊息可能出現的通訊失敗。

在kafka的事務功能中,並沒有類似的反查機制,需要使用者自行去解決這個問題。

但不代表rocketmq的事務功能比kafka更好,只能說在該例場景,rocketmq更適合。

kafka對事務的定義、實現和適用場景,和rocketmq有較大差異。

訊息佇列 訊息佇列

輪詢排程 一次性分發所有訊息,保證訊息平均分配,不管消費者是否能正常消費 訊息應答 保證消費端能確實消費,不丟失 公平 乙個乙個分發所有訊息,在保證分發到的執行緒確認回覆後,才分發下個訊息給下個空閒的消費者,訊息持久化 保證佇列中的訊息不丟失,包括3要素 交換器 訊息佇列 訊息都必須宣告持久化 發布...

訊息佇列的使用

剛開始看的時候,由兩個疑問,我自己的答案是這樣的 1.訊息佇列在系統中的最大個數,關於這個問題,書上有明確的答案 書上有個 列明了linux free bsd,mac os x solaris中的典型值。當然也可以通過一些手段來修改。sysctl就可以修改。2.在多個執行緒 或程序 同時對乙個訊息佇...

訊息佇列 訊息佇列 kafka

kafka是乙個分布式的基於發布 訂閱模式的訊息佇列,主要用於大資料實時處理領域。要理解kafka首先要有分布式的概念,要有訊息佇列的概念。分布式系統最大的優勢就是解耦和削峰,這種情況下,a系統生成了乙個訊息,b系統非同步獲取,那麼就需要乙個存放訊息的訊息佇列 mq 相比較傳統的訊息佇列,訊息被消費...