使用RocketMQ的小細節

2021-08-29 05:02:43 字數 4141 閱讀 1028

目錄

訊息過濾

訂閱關係一致性 總結

訊息重試

訊息冪等 總結

說到訊息過濾,就不得不說到 tag。沒錯,就是我們之前在專業術語中提到過的 tag。也稱為訊息標籤,用來標記 topic 下的不同用途的訊息。

在 rocketmq 中消費者是可以按照 tag 對訊息進行過濾。舉個電商交易場景的例子,使用者下完訂單之後,在後台會產生一系列的訊息,比如說訂單訊息、支付訊息和物流訊息。假設這些訊息都傳送到 topic 為 trade 中,同時用 tag 為 order 來標記訂單訊息,用 tag 為 pay 來標記支付訊息,用 tag 為 logistics 來標記物流訊息。需要支付訊息的支付系統(相當於乙個 consumer)訂閱 trade 中 tag 為 pay 的訊息,此時,broker 則只會把 tag 為 pay 的訊息投遞給支付系統。而如果是乙個實時計算系統,它可能需要接收所有和交易相關的訊息,那麼只要它訂閱 trade 中 tag 為 order、pay、logistics 的訊息,broker 就會把帶有這些 tag 的訊息投遞給實時計算系統。

對於訊息分類,我們可以選擇建立多個 topic 來區分,也可以選擇在同乙個 topic 下建立多個 tag 來區分。這兩種方式都是可行的,但是一般情況下,不同的 topic 之間的訊息是沒有什麼必然聯絡的,使用 tag 來區分同乙個 topic 下相互關聯的訊息則更加合適一些。

講完了訊息過濾,我們接著講講什麼是訂閱關係一致性呢?其實在講 rocketmq 消費模式的時候提到過,除了使用同乙個 group name,訂閱的 tag 也必須是一樣的,只有符合這兩個條件的 consumer 例項才能組成 consumer 集群。這裡所說的其實就是訂閱關係一致性。在 rocketmq 中,訂閱關係由 topic和 tag 組成,因此要保證訂閱關係一致性,就必須同時保證這兩點:

訂閱的 topic 必須一致

訂閱的 topic 中的 tag 必須一致

保證訂閱關係一致性是非常重要的,一旦訂閱關係不一致,訊息消費的邏輯就會混亂,甚至導致訊息丟失,這對於大部分業務場景來說都是不允許的,甚至是致命的。在實際使用中,切記同乙個消費者集群內的所有消費者例項務必要保證訂閱關係的一致性。

圖 1備註:圖中 「*」 代表訂閱該topic下所有的 tag。

我們用具體的例子來解釋一下,如圖 1 所示,消費者集群中有 3 個 consumer 例項,分別為 c1、c2、c3,各自訂閱的 topic 和 tag 各不相同。首先 c1 和 c2 都訂閱 topica,滿足了訂閱關係一致性的第一點,但是 c1 訂閱的是 topica 的 tag1,而 c2 訂閱的是 topica 的 tag2,不滿足訂閱關係一致性的第二點,所以 c1、c2 不滿足訂閱關係一致性。而 c3 訂閱的 topic 和 tag 都與 c1 和 c2不一樣,同樣也不滿足訂閱關係一致性。

圖 2備註:圖中 「||」 用來連線不用的 tag,表示與的意思。

在圖 2 中,消費者集群中有 3 個 consumer 例項,分別為 c1、c2、c3,都是訂閱 topica 下的 tag1 和 tag2,滿足了訂閱關係一致性的兩點要求,所以滿足了訂閱關係一致性。

圖 3如圖 3 所示,乙個 consumer 也可以訂閱多個 topic,同時也必須保證該 consumer 集群裡的多個消費者例項的訂閱關係一致性,才不會造成不必要的麻煩。

在實際使用中,訊息過濾可以幫助我們只消費我們所需要的訊息,這是在broker端就幫我們處理好的,大大減少了在 consumer 端的訊息過濾處理,一方面減少了**量,另一方面更減少了不必要訊息的網路傳輸消耗。

訂閱訊息一致性則保證了同乙個消費者集群中 consumer 例項的正常執行,避免訊息邏輯的混亂和訊息的丟失。所以在實際使用中,在 producer 端要做好訊息的分類,便於 consumer 可以使用 tag 進行訊息的準確訂閱,而在 consumer 端,則要保證訂閱關係一致性。

首先明確之前說過的,訊息重試只針對集群消費模式,廣播消費沒有訊息重試的特性,消費失敗之後,只會繼續消費下一條訊息。這也是為什麼我們一再強調,推薦大家使用集群消費模式,其訊息重試的特性能給開發者帶來極大的方便。

