9 ACK消費者訊息確認機制

2021-10-05 15:42:09 字數 4679 閱讀 7269

官方文件

部落格rabbitmq版本:3.8.3

amqp-client版本:5.7.1

由於在rabbitmq中,傳送出去的訊息不能完全保證能夠被消費者接收到,因此需要一種消費者訊息確認機制,來為訊息從rabbitmq節點到消費者的可靠傳遞提供支援。

當消費者向rabbitmq註冊之後,rabbitmq將通過basic.deliver方法投遞訊息給消費者。在每一次投遞的訊息體上都會攜帶乙個delivery tag,這個值在每隔通道上是唯一的,用來標識本次投遞。

delivery tag值是單調遞增的正整數,64位長度,值從1開始,每傳送一次訊息值遞增1,最大值為9223372036854775807

消費者在收到訊息後,向rabbitmq傳送應答訊息時,帶上這個delivery tag,告訴rabbitmq某次訊息投遞已經成功接收。

對於消費者訊息確認機制,rabbitmq提供了兩種確認方式:

rabbitmq成功將訊息傳送出去(將訊息成功寫入tcp socket)之後立即認為本次訊息投遞已經成功,不管消費者端是否成功接收到訊息並處理了本次訊息投遞。

這種訊息被傳送出去立刻被認為是成功投遞的,又被稱作fire-and-forget

優點:

缺點:

消費者受到訊息之後,手動呼叫basic.ackbasic.nackbasic.reject方法,傳送應答訊息,當rabbitmq受到應答訊息之後,才認為本次投遞成功。

三個函式的含義:

注意:

basic.rejectbasic.nack都是用於否定確認,但是多了乙個限制:一次只能拒絕單條訊息。

以上三個方法都表示訊息已經被正確投遞。但是basic.ack表示訊息已經被正確處理,basic.nackbasic.reject表示訊息沒有被正確處理,rabbitmq中仍然需要刪除這條訊息。

手動確認方式的訊息投遞效率低於自動確認方式,但是能夠彌補自動確認方式的不足。

例子:

// 下面的**片段演示了在消費者端如何手動確認訊息投遞

boolean autoack =

false

;channel.

basicconsume

(queuename, autoack,

"a-consumer-tag"

,new

defaultconsumer

(channel)})

;

手動確認方式也支援批量確認,這樣可以顯著減少網路流量。

批量確認是通過將multiple引數設定為true來實現的。這樣將會對deliverytag以及比deliverytag值小的訊息一同批量確認。

void

basicack

(long deliverytag,

boolean multiple)

throws ioexception;

例子:

// 下面的**片段演示了在消費者端如何手動確認訊息投遞,批量確認

boolean autoack =

false

;channel.

basicconsume

(queuename, autoack,

"a-consumer-tag"

,new

defaultconsumer

(channel)})

;

有時候消費者不能立即處理訊息,但是有其他消費者例項可以處理這個訊息。在這種情況下,可以通過basic.nackbasic.reject否定確認訊息。

這兩個方法是來告訴rabbitmq,訊息我們收到了,但是沒有處理它。當rabbitmq收到這樣的應答訊息之後,可以選擇刪除訊息也可以選擇重新投遞。這個行為由requeue引數來控制,如果requeue設定為true,那麼rabbitmq將會重新投遞這個訊息。

void

basicreject

(long deliverytag,

boolean requeue)

throws ioexception;

例子:

// 下面的例子用來說明消費者如何否定確認訊息

boolean autoack =

false

;channel.

basicconsume

(queuename, autoack,

"a-consumer-tag"

,new

defaultconsumer

(channel)})

;

使用basic.reject只能否定確認單個訊息。但是,使用basic.nack可以批量否定確認多條訊息。

批量否定確認也是通過將引數m設定為true來實現的。

void

basicnack

(long deliverytag,

boolean multiple,

boolean requeue)

throws ioexception;

例子:

// 下面的例子用來說明消費者如何批量否定確認訊息

boolean autoack =

false

;channel.

basicconsume

(queuename, autoack,

"a-consumer-tag"

,new

defaultconsumer

(channel)})

;

為了保證消費者能夠正確地處理投遞的訊息,還可以採用下面的這些措施。

只能在手動確認模式中使用。

為了避免消費者一次不能處理過多的訊息導致訊息堆積,可以通過basic.qos設定最大的預取值。該值定義了通道上允許的最大未確認訊息的數量,一旦未確認訊息的數量達到配置值,rabbitmq將停止在通道上傳送更多訊息,直到至少有乙個未被確認的訊息被確認。

備註:通道預取設定在basic.get(「plapi」)中是不啟作用的,即使在手動確認模式中。

rabbitmq中影響吞吐量最大的引數是訊息確認模式和qos預取值。

自動訊息確認模式或設定qos預取值為無限雖然可以最大的提高訊息的投遞速度,但是在消費者端未及時處理的訊息的數量也將增加,從而增加消費者端記憶體的消耗,很可能使消費者端崩潰。所以以上兩種情況需要謹慎使用。

rabbitmq官方推薦qos預取值設定在100300範圍內的值,通常能夠提供最佳的吞吐量,並且不會有使消費者奔潰的問題。

在訊息手動確認模式中,如果投遞訊息的所有通道或連線被突然關閉(包括消費者端丟失tcp連線、消費者應用程式(程序)掛掉、通道級別的協議異常),任何已經投遞的但是沒有被消費者端確認的訊息會自動重新排隊,等待重新被投遞。

請注意,檢測消費者端的連線不可用需要一段時間才會發現,所以會有一段時間內的所有訊息被重新投遞。

因為訊息可能被重新投遞,所以必須保證消費者端的介面的冪等性。

如果客戶端對同乙個delivery tag標記的訊息進行了多次ack確認,這將會導致rabbitmq通道上的乙個錯誤,例如precondition_failed - unknown delivery tag 100。如果使用了未知的tag,將引發相同的通道異常。

注意,必須在同乙個通道上對訊息進行確認。如果在另乙個通道上對訊息進行了確認,將會導致unknown delivery tag的錯誤。

對於不可路由的訊息,一旦exchange驗證訊息不會被路由到任何佇列,這條訊息將會由**來確認。

消費者手動ack

1.在這裡不提如何整合rabbit mq到spring。2.實現功能的配置都在消費者端 3.下面是步驟和說明 1 在消費者端的mq配置檔案上新增,配置 關鍵 為 acknowledeg manual 意為表示該消費者的ack方式為手動 此時的queue已經和生產者的exchange通過某個route...

RabbitMQ中的訊息確認ACK機制

我們將訊息持久化後,假如消費端出現異常,rabbitmq伺服器會將訊息快取到記憶體,當生產者傳送一直傳送訊息而消費者都沒有正常消費時訊息就會將這些訊息全部儲存在記憶體,當我們的訊息過多時,就可能導致rabbitmq伺服器記憶體洩漏,解決辦法 1.開啟ack確認機制,2.消費端設定重試機制 預設是三次...

rabbitMQ 消費者ack機制與拉取模式

public static void main string args throws exception以下三種根據實際情況使用 long deliverytag envelope.getdeliverytag 訊息序列號 boolean multiple false 是否多條訊息 boolean ...