解決RabbitMQ訊息丟失與重複消費問題

2021-08-21 02:46:14 字數 2329 閱讀 5500

最近使用者反饋提交的sql查詢一直處於長時間等待狀態,經過排查觀察,發現部分查詢請求丟失,導致使用者提交的查詢未被正常接收,繼而長時間無響應。

現象:即使sql控制台提交10個簡單sql查詢 -> 訊息傳送方:傳送10條訊息至訊息佇列 -> 訊息消費方:只消費了7條訊息

訊息佇列:rabbitmq

消費者:python

結論:訊息傳送正常

排查步驟:檢視log

結論:訊息數量正常

診斷步驟:

執行機安裝rabbitmq-dump-queue外掛程式,用於dump佇列的訊息;

1. 執行機:停止服務;

2. 使用者:提交10個sql查詢:

3. 傳送方:檢視web服務端的輸出日誌,確定10個訊息已經往訊息佇列寫;

4. 執行機:通過rabbitmq-dump-queue檢視佇列的訊息,確認是正常10個訊息寫入;

watch -n 1

'$gopath/src/rabbitmq-dump-queue/rabbitmq-dump-queue -uri="amqp://guest:guest@***xx:5672" -queue ph_open_task'

5. 執行機:啟動服務,訊息佇列中的訊息全部被接收;**邏輯:

try:

pool = pool(processes=40)

def callback(ch, method, properties, body):

try:

dosomething...

except exception as e:

print traceback.format_exc()

logger_msg.info(traceback.format_exc())

finally:

// 這裡會有問題,即使訊息未被處理也會反饋ack給rabbitmq

ch.basic_ack(delivery_tag=method.delivery_tag)

while

true:

try:

connection = pika.blockingconnection(

pika.connectionparameters(host='******xx'))

channel = connection.channel()

channel.queue_declare(queue=queue_name, durable=true)

channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback, queue=queue_name, no_ack=false)

channel.start_consuming()

except pika.exceptions.connectionclosed as e:

continue

except exception as e:

logger_msg.info(traceback.format_exc())

finally:

channel.basic_ack(delivery_tag=method.delivery_tag)

pool.close

()pool.join

()

結論:本例中消費者主程序將持續監聽mq,一旦mq有訊息將會拉取,隨後從程序池中啟動子程序來處理訊息,但是從程序池啟動子程序的過程並不一定成功(若當前程序池沒有空閒子程序),而主程序不管任何情況下都給mq傳送ack狀態碼,從而mq將未處理的訊息移除掉,導致訊息丟失

問題是在消費者環節產生,因此對消費者做改動,需要調整消費者的架構:

目前方案的問題以及解決方案:

RabbitMQ防止訊息丟失

rabbitmq中,訊息丟失可以簡單的分為兩種 客戶端丟失和服務端丟失。針對這兩種訊息丟失,rabbitmq都給出了相應的解決方案。回到目錄 如圖,生產者p向佇列中生產訊息,c1和c2消費佇列中的訊息,預設情況下,rabbitmq會平均的分發消費給c1c2 round robin dispatchi...

RabbitMQ防止訊息丟失

rabbitmq一般情況很少丟失,但是不能排除意外,為了保證系統高可用,我們必須作出更好完善措施,保證系統的穩定性。1.訊息持久化 2.ack確認機制 3.設定集群映象模式 4.訊息補償機制 第一種 訊息持久化 rabbitmq 的訊息預設存放在記憶體上面,如果不特別宣告設定,訊息不會持久化儲存到硬...

rabbitmq 重複ACK導致訊息丟失

背景 rabbitmq 在應用場景中,大多採用工作佇列 work queue的模式。在乙個常見的工作佇列模式中,消費者 worker 將不斷的輪詢從佇列中拉取最新訊息,當佇列負載壓力增大時允許新增多個worker 進行處理。然而執行乙個任務可能需要相當的時長,這是由業務特性所決定的 如果 worke...