RabbitMQ 核心概念 死信 延時佇列

2022-03-17 21:03:02 字數 2825 閱讀 8014

rabbitmq訊息傳遞模型的核心思想是生產者從不將任何訊息直接傳送到佇列。實際上,生產者經常甚至根本不知道是否將訊息傳遞到任何佇列。

send.py

import pika

connection = pika.blockingconnection(

pika.connectionparameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='', routing_key='hello', body='hello world!')

print(" [x] sent 'hello world!'")

connection.close()

recive.py

import pika, sys, os

def main():

connection = pika.blockingconnection(pika.connectionparameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):

print(" [x] received %r" % body)

channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=true)

print(' [*] waiting for messages. to exit press ctrl+c')

channel.start_consuming()

if __name__ == '__main__':

try:

main()

except keyboardinterrupt:

print('interrupted')

try:

sys.exit(0)

except systemexit:

os._exit(0)

producer exchange、routing_key queue consumer

producer 將訊息發往exchange

預設使用的是direct型別,exchange為空,根據routing_key找到queue或者建立queue

channel.basic_publish(exchange='', routing_key='hello', body='hello world!')
生產者只能將訊息傳送到exchange,交流是一件非常簡單的事情。一方面,它接收來自生產者的訊息,另一方面,將它們推入佇列。交易所必須確切知道如何處理收到的訊息。是否應將其附加到特定佇列?是否應該將其附加到許多佇列中?還是應該丟棄它。規則由交換型別定義 。

有幾種交換型別可用:direct,topic,headers 和fanout。我們將集中討論最後乙個-fanout。讓我們建立該型別的交換,並將其稱為log:

channel.exchange_declare(exchange = 'logs',

exchange_type = 'fanout')

發布或者訂閱時指定routing_key

exchange 在direct型別時:routing_key 和exchange 訊息所屬佇列;

exchange 在topic 模式下時:

"#":匹配所有的路由鍵

"work.#":無匹配

exchangezai fanout型別時:

到達該exchange的所有訊息將會傳送到繫結到該exchange的所有佇列

訊息被否定確認,使用channel.basic_nackchannel.basic_reject,並且此時requeue屬性被設定為false

訊息在佇列的存活時間超過設定的ttl時間。

訊息佇列的訊息數量已經超過最大佇列長度。

那麼該訊息將成為「死信」。

「死信」訊息會被rabbitmq進行特殊處理,如果配置了死信佇列資訊,那麼該訊息將會被丟進死信佇列中,如果沒有配置,則該訊息將會被丟棄。

死信訊息的生命週期:

業務訊息被投入業務佇列

消費者消費業務佇列的訊息,由於處理過程中發生異常,於是進行了nck或者reject操作

被nck或reject的訊息由rabbitmq投遞到死信交換機中

死信交換機將訊息投入相應的死信佇列

死信佇列的消費者消費死信訊息

延時佇列,首先,它是一種佇列,佇列意味著內部的元素是有序的,元素出隊和入隊是有方向性的,元素從一端進入,從另一端取出。

其次,延時佇列,最重要的特性就體現在它的延時屬性上,跟普通的佇列不一樣的是,普通佇列中的元素總是等著希望被早點取出處理,而延時佇列中的元素則是希望被在指定時間得到取出和處理,所以延時佇列中的元素是都是帶時間屬性的,通常來說是需要被處理的訊息或者任務。

簡單來說,延時佇列就是用來存放需要在指定時間被處理的元素的佇列。

官網基礎

可靠訊息|死信佇列|延時佇列

RabbitMq死信佇列

死信交換機有什麼用呢?在建立佇列的時候 可以給這個佇列附帶乙個交換機,那麼這個佇列作廢的訊息就會被重新發到附帶的交換機,然後讓這個交換機重新路由這條訊息。通俗的說,就是訊息產生之後,因為設定了超時時間,在這段時間內訊息沒有被消費就會被扔到死信佇列裡面。交換機名稱 private static fin...

rabbitmq死信佇列

死信佇列 dlx dead letter exchange 利用dlx,當訊息在乙個佇列中變成死信 dead message 之後,它能重新publish到另外乙個exchange,這個exchange就是dxl 訊息變成死信的幾種情況 訊息被拒絕 basic.reject basic.nack 並...

rabbitmq死信佇列

概念 當訊息成為死信時,會將該訊息放到死信交換機當中,這個交換機也繫結的其他佇列,還可以繼續進行消費。訊息什麼時候會變成死信 在配置檔案宣告佇列時指定死信交換機的名稱和死信交換機的路由key key x dead letter exchange value 死信交換機名稱 key x dead le...