RabbitMQ 實戰指南 一 死信佇列

2021-09-29 00:03:21 字數 3144 閱讀 8448

dlx,全稱為 dead-letter-exchange,可以稱之為死信交換器。當訊息在乙個佇列中變成死信(dead message)之後,它能被傳送到另乙個交換器中,這個交換器就是dlx,繫結dlx的佇列就稱之為死信佇列。

dlx 也是乙個正常的交換器,和一般的交換器沒有區別,它能在任何的佇列上被指定,實際上就是設定某個佇列的屬性。當這個佇列中存在死信時,rabbitmq就會自動地將這個訊息重新發布到設定的dlx上去,進而被路由到另乙個佇列,即死信佇列。然後可以監聽這個死信佇列中的訊息進行相應的處理。

可以通過為佇列設定 x-dead-letter-exchange 引數設定 dlx,也可以通過設定 

x-dead-letter-routing-key 引數為這個dlx指定路由鍵,如果沒有特殊指定,則使用原佇列的路由鍵。

4.1 測試過程 

整個過程如下圖:

4.2 生產者** 

<?php 

require __dir__ . '/../../../../vendor/autoload.php';

usephpamqplib\wire\amqptable;

usephpamqplib\message\amqpmessage;

usephpamqplib\exchange\amqpexchangetype;

usephpamqplib\connection\amqpstreamconnection;/**

* 死信佇列測試

* 1、建立兩個交換器 exchange.normal 和 exchange.dlx, 分別繫結兩個佇列 queue.normal 和 queue.dlx

* 2、把 queue.normal 佇列裡面的訊息配置過期時間,然後通過 x-dead-letter-exchange 指定死信交換器為 exchange.dlx

* 3、傳送訊息到 queue.normal 中,訊息過期之後流入 exchange.dlx,然後路由到 queue.dlx 佇列中,進行消費 */

//todo 更改配置

$connection = new amqpstreamconnection('192.168.33.1', 5672, 'zhangcs', 'zhangcs', '/');

$channel = $connection->channel();

$channel->exchange_declare('exchange.dlx', amqpexchangetype::direct, false, true

);$channel->exchange_declare('exchange.normal', amqpexchangetype::fanout, false, true

);//

設定 queue.normal 佇列中的訊息10s之後過期

$args = new

amqptable();

$args->set('x-message-ttl', 10000);

$args->set('x-dead-letter-exchange', 'exchange.dlx');

$args->set('x-dead-letter-routing-key', 'routingkey');

$channel->queue_declare('queue.normal', false, true, false, false, false, $args

);$channel->queue_declare('queue.dlx', false, true, false, false

);$channel->queue_bind('queue.normal', 'exchange.normal');

$channel->queue_bind('queue.dlx', 'exchange.dlx', 'routingkey');

$message = new amqpmessage('hello dlx message');

$channel->basic_publish($message, 'exchange.normal', 'rk');

$channel->close();

$connection->close();

執行生成者**之後,queue.normal 佇列會有一條訊息,如下圖:

10秒之後,訊息會過期,然後被進入 exchange.dlx, 進而路由到 queue.dlx 佇列中:

4.3、消費者**

<?php 

require __dir__ . '/../../../../vendor/autoload.php';

usephpamqplib\message\amqpmessage;

usephpamqplib\exchange\amqpexchangetype;

usephpamqplib\connection\amqpstreamconnection;

//todo 更改配置

$connection = new amqpstreamconnection('192.168.33.1', 5672, 'zhangcs', 'zhangcs', '/');

$channel = $connection->channel();

$channel->exchange_declare('exchange.dlx', amqpexchangetype::direct, false, true

);$channel->queue_declare('queue.dlx', false, true, false, false

);$channel->queue_bind('queue.dlx', 'exchange.dlx', 'routingkey');

function process_message($message

)$channel->basic_consume('queue.dlx', 'consumer_tag', false, false, false, false, 'process_message');

function shutdown($channel, $connection

)register_shutdown_function('shutdown', $channel, $connection

);while ($channel ->is_consuming())

執行消費者**之後,消費會從 queue.dlx 中消費掉:

RabbitMQ 實戰指南 一 延遲佇列

延遲佇列中儲存延遲訊息,延遲訊息是指當訊息被傳送到佇列中不會立即消費,而是等待一段時間後再消費該訊息。延遲佇列很多應用場景,乙個典型的應用場景是訂單未支付超時取消,使用者下單之後30分鐘內未支付成功,則把訂單取消。rabbitmq 本身沒有直接支援延遲佇列的功能,但是可以通過過期時間ttl和死信佇列...

RabbitMQ 實戰指南 一 過期時間TTL

目前有兩種方式可以設定訊息的ttl 如果兩種方法一起使用,則訊息的ttl已較小的數值為準。1.1 通過設定佇列屬性來控制訊息的ttl 在宣告佇列的時候可以通過 x message ttl 屬性來控制訊息的ttl,這個引數的單位是毫秒。如果不設定 ttl.則表示此訊息不會過期 如果將 ttl 設定為 ...

《RabbitMQ實戰指南》整理(七)網路分割槽

當乙個集 生網路分割槽時,集群會分為兩個部分或者更多,它們各自為政,互相都認為對方分區內的節點已經掛了,包括佇列 交換器及繫結等元資料的建立和銷毀都處於自身分區內,與其他分割槽無關。分割槽的引入是為了配合rabbitmq的資料一致性複製原理。一般情況下,網路分割槽都是由於單個節點的故障引起的,且通常...