RocketMQ 訊息有序實現方案

2021-10-05 15:46:17 字數 1877 閱讀 1579

在整個mq集群內,訊息都是按照生產者的生產順序存放,消費者也是按此順序進行消費

優點:生產者不需要額外的處理,由mq集群保證訊息全域性有序

缺點:犧牲了高可用、效能

高可用mq集群的常見部署

全域性有序部署方案

為了保證mq集群內訊息有序,對於全域性有序的topic,只會在一台broker機器上建立乙個queue,而queue內的訊息是保證有序的,這樣生產者和消費都都只能按序消費

這種方案很明顯的缺點就是當broker2機器掛了,《topic-全域性有序》相關的業務將無法寫入和消費訊息,並且由於乙個queue只能由乙個consumer例項消費(clustering模式下),所以效能也就可想而知的差了

反觀topica,由於在broker1,broker2上都有建立queue,當其中一台broker不可用時,寫入和消費將由另外一台broker的其他queue接管,所以對於topica來說實現高可用和高效能

參見上圖中的topica部署

訊息在queue裡是保證有序的

優點:高可用,效能好

缺點:

生產者需要在生產訊息時做特殊處理

在可用queue數量變化情況下(例如broker1掛了),會導致短暫的訊息亂序

生產者處理**示例
message msg =

newmessage

("topictest"

, tags[i % tags.length]

,"key"

+ i, body.

getbytes()

);sendresult sendresult = producer.

send

(msg,

newmessagequeueselector()

}, orderlist.

get(i)

.getorderid()

);//訂單id

既然兩種方式都有各自的優缺點,那能不能結合一把,在大部分情況下保證訊息有序,在異常情況下通過業務方自己的控制處理達到高可用、高效能、訊息有序的效果呢?

假定場景
a元件負責使用者資訊修改,a通過mq將變更通知b元件,b元件收到訊息後進行業務處理

class

user

方案:時間戳+全量訊息體

在使用者資訊發生修改時,傳全量訊息體

例如只改了age屬性,但還是將id,name,age,address等所有屬性資訊一起放在訊息體中傳送到mq

b在收到這條訊息時,通過與本地db中該使用者的updatetime對比,若本地時間》訊息體中的updatetime,則這條訊息可能是舊訊息,可以選擇丟棄這條資訊,或者告警處理

問題上面的方案還是有問題的,如果a元件是分布式部署在不同機器上,兩次變更是在不同的例項上操作的

不巧的是這兩台機器的時間還是錯亂的,m1機器時間大於m2機器時間,第一次操作是在m1上進行產生msga(updatetime=100),第二次操作在m2上進行產生msgb(updatetime=90),此時就會導致b將msgb丟棄,造成異常。這種情況就類似全域性id生成的問題,由一台伺服器單點生成時間

RocketMQ事務訊息實現分析

這週rocketmq發布了4.3.0版本,new feature中最受關注的一點就是支援了事務訊息 今天花了點時間看了下具體的實現內容,下面是簡單的總結。通過馮嘉發布的 rocketmq 4.3正式發布,支援分布式事務 一文可以看到rocketmq採用了2pc的方案來提交事務訊息,同時增加乙個補償邏...

RocketMQ事務訊息實現分析

這週rocketmq發布了4.3.0版本,new feature中最受關注的一點就是支援了事務訊息 今天花了點時間看了下具體的實現內容,下面是簡單的總結。通過馮嘉發布的 rocketmq 4.3正式發布,支援分布式事務 一文可以看到rocketmq採用了2pc的方案來提交事務訊息,同時增加乙個補償邏...

RocketMQ訊息型別

普通資訊也叫做無序訊息,簡單來說就是沒有順序的訊息,producer 只管傳送訊息,consumer 只管接收訊息,至於訊息和訊息之間的順序並沒 可能先傳送的訊息先消費,也可能先傳送的訊息後消費。舉個簡單例子,producer 依次傳送 order id 為 1 2 3 的訊息到 broker,co...