rabbitmq 延遲佇列

2022-09-08 21:24:26 字數 3456 閱讀 1849

延時佇列

佇列內部是有序的,最重要的特性就體現在它的延時屬性上,延時佇列中的元素是希望 在指定時間到了以後或之前取出和處理,簡單來說,延時佇列就是用來存放需要在指定時間被處理的 元素的佇列。

延遲佇列使用場景

1.訂單在十分鐘之內未支付則自動取消

2.新建立的店鋪,如果在十天內都沒有上傳過商品,則自動傳送訊息提醒。

3.使用者註冊成功後,如果三天內沒有登陸則進行簡訊提醒。

4.使用者發起退款,如果三天內沒有得到處理則通知相關運營人員。

5.預定會議後,需要在預定的時間點前十分鐘通知各個與會人員參加會議

首先要只要乙個rabbitmq的特性ttl

ttl是什麼呢?ttl 是 rabbitmq 中乙個訊息或者佇列的屬性,表明一條訊息或者該佇列中的所有 訊息的最大存活時間,

單位是毫秒。換句話說,如果一條訊息設定了 ttl 屬性或者進入了設定ttl 屬性的佇列,那麼這 條訊息如果在ttl 設定的時間內沒有被消費,則會成為"死信"。如果同時配置了佇列的ttl 和訊息的 ttl,那麼較小的那個值將會被使用,有兩種方式設定 ttl

1 訊息設定ttl

2 佇列設定ttl

兩者的區別:

如果設定了佇列的 ttl 屬性,那麼一旦訊息過期,就會被佇列丟棄(如果配置了死信佇列被丟到死信隊 列中),而第二種方式,訊息即使過期,也不一定會被馬上丟棄,因為訊息是否過期是在即將投遞到消費者 之前判定的,如果當前佇列有嚴重的訊息積壓情況,則已過期的訊息也許還能存活較長時間;另外,還需 要注意的一點是,如果不設定 ttl,表示訊息永遠不會過期,如果將 ttl 設定為 0,則表示除非此時可以 直接投遞該訊息到消費者,否則該訊息將會被丟棄。 前一小節我們介紹了死信佇列,剛剛又介紹了 ttl,至此利用 rabbitmq 實現延時佇列的兩大要素已 經集齊,接下來只需要將它們進行融合,再加入一點點調味料,延時佇列就可以新鮮出爐了。想想看,延時佇列,不就是想要訊息延遲多久被處理嗎,ttl 則剛好能讓訊息在延遲多久之後成為死信,另一方面, 成為死信的訊息都會被投遞到死信佇列裡,這樣只需要消費者一直消費死信佇列裡的訊息就完事了,因為 裡面的訊息都是希望被立即處理的訊息

實現:

建立兩個佇列 qa 和 qb,兩者佇列 ttl 分別設定為 10s 和 40s, 且兩個佇列都沒有消費者 ,然後在建立乙個普通交換機 x 和死信交換機 y,它們的型別都是direct,建立乙個死信佇列 qd,它們的繫結關係如下

面臨的問題:

舉個例子: 如果有兩條訊息 從x交換機 到qa佇列 , 我們對這兩條訊息設定ttl ,   "訊息1" 延遲40秒 , "訊息2"延遲10秒 , 最後會發現 兩條訊息都會延遲40秒 , 看起來 "訊息1" 擋住了路 , 沒有讓 延遲比它低的"訊息2" 提前過去;

原因:因為 rabbitmq 只會檢查第乙個訊息是否過期,如果過期則丟到死信佇列, 如果第乙個訊息的延時時長很長,而第二個訊息的延時時長很短,第二個訊息並不會優先得到執行。

解決辦法:

上文中提到的問題,確實是乙個問題,如果不能實現在訊息粒度上的 ttl,並使其在設定的ttl 時間 及時死亡,就無法設計成乙個通用的延時佇列。那如何解決呢,接下來我們就去解決該問題。

rabbitmq 外掛程式實現延遲佇列 !!!

進入 rabbitmq 的安裝目錄下的 plgins 目錄,執行下面命令讓該外掛程式生效,然後重啟 rabbitmq /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins

使用:

在我們自定義的交換機中,這是一種新的交換型別,該型別訊息支援延遲投遞機制 訊息傳遞後並 不會立即投遞到目標佇列中,而是儲存在 mnesia(乙個分布式資料系統)表中,當達到投遞時間時,才 投遞到目標佇列中。

定義交換機:

@configuration

public

class

delayedqueueconfig

//自定義交換機 我們在這裡定義的是乙個延遲交換機

@bean

public

customexchange delayedexchange()

@bean

public binding bindingdelayedqueue(@qualifier("delayedqueue") queue queue,

@qualifier("delayedexchange") customexchange delayedexchange)

}

生產者:

public

static

final string delayed_exchange_name = "delayed.exchange";

public

static

final string delayed_routing_key = "delayed.routingkey";

public

void

sendmsg(@pathvariable string message, @pathvariable integer delaytime) );

log.info(" 當 前 時 間 : {}, 發 送 一 條 延 遲 {} 毫秒的資訊給佇列 delayed.queue:{}", new

date(), delaytime, message);

}

消費者正常操作

延時佇列在需要延時處理的場景下非常有用,使用 rabbitmq 來實現延時佇列可以很好的利用 rabbitmq 的特性,如:訊息可靠傳送、訊息可靠投遞、死信佇列來保障訊息至少被消費一次以及未被正 確處理的訊息不會被丟棄。另外,通過 rabbitmq 集群的特性,可以很好的解決單點故障問題,不會因為 單個節點掛掉導致延時佇列不可用或者訊息丟失。 當然,延時佇列還有很多其它選擇,比如利用 j**a 的 delayqueue,利用 redis 的 zset,利用 quartz 或者利用 kafka 的時間輪,這些方式各有特點,看需要適用的場景

Rabbitmq延遲佇列

建立乙個自定義列表 如何建立乙個註腳 注釋也是必不可少的 katex數學公式 新的甘特圖功能,豐富你的文章 uml 圖表 flowchart流程圖 匯出與匯入 你好!這是你第一次使用markdown編輯器所展示的歡迎頁。如果你想學習如何使用markdown編輯器,可以仔細閱讀這篇文章,了解一下mar...

RabbitMQ 延遲佇列

rabbitmq實現延遲佇列一 在佇列上設定ttl publish delaysync.exchange delay.5m.queue 延遲佇列 delay.exchange test.queue 正常佇列 consumer 延遲佇列start mapmap new hashmap map.put ...

rabbitmq實現延遲佇列

延遲佇列應用場景 使用者生成訂單之後,需要過一段時間校驗訂單的支付狀態,如果訂單仍未支付則需要及時地關閉訂單。使用者註冊成功之後,需要過一段時間比如一周後校驗使用者的使用情況,如果發現使用者活躍度較低,則傳送郵件或者簡訊來提醒使用者使用。延遲重試。比如消費者從佇列裡消費訊息時失敗了,但是想要延遲一段...