如何用RabbitMQ實現延遲佇列

2022-01-09 21:11:20 字數 4100 閱讀 3801

jdkjuc工具包中,提供了一種延遲佇列delayqueue。延遲佇列用處非常廣泛,比如我們最常見的場景就是在網購或者外賣平台中發起乙個訂單,如果不付款,一般15分鐘後就會被關閉,這個直接用定時任務是不好實現的,因為每個使用者下單的時間並不確定,所以這時候就需要用到延遲佇列。

延遲佇列本身也是佇列,只不過這個佇列是延遲的,意思就是說當我們把一條訊息放入延遲佇列,訊息並不會立刻出隊,而是會在到達指定時間之後(或者說過了指定時間)才會出隊,從而被消費者消費。

rabbitmq中的死信佇列就是用來儲存特定條件下的訊息,那麼假如我們把這個條件設定為指定時間過期(設定帶ttl的訊息或者佇列),就可以用來實現延遲佇列的功能。

新建乙個ttldelayrabbitconfig配置類(省略了包名和匯入),訊息最開始傳送至ttl訊息佇列,這個佇列中所有的訊息在5秒後過期,後期後會進入死信佇列:

@configuration

public class ttldelayrabbitconfig

//ttl訊息佇列

@bean("ttldelayqueue")

public queue ttlqueue()

//fanout交換機和productqueue繫結

@bean

public binding bindttlfanoutexchange(@qualifier("ttldelayqueue") queue queue, @qualifier("ttldelayfanoutexchange") fanoutexchange fanoutexchange)

//fanout死信交換機

@bean("ttldelaydeadletterexchange")

public fanoutexchange deadletterexchange()

//死信佇列

@bean("ttldelaydeadletterqueue")

public queue ttldelaydeadletterqueue()

//死信佇列和死信交換機繫結

@bean

public binding deadletterqueuebindexchange(@qualifier("ttldelaydeadletterqueue") queue queue, @qualifier("ttldelaydeadletterexchange") fanoutexchange fanoutexchange)

}

新建乙個消費者ttldelayconsumer類,監聽死信佇列,這裡收到的訊息都是生產者生產訊息之後的5秒,也就是延遲了5秒的訊息:

@component

public class ttldelayconsumer

}

新建乙個delayqueuecontroller類做生產者來傳送訊息:

@restcontroller

public class delayqueuecontroller

}

最後我們在瀏覽器輸入位址http://localhost:8080/delay/ttl/send?msg=測試ttl延遲佇列進行測試,可以看到每條訊息都是在傳送5秒之後才能收到訊息:

假如我們實際中,有的訊息是10分鐘過期,有的是20分鐘過期,這時候我們就需要建立多個佇列,一旦時間維度非常龐大,那麼就需要維護非常多的佇列。說到這裡,可能很多人會有疑問,我們可以針對單條資訊設定過期時間,大可不必去定義多個佇列?

然而事實真的是如此嗎?接下來我們通過乙個例子來驗證下。

把上面示例中ttldelayrabbitconfig類中的佇列定義函式x-message-ttl屬性去掉,不過需要注意的是我們需要先把這個佇列後台刪除掉,否則同名佇列重複建立無效:

@bean("ttldelayqueue")

public queue ttlqueue()

public string ttlmsgsend(@requestparam(value = "msg",defaultvalue = "no message") string msg,

@requestparam(value = "time") string milltimes)

然後執行2條訊息傳送,一條10秒過期,一條5秒過期,先傳送10秒的:

http://localhost:8080/delay/ttl/send?msg=10秒過期訊息&time=10000

http://localhost:8080/delay/ttl/send?msg=5秒過期訊息&time=5000

執行之後得到如下結果:

我們看到,兩條訊息都是10秒後過期,這是巧合嗎?並不是,這是因為rabbitmq中的機制就是如果前一條訊息沒有出隊,那麼即使後一條訊息已經失效,也必須要等前一條訊息出隊之後才能出隊,所以這就是為什麼一般都盡量避免同乙個佇列單條訊息設定不同過期時間的做法。

通過以上兩個例子,使用死信佇列來實現延遲佇列,我們可以得到幾個很明顯的缺點:

為了避免ttl和死信佇列可能造成的問題,所以就非常有必要用一種新的更好的方案來替代實現延遲佇列,這就是延時佇列外掛程式。

rabbitmq3.5.7版本之後,提供了乙個外掛程式(rabbitmq-delayed-message-exchange)來實現延遲佇列 ,同時需保證erlang/opt版本為18.0之後。

wget
新建乙個plugindelayrabbitconfig配置類:

@configuration

public class plugindelayrabbitconfig

@bean("plugindelayqueue")

public queue plugindelayqueue()

@bean

public binding plugindelaybinding(@qualifier("plugindelayqueue") queue queue,@qualifier("plugindelayexchange") customexchange customexchange)

}

新建乙個消費者類plugindelayconsumer

@component

public class plugindelayconsumer

}

在上面示例中的delayqueuecontroller類,新增乙個方法:

public string pluginmsgsend(@requestparam(value = "msg",defaultvalue = "no message") string msg)

接下來就可以訪問位址http://localhost:8080/delay/plugin/send?msg=外掛程式延遲佇列訊息進行測試,可以看到,訊息在延時5秒之後被消費:

如何用RabbitMQ實現延遲佇列

利用外掛程式實現延遲佇列 總結在jdk的juc工具包中,提供了一種延遲佇列delayqueue。延遲佇列用處非常廣泛,比如我們最常見的場景就是在網購或者外賣平台中發起乙個訂單,如果不付款,一般15分鐘後就會被關閉,這個直接用定時任務是不好實現的,因為每個使用者下單的時間並不確定,所以這時候就需要用到...

RabbitMQ如何實現延遲佇列?

延遲佇列儲存的物件肯定是對應的延遲訊息,所謂 延遲訊息 是指當訊息被傳送以後,並不想讓消費者立即拿到訊息,而是等待指定時間後,消費者才拿到這個訊息進行消費。場景一 在訂單系統中,乙個使用者下單之後通常有30分鐘的時間進行支付,如果30分鐘之內沒有支付成功,那麼這個訂單將進行一場處理。這是就可以使用延...

rabbitmq實現延遲佇列

延遲佇列應用場景 使用者生成訂單之後,需要過一段時間校驗訂單的支付狀態,如果訂單仍未支付則需要及時地關閉訂單。使用者註冊成功之後,需要過一段時間比如一周後校驗使用者的使用情況,如果發現使用者活躍度較低,則傳送郵件或者簡訊來提醒使用者使用。延遲重試。比如消費者從佇列裡消費訊息時失敗了,但是想要延遲一段...