RocketMQ訊息型別

2021-08-10 22:32:54 字數 3447 閱讀 4635

普通資訊也叫做無序訊息,簡單來說就是沒有順序的訊息,producer 只管傳送訊息,consumer 只管接收訊息,至於訊息和訊息之間的順序並沒***,可能先傳送的訊息先消費,也可能先傳送的訊息後消費。

舉個簡單例子,producer 依次傳送 order id 為 1、2、3 的訊息到 broker,consumer 接到的訊息順序有可能是 1、2、3,也有可能是 2、1、3 等情況,這就是普通資訊。

因為不需要保證訊息的順序,所以訊息可以大規模併發地傳送和消費,吞吐量很高,適合大部分場景。

**示例

public class producer catch (exception e)

}//傳送完訊息之後,呼叫shutdown()方法關閉producer

producer.shutdown();}}

public class consumer 

});//呼叫start()方法啟動consumer

consumer.start();

system.out.println("consumer started.");}}

有序訊息就是按照一定的先後順序的訊息型別。

舉個例子來說,producer 依次傳送 order id 為 1、2、3 的訊息到 broker,consumer 接到的訊息順序也就是 1、2、3 ,而不會出現普通資訊那樣的 2、1、3 等情況。

那麼有序訊息是如何保證的呢?我們都知道訊息首先由 producer 到 broker,再從 broker 到 consumer,分這兩步走。那麼要保證訊息的有序,勢必這兩步都是要保證有序的,即要保證訊息是按有序傳送到 broker,broker 也是有序將訊息投遞給 consumer,兩個條件必須同時滿足,缺一不可。

進一步還可以將有序訊息分成

之前我們講過,topic 只是訊息的邏輯分類,內部實現其實是由 queue 組成。當 producer 把訊息傳送到某個 topic 時,預設是會訊息傳送到具體的 queue 上。

全域性有序

舉個例子,producer 傳送 order id 為 1、2、3、4 的四條訊息到 topica 上,假設 topica 的 queue 數為 3 個(queue0、queue1、queue2),那麼訊息的分布可能就是這種情況,id 為 1 的在 queue0,id 為 2 的在 queue1,id 為 3 的在 queue2,id 為 4 的在 queue0。同樣的,consumer 消費時也是按 queue 去消費,這時候就可能出現先消費 1、4,再消費 2、3,和我們的預期不符。那麼我們如何實現 1、2、3、4 的消費順序呢?道理其實很簡單,只需要把訂單 topic 的 queue 數改為 1,如此一來,只要 producer 按照 1、2、3、4 的順序去傳送訊息,那麼 consumer 自然也就按照 1、2、3、4 的順序去消費,這就是全域性有序訊息。

由於乙個 topic 只有乙個 queue ,即使我們有多個 producer 例項和 consumer 例項也很難提高訊息吞吐量。就好比過獨木橋,大家只能乙個挨著乙個過去,效率低下。

那麼有沒有吞吐量和有序之間折中的方案呢?其實是有的,就是區域性有序訊息。

區域性有序

我們知道訂單訊息可以再細分為訂單建立、訂單付款、訂單完成等訊息,這些訊息都有相同的 order id。同時,也只有按照訂單建立、訂單付款、訂單完成的順序去消費才符合業務邏輯。但是不同 order id 的訊息是可以並行的,不會影響到業務。這時候就常見做法就是將 order id 進行處理,將 order id 相同的訊息傳送到 topicb 的同乙個 queue,假設我們 topicb 有 2 個 queue,那麼我們可以簡單的對 id 取餘,奇數的發往 queue0,偶數的發往 queue1,消費者按照 queue 去消費時,就能保證 queue0 裡面的訊息有序消費,queue1 裡面的訊息有序消費。

由於乙個 topic 可以有多個 queue,所以在效能比全域性有序高得多。假設 queue 數是 n,理論上效能就是全域性有序的 n 倍,當然 consumer 也要跟著增加才行。在實際情況中,這種區域性有序訊息是會比全域性有序訊息用的更多。

示例**

public class producer ;

// 傳送10條訊息到topic為topictestordered,tag為tags陣列按順序取值,

// key值為「key」拼接上i的值,訊息內容為「hello rocketmq」拼接上i的值

for (int i = 0; i < 10; i++)

}, orderid);

system.out.println(sendresult);

}orderedproducer.shutdown();

} catch (mqclientexception e) catch (remotingexception e) catch (mqbrokerexception e) catch (interruptedexception e) }}

至於是要實現全域性有序,還是區域性有序,在此示例**中,就取決於 topictestordered 這個 topic 的佇列數了。

public class consumer 

});//呼叫start()方法啟動consumer

consumer.start();

system.out.println("consumer started.");}}

延時訊息,簡單來說就是當 producer 將訊息傳送到 broker 後,會延時一定時間後才投遞給 consumer 進行消費。

rcoketmq的延時等級為:1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h。level=0,表示不延時。level=1,表示 1 級延時,對應延時 1s。level=2 表示 2 級延時,對應5s,以此類推。

這種訊息一般適用於訊息生產和消費之間有時間視窗要求的場景。比如說我們網購時,下單之後是有乙個支付時間,超過這個時間未支付,系統就應該自動關閉該筆訂單。那麼在訂單建立的時候就會就需要傳送一條延時訊息(延時15分鐘)後投遞給 consumer,consumer 接收訊息後再對訂單的支付狀態進行判斷是否關閉訂單。

設定延時非常簡單,只需要在message設定對應的延時級別即可:

message msg = new message("topictest",// topic

"taga",// tag

("hello rocketmq " + i).getbytes(remotinghelper.default_charset)// body

);// 這裡設定需要延時的等級即可

msg.setdelaytimelevel(3);

sendresult sendresult = producer.send(msg);

RocketMQ 事務訊息

一 事務訊息實現方式 應用使用事務訊息的步驟 1 應用傳送訊息,使用prepare欄位標示準備訊息 2 應用執行本地業務邏輯 3 應用傳送事務提交或回滾訊息 broker收到prepare訊息後會將topic替換為rmq sys trans half topic,queueid替換為0,然後寫入co...

rocketMq訊息查詢

最近有人問我知道rocketmq是怎麼查詢訊息的,我發現我貌似回答不上來,所以抽空就把這塊內容補充一下,主要是講清楚根據key查詢訊息和根據msgid查詢訊息兩塊內容。看下引數列表中我們可以看到 k指出了核心key的引數,指定了根據key查詢訊息的方法,這個命令返回的是msgid,據說還有一些坑,可...

順序訊息 RocketMQ

訊息有序指的是可以按照訊息的傳送順序來消費。rocketmq可以嚴格的保證訊息有序。但這個順序,不是全域性順序,只是分割槽 queue 順序。要全域性順序只能乙個分割槽。之所以出現你這個場景看起來不是順序的,是因為傳送訊息的時候,訊息傳送預設是會採用輪詢的方式傳送到不通的queue 分割槽 如圖 而...