Kafka 中的再均衡

2021-10-24 15:02:58 字數 4291 閱讀 8800

我們先回顧下,乙個主題可以有多個分割槽,而訂閱該主題的消費組中可以有多個消費者。每乙個分割槽只能被消費組中的乙個消費者消費,可認為每個分割槽的消費權只屬於消費組中的乙個消費者。但是世界是變化的,例如消費者會宕機,還有新的消費者會加入,而為了應對這些變化,讓分割槽所屬權的分配合理,這都需要對分割槽所屬權進行調整,也就是所謂的 「再均衡」。本文將對再均衡的相關知識進行詳細敘述。

首先,我們需要了解什麼情況下會觸發再均衡,在前文已經提到了消費者數量的變化,是需要再均衡的。在使用 kafka 時,除了消費者數量可能會變化,分割槽數量也同樣可能變化,我們可以人為的對分割槽數量進行修改,但是 kafka 只允許增加分割槽,所以我們只能把分割槽數量調大,不能調小,否則會收到invalidpartitionexception異常。關於為什麼不能減少分割槽,可參考下面的回答:

按 kafka 現有的**邏輯,此功能是完全可以實現的,不過也會使得**的複雜度急劇增大。實現此功能需要考慮的因素很多,比如刪除掉的分割槽中的訊息該作何處理?如果隨著分割槽一起消失則訊息的可靠性得不到保障;如果需要保留則又需要考慮如何保留。直接儲存到現有分割槽的尾部,訊息的時間戳就不會遞增,如此對於 spark、flink 這類需要訊息時間戳(事件時間)的元件將會受到影響;如果分散插入到現有的分割槽中,那麼在訊息量很大的時候,內部的資料複製會占用很大的資源,而且在複製期間,此主題的可用性又如何得到保障?與此同時,順序性問題、事務性問題、以及分割槽和副本的狀態機切換問題都是不得不面對的。反觀這個功能的收益點卻是很低,如果真的需要實現此類的功能,完全可以重新建立乙個分割槽數較小的主題,然後將現有主題中的訊息按照既定的邏輯複製過去即可。

簡單來說,就是做這個功能需要考慮很多因素,這樣會把**弄的很複雜,而收益卻很低,而且存在替代方案來實現該效果,建立乙個分割槽數小的主題,再把當前主題遷移過去。

除了消費者、分割槽數量的變化,還有一種情況,也需要進行再均衡。當消費者訂閱主題時使用的是正規表示式,例如 「test.*」,表示訂閱所有以 test 開頭的主題,當有新的以 test 開頭的主題被建立時,則需要通過再均衡將該主題的分割槽分配給消費者。

再均衡的三種觸發時機,我們已經清楚了,下面我們看下再均衡是如何實現的。

再均衡,將分割槽所屬權分配給消費者。因此需要和所有消費者通訊,這就需要引進乙個協調者的概念,由協調者為消費組服務,為消費者們做好協調工作。在 kafka 中,每一台 broker 上都有乙個協調者元件,負責組成員管理、再均衡和提交位移管理等工作。如果有 n 臺 broker,那就有 n 個協調者元件,而乙個消費組只需乙個協調者進行服務,那該 ** 由哪個 broker 為其服務?** 確定 broker 需要兩步:

計算分割槽號

partition = math.abs(groupid.hashcode() % offsetstopicpartitioncount)

根據groupid的雜湊值,取餘offsetstopicpartitioncount(內部主題__consumer_offsets的分割槽數,預設 50)的絕對值,其意思就是把消費組雜湊雜湊到內部主題__consumer_offsets的乙個分割槽上。確定協調者為什麼要和內部主題扯上關係。這就跟協調者的作用有關了。協調者不僅是負責組成員管理和再均衡,在協調者中還需要負責處理消費者的偏移量提交,而偏移量提交則正是提交到__consumer_offsets的乙個分割槽上。所以這裡需要取餘offsetstopicpartitioncount來確定偏移量提交的分割槽。

找出分割槽 leader 副本所在的 broker

確定了分割槽就簡單了,分割槽 leader 副本所在的 broker 上的協調者,就是我們要找的。

這個演算法通常用於幫助定位問題。當乙個消費組出現問題時,我們可以先確定協調者的 broker,然後檢視 broker 端的日誌來定位問題。

協調者,我們確定了。那協調者和消費者之間是如何互動的?協調者如何掌握消費者的狀態,又如何通知再均衡。這裡使用了心跳機制。在消費者端有乙個專門的心跳執行緒負責以heartbeat.interval.ms的間隔頻率傳送心跳給協調者,告訴協調者自己還活著。同時協調者會返回乙個響應。而當需要開始再均衡時,協調者則會在響應中加入rebalance_in_progress,當消費者收到響應時,便能知道再均衡要開始了。

