如何保證rocketmq消費順序

2021-09-24 20:55:37 字數 2682 閱讀 3413

問題 : 我們知道訊息佇列可以在高併發的情況下,實現 削峰填谷,以及可以 非同步解偶。但是 某些業務場景下,我們需要 保證訊息是嚴格按照一定順序 去消費的,這時候我們要怎麼辦? 以rocketmq為例。

我們 知道 ,訊息從 producer 傳送到 broker 佇列中 ,一般是 輪訓傳送到 多個broker中的,而 consumer 消費拉取的時候 一般都是 同時拉取 多個queue的訊息消費,這樣就容易導致 ,消費的無序性,如下圖

舉例 : 電商平台中 訂單的建立 ,支付,推送 都需要按照嚴格的順序消費,那麼為了保證消費順序 我們如何操作? 

解決 : 1 我們要保證 producer傳送訊息時 需要 按順序傳送 ,並且傳送到 同乙個佇列 ,這裡我們可以使用 業務字段 id號或者 訂單號 按佇列數 取模的方法 ,通過messagequeueselector 來保證 同乙個訂單號的 訊息,按順序 傳送到 同乙個queue.

2 我們要保證 consumer 按順序 消費訊息 ,rocketmq中 messagelistenerconcurrently 是無法確保訊息消費順序的,只有 messagelistenerorderly 才可以保證消費順序。

**如下 :

rocketmq訊息生產端示例**如下:

/*** producer,傳送順序訊息

*/public class producer ;

// 訂單列表

listorderlist =  new producer().buildorders();

date date = new date();

******dateformat sdf = new ******dateformat("yyyy-mm-dd hh:mm:ss");

string datestr = sdf.format(date);

for (int i = 0; i < 10; i++)

}, orderlist.get(i).getorderid());//訂單id

system.out.println(sendresult + ", body:" + body);

}producer.shutdown();

} catch (mqclientexception e) catch (remotingexception e) catch (mqbrokerexception e) catch (interruptedexception e)

system.in.read();

}/**

* 生成模擬訂單資料 

*/private listbuildorders()

輸出:從圖中紅色框可以看出,orderid等於15103111039的訂單被順序放入queueid等於7的佇列。queueoffset同時在順序增長。

傳送時有序,接收(消費)時也要有序,才能保證順序消費。如下這段**演示了普通消費(非有序消費)的實現方式。

/*** 普通資訊消費

*/public class consumer

try catch (exception e)

return consumeconcurrentlystatus.consume_success;

}});

consumer.start();

system.out.println("consumer started.");}}

輸出:可見,訂單號為15103111039的訂單被消費時順序完成亂了。所以用messagelistenerconcurrently這種消費者是無法做到順序消費的,採用下面這種方式就做到了順序消費:

/*** 順序訊息消費,帶事務方式(應用可控制offset什麼時候提交)

*/public class consumerinorder

try catch (exception e)

return consumeorderlystatus.success;

}});

consumer.start();

system.out.println("consumer started.");}}

輸出:messagelistenerorderly能夠保證順序消費,從圖中我們也看到了期望的結果。圖中的輸出是只啟動了乙個消費者時的輸出,看起來訂單號還是混在一起,但是每組訂單號之間是有序的。因為訊息傳送時被分配到了三個佇列(參見前面生產者輸出日誌),那麼這三個佇列的訊息被這唯一消費者消費。

如果啟動2個消費者呢?那麼其中乙個消費者對應消費2個佇列,另乙個消費者對應消費剩下的1個佇列。

如果啟動3個消費者呢?那麼每個消費者都對應消費1個佇列,訂單號就區分開了。輸出變為這樣:

消費者1輸出:

消費者2輸出:

消費者3輸出:

很完美,有木有?!

按照這個示例,把訂單號取了做了乙個取模運算再丟到selector中,selector保證同乙個模的都會投遞到同一條queue。即: 相同訂單號的--->有相同的模--->有相同的queue。最後就會類似這樣:

總結:rocketmq的順序訊息需要滿足2點:

1.producer端保證傳送訊息有序,且傳送到同乙個佇列。

2.consumer端保證消費同乙個佇列。

RocketMQ 如何保證訊息順序消費

rocketmq支援區域性順序消費,但不支援全域性,換句話說針對topic中的每個queue是可以按照fifo進行消費。要保證乙個訂單有關的訊息順序消費,有兩點需要注意,一是將訂單有關的訊息傳送到相關topic中同乙個queue裡,二是消費者按照先進先出的原則進行消費。在訊息傳送時,需指定對應的me...

rocketmq如何保證訊息不丟失

一 大體可以從三方面來說 分別從producer傳送機制 broker的持久化機制,以及消費者的offset機制來最大程度保證訊息不易丟失 從producer的視角來看 如果訊息未能正確的儲存在mq中,或者消費者未能正確的消費到這條訊息,都是訊息丟失。從broker的視角來看 如果訊息已經存在bro...

RabbitMQ 如何保證消費不被重複消費?

為什麼會出現訊息重複?訊息重複的原因有兩個 1.生產時訊息重複,2.消費時訊息重複。由於生產者傳送訊息給mq,在mq確認的時候出現了網路波動,生產者沒有收到確認,實際上mq已經接收到了訊息。這時候生產者就會重新傳送一遍這條訊息。生產者中如果訊息未被確認,或確認失敗,我們可以使用定時任務 redis ...