RabbitMQ系列 如何保證訊息的可靠傳輸

2022-03-23 09:00:33 字數 3322 閱讀 8585

訊息的可靠投遞除了需要硬體,網路,訊息中介軟體等的可靠保證外,還需要生產者,消費者來共同保證來完成。一條訊息從生產者產生,到傳送到交換機,並被投遞到佇列,並最終被消費者消費,這整個路徑上,途徑的每乙個地方都要保證訊息的可靠性。

其實,官方文件reliability guide已經總結了訊息系統安全的方方面面。

除了上面這幾點外,還需要下面一系列的方法來共同保證訊息的可靠傳輸。

官方文件:data safety on the broker side

將交換機、佇列和訊息設為durable

spring**中,交換機和佇列預設都是持久化的

需要將訊息的投遞模式(delivery_mode)設定為2(也就是持久化)。

當我們使用rabbittemplate呼叫了convertandsend(string exchange, string routingkey, final object object) 方法。預設就是持久化模式。

注意:但要注意的是,

將所有的訊息都設定為持久化,會嚴重影響rabbitmq的效能,寫入硬碟的速度比寫入記憶體的速度慢的不只一點點。對於可靠性不是那麼高的訊息可以不採用持久化處理以提高整體的吞吐率,在選擇是否要將訊息持久化時,需要在可靠性和吞吐量之間做乙個權衡。 

在某種應用場景,如大流量的訂單交易系統,為了不影響效能,我們可以不設定持久化,但是我們會定時掃瞄資料庫中的未傳送成功的訊息,進行重試傳送,實際應用場景,我們其實有很多解決方案。

當訊息傳送出去之後,我們如何知道訊息有沒有正確到達exchange呢?如果在這個過程中,訊息丟失了,我們根本不知道發生了什麼,也不知道是什麼原因導致訊息傳送失敗了,為解決這個問題,主要有如下兩種方案:

但是使用事務機制實現會嚴重降低rabbitmq的訊息吞吐量,我們採用一種輕量級的方案——生產者訊息確認機制。

訊息確認機制就是生產者傳送的訊息一旦被投遞到匹配的佇列之後,交換機就會傳送乙個確認訊息給生產者,生產者就知曉訊息已經正確到達了目的地。 如果訊息和佇列是持久化儲存的,那麼確認訊息會在訊息寫入磁碟之後發出。

具體實現:通過實現confirmcallback介面,訊息傳送到exchange後觸發**。

rabbittemplate.setconfirmcallback(new

rabbittemplate.confirmcallback() ", cause);} }

});

這裡僅僅是記錄了日誌檔案,後續還要對訊息進行重試傳送,可以根據業務的需要使用立即重試或者在未來的某個時間點重試。

重試機制

當然,使用生產者訊息確認機制需要考慮的另外乙個問題是由於網路斷開等原因導致生產者收不到ack確認,那麼對於生產者來說可能會有兩種結果:

對於後者,一種可能的處理方式是在生產者確認**方法中去驗證(查詢)一下消費者對應的業務是否對該訊息進行了處理(根據訊息對應的業務的id),但很顯然這樣是非常不合理的。因為消費者可能有多個,需要一一去驗證,同時這也與引入訊息佇列實現生產者和消費者的解耦相悖。所以最合理的處理方式就是生產者進行重試,但消費者要進行冪等處理了。可以參考rocketmq訊息冪等處理。

對於立即重試,因為可能是網路故障,所以依然於事無補,所以可以指定一定的重試次數,或者可以同時調整重試時間間隔(1,2,4,8……)。對於後者,可以將未成功傳送的訊息記錄到資料庫日誌表,然後使用乙個定時器去定時掃瞄日誌表,然後呼叫生產者重新傳送訊息。

具體實現如下:

rabbittemplate有乙個確認**介面,該介面中有個confirm方法。只需要實現該**介面,並在confirm方法中判斷是否收到確認,如果未收到確認,則記錄日誌(訊息id,失敗原因),然後將訊息id和訊息內容儲存到redis,然後啟動乙個定時任務來重發訊息。

