基於redis的延遲訊息佇列設計

2021-08-07 04:20:51 字數 2544 閱讀 3665

需求背景

佇列設計

目前可以考慮使用rabbitmq來滿足需求 但是不打算使用,因為目前太多的業務使用了另外的mq中介軟體。

開發前需要考慮的問題?

簡單定義乙個訊息資料結構

private string topic;

/***topic**/

private string id;

/***自動生成 全域性惟一 snowflake**/

private string bizkey;

private

long delay;

/***延時毫秒數**/

private

int priority;

//優先順序

private

long ttl;

/**消費端消費的ttl**/

private string body;

/***訊息體**/

private

long createtime=system.currenttimemillis();

private

int status= status.waitput.ordinal();

執行原理:

map來儲存元資料。id作為key,整個訊息結構序列化(json/…)之後作為value,放入元訊息池中。

id放入其中(有n個)乙個zset有序列表中,以createtime+delay+priority作為score。修改狀態為正在延遲中

使用timer實時監控zset有序列表中top 10的資料 。 如果資料score>=當前時間毫秒就取出來,根據topic重新放入乙個新的可消費列表(list)中,在zset中刪除已經取出來的資料,並修改狀態為待消費

客戶端獲取資料只需要從可消費佇列中獲取就可以了。並且狀態必須為待消費 執行時間需要<=當前時間的 如果不滿足 重新放入zset列表中,修改狀態為正在延遲。如果滿足修改狀態為已消費。或者直接刪除元資料。

客戶端因為涉及到不同程式語言的問題,所以當前預設支援http訪問方式。

新增延時訊息新增成功之後返回消費唯一id post /push

刪除延時訊息 需要傳遞訊息id get /delete?id=

恢復延時訊息 get /restore?expire=true|false expire是否恢復已過期未執行的訊息。

恢復單個延時訊息 需要傳遞訊息id get /restore/id

獲取訊息 需要長連線 get /get/topic

用nginx暴露服務,配置為輪詢 在新增延遲訊息的時候就可以流量平均分配。

目前系統中客戶端並沒有採用http長連線的方式來消費訊息,而是採用mq的方式來消費資料這樣客戶端就可以不用關心延遲訊息佇列。只需要在傳送mq的時候攔截一下 如果是延遲訊息就用延遲訊息系統處理。

訊息可恢復

實現恢復的原理 正常情況下一般都是記錄日誌,比如mysqlbinlog等。

這裡我們直接採用mysql資料庫作為記錄日誌。

目前打算建立以下2張表:

訊息表 字段包括整個訊息體

訊息流轉表 字段包括訊息id、變更狀態、變更時間、zset掃瞄線程name、host/ip

定義zset掃瞄線程name是為了更清楚的看到訊息被分發到具體哪個zset中。前提是zset的key和監控zset的執行緒名稱要有點關係 這裡也可以是zset key。

舉個栗子

假如redis伺服器宕機了,重啟之後發現資料也沒有了。所以這個恢復是很有必要的,只需要從表1也就是訊息表中把訊息狀態不等於已消費的資料全部重新分發到延遲佇列中去,然後同步一下狀態就可以了。

當然恢復單個任務也可以這麼幹。

關於高可用

分布式協調還是選用zookeeper吧。

如果有多個例項最多同時只能有1個例項工作 這樣就避免了分布式競爭鎖帶來的壞處,當然如果業務需要多個例項同時工作也是支援的,也就是乙個訊息最多只能有1個例項處理,可以選用zookeeper或者redis就能實現分布式鎖了。

最終做了一下測試多例項同時執行,可能因為會涉及到鎖的問題效能有所下降,反而單機效果很好。所以比較推薦基於docker的主備部署模式。

擴充套件支援zset佇列個數可配置 避免大資料帶來高延遲的問題。

目前存在日誌和redis元資料有可能不一致的問題 如mysql掛了,寫日誌不會成功。

設計圖:

基於redis的延遲訊息佇列設計

需求背景 佇列設計 目前可以考慮使用rabbitmq來滿足需求 但是不打算使用,因為目前太多的業務使用了另外的mq中介軟體。開發前需要考慮的問題?簡單定義乙個訊息資料結構 private string topic topic private string id 自動生成 全域性惟一 snowflak...

基於redis的延遲訊息佇列設計

需求背景 佇列設計 目前可以考慮使用rabbitmq來滿足需求 但是不打算使用,因為目前太多的業務使用了另外的mq中介軟體。開發前需要考慮的問題?簡單定義乙個訊息資料結構 private string topic topic private string id 自動生成 全域性惟一 snowflak...

基於redis構建訊息佇列

一般來說,訊息佇列有兩種場景 一種是發布者訂閱者模式 一種是生產者消費者模式。利用redis這兩種場景的訊息佇列都能夠實現。定義 1 redis作為訊息中介軟體 1 producer consumermode 該方式是借助redis的list結構實現的。producer呼叫redis的lpush往特...