rocketmq broker 訊息分發流程分析

2021-09-26 21:19:19 字數 1271 閱讀 8848

文章基於rocket-mq4.0 **分析

server接收到訊息後,會將訊息分發到對應的consumequeue中,等待client端拉取消費。

核心類:

org.apache.rocketmq.store.defaultmessagestore.reputmessageservice
類圖:

該類繼承   servicethread -runnable,是乙個執行緒類;在run方法裡會每間隔1ms執行方法:

org.apache.rocketmq.store.defaultmessagestore.reputmessageservice#doreput
首先會將  reputfromoffset 和  defaultmessagestore已經確認的offset作比較

if (defaultmessagestore.this.getmessagestoreconfig().isduplicationenable() //

&& this.reputfromoffset >= defaultmessagestore.this.getconfirmoffset())

如果當前值已經超過defaultmessagestore已經確認的值,則不會往下走了

然後通過當前的 reputfromoffset 去 commitlog 裡去獲取資料,如果獲取到了則繼續往下走

拿到資料最終呼叫 dodispatch 方法 處理 consumequeue 和 index 的重建

public void dodispatch(dispatchrequest req) 

//index的重建

if (defaultmessagestore.this.getmessagestoreconfig().ismessageindexenable())

}

往consumequeue(指定的queueid)裡放訊息,等待消費端的拉取

public void putmessagepositioninfo(string topic, int queueid, long offset, int size, long tagscode, long storetimestamp,

long logicoffset)

RocketMQ Broker的最佳實踐

翻譯自rocket官方文件 broker的方式有非同步主,同步主,或者從。如果不能容忍訊息丟失,建議以同步主從方式部署。如果對丟失沒那麼嚴格,但是希望高可用,可以部署為非同步主從,如果你想更簡單,可以使用非同步主而不需要從機。非同步刷盤是推薦的方式,因為同步刷盤太多的消耗而且造成大量的效能丟失,如果...

RocketMQ Broker 快速失敗機制

版本 4.2.0,原始碼 class檔案 broker的快速失敗機制是為了防止請求過載,導致broker處理請求效率變低,從而影響訊息的消費 broker啟動的時候會初始化broke stfailure this.broke stfailure new broke stfailure this th...

Rocketmq broker寫入能力檢視

如何證明 rocketmq 集群本身沒有問題呢?其實也很簡單,我們通常乙個常用的技巧是檢視 rocketmq 訊息寫入的效能,執行如下命令 cd logs rocketmqlogs grep pagecachert store.log more 其輸出的結果如下圖所示 在 rocketmq brok...