RocketMQ 9 訊息堆積與消費延遲

2022-09-20 10:21:11 字數 3189 閱讀 2251

訊息處理流程中,如果consumer的消費速度跟不上producer的傳送速度,mq中未處理的訊息會越來越多(進的多出的少),這部分訊息就被稱為堆積訊息。訊息出現堆積進而會造成訊息的消費延遲。 以下場景需要重點關注訊息堆積和消費延遲問題:

consumer使用長輪詢pull模式消費訊息時,分為以下兩個階段:

1.訊息拉取

consumer通過長輪詢pull模式批量拉取的方式從服務端獲取訊息,將拉取到的訊息快取到本地緩衝佇列中。對於拉取式消費,在內網環境下會有很高的吞吐量,所以這一階段一般不會成為訊息堆積的瓶頸。

乙個單執行緒單分割槽的低規格主機(consumer,4c8g),其可達到幾萬的tps。如果是多個分割槽多 個執行緒,則可以輕鬆達到幾十萬的tps。

2. 訊息消費consumer將本地快取的訊息提交到消費執行緒中,使用業務消費邏輯對訊息進行處理,處理完畢後獲取 到乙個結果。這是真正的訊息消費過程。此時consumer的消費能力就完全依賴於訊息的消費耗時和消 費併發度了。

如果由於業務處理邏輯複雜等原因,導致處理單條訊息的耗時較長,則整體的訊息吞吐量肯定不會高,此時就會導致consumer本地緩衝佇列達到上限,停止從服務端拉取訊息。

結論: 訊息堆積的主要瓶頸在於客戶端的消費能力,而消費能力由消費耗時和消費併發度決定。注意,消費 耗時的優先順序要高於消費併發度。即在保證了消費耗時的合理性前提下,再考慮消費併發度問題。

影響訊息處理時長的主要因素是**邏輯。而**邏輯中可能會影響處理時長**主要有兩種型別:cpu內部計算型**外部i/o操作型**。

通常情況下**中如果沒有複雜的遞迴和迴圈的話,內部計算耗時相對外部i/o操作來說幾乎可以忽略。所以外部io型**是影響訊息處理時長的主要癥結所在。

外部io操作型**舉例:

關於下游系統呼叫邏輯需要進行提前梳理,掌握每個呼叫操作預期的耗時,這樣做是為了能夠 判斷消費邏輯中io操作的耗時是否合理。通常訊息堆積是由於下游系統出現了服務異常或達到 了dbms容量限制,導致消費耗時增加。

一般情況下,消費者端的消費併發度由單節點執行緒數和節點數量共同決定,其值為單節點執行緒數*節點數量

不過,通常需要優先調整單節點的執行緒數,若單機硬體資源達到了上限,再需要通過橫向擴充套件來提高消費併發度。

單節點執行緒數,即單個consumer所包含的執行緒數量

節點數量,即consumer group所包含的consumer數量

對於普通資訊、延時訊息及事務訊息,併發度計算都是單節點執行緒數*節點數量

但對於順序 訊息則是不同的。

順序訊息的消費併發度等於topic的queue分割槽數量。

1)全域性順序訊息:該型別訊息的topic只有乙個queue分割槽(一台機器)。其可以保證該topic的所有訊息被順序消費。

為了保證這個全域性順序性,consumer group中的任何乙個機器在同一時刻只能有乙個consumer消費者進行消費。所以其併發度為1。

2)分割槽順序訊息:該型別訊息的topic有多個queue分割槽(乙個topic下的queue分布在集群中的多個broker中)。

其僅可以保證該topic的某台broker的訊息被順序消費,不能保證整個topic中訊息的順序消費。

為了保證這個分割槽順序性, 每個queue分割槽中的訊息在consumer group中的同一時刻只能有乙個consumer的乙個執行緒進行 消費。即,在同一時刻最多會出現多個queue分蘖有多個consumer的多個執行緒並行消費。所以其併發度為topic的分割槽數量(該topic的broker分割槽數量)。

對於一台主機中線程池中線程數的設定需要謹慎,不能盲目直接調大執行緒數,設定過大的執行緒數反而會

帶來大量的執行緒切換的開銷。理想環境下單節點的最優執行緒數計算模型為:c *(t1 + t2)/ t1。

最優執行緒數 = c **(t1 + t2)/ t1 = c * t1/t1 + c * t2/t1 = c + c * t2/t1*

注意,該計算出的數值是理想狀態下的理論資料,在生產環境中,不建議直接使用。而是根據

當前環境,先設定乙個比該值小的數值然後觀察其壓測效果,然後再根據效果逐步調大執行緒

數,直至找到在該環境中效能最佳時的值。

為了避免在業務使用時出現非預期的訊息堆積和消費延遲問題,需要在前期設計階段對整個業務邏輯進

行完善的排查和梳理。其中最重要的就是梳理訊息的消費耗時和設定訊息消費的併發度。

梳理訊息的消費耗時

通過壓測獲取訊息的消費耗時,並對耗時較高的操作的**邏輯進行分析。梳理訊息的消費耗時需要關

注以下資訊:

設定消費併發度

對於訊息消費併發度的計算,可以通過以下兩步實施

節點數 = 流量峰值 / 單個節點訊息吞吐量

訊息被消費過後會被清理掉嗎?不會的。

訊息是被順序儲存在commitlog檔案的,且訊息大小不定長,所以訊息的清理是不可能以訊息為單位進 行清理的,而是以commitlog檔案為單位進行清理的。否則會急劇下降清理效率,並且實現起來邏輯複雜。

commitlog檔案存在乙個過期時間,預設為72小時,即三天。除了使用者手動清理外,在以下情況下也 會被自動清理,無**件中的訊息是否被消費過:

需要注意以下幾點:

1)對於rocketmq系統來說,刪除乙個1g大小的檔案,是乙個壓力巨大的io操作。在刪除過程 中,系統效能會驟然下降。所以,其預設清理時間點為凌晨4點,訪問量最小的時間。也正因如 果,我們要保障磁碟空間的空閒率,不要使系統出現在其它時間點刪除commitlog檔案的情況。

2)官方建議rocketmq服務的linux檔案系統採用ext4。因為對於檔案刪除操作,ext4要比ext3性 能更好

RabbitMQ訊息堆積問題?

有時可能因為消費者自身 問題,導致沒辦法正常消費訊息,那麼就會導致訊息佇列中會堆積大量的訊息 或因為同一時間來了非常多的訊息,消費者沒辦法及時消費,導致訊息佇列中堆積了大量訊息。1.去優化消費者 提高消費能力。減少消費時間 2.可以給消費設定年齡 生命週期 如果超時就丟棄掉。可以不讓訊息大量堆積在訊...

RocketMQ訊息型別

普通資訊也叫做無序訊息,簡單來說就是沒有順序的訊息,producer 只管傳送訊息,consumer 只管接收訊息,至於訊息和訊息之間的順序並沒 可能先傳送的訊息先消費,也可能先傳送的訊息後消費。舉個簡單例子,producer 依次傳送 order id 為 1 2 3 的訊息到 broker,co...

RocketMQ 事務訊息

一 事務訊息實現方式 應用使用事務訊息的步驟 1 應用傳送訊息,使用prepare欄位標示準備訊息 2 應用執行本地業務邏輯 3 應用傳送事務提交或回滾訊息 broker收到prepare訊息後會將topic替換為rmq sys trans half topic,queueid替換為0,然後寫入co...