如何通過RabbitMq實現動態定時任務詳解

2022-09-21 14:36:08 字數 4186 閱讀 8724

目錄

定時任務的需求所謂是數不勝數,其中實現方式也是百花齊放,用得最多的大概率為springboot中的 @scheduled(cron = 「0 0 1 1 * ?」) 註解,或者是定時任務xxl-job框架,這兩者我接觸的比較多,除此之外還有,quartz 、elastic-job、但這兩個在分布式領域而言,和xxl-jobb比較,xxl-job更為受歡迎。無論是這些框架或者是springboot自帶的定時任務元件,基本上都能滿足固定定時任務的需求。而我們今天討論的是動態定時任務的實現。

動態定時任務的需求其實在現實生活中隨處可見,如花費到期多久之後傳送資訊提醒使用者?時間間隔是多少。又或者客戶下單之後多久提醒商家發貨,提醒的頻率又是多少…。這樣的需求還有很多。今天我們針對此類需求進行**。

對於此類需求相比於傳統的定時任務無非多了可控性, 其可控性包括了定時任務開始和結束時間的可控性,週期性可控性,只要解決了這兩個問題,實際上此類的需求也就迎刃而解了。

前面提供的方案只做文字探索性描述。

2.1、 採用重寫springboot 的定時框架,從資料庫中讀取cron表示式來實現可控性週期。

其本質是通過如下執行緒進行動態定時任務的建立,從而實現對應的週期可控性。

threadpooltaskscheduler threadpooltaskscheduler = new threadpooltaskscheduler();

其具體的細節不再說,其存在的痛點包括了

1 . 需要另外邏輯去實現可控性開始時間和結束時間

2. 此任務開啟的入參是corn表示式,需要另外的邏輯將其進行轉化,太過於猥瑣

2.2、採用時間執行緒池

時間執行緒池我忘記叫什麼,他是可以指定開始時間,週期時間的,相對而言,比第一種方案來得更為直觀,其我考慮到的痛點如下。其實上面那種方案也是有這個痛點的。

2.3、採用延時操作

簡單言之,實際上只要實現了延時操作 便是實現了動態的開始時間以及週期性執行,可以利用其遞迴的概念實現所謂的動態週期。

redis 佇列來實現延時

redis的體量本身定位就不高,在資料量(任務量)過大時,對redis的壓力也很大,redis不一定扛得住。但其實通過redis來實現延時訊息這樣的成功案列還是有很多的。在這裡就不細說了。

rabbitmq實現延時訊息。

通過mq實現延時訊息是本文的重點,在標題三會細說。

通過建立死信佇列來實現延時任務,然後再通過遞迴思想實現對應的邏輯,就可以實現對應的動態延時任務,但是這個會存在以下下幾個痛點。

佇列順序消費

通過死信,我們確實可以動態的控制訊息的消費時間,但是訊息在佇列裡面,如果佇列裡面存在多個資訊任務,前乙個未到消費時間,後乙個已經到了消費時間,這就好導致了,即使後面任務資訊消費時間到了,卻沒法被消費的問題。解決方法,對佇列進行排序邏輯,但如果這樣做的話,就有點猥瑣了。

開銷過大。

對於通過死信來實現延時訊息,網上有挺多優秀的部落格介紹,在此就不做說明了。

使用延時外掛程式需要mq在3.6以上(實際上我在嘗試**的時候並未發現git上有對應3.6的外掛程式,所以還是選擇較高的版本比較好)。

這裡不做過多說明,重點在於編碼的實現,主要步驟如下。

去官網**對應版本的外掛程式,位址為**位址

外掛程式名字為rabbitmq_delayed_message_exchange

將外掛程式放到mq外掛程式目錄下,然後cmd命令解壓網(網上有命令),然後重啟mq服務。大概就這樣的乙個過程。

建立佇列

這裡只弄了對應的核心**,大致就是建立延時交換機,延時佇列,以及繫結器,對應的key,value如下

public static final string delay_exchange = "delay.exchange";

