RocketMq 訊息重試

2021-10-25 11:01:26 字數 2751 閱讀 7742

順序訊息的重試

對於順序訊息,當消費者消費訊息失敗後,訊息佇列 rocketmq 會自動不斷進行訊息重試(每次間隔時間為 1 秒),這時,應用會出現訊息消費被阻塞的情況。因此,在使用順序訊息時,務必保證應用能夠及時監控並處理消費失敗的情況,避免阻塞現象的發生。

defaultmqpushconsumer consumer =

newdefaultmqpushconsumer

("consumer_grp_04_01");

consumer.

setnamesrvaddr

("node1:9876");

consumer.

setconsumemessagebatchmaxsize(1

);consumer.

setconsumethreadmin(1

);consumer.

setconsumethreadmax(1

);// 訊息訂閱

consumer.

subscribe

("tp_demo_04"

,"*");

// 併發消費

// consumer.setmessagelistener(new messagelistenerconcurrently()

// });

// 順序消費

consumer.

setmessagelistener

(new

messagelistenerorderly()

return null;}}

);consumer.

start()

;

對於無序訊息(普通、定時、延時、事務訊息),當消費者消費訊息失敗時,您可以通過設定返回狀態達到訊息重試的結果。

無序訊息的重試只針對集群消費方式生效;廣播方式不提供失敗重試特性,即消費失敗後,失敗訊息不再重試,繼續消費新的訊息

重試次數

訊息佇列 rocketmq 預設允許每條訊息最多重試 16 次,每次重試的間隔時間如下

如果訊息重試 16 次後仍然失敗,訊息將不再投遞。如果嚴格按照上述重試時間間隔計算,某條訊息在一直消費失敗的前提下,將會在接下來的 4 小時 46 分鐘之內進行 16 次重試,超過這個時間範圍訊息將不再重試投遞。

注意: 一條訊息無論重試多少次,這些重試訊息的 message id 不會改變

配置方式

消費失敗後,重試配置方式

集群消費方式下,訊息消費失敗後期望訊息重試,需要在訊息***介面的實現中明確進行配置

(三種方式任選一種)

返回 consumeconcurrentlystatus.reconsume_later; (推薦)

返回 null

丟擲異常

public

class

myconcurrentlymessagelistener

implements

messagelistenerconcurrently

}

消費失敗後,不重試配置方式

集群消費方式下,訊息失敗後期望訊息不重試,需要捕獲消費邏輯中可能丟擲的異常,最終返回consumeconcurrentlystatus.consume_success,此後這條訊息將不會再重試。

public

class

myconcurrentlymessagelistener

implements

messagelistenerconcurrently

catch

(throwable e)

//訊息處理正常,直接返回 consumeconcurrentlystatus.consume_success

return consumeconcurrentlystatus.consume_success;

}}

最大重試次數小於等於 16 次,則重試時間間隔同上表描述。

最大重試次數大於 16 次,超過 16 次的重試時間間隔均為每次 2 小時

defaultmqpushconsumer consumer =

newdefaultmqpushconsumer

("consumer_grp_04_01");

// 設定重新消費的次數

// 共16個級別,大於16的一律按照2小時重試

consumer.

setmaxreconsumetimes(20

);

注意

訊息最大重試次數的設定對相同 group id 下的所有 consumer 例項有效。

如果只對相同 group id 下兩個 consumer 例項中的其中乙個設定了

maxreconsumetimes,那麼該配置對兩個 consumer 例項均生效。

配置採用覆蓋的方式生效,即最後啟動的 consumer 例項會覆蓋之前的啟動例項的配置

獲取訊息重試次數

消費者收到訊息後,可按照如下方式獲取訊息的重試次數

public

class

myconcurrentlymessagelistener

implements

messagelistenerconcurrently

doconsumemessage

(msgs)

;return consumeconcurrentlystatus.consume_success;

}}

RocketMQ訊息型別

普通資訊也叫做無序訊息,簡單來說就是沒有順序的訊息,producer 只管傳送訊息,consumer 只管接收訊息,至於訊息和訊息之間的順序並沒 可能先傳送的訊息先消費,也可能先傳送的訊息後消費。舉個簡單例子,producer 依次傳送 order id 為 1 2 3 的訊息到 broker,co...

RocketMQ 事務訊息

一 事務訊息實現方式 應用使用事務訊息的步驟 1 應用傳送訊息,使用prepare欄位標示準備訊息 2 應用執行本地業務邏輯 3 應用傳送事務提交或回滾訊息 broker收到prepare訊息後會將topic替換為rmq sys trans half topic,queueid替換為0,然後寫入co...

rocketMq訊息查詢

最近有人問我知道rocketmq是怎麼查詢訊息的,我發現我貌似回答不上來,所以抽空就把這塊內容補充一下,主要是講清楚根據key查詢訊息和根據msgid查詢訊息兩塊內容。看下引數列表中我們可以看到 k指出了核心key的引數,指定了根據key查詢訊息的方法,這個命令返回的是msgid,據說還有一些坑,可...