kafka傳送訊息分割槽策略詳解

2021-08-15 21:32:23 字數 2517 閱讀 6320

背景:

乙個簡單的用scala往kafka裡寫資料demo,每次執行只往乙個分割槽寫入資料,下次執行又選另乙個分割槽一直寫入。

傳送例子:

def main(args: array[string]): unit = 

}

原因探索:

keyedmessage有兩種例項化方式導致:

def this(topic: string

, message: v) =this(topic,

null.asinstanceof[k],

null, message)

def this(topic: string

, key: k

, message: v) =this(topic, key, key, message)

構建produce時

valproducer =newproducer[string

, string](kafkaconfig)

呼叫預設defaulteventhandler實現

def this(config: producerconfig) =

this(config,

newdefaulteventhandler[k

,v](config,

utils.createobject[partitioner](config.partitionerclass

, config.props),

utils.createobject[encoder[v]](config.serializerclass

, config.props),

utils.createobject[encoder[k]](config.keyserializerclass

, config.props),

newproducerpool(config)))

defaulteventhandler中

private defgetpartition(topic: string

, key: any, topicpartitionlist: seq[partitionandleader]): int =

}elsepartitioner.partition(key, numpartitions)

if(partition < 0 || partition >= numpartitions)

throw newunknowntopicorpartitionexception("invalid partition id: " + partition + " for topic " + topic +

"; valid values are in the inclusive range of [0, " + (numpartitions-1) + "]")

trace("assigning message of topic %s and key %s to a selected partition %d".format(topic,

if(key ==null) "[none]"

elsekey.tostring, partition))

partition

}

如果 keyedmessage 例項化時 key為 null,則 生成乙個隨機數作為分割槽id,此topic就用此分割槽id寫入資料,就造成每次啟動只往某個分割槽寫入資料

如果key不為null,則根據配置中實現類的partition方法進行分割槽id計算

props.getstring("partitioner.class"

, "kafka.producer.defaultpartitioner")

classdefaultpartitioner(props: verifiableproperties =null)extendspartitioner 

}

獲取分割槽id,進行資料寫入。

總結:1.如果keyedmessage例項化是key為空,導致只往某個分割槽寫入資料

2.可以自己實現partitioner,重寫獲取分割槽id演算法,不然就用預設的,key不可為空

kafka 訊息的分割槽分配策略

訊息的消費原理 觸發分割槽分配策略的條件 topic 在kafka 中,topic是乙個儲存訊息的邏輯概念,可以認為是乙個訊息的集合。每條訊息傳送到 kafka 集群的訊息都有乙個類別。每個topic可以有多個生產者向他傳送訊息,也可以有多個消費者去消費訊息 partition 每個topic 可以...

詳解Kafka分割槽分配策略

前言 乙個consumer group中有多個consumer,乙個topic有多個partition,所以必然會涉及到partition的分配問題,即確定那個partition由哪個consumer來消費。kafka有兩種分配策略,一是roundrobin,二是range 1.roundrobin...

Kafka 訊息傳送

建立乙個kafkaprodecer物件,傳入上面建立的properties物件 kafkaproducerproducer new kafkaproducer mykafkaprops 使用prodecerrecord string topic,string key,string value 建構函...