rabbitmq 學習 之 消費者確認機制(6)

2021-09-24 11:50:43 字數 2723 閱讀 5120

在rabbitmq中,即使將queue,exchange, message等都設定了持久化之後,還是不能保證100%保證資料不丟失了。為了實現訊息不丟失,我們需要從consumer端和productor端同時進行處理。本篇文章先介紹consumer端,在ampq-0-9-1中有定義從消費者到rabbitmq的訊息確認機制,通過此機制可以保證訊息能夠從rabbitmq正確到達消費者端。

在消費者端確認的方式

rabbitmq中的兩種確認方式:

1 自動確認方式:rabbitmq成功將訊息發出(即將訊息成功寫入tcp socket)中立即認為本次投遞已經被正確處理,不管消費者端是否成功處理本次投遞

在自動確認模式下,訊息傳送後即被認為成功投遞,又稱為」fire-and-forget」 

優點:這種模式下吞吐量非常高。 

缺點:a. 有可能出現投遞丟失的情況,不同於手動確認模式,如果消費者的tcp連線或通道在訊息成功互動之前關閉,則此訊息會丟失 b. 消費者端過載的問題。在手動確認模式中,可以設定一次最多同時處理多少訊息,而自動模式不能設定此值。因此,消費者有可能因為訊息無法及時處理,堆積中記憶體中,記憶體耗盡而奔潰 c. 此種模式只推薦在消費者可以快速且穩定處理投遞的訊息的場景中使用

2 手動處理方式:消費者收到訊息後,手動呼叫basic.ack/basic.nack/basic.reject後,rabbitmq收到這些訊息後,才認為本次投遞成功

手動訊息確認方法有: 

§ basic.ack用於肯定確認 

§ basic.nack用於否定確認(注意:這是amqp 0-9-1的rabbitmq擴充套件) 

§ basic.reject用於否定確認,但與basic.nack相比有乙個限制:一次只能拒絕單條訊息 

消費者端以上的3個方法都表示訊息已經被正確投遞,但是basic.ack表示訊息已經被正確處理,但是basic.nack,basic.reject表示沒有被正確處理,但是rabbitmq中仍然需要刪除這條訊息。 

手動的確認模式的投遞效率略低於自動,但是可以彌補自動確認模式的不足。

批量手動投遞確認

訊息手動除了一次確認一條,也可以一次確認多條。為了減少網路流量,可以批量手動確認。在應答時,設定basic.nack的multiple 欄位為true,可以同時對delivery_tag和比delivery_tag值小的投遞訊息進行確認 

例如,假設在通道上沒有確認訊息的delivery_tag是5,6,7和8,當basic.nack中delivery_tag被設定為8並且multiple 被設定為true時,方法執行成功後,從5到8的所有訊息將被確認。 如果multiple 設定為false,那麼交貨5,6和7仍然是未確認的。

投遞唯一碼: delivery tags

當消費者向rabbitmq註冊後,rabbitmq使用basic.deliver向消費者投遞訊息時,訊息體上會帶上delivery tag,這個值會唯一標識本次投遞,在同一通道上,此值是唯一的。delivery tag值有64位長度,值從1開始,每傳送一次訊息值遞增1,最大值為9223372036854775807。消費者端在應答訊息時,帶上此引數,告訴rabbitmq某次投遞已經正確應答。

消費者**:

a. channel.basicconsume設定接收非自動確認 

b. 在處理完訊息後,呼叫channel.basicack進行手動訊息確認

// 預設消費者實現

consumer consumer = new defaultconsumer(channel)

};// 接收訊息:設定非自動確認

channel.basicconsume(queue_name, false, consumer);

1 其他消費者端的為了保證正確處理資料的機制有哪些?在消費者端為了正確處理資料,還可以設定qos和投遞失敗

通道預取設定(channel prefetch setting (qos))

只能在訊息手動確認模式中啟作用。 

為了避免消費者端一次同時處理過多的訊息,可以通過basic.qos設定最大的預取值。該值定義了通道上允許的最大未確認訊息,一旦未確認訊息的數量達到配置值,rabbitmq將停止在通道上傳送更多訊息,直到至少有乙個未被確認的訊息被確認。 

備註:通道預取設定在basic.get (「pull api」)中是不啟作用,即使在訊息手動確認模式中

2 在rabbitmq中影響吞吐量最大的引數是 什麼? 消費者確認模式,預取和吞吐量

自動訊息確認模式或設定qos預取值為無限雖然可以最大的提高訊息的投遞速度,但是在消費者端未及時處理的訊息的數量也將增加,從而增加消費者ram消耗,使用消費者端奔潰。所以以上兩種情況需要謹慎使用。

rabbitmq官方推薦qos預取值設定在 100到300範圍內的值通常提供最佳的吞吐量,並且不會有使消費者奔潰的問題

3 消費者失敗或失去連線時 訊息去**了?  重新放入佇列,自動重新排隊

在訊息手動確認模式中,如果發生以下情況投遞訊息所有的通道或連線被突然關閉(包括消費者端丟失tcp連線、消費者應用程式(程序)掛掉、通道級別的協議異常)任何已經投遞的訊息但是沒有被消費者端確認的訊息會自動重新排隊。 

請注意,連線檢測不可用客戶端需要一段時間才會發現,所以會有一段時間內的所有訊息會重新投遞 

因為訊息的可能重新投遞,所有必須保證消費者端的介面的冪等。

4 多次確認和對未知delivery_tag進行確認的結果是什麼樣的?

如果消費者對同一delivery_tag進行多次確認,則丟擲通道異常precondition_failed。如果對 

未知的delivery_tag進行確認,也會丟擲通道異常。

rabbitmq直連模式 消費者

pom org.springframework.boot spring boot starter amqp org.springframework.boot spring boot starter web org.springframework.boot spring boot starter te...

RabbitMQ生產者消費者模型

生產者 mport pika connection pika.blockingconnection pika.connectionparameters host 127.0.0.1 建立乙個例項 channel connection.channel 宣告乙個管道 channel.queue decl...

rabbit mq消費者怎麼限制 速度

如果併發數量很高,那麼這個時候佇列中就會有很多訊息等待處理,如果不限制消費者的拉取數量,消費者就會每秒拉取很多的訊息,最後還是會達到乙個很高的併發數,消費者伺服器照樣存在崩潰的可能性。使用前提 消費者採用的是手動確認模式 修改配置檔案,這裡使用的是yml格式 spring rabbitmq list...