kafka 訊息的分割槽分配策略

2021-10-24 02:20:28 字數 3911 閱讀 7458

訊息的消費原理

觸發分割槽分配策略的條件

topic

在kafka 中,topic是乙個儲存訊息的邏輯概念,可以認為是乙個訊息的集合。每條訊息傳送到 kafka 集群的訊息都有乙個類別。每個topic可以有多個生產者向他傳送訊息,也可以有多個消費者去消費訊息

partition

每個topic 可以劃分多個分割槽(每個topic至少有乙個分割槽),同乙個topic下的不同分割槽包含的訊息是不同的。每個訊息在被新增到分割槽時,都會被分配乙個offset,它是訊息再此分割槽中的唯一編號,kafka通過offset保證訊息在分區內的順序,offset 的順序不跨分割槽,即kafka只保證在同乙個分區內的訊息是有序的。

topic 和 partition 儲存

partition 是以檔案的形式儲存在檔案系統中,比如建立乙個名為 demo 的topic ,其中有三個partition ,那麼在kafka 的資料目錄中,就有3三目錄 ,demo-0~2,命名規則:topicname-partitionid

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic demo
訊息是kafka 中最基本的資料單元,在kafka中,一條訊息有 key 、value 兩部分組成,在傳送一條訊息時,我們可以指定這個 key,那麼 producer 會根據 key 和partition 機制來判斷當前這條訊息應該傳送並儲存到哪個partition中;我們可以根據需要進行擴充套件producer 的partition 機制

/**

* @author : guaoran

* @description :

* 自定義訊息分割槽演算法

* @date :2019/1/15 13:35

*/public

class

topicpartitiondemo

implements

partitioner

else

system.err.

println

("topic="

+topic+

",key="

+key+

",value="

+value+

",partitionnum="

+partitionnum)

;return partitionnum;

}}

預設情況下,kafka 採用的是hash 取模的分割槽演算法。如果key 為null,則會隨機分配乙個分割槽。這個隨機是在引數」metadata.max.age.ms」 的時間範圍內隨機選擇乙個。對於這個時間段內,如果key為null,則只會傳送到唯一的分割槽。這個值預設情況下是10分鐘更新一次。

//消費指定分割槽的時候,不需要再訂閱 

// todo 只消費分割槽 0 的訊息

topicpartition partitiondemo =

newtopicpartition

(topic,0)

;consumer.

assign

(arrays.

aslist

(partitiondemo)

);

在實際生產過程中,每個topic都會有多個partition,多個partition的好處在於,一方面能夠對broker上的資料進行分片有效減少訊息的容量從而提公升io效能。另一方面,為了提高消費端的消費能力,一般會通過多個consumer 去消費同乙個topic,也就是消費端的負載均衡機制。

在group.id相同的consumer進行消費同乙個topic時,乙個consumer消費過得資料在另一consumer中不會被消費到,那麼同乙個consumer group 裡面的consumer 去消費資料的時候,會根據分片進行分配消費分割槽的資料。如果有三個partition ,同時啟動三個group.id 相同的consumer去同時消費同乙個topic,最終的結果是三個consumer 會分別消費乙個partition 的資料。

在kafka中存在兩種分割槽分配策略,一種是range(預設),一種是roundrobin(輪詢)。通過partition.assignment.strategy 引數來設定。

range 策略是對每個主題而言的,首先對同乙個主題裡面的分割槽按照序號進行排序。並對消費者按照字母順序進行排序。假設有10個分割槽,3個消費者,排完序的分割槽將會是0-9;消費者執行緒排完序是c0-0,c1-1,c2-2 。然後將partitions的個數除於消費者執行緒的總數來決定每個消費者執行緒將會消費幾個分割槽。如果除不盡,則前面的消費者會多消費乙個分割槽。所以最終結果是:c0消費 0-3分割槽,c1消費4-6分割槽,c2消費7-9分割槽。

如果同時消費兩個主題的話,分割槽數相同,消費者相同,此時,c0消費者比其他消費者執行緒多消費2個分割槽,這就是range strategy 的乙個弊端。最好是分割槽數是消費者的整數倍。

輪詢分割槽策略是把所有的partition 和所有consumer 都列出來,然後按照hashcode進行排序。最後通過輪詢演算法分配partition給消費者。如果所有consumer例項的訂閱都是相同的,那麼partition會均勻分布。

使用輪詢分割槽策略必須滿足兩個條件

每個主題的消費者例項具有相同數量的流

每個消費者訂閱的主題必須是相同的。

當出現以下幾種情況時,kafka會進行一次分割槽分配操作,即 kafka consumer 的rebalance

同乙個consumer group 內新增了消費者

消費者離開當前的consumer group ,如:主動停機或宕機

topic 分割槽數量發生了變化

kafka 提供了乙個角色:coordinator 來執行對於consumer group的管理 ,當consumer group 的第乙個 consumer 啟動的時候,它會去跟 kafka server 確定誰是他們組的 coordinator 。之後該group 內所有成員都會和該coordinator 進行協調通訊。

確定 coordinator

消費者向kafka 集群中的任意乙個broker 傳送乙個 groupcoordinatorrequest請求,服務端會返回乙個負載最小的broker 節點的id,並將 broker 設定為 coordinator

joingroup 的過程

在rebalance 之前,需要保證 coordinator是已經確定好了的,整個rebalance 的過程分為兩個步驟,join 和 sync 。

join:表示加入到consumer group 中,在這一步中,所有成員都會想 coordinator 傳送joingroup的請求。一旦所有成員都傳送了joingroup請求,那麼 coordinator會選擇乙個consumer 擔任leader 角色,並把組成員資訊和訂閱資訊傳送消費者,並返回分割槽策略給leader。

synchronizing groupgroup statestate 階段

leader 收到 coordinator 的分割槽策略後確定分配方案,將消費者對應的partition分配方案同步給consumer group中的所有consumer。

步驟

消費者向任意乙個broker 傳送訊息, broker 返回乙個最小的broker id ,作為coordinator

消費者向coordinator 傳送 joingroup 請求,coordinator 選擇 乙個消費者作為leader,返回給leader 分割槽策略

消費者leader 根據分割槽策略確定分割槽方案後,向 coordinator 傳送 synchronizing groupgroup 請求, 就是告知broker 消費者分別消費哪些個分割槽。

kafka的分割槽分配策略

將所有broker n個 和partition排序 將第i個partition分配到第 i mode n 個broker上 當key為空時,訊息隨機傳送到各個分割槽 各個版本會有不同,有的是採用輪詢的方式,有的是隨機,有的是一定時間內只傳送給固定partition,隔一段時間後隨機換乙個 用key的...

Kafka的分割槽分配策略

用過 kafka 的同學應該都知道,每個 topic 一般會有很多個 partitions。為了使得我們能夠及時消費訊息,我們也可能會啟動多個 consumer 去消費,而每個 consumer 又會啟動乙個或多個streams去分別消費 topic 對應分割槽中的資料。我們又知道,kafka 存在...

詳解Kafka分割槽分配策略

前言 乙個consumer group中有多個consumer,乙個topic有多個partition,所以必然會涉及到partition的分配問題,即確定那個partition由哪個consumer來消費。kafka有兩種分配策略,一是roundrobin,二是range 1.roundrobin...