深入研究RocketMQ消費者是如何獲取訊息的

2021-10-24 14:58:49 字數 3513 閱讀 7531

小夥伴們,國慶都過的開心嗎?國慶後的第乙個工作日是不是很多小夥伴還沉浸在假期的心情中,沒有工作狀態呢?

那王子今天和大家聊一聊rocketmq的消費者是如何獲取訊息的,通過學習知識來找回狀態吧。

廢話不多說,我們開始吧。

首先我們了解乙個概念,什麼是消費者組

消費者組你就可以把它理解為,給一組消費者起乙個名字。

假設我們有乙個訂單topic名字是ordertopic,然後庫存系統和積分系統都要消費這個topic中的資料,我們分別給庫存系統和積分系統起乙個消費組名字:stock_consumer_group、score_consumer_group。

設定消費者組名字是在**中實現的,如下:

defaultmqpushconsumer consumer = new defaultmqpushconsumer("stock_consumer_group");
比如我們的庫存系統提供了2臺機器,每台機器上的消費者組名字都是stock_consumer_group,那麼這2臺機器就是乙個消費者組。

大體結構如上圖所示,那麼當訂單系統傳送訊息到ordertopic中後,庫存系統和積分系統是如何進行消費的呢?

預設情況下,這條訊息傳送到broker後,庫存系統和積分系統都會拉取這條訊息,而且庫存系統的兩台機器中只有一台會消費到這條訊息,積分系統也一樣。

這就是消費組的概念,不同的系統設定不同的消費組,如果不同的消費組訂閱了同乙個topic,那麼對於topic中的一條訊息,每個消費組都會獲取到這條訊息。

接下來我們思考乙個問題,對於消費者組而言,當它獲取到一條訊息後,假設消費者組內有多台機器,那麼到底是只有一台機器獲取到訊息,還是所有機器都獲取到訊息呢?

這其實是消費的兩種模式,集群模式和廣播模式

預設情況下我們都是使用的集群模式,也就是說消費者組收到訊息後,只有其中的一台機器會接收到訊息。

我們可以手動指定為廣播模式。

consumer.setmessagemodel(messagemodel.broadcasting)
指定為廣播模式後,消費者組內的每台機器都會收到這條訊息。

具體要根據業務場景選擇消費模式。

接著我們想一下,對於乙個topic下的多個messagequeue,消費者組中的多台機器是如何消費的呢?

這部分內容底層實現是很複雜的,我們可以簡單的理解為它會均勻的將多個messagequeue分配給消費者組中的多台機器消費。

舉個例子,假如我們的ordertopic有四個messagequeue,這4個messagequeue分布在兩台masterbroker上,每個masterbroker上有兩個messagequeue。

然後庫存系統作為乙個消費者組有兩台機器,那麼最好的分配方式就是每台消費者機器負責兩個messagequeue,這樣就實現了機器的負載消費,示意圖如下:

所以我們可以大致的認為,乙個topic中的多個messagequeue會被均勻的分布給乙個消費者組中的多台機器進行消費,這裡要注意一點,乙個messagequeue只能被一台消費者機器消費,但是一台消費者機器可以同時負責處理多個messagequeue。

那麼當消費者組中的機器數量發生變化時,是怎麼處理的。

機器數量發生變化一般就兩種情況,一種是有機器宕機了,另一種是增加機器進行集群擴容了。

其實這種情況下是會進行rebalance環節的,也就是會重新分配每個消費者機器要處理的messagequeue。

不知道小夥伴們還記不記得,在之前的文章rocketmq的傳送模式和消費模式中,我們已經用**說明了消費者的兩種消費模式:push和pull,當時只提供了push消費的**,而沒有提供pull消費的**。

其實這兩種模式本質上是一樣的,都是消費者主動發出請求到broker上拉取訊息。

push模式的底層也是通過消費者主動拉取的方式來實現的,只不過它的名字叫push而已,意思是broker盡可能實時的推送訊息給消費者。