public static final string delay_route_key = "delay.rou程式設計客棧tekey";

public static final string delay_queue = "delay.queue";

/*** 延時交換機

* @return 延時交換機

*/@bean

public customexchange delayexchange()

/*** mq已經安裝了延時外掛程式使用,否則得使用延時外掛程式

* @return 延時傳送佇列。

*/@bean

public queue delayqueue()

/*** 延時繫結區

* @return 延時繫結區

*/@bean

public binding delaybind()

生產者這裡寫得比較隨意,也直接使用了lombok,還直接用了 @service ,有點草率,主要為了讓讀者看得清晰。還用了hutool工具類的jsonutil。

可以清晰的看到主方法裡面需要傳乙個integer型別的入參,這個時間我將其轉換成了秒,其mq實際入參為毫秒,所以讀者不要被誤導。入參time通俗的講就是這個訊息多久之後被消費。不需要在乎順序。

package com.linkyoyo.bill.mq.impl;

import cn.hutool.core.util.objectutil;

import cn.hutool.json.jsonobject;

import cn.hutool.json.jsonutil;

import com.linkyoyo.bill.bo.workorderdelaysenmailactionbo;

import com.linkyoyo.bill.config.rabbitmqconfig;

import com.linkyoyo.bill.mq.delaysenderservice;

import lombok.allargsconstructor;

import lombok.requiredargsconstructor;

import lombok.extern.slf4j.slf4j;

import org.springframework.amqp.rabbit.core.rabbittemplate;

import org.springframework.scheduling.annotation.async;

import org.springframework.stereotype.service;

/** * 延時傳送

* @author 鄒 [[email protected]]

* @date 2022/1/4 20:33

*/@slf4j

@requiredargsconstructor

@allargsconstructor

@service

public class delaysenderserviceimpl implements delaysenderservice

rabbittemplate.convertandsend(rabbitmqconfig.delay_exchange, rabbitmqconfig.delay_route_key, message, msg -> );

log.info("延時傳送成功:延時週期時間{}毫秒,訊息內容為{}", time * 1000, message);

}@override

public void sendmessagebydelay(workorderdelaysenmailactionbo actionbo)

this.sendmessagebydelay(jsonutil.parseobj(actionbo), aftersecond);}}

消費者消費者的demo不太好寫,只是做了乙個簡單的偽**。 以定時任務發郵箱為例

1- 消費者執行緒開始,先執行發郵箱任務

2- 傳送完郵箱之後判斷是否還需要發郵箱,如果需要,就再通過生產者傳送延時郵箱 此時可以指定下一次消費的時間,以此流程走下去便是一套動態任務的流程實現。可以參考後續的流程圖。

這樣就實現乙個簡易的定時任務傳送郵箱的邏輯

private final delaysenderservice delaysenderservice;

rtequshemp@rabbithandler

@rabbitlistener(queues = rabbitmqconfig.delay_queue)

public void delayconsumer(message message)

log.info("資訊為:{}", message.getbody());

}大致流程就這麼多了,以下是整套步驟流閉環程圖

Python通過RabbitMQ實現RPC

usr bin env python coding utf 8 import pika import uuid import time class fibonaccirpcclient object def init self 生成socket self.connection pika.blocki...

RabbitMQ如何實現延遲佇列?

延遲佇列儲存的物件肯定是對應的延遲訊息,所謂 延遲訊息 是指當訊息被傳送以後,並不想讓消費者立即拿到訊息,而是等待指定時間後,消費者才拿到這個訊息進行消費。場景一 在訂單系統中,乙個使用者下單之後通常有30分鐘的時間進行支付,如果30分鐘之內沒有支付成功,那麼這個訂單將進行一場處理。這是就可以使用延...

通過docker安裝rabbitmq

docker ubuntu安裝docker 直接使用官網安裝指令碼自動安裝,安裝命令如下 curl fssl bash s docker mirror aliyun 也可以使用手動安裝,具體安裝方法如下 docker安裝完成之後,可以了解一些簡單的命令 使用docker安裝rabbitmq 1 拉取...