RabbitMQ 死信 延遲佇列

2021-10-23 13:02:31 字數 4309 閱讀 2941

死信訊息:

過期訊息:

在 rabbitmq 中存在2種方可設定訊息的過期時間,第一種通過對佇列進行設定,這種設定後,該佇列中所有的訊息都存在相同的過期時間,第二種通過對訊息本身進行設定,那麼每條訊息的過期時間都不一樣。如果同時使用這2種方法,那麼以過期時間小的那個數值為準。當訊息達到過期時間還沒有被消費,那麼那個訊息就成為了乙個死信訊息。

延時佇列:

在rabbitmq中不存在延時佇列,但是我們可以通過設定訊息的過期時間和死信佇列來模擬出延時佇列。消費者監聽死信交換器繫結的佇列,而不要監聽訊息傳送的佇列。

有了以上的基礎知識,我們完成以下需求:

需求:

使用者在系統中建立乙個訂單,如果超過時間使用者沒有進行支付,那麼自動取消訂單。

分析:

1、上面這個情況,我們就適合使用延時佇列來實現,那麼延時佇列如何建立

2、延時佇列可以由 過期訊息+死信佇列 來時間

3、過期訊息通過佇列中設定 x-message-ttl 引數實現

4、死信佇列通過在佇列申明時,給佇列設定x-dead-letter-exchange引數,然後另外申明乙個佇列繫結x-dead-letter-exchange對應的交換器

一.生產者

# utf-8

import time

import pika

host = 'localhost'

port = 5672

user = 'admin'

password = '123456'

connection = pika.blockingconnection(

pika.connectionparameters(host=host, port=port, credentials=pika.plaincredentials(user, password),

virtual_host="admin"))

channel = connection.channel()

# 過時訊息接收交換機

delay_exchange = 'delay_exchange'

# 過時訊息接收佇列

delay_queue = 'delay_queue'

# 正常交換機

exchange = 'test_exchange'

# 正常佇列

queue = 'test_queue'

# 設定過時佇列引數

arguments =

# 宣告收容交換機

channel.exchange_declare(exchange=exchange, exchange_type='fanout') # 宣告收容佇列

channel.queue_declare(queue=queue, durable=true, arguments=arguments)

# 收容佇列和收容交換機繫結

channel.queue_bind(exchange=exchange, queue=queue)

for i in range(1, 10):

severity = 'info'

message = '.format(i)}

channel.basic_publish(exchange=exchange,

routing_key=severity,

body=message)

print(" [x] sent %r:%r" % (severity, message))

time.sleep(2)

二.消費者

正常消費者

# utf-8

import pika

host = 'localhost'

port = 5672

user = 'admin'

password = '123456'

connection = pika.blockingconnection(

pika.connectionparameters(host=host, port=port, credentials=pika.plaincredentials(user, password),

virtual_host="admin"))

channel = connection.channel()

# 正常交換機

delay_exchange = 'test_exchange'

# 正常佇列

delay_queue = 'test__queue'

channel.exchange_declare(exchange=delay_exchange,

exchange_type='fanout')

result = channel.queue_declare(delay_queue,exclusive=true)

queue_name = result.method.queue

channel.queue_bind(exchange=delay_exchange,queue=queue_name,

routing_key='info') # 只處理routing_key為info的訊息

print(' [*] waiting for logs. to exit press ctrl+c')

def callback(ch, method, properties, body):

print(" [x] %r:%r" % (method.routing_key, body,))

channel.basic_consume(queue_name,callback,true)

channel.start_consuming()

異常消費者

# utf-8

import pika

host = 'localhost'

port = 5672

user = 'admin'

password = '123456'

connection = pika.blockingconnection(

pika.connectionparameters(host=host, port=port, credentials=pika.plaincredentials(user, password),

virtual_host="admin"))

channel = connection.channel()

# 過時訊息接收交換機

delay_exchange = 'delay_exchange'

# 過時訊息接收佇列

delay_queue = 'delay__queue'

channel.exchange_declare(exchange=delay_exchange,

exchange_type='fanout')

result = channel.queue_declare(delay_queue,exclusive=true)

queue_name = result.method.queue

channel.queue_bind(exchange=delay_exchange,queue=queue_name,

routing_key='info') # 只處理routing_key為info的訊息

print(' [*] waiting for logs. to exit press ctrl+c')

def callback(ch, method, properties, body):

print(" [x] %r:%r" % (method.routing_key, body,))

channel.basic_consume(queue_name,callback,true)

channel.start_consuming()

如果消費者在30分鐘內完成了訂單支付也就是走test_queue佇列消費訊息,如果30分鐘內未支付訂單則走delay_queue佇列異常處理(通常該指令碼一直跑就行,更新訂單狀態).

關鍵點設定過時訊息接收交換機及佇列

arguments =

參考:

RabbitMq死信佇列

死信交換機有什麼用呢?在建立佇列的時候 可以給這個佇列附帶乙個交換機,那麼這個佇列作廢的訊息就會被重新發到附帶的交換機,然後讓這個交換機重新路由這條訊息。通俗的說,就是訊息產生之後,因為設定了超時時間,在這段時間內訊息沒有被消費就會被扔到死信佇列裡面。交換機名稱 private static fin...

rabbitmq死信佇列

死信佇列 dlx dead letter exchange 利用dlx,當訊息在乙個佇列中變成死信 dead message 之後,它能重新publish到另外乙個exchange,這個exchange就是dxl 訊息變成死信的幾種情況 訊息被拒絕 basic.reject basic.nack 並...

rabbitmq死信佇列

概念 當訊息成為死信時,會將該訊息放到死信交換機當中,這個交換機也繫結的其他佇列,還可以繼續進行消費。訊息什麼時候會變成死信 在配置檔案宣告佇列時指定死信交換機的名稱和死信交換機的路由key key x dead letter exchange value 死信交換機名稱 key x dead le...