我們一般在使用rocketmq的時候,消費模式基本都是使用的push模式,因為pull模式真的使用起來**特別複雜,而且push模式的底層還是pull模式,只是對時效性有了更好的支援。

push模式大體實現思路是這樣的:當消費者傳送請求到broker拉取訊息的時候,如果有新的訊息可以消費,會立馬返回訊息到消費者進行消費,消費後會接著傳送請求到broker拉取訊息。

也就說push模式下,處理完一批訊息後會理解再傳送請求給broker拉取下一批訊息,所以時效性更好,看起來就像是broker在實時推送訊息。

當請求傳送到broker發現沒有需要消費的訊息時,就會讓請求執行緒掛起,預設掛起15秒,然後會有另乙個後台執行緒每隔一段時間判斷一下是否有新訊息需要消費,一旦發現了新的訊息,就會去喚醒掛起的執行緒,將訊息返回給消費者進行消費,然後消費完畢再次傳送請求拉取訊息。

這一部分的原始碼實現是很複雜的,我們只要了解它的核心思路就可以了。就算是push模式,本質上也是對pull模式的一種封裝

接下來我們來聊聊broker是如何讀取訊息返回給消費者的。之前的文章深入研究broker是如何持久化的中我們已經知道了broker是如何持久化訊息的,小夥伴們可以複習一下。

那麼當消費者傳送請求到broker中拉取訊息時,假設是第一次拉取,就會從messagequeue中的第一條訊息開始拉取。

如何定位到第一條訊息的位置呢,首先broker會找到messagequeue對應的consumerqueue,從裡面找到這條訊息的offset,然後通過offset去commitlog中讀取訊息資料,把訊息返回給消費者。

當消費者消費完這條訊息後,會提交乙個消費的進度給broker,broker會記錄下乙個consumeroffset來標記我們的消費進度。

下次消費者再去這個messagequeue中拉取訊息時,就會從記錄的消費位置繼續拉取訊息,而不用從頭獲取了。

好了,到這裡本篇文章就結束了。

沒有從國慶中收回心的小夥伴們(ps:王子也一樣沒有進入狀態(`・ω・´))就與王子一起通過學習找回狀態吧。

往期文章推薦:

什麼是訊息中介軟體?主要作用是什麼?

常見的訊息中介軟體有哪些?你們是怎麼進行技術選型的?

你懂rocketmq 的架構原理嗎?

聊一聊rocketmq的註冊中心nameserver

broker的主從架構是怎麼實現的?

rocketmq生產部署架構如何設計

rabbitmq和kafka的高可用集群原理

rocketmq的傳送模式和消費模式

討論一下秒殺系統的技術難點與解決方案

秒殺系統中的扣減庫存和流量削峰

深入研究rocketmq生產者傳送訊息的底層原理

深入研究broker是如何持久化的

dledger是如何實現主從自動切換的

RocketMQ消費者實踐

最近工作中用到了rocketmq,現記錄下,如何正確實現消費 防止重複消費 如何快速消費 消費失敗如何處理 重複消費會造成資料不一致等問題。所以,消費者要做到消費冪等。1 每次消費,記錄messageid 如果再次消費該message,查詢messageid是否已存在,已存在,就跳過消費 2 使用具...

RocketMQ 消費者核心配置詳解

topic 下佇列的奇偶數會影響 customer 個數裡面的消費數量 如果是4個佇列,8個訊息,4個節點則會各消費2條,如果不對等,則負載均衡會分配不均。如果 consumer 例項的數量比 message queue 的總數量還多的話,多出來的 consumer 例項將無法分到 queue,也就...

RocketMQ建立多個消費者問題分析

在乙個程序中同乙個消費組建立多個消費者會出現the consumer group groupname has been created before,specify another name please.defaultmqpushconsumer consumer1 new defaultmqpu...