基於 rabbitmq 實現的延時佇列

2021-09-13 19:15:05 字數 2149 閱讀 7911

雖然 rabbitmq 沒有延時佇列的功能,但是稍微變動一下也是可以實現的

存在乙個倒計時機制:time to live(ttl)

當到達時間點的時候會觸發乙個傳送訊息的事件:dead letter exchanges(dlx)

基於第一點,我利用的是訊息存在過期時間這一特性, 訊息一旦過期就會變成dead letter,可以讓單獨的訊息過期,也可以設定整個佇列訊息的過期時間

rabbitmq會有限取兩個值的最小值

基於第二點,是用到了rabbitmq的過期訊息處理機制:

.x-dead-letter-exchange將過期的訊息傳送到指定的exchange

.x-dead-letter-routing-key將過期的訊息傳送到自定的route當中

在這裡例子當中,我使用的是 過期訊息+**指定exchange

首先是消費者comsumer.go

q.name, // queue name, 這裡指的是 test_logs

"", // routing key

"logs", // exchange

false,

nil)

failonerror(err, "failed to bind a queue")

// 這裡監聽的是 test_logs

msgs, err := ch.consume(

q.name, // queue name, 這裡指的是 test_logs

"", // consumer

true, // auto-ack

false, // exclusive

false, // no-local

false, // no-wait

nil, // args

)failonerror(err, "failed to register a consumer")

forever := make(chan bool)

go func()

}()log.printf(" [*] waiting for logs. to exit press ctrl+c")

<-forever

}然後是生產者productor.go

}執行一下:

go run comsumer.go

go run productor.go

具體看**和注釋就行, 這裡的關鍵點就是將要延時的訊息傳送到過期佇列當中, 然後監聽的是過期佇列**到的 exchange 下的佇列

正常情況就是始終監聽乙個佇列,然後把過期訊息傳送到延時佇列中,當訊息到達時間後就把訊息發到正在監聽的佇列

乙個自己寫的mq工具

部落格原文

RabbitMQ實現延時佇列

rabbitmq實現延時佇列一般有兩種形式 第一種方式 利用兩個特性 time to live ttl dead letter exchanges dlx a訊息佇列過期 傳送給b佇列 第二種方式 利用rabbitmq的外掛程式x delay message rabbitmq可以針對佇列設定x ex...

RabbitMQ延時佇列實現訂單關閉

配置virtual host虛擬主機 spring.rabbitmq.virtual host test order close ip位址 spring.rabbitmq.host 127.0 0.1 使用者名稱 密碼 spring.rabbitmq.username guest spring.ra...

RabbitMQ 基於RPC實現

對於使用rabbitmq執行command的情況,有時候需要有返回值資訊。此時相當於client發布乙個command後,並偵聽返回結果的queue,server接收並處理,將處理結果發布到client偵聽的queue中。簡單實現如下 1.client端 private static void rp...