使用RabbitMQ實現延遲佇列

2021-08-25 05:31:44 字數 3065 閱讀 8506

在專案中

1.使用者確認乙個訂單,若30分鐘之類沒有支付,則需要取消訂單,若用定時任務去掃瞄訂單表,第一,定時任務時間如何定義,有存在漏掃的風險,第二,訂單表資料龐大,掃瞄表非常消耗效能,這時候該功能可以引入rabbitmq延遲佇列來做

2.某條活動通知在指定的一天推送給使用者,可以用延遲佇列

延遲佇列即傳送一條訊息給目標佇列,並非讓目標佇列立即接受到訊息,而是讓訊息等待一段延遲時間才到達目標佇列讓它消費

rabbitmq本身沒有直接支援延遲佇列功能,我們可以利用它的dxl特性來擴充套件

rabbitmq的queue可配置x-dead-letter-exchange和x-dead-letter-routing-key兩個引數,意為如果佇列中出現了dead letter,則按照這兩個引數重新路由**到指定佇列

x-dead-letter-exchange : 出現dead letter之後,將dead letter重新傳送到exchange

x-dead-letter-routing-key : 出現dead letter之後,將dead letter重新按照指定的routing-key傳送

佇列出現dead letter情況有:

由此可知,設定ttl規則之後,當訊息在乙個佇列中變成死信之後,利用dlx特性,它能重新被**到另一exchange或routing-key,從而被重新消費

在之前的文章中,我們說過,rabbitmq的訊息生成器是不直接將訊息傳送到佇列,而是傳送到交換器,交換器可以**到單個或多個佇列,由此,我們需要兩個佇列、兩個交換機

queue1: 死信佇列,將訊息傳送到死信exchange,繫結此佇列,讓訊息進入該佇列成為死信,設定ttl過期時間,該佇列沒有消費者,等待時間過期進入dead letter,當queue1佇列有死信產生時,會**到交換器x-dead-letter-exchange,以路由鍵x-dead-letter-rouuting-key**到指定queue

queue2: **佇列,也是訊息最終消費的目標佇列,此佇列需要從死信佇列接收訊息,所以需要繫結死信**到的交換器x-dead-letter-exchange

fanout sender**整合, messagepostprocessor訊息處理器是個functional介面

/**

* 延遲傳送訊息佇列,客戶端發目標佇列傳送一條延遲訊息 :

* 此時,將訊息傳送到dxl死信佇列,而非直接傳送到queuename佇列,並設定延遲時間 times 秒

* * 死信佇列沒有消費者,它用來儲存超時的訊息,並**到另一佇列,**佇列等待訊息延遲之後接收到訊息,

* 已過了times 秒,處理業務邏輯

** @param payload 訊息體

*/public void senddelaymessage(delaymessagepayload payload) , correlation);

log.info("fanout send delay message success exchange:{}, message:{}, correlationid:{}",payload.getdeadexchange(),

jsonutils.obj2json(payload), correlation.getid());

}

訊息體:

@allargsconstructor

@noargsconstructor

@data

public class delaymessagepayload implements serializable

客戶端:

delaymessagepayload payload = new delaymessagepayload(rabbitconstant.fanoutexchange.seckill_dead_letter,

rabbitconstant.fanoutqueue.seckill_forward, rabbitconstant.fanoutexchange.seckill_dead_letter,

rabbitconstant.fanoutqueue.seckill_dead_letter,

"123456789",10 * 1000);

fanoutsender.senddelaymessage(payload);

初始化佇列和交換機:

@configuration

public class seckilltimeoutpayconfig

@bean

public fanoutexchange initdeadexchange()

@bean

public binding binddead()

/*** 初始化**佇列

*/@bean

public queue initforwardqueue()

@bean

public binding bindforwardexchange()

}

消費監聽:

@rabbitlistener(queues = rabbitconstant.fanoutqueue.seckill_forward)

@component

@slf4j

public class seckilltimeoutpayreceive ", jsonutils.obj2json(payload));

}catch (exception e)

}

注意: 訊息佇列是先進先出的棧列結構,也就意味著,如果先進去的延遲時間比後進去的延遲時間要大,那麼會一直阻塞等待先進去的消費,後面的就消費不到了,需要將延遲時間一致的業務放入同一佇列即可

rabbitmq實現延遲佇列

延遲佇列應用場景 使用者生成訂單之後,需要過一段時間校驗訂單的支付狀態,如果訂單仍未支付則需要及時地關閉訂單。使用者註冊成功之後,需要過一段時間比如一周後校驗使用者的使用情況,如果發現使用者活躍度較低,則傳送郵件或者簡訊來提醒使用者使用。延遲重試。比如消費者從佇列裡消費訊息時失敗了,但是想要延遲一段...

RabbitMQ 延遲佇列實現

1 延遲佇列,可以通過rabbitmq自帶機制實現 ttl 死信佇列 通過設定訊息或者佇列的ttl,過期後進行訊息的投遞,從而達到delay的效果 但存在問題 1 設定佇列ttl 同乙個佇列的所有訊息從入佇列到ttl的時間,過期後會投遞到相應死信交換機。這樣如果訊息的過期時間不盡相同,會建立n個不同...

基於PHP使用rabbitmq實現訊息佇列

1.從github上面獲取amqp基於php的實現擴充套件 2.建立生產者 send.php 1 require dir protected vendor autoload.php 23 usephpamqplib connection amqpstreamconnection 4use phpam...