由於再平衡的開始依賴於心跳的響應,所以heartbeat.interval.ms除了決定心跳的頻率,也決定了再均衡的通知頻率。

現在我們再重新看下,觸發再均衡的時機,前面說到有三種情況觸發再均衡,分別是消費者數量的增加或減少、分割槽數的增加和新建立主題,其中消費者數量增加、分割槽數增加和新建立主題,這都是必須是人為操作,算是計畫內的再均衡。而消費者數量減少則除了是人為操作,也可能因為其他原因導致,屬於計畫之外的再均衡,這是我們需要關心的,畢竟再均衡的開銷還是很大的,所有消費者都會停止工作,所以我們應盡量避免不必要的再均衡。下面我們看下影響消費者數量減少的引數有哪些:

session.timeout.ms:broker 端引數,消費者的存活時間,預設 10 秒,如果在這段時間內,協調者沒收到任何心跳,則認為該消費者已崩潰離組;

heartbeat.interval.ms:消費者端引數,傳送心跳的頻率,預設 3 秒;

max.poll.interval.ms:消費者端引數,兩次呼叫 poll 的最大時間間隔,預設 5 分鐘,如果 5 分鐘內無法消費完,則會主動離組。

session.timeout.ms ≥ 3 * heartbeat.interval.ms

為盡量避免因為偶發的網路原因,心跳無法到達協調者,在超時之前,應至少能傳送 3 輪心跳。再給出乙個經驗值的設定:session.timeout.ms=6sheartbeat.interval.ms=2s

max.poll.interval.ms的設定,則主要和下游處理時間有關,例如下游處理時間需要 6 分鐘,那按預設值是不合理的,消費者會頻繁主動離組。所以需要把值設定的比下游處理時間大一點,避免不必要的再均衡。

這一小節主要講了協調者如何通知消費者開始再均衡,以及如何設定引數避免不必要的再均衡,下面我們看下再均衡的流程是怎麼樣的。

當消費者收到協調者的再均衡開始通知時,需要立即提交偏移量

消費者在收到提交偏移量成功的響應後,再傳送 joingroup 請求,重新申**入組,請求中會含有訂閱的主題資訊;

當協調者收到第乙個 joingroup 請求時,會把發出請求的消費者指定為leader 消費者,同時等待rebalance.timeout.ms,在收集其他消費者的 joingroup 請求中的訂閱資訊後,將訂閱資訊放在 joingroup 響應中傳送給 leader 消費者,並告知他成為了 leader,同時也會傳送成功入組的 joingroup 響應給其他消費者;

leader 消費者收到 joingroup 響應後,根據消費者的訂閱資訊制定分配方案,把方案放在syncgroup 請求中,傳送給協調者。普通消費者在收到響應後,則直接傳送 syncgroup 請求,等待 leader 的分配方案;

協調者收到分配方案後,再通過syncgroup 響應把分配方案發給所有消費組。

當所有消費者收到分配方案後,就意味著再均衡的結束,可以正常開始消費工作了。

《深入理解 kafka》

《kafka 核心技術與實戰》

kafka 之 group 狀態變化分析及 rebalance 過程: https://matt3

3.com/2017/01/16/kafka-group/#consumer- 初始化時 -group- 狀態變化

Kafka消費者組再均衡問題

kafka消費者組再均衡問題 harvardfly 在kafka中,當有新消費者加入或者訂閱的topic數發生變化時,會觸發rebalance 再均衡 在同乙個消費者組當中,分割槽的所有權從乙個消費者轉移到另外乙個消費者 機制,rebalance顧名思義就是重新均衡消費者消費。rebalance的過...

再識負載均衡

以上結構是一般的分布式架構的組成,所有標記紅點的位置就是我們可以運用負載均衡的地方,顯然在使用者的請求和應用之間我們需要乙個反向 來實現負載均衡,這裡一般是七層負載均衡.還有一些地方只是需要簡單的修改ip來達到負載均衡,想象這樣乙個場景,就是所有的使用者對於乙個公司只是知道乙個ip,但是向這個ip發...

kafka的leader均衡機制

kafka的leader的均衡機制 當乙個broker停止或者crashes時,所有本來將它作為leader的分割槽將會把leader轉移到其他broker上去,極端情況下,會導致同乙個leader管理多個分割槽,導致負載不均衡,同時當這個broker重啟時,如果這個broker不再是任何分割槽的l...