RabbitMQ 消費端的限流策略

2021-09-11 15:01:58 字數 2058 閱讀 1080

假設乙個場景,由於我們的消費端突然全部不可用了,導致 rabbitmq 伺服器上有上萬條未處理的訊息,這時候如果沒做任何現在,隨便開啟乙個消費端客戶端,就會導致巨量的訊息瞬間全部推送過來,但是我們單個客戶端無法同時處理這麼多的資料,就會導致消費端變得巨卡,有可能直接崩潰不可用了。所以在實際生產中,限流保護是很重要的。

rabbitmq 提供了一種 qos (服務質量保證)功能,即在非自動確認訊息的前提下,如果一定數目的訊息(通過基於 consume 或者 channel 設定 qos 的值)未被確認前,不進行消費新的訊息。關鍵**就是在宣告消費者**裡面的

void basicqos(unit prefetchsize , ushort prefetchcount, bool global )

複製**

prefetchsize:0

prefetchcount:會告訴 rabbitmq 不要同時給乙個消費者推送多於 n 個訊息,即一旦有 n 個訊息還沒有 ack,則該 consumer 將 block 掉,直到有訊息 ack

global:true、false 是否將上面設定應用於 channel,簡單點說,就是上面限制是 channel 級別的還是 consumer 級別

備註:prefetchsize 和 global 這兩項,rabbitmq 沒有實現,暫且不研究。特別注意一點,prefetchcount 在 no_ask=false 的情況下才生效,即在自動應答的情況下這兩個值是不生效的。

**演示:

複製**

生產端**基本沒變化,改了 exchange 和 routingkey 而已

public class procuder ", msg + i);

channel.basicpublish(consumer.exchange_name, consumer.routing_key, true, null, (msg + i).getbytes());

} }}

複製**

消費端**需要修改一下autoack 設定為 false **

增加 ** channel.basicqos(0, 1, false);

完整的消費端**如下

/**

* 使用自定義消費者

*/public class consumer

}複製**

自定義消費者

public class myconsumer extends defaultconsumer 

@override

public void handledelivery(string consumertag, //消費者標籤

envelope envelope,

amqp.basicproperties properties,

byte body) throws ioexception

}複製**

然後啟動消費端,上管控臺檢視 test_qos_exchange 和 test_qos_queue 是否生成了

確認 test_qos_exchange 上繫結了 test_qos_queue

啟動生產端傳送 5 條訊息

發現消費端只列印了一條訊息

從管控台上也看到總共 5 條訊息,有 4 條等待著,一條消費了但是沒有 ack 回去

修改自定義消費者裡面的**,如下所示

public class myconsumer extends defaultconsumer 

@override

public void handledelivery(string consumertag, //消費者標籤

envelope envelope,

amqp.basicproperties properties,

byte body) throws ioexception

}複製**

重啟消費端,看到消費端就按照一條一條消費,並且 ack 回去了

如上所示就是簡單的rabbitmq消費端的限流策略

消費端限流

什麼是消費端的限流 假設乙個場景,首先我們rabbitmq伺服器上面有上萬條沒有處理的訊息,我們隨便開啟乙個消費者客戶端,會出現下面情況 巨量訊息瞬間全部推送過來,但是我們當個客戶端沒有辦法進行處理這麼多的資料,可能會造成伺服器宕機。rabbitmq提供一種qos 服務質量保證 功能,即在非自動確認...

大廠如何用RabbitMQ做消費端限流

假設rabbitmq伺服器有上萬條未處理訊息,隨便開啟乙個消費端,會造成巨量訊息瞬間全部推送過來,然而我們單個客戶端無法同時處理這麼多資料。還比如說單個pro一分鐘產生了幾百條資料,但是單個con一分鐘可能只能處理60條,這時pro con不平衡。通常pro沒辦法做限制,所以con就需要做一些限流措...

消費端ACK和消費端限流

rabbitmq提供了一種qos 服務質量保證 功能,即在非自動確認訊息的前提下,如果一定數目的訊息 通過consumer或者channel設定qos的值 未被確認前,不進行消費新的訊息.自動簽收要設定成false,建議實際工作中也設定成false void basicqos int prefetc...