那麼什麼是訊息重試呢?簡單來說,就是當消費者消費訊息失敗後,broker 會重新投遞該訊息,直到消費成功。在 rocketmq 中,當消費者使用集群消費模式時,消費者接收到訊息並進行相應的邏輯處理之後,最後都要返回乙個狀態值給 broker。這樣 broker 才知道是否消費成功,需不需要重新投遞訊息。也就是說,我們可以通過設定返回的狀態值來告訴 broker 是否重新投遞訊息。

到這裡,可能大家會有乙個疑問,那如果這條訊息本身就是一條髒資料,就算你消費 100 次也不會消費成功,難道還是一直去重試嘛?其實 rocketmq 並不會無限制地重試下去,預設每條訊息最多重試 16 次,而每次重試的間隔時間如下表所示:

第幾次重試

每次重試間隔時間

110 秒

230 秒

31 分鐘

42 分鐘

53 分鐘

64 分鐘

75 分鐘

86 分鐘

97 分鐘

108 分鐘

119 分鐘

1210 分鐘

1320 分鐘

1430 分鐘

151 小時

162 小時

那麼如果訊息重試 16 次之後還是消費失敗怎麼辦呢?那麼訊息就不會再投遞給消費者,而是將訊息放到相對應的死信佇列中。這時候我們就需要對死信佇列的訊息做一些人工補償處理,因為這些訊息可能本身就有問題,也有可能和消費邏輯呼叫的服務有關等,所以需要人工判斷之後再進行處理。

返回 consumeconcurrentlystatus.reconsume_later

返回 null

丟擲異常

前兩種情況都比較好理解,就是前面說過的設定狀態值,也就是說,只需要消費者返回 consumeconcurrentlystatus.reconsume_later 或者 null,就相當於告訴 broker 說,這條訊息我消費失敗了,你給我重新投遞一次。而對於丟擲異常這種情況,只要在你處理消費邏輯的地方丟擲了異常,那麼 broker 也重新投遞這條訊息。注意一點,如果是**獲的異常,則不會進行訊息重試。

首先什麼是消費冪等呢?簡單來說就是對於一條訊息的處理結果,不管這條訊息被處理多少次,最終的結果都一樣。比如說,你收到一條訊息是要更新乙個商品的**為 6.8 元,那麼當這條訊息執行 1 次,還是執行 100 次,最終在資料庫裡的該商品**就是 6.8 元,這就是所謂的冪等。 那麼為什麼消費需要冪等呢?因為在實際使用中,尤其在網路不穩定的情況下,rocketmq 的訊息有可能會出現重複,包括兩種情況:

傳送時訊息重複;

投遞時訊息重複;

第一種情況是生產者傳送訊息的場景,訊息已成功傳送到 broker ,但是此時可能發生網路閃斷或者生產者宕機了,導致 broker 發回的響應失敗。這時候生產者由於沒有收到響應,認為訊息傳送失敗,於是嘗試再次傳送訊息給 broker。這樣一來,broker 就會再收到一條一摸一樣內容的訊息,最終造成了消費者也收到兩條內容一摸一樣的訊息。

第二種情況是消費者消費訊息的場景,訊息已投遞到消費者並完成消費邏輯處理,當消費者給 broker 反饋消費狀態時可能發生網路閃斷。broker 收不到消費者的消費狀態,為了保證至少消費一次的語義,broker 將在網路恢復後再次嘗試投遞之前已經被處理過的訊息,最終造成消費者收到兩條內容一摸一樣的訊息。

當然對於一些允許訊息重複的場景,大可以不必關心消費冪等。但是對於那些不允許訊息重複的業務場景來說,處理建議就是通過業務上的唯一標識來作為冪等處理的依據。

訊息重試,保證了消費訊息的容錯性,即使消費失敗,也不需要開發者自己去編寫**來做補償,大大提高了開發效率,同時也是 rocketmq 相較於其他 mq 的乙個非常好的特性。而消費冪等主要是針對那些不允許訊息重複的場景,應該說大部分 mq 都需要冪等處理,這屬於**邏輯或者說業務上的需要,最好的處理方式就是前面所說的根據業務上唯一標識來作為冪等處理的依據。

Groovy Map使用的小細節

def s hello def m m s string1 m hello string2 println m assertequals m.hello,string2 assertequals m hello string2 assertequals m.get hello string2 ass...

typedef使用時的小細節

基礎不牢,地動山搖。今天在編譯的時候出現了個小錯誤困惑一下,主要是對typedef的理解不清晰造成的。其實typedef的使用跟取別名差不多。可以對比 很多情況下,typedef都是用於結構體的定義中。這裡是給tagdatastructnode這個結構體取了乙個別名,叫datastructnode。...

Vue使用小細節點

動態class繫結 新增text 平級別的css text vue的插值表示式,不僅能使用變數,還能使用js表示式,如下 計算屬性,方法,偵聽器 var vm new vue watch lastname function 計算屬性 有快取機制 當依賴的元素,沒有改變就不會再重新計算,用之前計算的結...