Kafka 從Consumer消費能力低下談起

2021-09-03 07:35:05 字數 2872 閱讀 5216

近期在生產環境發下日誌入庫延遲,導致很多準實時的監控圖表獲取不到資訊,這問題以前沒有出現過,可能跟最近業務量上公升有關,畢竟日均小兩億的平台了。梳理系統架構發現,日誌是快取在kafka中,由乙個後台程序task從kafka中消費,存放到資料庫中的,日誌入庫延遲,跟task關係很大。由於之前對kafka了解很少,以此為契機進行下學習,並以此部落格為記錄,溫故知新。

什麼是kafka呢?

官方說kafka是乙個高吞吐量的分布式發布訂閱訊息系統。它有很多術語,比如producer,consumer,broker,如下簡單解釋下。

producer:訊息的生產者,向kafka傳送訊息的客戶端

consumer:訊息的消費者,從kafka接取訊息的客戶端

broker:kafka集群中的單個例項

group:特指consumer group,多個consumer可以組成乙個組,每個訊息只能被組中的乙個consumer消費,如果乙個訊息可以被多個consumer消費的話,那麼這些consumer必須在不同的組,這個特性很重要

topic:kafka主題,乙個topic可以認為是一類訊息

partition:分割槽,乙個topic又可以分成多個分割槽,乙個分割槽的訊息只能被組內乙個consumer消費

offset:偏移量,offset為乙個long型數字,它是唯一標記一條訊息。它唯一的標記一條訊息,我把他理解成游標卡尺的游標,標記訊息的位置

接著我們來看乙個簡單的kafka系統結構:

需要補充的是,kafka本身不維護producer和consumer的資訊,這些資訊是由zookeeper來儲存維護的,所以producer和consumer的加入離開,不會影響到kafka集群本身

kafka一些特點

訊息狀態:

訊息狀態儲存在consumer中,broker不關心訊息是否消費被誰消費了,只會記錄乙個偏移量(offset),正常情況下,broker中訊息將會被順序消費,consumer驅動offset線性向前移動

支援批量傳送:

kafka支援以訊息集合為單位進行批量傳送,以提高push效率

push-and-pull :

kafka中的producer和consumer採用的是push-and-pull模式,即producer只管向broker

push訊息,consumer只管從broker pull訊息,兩者對訊息的生產和消費是非同步的。

分割槽機制partition:

kafka的broker端支援訊息分割槽,producer可以決定把訊息發到哪個分割槽,在乙個分割槽中訊息的順序就是producer傳送訊息的順序,乙個topic中可以有多個分割槽,具體分割槽的數量是可配置的。

producer將訊息發布到指定的topic中,同時producer也能決定將此訊息歸屬於哪個partition;比如基於」round-robin"方式或者通過其他的一些演算法等。本質上kafka只支援topic.每個consumer屬於乙個consumer group;反過來說,每個group中可以有多個consumer.傳送到topic的訊息,只會被訂閱此topic的每個group中的乙個consumer消費。這一句很重要,只會被訂閱topic的每個group中的乙個consumer消費!

上面提到,乙個topic的訊息,會在乙個group中的consumer中負載均衡。乙個topic中的每個partions,只會被乙個"訂閱者"中的乙個consumer消費,不過乙個consumer可以消費多個partitions中的訊息。

partitions和consumer之間的關係如下:

設定乙個topic

partition情形: p0, p1, p2, p3,

consumer情形(同group): c0, c1,c2

那麼如果按範圍分配策略,分配結果是: c0: p0, c1: p1, c2: p2, p3

如果按輪詢分配策略,分配結果是: c0: p1,p3, c1: p1, c2: p2

分配過程:

常見問題

提示很明顯了,心跳檢測失敗,kafka認為這個consumer已經掛掉了,於是將其移除group,重新對partitions和組員進行分配(rebanlence),但實際上這個consumer還在處理,待它處理完提交offset時,發現,哎,我的分割槽呢?失敗!

對於這個問題,主要思路有如下幾個:

1,提高心跳檢測的超時時間,session.timeout.ms:超時時間設定大一點

2,減小每次取kafka的訊息大小,max.partition.fetch.bytes:一次拉取最大byte這個屬性 小一點.預設1m

3,提高partitions的例項數量(和consumer匹配,以》=consumer數量為宜)

4,提高consumer效率,取資料和處理資料非同步,consumer中不要有額外的時間消耗,以免影響取資料或心跳檢測

5,在網上資料中,還有提到,如果用的是spring-kafka的,可以關閉掉自動提交的設定enable.auto.commit

kafka動態修改 consumer

在新版本kafka中,consumer offsets這個topic是存放消費者偏移量的,但是該主題預設配置副本數量只有1,容易造成單點故障,我們可以動態修改 無需重啟服務 副本因子,提高kafka的可靠性 1.1動態地增加相關主題的副本數非常的簡單,同樣是使用kafka reassign part...

Kafka學習整理五 Consumer配置

property default description group.id 用來唯一標識consumer程序所在組的字串,如果設定同樣的group id,表示這些processes都是屬於同乙個consumer group zookeeper.connect 指定zookeeper的連線的字串,格式...

Kafka的Consumer負載均衡演算法

有乙個topic test,然後這個topic的partition和他們所在的broker的圖如下 1.其中 broker有兩個,也就是伺服器有兩台。2.partition有6個,分布按照如圖所示,按照雜湊取模的演算法分配。3.消費者有8個,他們屬於同乙個消費組。複製 如果按照如圖所示,那麼這乙個消...