Publisher的訊息確認機制

2021-08-13 18:35:45 字數 2744 閱讀 6244

在前面的文章中提到了queue和consumer之間的訊息確認機制:通過設定ack。那麼publisher能不到知道他post的message有沒有到達queue,甚至更近一步,是否被某個consumer處理呢?畢竟對於一些非常重要的資料,可能publisher需要確認某個訊息已經被正確處理。

在我們的系統中,我們沒有是實現這種確認,也就是說,不管message是否被consume了,publisher不會去care。他只是將自己的狀態publish給上層,由上層的邏輯去處理。如果message沒有被正確處理,可能會導致某些狀態丟失。但是由於提供了其他強制重新整理全部狀態的機制,因此這種異常情況的影響也就可以忽略不計了。

對於某些非同步操作,比如客戶端需要建立乙個filesystem,這個可能需要比較長的時間,甚至要數秒鐘。這時候通過rpc可以解決這個問題。因此也就不存在publisher端的確認機制了。

那麼,有沒有一種機制能保證publisher能夠感知它的message有沒有被處理的?答案肯定的。在這裡感謝笑天居士同學:他在我的《

rabbitmq訊息佇列(三):任務分發機制

如果採用標準的 amqp 協議,則唯一能夠保證訊息不會丟失的方式是利用事務機制 -- 令 channel 處於 transactional 模式、向其 publish 訊息、執行 commit 動作。在這種方式下,事務機制會帶來大量的多餘開銷,並會導致吞吐量下降 250% 。為了補救事務帶來的問題,引入了 confirmation 機制(即 publisher confirm)。

為了使能 confirm 機制,client 首先要傳送 confirm.select 方法幀。取決於是否設定了 no-wait 屬性,broker 會相應的判定是否以 confirm.select-ok 進行應答。一旦在 channel 上使用 confirm.select方法,channel 就將處於 confirm 模式。處於 transactional 模式的 channel 不能再被設定成 confirm 模式,反之亦然。

一旦 channel 處於 confirm 模式,broker 和 client 都將啟動訊息計數(以 

confirm.select

為基礎從 1 開始計數

)。broker 會在處理完訊息後,在當前 channel 上通過傳送 basic.ack 的方式對其進行 confirm 。

delivery-tag 域的值標識了被 confirm 訊息的序列號。broker 也可以通過設定 

basic.ack 中的 

multiple 域

來表明到指定序列號為止的所有訊息都已被 broker 正確的處理了。

在異常情況中,broker 將無法成功處理相應的訊息,此時 broker 將傳送 

basic.nack 來代替 

basic.ack 。在這個情形下,

basic.nack

中各域值的含義與 

basic.ack 中相應各域含義是相同的,同時 requeue 域的值應該被忽略。

通過nack 一或多條訊息

,broker 表明自身無法對相應訊息完成處理,並拒絕為這些訊息的處理負責。在這種情況下,client 可以選擇將訊息 re-publish 。

在 channel 被設定成 confirm 模式之後,所有被 publish 的後續訊息都將被 confirm(即 ack) 或者被 nack 一次。但是沒有對訊息被 confirm 的快慢做任何保證,並且同一條訊息不會既被 confirm 又被 nack 。

broker 將在下面的情況中對訊息進行 confirm :

broker 會丟失持久化訊息,如果 broker 在將上述訊息寫入磁碟前異常。在一定條件下,這種情況會導致 broker 以一種奇怪的方式執行。

例如,考慮下述情景:

1.  乙個 client 將持久訊息 publish 到持久 queue 中

2.  另乙個 client 從 queue 中 consume 訊息(注意:該訊息具有持久屬性,並且 queue 是持久化的),當尚未對其進行 ack

3.  broker 異常重啟

4.  client 重連並開始 consume 訊息

在上述情景下,client 有理由認為訊息需要被(broker)重新 deliver 。但這並非事實:重啟(有可能)會令 broker 丟失訊息。為了確保永續性,client 應該使用 confirm 機制。如果 publisher 使用的 channel 被設定為 confirm 模式,publisher 將不會收到已丟失訊息的 ack(這是因為 consumer 沒有對訊息進行 ack ,同時該訊息也未被寫入磁碟)。

首先要區別amqp協議mandatory和immediate標誌位的作用。

mandatory和immediate是amqp協議中basic.pulish方法中的兩個標誌位,它們都有當訊息傳遞過程中不可達目的地時將訊息返回給生產者的功能。具體區別在於:

1. mandatory標誌位

當mandatory標誌位設定為true時,如果exchange根據自身型別和訊息routekey無法找到乙個符合條件的queue,那麼會呼叫basic.return方法將訊息返還給生產者;當mandatory設為false時,出現上述情形broker會直接將訊息扔掉。

2. immediate標誌位

當immediate標誌位設定為true時,如果exchange在將訊息route到queue(s)時發現對應的queue上沒有消費者,那麼這條訊息不會放入佇列中。當與訊息routekey關聯的所有queue(乙個或多個)都沒有消費者時,該訊息會通過basic.return方法返還給生產者。

具體的**參考請參考參考資料1.

RabbitMQ之訊息確認 AMQP 事務機制

注意,此事務非資料庫的事務概念!將channel設定成事務模式 channel.txselect 提交事務 channel.txcommit 事務回滾 channel.txrollback 當訊息的發布者在將訊息傳送出去之後,訊息到底有沒有正確到達broker 伺服器呢?如果不進行特殊配置的話,預設...

訊息確認機制

每一次提交和請求的時候都會降低吞吐量 少用 三種模式 txselect 用於將當前channel設定為 事務 transation模式 txcmmit 提交事務 txrollback 回滾事務 用例 生產者 消費者 生產者confirm模式的原理 該模式的好處是什麼?該模式是非同步的,能提高吞吐量。...

ActiveMQ 訊息確認

一 事務性會話 當乙個事務被提交的時候,確認自動發生 connectionfactory connectionfactory new activemqconnectionfactory tcp connection connection connectionfactory.createconnect...