public

class publishconfirmcallback implements

rabbittemplate.confirmcallback ] cause [{}]", correlationdata.getid(), cause);

//todo : add id and info to redis; schedule a job to resend message;

//todo : 將id和資訊加入redis,啟動定時任務來重發訊息

} }

}

對於生產者重試的實現可以參考部落格:rabbitmq可靠傳送的自動重試機制

為了防止訊息丟失,以及對訊息收發進行有效跟蹤,需要在傳送訊息的時候記錄,訊息接收處理後刪除。可以在發訊息的時候,同時往redis或資料庫裡面同時寫乙份。

在訊息傳送和接收時記錄db日誌,定時輪詢db日誌,查明哪些傳送訊息沒有成功消費,啟動重新傳送訊息機制。

補充乙個mandatory引數。當mandatory引數設為true時,如果目的不可達,會將訊息返還給生產者,生產者通過乙個**函式可以獲取該資訊。

為了保證訊息從佇列可靠地到達消費者,rabbitmq提供了消費者訊息確認機制(message acknowledgement)。採用訊息確認機制之後,消費者就有足夠的時間來處理訊息,不用擔心處理訊息過程中消費者程序掛掉後訊息丟失的問題,因為rabbitmq會一直等待並持有訊息,直到消費者確認了該訊息。

參考部落格:spring-boot + rabbitmq訊息手動確認模式的幾點說明(重試機制)

官方文件:dead letter

dlx,dead letter exchange 的縮寫,又死信郵箱、死信交換機。dlx就是乙個普通的交換機,和一般的交換機沒有任何區別。 當訊息在乙個佇列中變成死信(dead message)時,通過這個交換機將死信傳送到死信佇列中(指定好相關引數,rabbitmq會自動傳送)。什麼是死信呢?什麼樣的訊息會變成死信呢?

在定義業務佇列的時可以考慮指定乙個死信交換機,並繫結乙個死信佇列,當訊息變成死信時該訊息就會被傳送到該死信佇列上,這樣就方便我們檢視訊息失敗的原因了。

定義業務(普通)佇列的時候指定引數:

@bean

public

queue helloqueue()

參考部落格:rabbitmq的集群模式

除了上面講的基本可靠性保證外,其實還有很多效能優化方案、可靠性保證方案:集群監控、流控、映象佇列、haproxy+keeplived高可靠負載均衡

參考:快取架構之史上講的最明白的rabbitmq可靠訊息傳輸實戰演練

RabbitMQ 如何保證訊息不丟失?

rabbitmq 如何保證訊息不丟失?rabbitmq一般情況很少丟失,但是不能排除意外,為了保證我們自己系統高可用,我們必須作出更好完善措施,保證系統的穩定性。下面來介紹下,如何保證訊息的絕對不丟失的問題,下面分享的絕對乾貨,都是在知名網際網路產品的產線中使用。1.訊息持久化 2.ack確認機制 ...

Rabbitmq如何保證訊息順序執行

訊息佇列中的若干訊息如果是對同乙個資料進行操作,這些操作具有前後的關係,必須要按前後的順序執行,否則就會造成資料異常。舉例 比如通過mysql binlog進行兩個資料庫的資料同步,由於對資料庫的資料操作是具有順序性的,如果操作順序搞反,就會造成不可估量的錯誤。比如資料庫對一條資料依次進行了 插入 ...

RabbitMQ如何保證訊息不丟失

rabbitmq 大致有三種場景會發生訊息丟失 1 consumer沒有接收到訊息,消費之前 2 consumer接收到訊息,訊息暫存記憶體,還未消費 3 consumer消費時 第一種丟失場景就像我剛剛到達便利店,還沒選好買什麼商品,這個時候便利店突然斷電,無法消費 這種場景下使用message ...