RocketMQ 客戶端最佳實踐

2021-09-23 17:58:13 字數 2857 閱讀 5638

本文站在消費者和生產者的角度給出一些rocketmq客戶端使用的實踐意見。

乙個應用盡可能用乙個topic,訊息子型別用tags來標識,tags可以由應用自由設定。只有傳送訊息設定了tags,消費方在訂閱訊息時,才可以利用tags在broker做訊息過濾。

message.settags("taga");
每個訊息在業務層面的唯一標識碼,要設定到keys欄位,方便將來定位訊息丟失問題。伺服器會為每個訊息建立索引(雜湊索引),應用可以通過topic,key來查詢這條訊息內容,以及訊息被誰消費。由於是雜湊索引,請務必保證key盡可能唯一,這樣可以避免潛在的雜湊衝突。

訊息傳送成功或者失敗,要列印訊息日誌,務必要列印sendresult和key欄位。

對於訊息不可丟失應用,務必要有訊息重發機制,例如如果訊息傳送失敗,儲存到資料庫,能有定時程式嘗試重發,或者人工觸發重發。

producer的send方法本身支援內部重試,重試邏輯如下:

所以,如果本身向broker傳送訊息產生超時異常,就不會再做重試。

以上策略仍然不能保證訊息一定傳送成功,為保證訊息一定成功,建議應用這樣做:

如果呼叫send同步方法傳送失敗,則嘗試將訊息儲存到db,由後台執行緒定時重試,保證訊息一定到達broker。

上述db重試方式為什麼沒有整合到mq客戶端內部做,而是要求應用自己去完成,我們基於以下幾點考慮:

mq的客戶端設計為無狀態模式,方便任意的水平擴充套件,且對機器資源的消耗僅僅是cpu、記憶體、網路。

如果mq客戶端內部整合乙個kv儲存模組,那麼資料只有同步落盤才能較可靠,而同步落盤本身效能開銷較大,所以通常會採用非同步落盤,又由於應用關閉過程不受mq運維人員控制,可能經常會發生kill -9這樣暴力方式關閉,造成資料沒有及時落盤而丟失。

producer所在機器的可靠性較低,一般為虛擬機器,不適合儲存重要資料。

綜上,建議重試過程交由應用來控制。

乙個rpc呼叫,通常是這樣乙個過程

客戶端傳送請求到伺服器

伺服器處理該請求

伺服器向客戶端返回應答

所以乙個rpc的耗時時間是上述三個步驟的總和,而某些場景要求耗時非常短,但是對可靠性要求並不高,例如日誌收集類應用,此類應用可以採用oneway形式呼叫,oneway形式只傳送請求不等待應答,而傳送請求在客戶端實現層面僅僅是乙個os系統呼叫的開銷,即將資料寫入客戶端的socket緩衝區,此過程耗時通常在微秒級。

rocketmq目前無法避免訊息重複,所以如果業務對消費重複非常敏感,務必要在業務層面去重,有以下幾種去重方式:

將訊息的唯一鍵,可以是msgid,也可以是訊息內容中的唯一標識字段,例如訂單id等,消費之前判斷是否在db或tair(全域性kv儲存)中存在,如果不存在則插入,並消費,否則跳過。(實際過程要考慮原子性問題,判斷是否存在可以嘗試插入,如果報主鍵衝突,則插入失敗,直接跳過)。msgid一定是全域性唯一識別符號,但是可能會存在同樣的訊息有兩個不同msgid的情況(有多種原因),這種情況可能會使業務上重複消費,建議最好使用訊息內容中的唯一標識欄位去重。

使用業務層面的狀態機去重。

消費並行度與消費吞吐量關係如下圖所示:

消費並行度與消費rt關係如下圖所示:

絕大部分訊息消費行為屬於io密集型,即可能是運算元據庫,或者呼叫rpc,這類消費行為的消費速度在於後端資料庫或者外系統的吞吐量,通過增加消費並行度,可以提高總的消費吞吐量,但是並行度增加到一定程度,反而會下降,如圖所示,呈現拋物線形式。所以應用必須要設定合理的並行度。cpu密集型應用除外。

修改消費並行度方法如下所示:

同乙個consumergroup下,通過增加consumer例項數量來提高並行度,超過訂閱佇列數的consumer例項無效。可以通過加機器,或者在已有機器啟動多個程序的方式。

提高單個consumer的消費並行執行緒,通過修改以下引數

consumethreadmin

consumethreadmax

某些業務流程如果支援批量方式消費,則可以很大程度上提高消費吞吐量,例如訂單扣款類應用,一次處理乙個訂單耗時1秒鐘,一次處理10個訂單可能也只耗時2秒鐘,這樣即可大幅度提高消費的吞吐量,通過設定consumer的consumemessagebatchmaxsize這個引數,預設是1,即一次只消費一條訊息,例如設定為n,那麼每次消費的訊息數小於等於n。

發生訊息堆積時,如果消費速度一直追不上傳送速度,可以選擇丟棄不重要的訊息,那麼如何判斷訊息是否有堆積情況呢,可以加入如下**邏輯:

public consumeconcurrentlystatus consumemessage(listmsgs, consumeconcurrentlycontext context) 

// todo 正常消費過程

return consumeconcurrentlystatus.consume_success;

}

如以上**所示,當某個佇列的訊息數堆積到100000條以上,則嘗試丟棄部分或全部訊息,這樣就可以快速追上傳送訊息的速度。

舉例如下,某條訊息的消費過程如下:

根據訊息從db查詢資料1

根據訊息從db查詢資料2

複雜的業務計算

向db插入資料3

向db插入資料4

這條訊息的消費過程與db互動了4次,如果按照每次5ms計算,那麼總共耗時20ms,假設業務計算耗時5ms,那麼總過耗時25ms,如果能把4次db互動優化為2次,那麼總耗時就可以優化到15ms,也就是說總體效能提高了40%。

rocketmq客戶端日誌配置

rocketmq客戶端會列印一些日誌如消費進度,心跳等,預設的是滾動10個日誌檔案,每個100mb,又一次看到這個日誌目錄有幾十gb,手動刪除,但是悲劇的是rocketmq沒有釋放這個日誌目錄,導致不能真正的釋放磁碟空間。於是研究了下怎麼配置rocketmq客戶端日誌配置。客戶端日誌配置client...

RocketMQ最佳實踐

rocketmq主要由nameserver broker producer以及consumer四部分構成,如下圖所示 所有的集群都具有水平擴充套件能力,無單點障礙。nameserver以輕量級的方式提供服務發現和路由功能,每個nameserver存有全量的路由資訊,提供對等的讀寫服務,支援快速擴縮容...

瘦客戶端 胖客戶端 智慧型客戶端

胖客戶端模式將應用程式處理分成了兩部分 由使用者的桌面計算機執行的處理和最適合乙個集中的伺服器執行的處理。乙個典型的胖客戶端包含乙個或多個在使用者的pc上執行的應用程式,使用者可以檢視並運算元據 處理一些或所有的業務規則 同時提供乙個豐富的使用者介面做出響應。伺服器負責管理對資料的訪問並負責執行一些...