rocketmq 順序消費

2021-09-29 12:44:31 字數 3854 閱讀 3502

org.apache.rocketmq

rocketmq-spring-boot-starter

2.0.3

有序訊息需要生產者,消費者一起配合,生產者要保證每次訊息都要投遞到broker的同乙個佇列裡,消費者需要設定

關鍵點 生產這傳送 同步訊息且指定佇列 syncsendorderly

消費者指定順序消費 consumemode = consumemode.orderly

生產者

@resource

private rocketmqtemplate rocketmqtemplate;

for (int i = 0; i < 10; i++) else

}

日誌

messagequeue [topic=string-topic, brokername=localhost.localdomain, queueid=0]

messagequeue [topic=string-topic, brokername=localhost.localdomain, queueid=3]

messagequeue [topic=string-topic, brokername=localhost.localdomain, queueid=0]

messagequeue [topic=string-topic, brokername=localhost.localdomain, queueid=3]

messagequeue [topic=string-topic, brokername=localhost.localdomain, queueid=0]

messagequeue [topic=string-topic, brokername=localhost.localdomain, queueid=3]

messagequeue [topic=string-topic, brokername=localhost.localdomain, queueid=0]

messagequeue [topic=string-topic, brokername=localhost.localdomain, queueid=3]

messagequeue [topic=string-topic, brokername=localhost.localdomain, queueid=0]

messagequeue [topic=string-topic, brokername=localhost.localdomain, queueid=3]

消費者1

@service

@rocketmqmessagelistener(topic = "$", consumergroup = "string_consumer",consumemode = consumemode.orderly)

public class stringconsumer implements rocketmqlistener, rocketmqpushconsumerlifecyclelistener

@override

public void preparestart(defaultmqpushconsumer defaultmqpushconsumer)

}

消費者2

@service

@rocketmqmessagelistener(topic = "$", consumergroup = "string_consumer",consumemode = consumemode.orderly)

public class stringconsumer2 implements rocketmqlistener, rocketmqpushconsumerlifecyclelistener

@override

public void preparestart(defaultmqpushconsumer defaultmqpushconsumer)

}

結果

*****==msg0

-----------msgaa1

2019-11-11 10:49:28.167 info 1977 --- [messagethread_8] a.r.s.s.defaultrocketmqlistenercontainer : consume c0a8016109ca18b4aac235d2330e0000 cost: 7 ms

2019-11-11 10:49:28.167 info 1977 --- [messagethread_6] a.r.s.s.defaultrocketmqlistenercontainer : consume c0a8016109ca18b4aac235d2331f0002 cost: 7 ms

-----------msgaa3

*****==msg2

2019-11-11 10:49:28.210 info 1977 --- [messagethread_8] a.r.s.s.defaultrocketmqlistenercontainer : consume c0a8016109ca18b4aac235d233270004 cost: 0 ms

*****==msg4

2019-11-11 10:49:28.210 info 1977 --- [messagethread_6] a.r.s.s.defaultrocketmqlistenercontainer : consume c0a8016109ca18b4aac235d2332b0006 cost: 0 ms

2019-11-11 10:49:28.212 info 1977 --- [messagethread_8] a.r.s.s.defaultrocketmqlistenercontainer : consume c0a8016109ca18b4aac235d233300009 cost: 0 ms

-----------msgaa5

*****==msg6

2019-11-11 10:49:28.214 info 1977 --- [messagethread_6] a.r.s.s.defaultrocketmqlistenercontainer : consume c0a8016109ca18b4aac235d23334000c cost: 0 ms

-----------msgaa7

2019-11-11 10:49:28.216 info 1977 --- [messagethread_6] a.r.s.s.defaultrocketmqlistenercontainer : consume c0a8016109ca18b4aac235d2333e0010 cost: 0 ms

2019-11-11 10:49:28.216 info 1977 --- [messagethread_8] a.r.s.s.defaultrocketmqlistenercontainer : consume c0a8016109ca18b4aac235d2333a000e cost: 0 ms

-----------msgaa9

*****==msg8

2019-11-11 10:49:28.217 info 1977 --- [messagethread_8] a.r.s.s.defaultrocketmqlistenercontainer : consume c0a8016109ca18b4aac235d233410012 cost: 0 ms

2019-11-11 10:49:28.217 info 1977 --- [messagethread_6] a.r.s.s.defaultrocketmqlistenercontainer : consume c0a8016109ca18b4aac235d233440014 cost: 0 ms

rocketmq實現順序消費

訊息有序指的是可以按照訊息的傳送順序來消費。rocketmq可以嚴格的保證訊息有序。但這個順序,不是全域性順序,只是分割槽 queue 順序。要全域性順序只能乙個分割槽。之所以出現你這個場景看起來不是順序的,是因為傳送訊息的時候,訊息傳送預設是會採用輪詢的方式傳送到不通的queue 分割槽 如圖 而...

如何保證rocketmq消費順序

問題 我們知道訊息佇列可以在高併發的情況下,實現 削峰填谷,以及可以 非同步解偶。但是 某些業務場景下,我們需要 保證訊息是嚴格按照一定順序 去消費的,這時候我們要怎麼辦?以rocketmq為例。我們 知道 訊息從 producer 傳送到 broker 佇列中 一般是 輪訓傳送到 多個broker...

RocketMQ 如何保證訊息順序消費

rocketmq支援區域性順序消費,但不支援全域性,換句話說針對topic中的每個queue是可以按照fifo進行消費。要保證乙個訂單有關的訊息順序消費,有兩點需要注意,一是將訂單有關的訊息傳送到相關topic中同乙個queue裡,二是消費者按照先進先出的原則進行消費。在訊息傳送時,需指定對應的me...