背景:
乙個簡單的用scala往kafka裡寫資料demo,每次執行只往乙個分割槽寫入資料,下次執行又選另乙個分割槽一直寫入。
傳送例子:
def main(args: array[string]): unit =
}
原因探索:
keyedmessage有兩種例項化方式導致:
def this(topic: string構建produce時, message: v) =this(topic,
null.asinstanceof[k],
null, message)
def this(topic: string
, key: k
, message: v) =this(topic, key, key, message)
valproducer =newproducer[string呼叫預設defaulteventhandler實現, string](kafkaconfig)
def this(config: producerconfig) =this(config,
newdefaulteventhandler[k
,v](config,
utils.createobject[partitioner](config.partitionerclass
, config.props),
utils.createobject[encoder[v]](config.serializerclass
, config.props),
utils.createobject[encoder[k]](config.keyserializerclass
, config.props),
newproducerpool(config)))
defaulteventhandler中
private defgetpartition(topic: string, key: any, topicpartitionlist: seq[partitionandleader]): int =
}elsepartitioner.partition(key, numpartitions)
if(partition < 0 || partition >= numpartitions)
throw newunknowntopicorpartitionexception("invalid partition id: " + partition + " for topic " + topic +
"; valid values are in the inclusive range of [0, " + (numpartitions-1) + "]")
trace("assigning message of topic %s and key %s to a selected partition %d".format(topic,
if(key ==null) "[none]"
elsekey.tostring, partition))
partition
}
如果 keyedmessage 例項化時 key為 null,則 生成乙個隨機數作為分割槽id,此topic就用此分割槽id寫入資料,就造成每次啟動只往某個分割槽寫入資料
如果key不為null,則根據配置中實現類的partition方法進行分割槽id計算
props.getstring("partitioner.class", "kafka.producer.defaultpartitioner")
classdefaultpartitioner(props: verifiableproperties =null)extendspartitioner獲取分割槽id,進行資料寫入。}
總結:1.如果keyedmessage例項化是key為空,導致只往某個分割槽寫入資料
2.可以自己實現partitioner,重寫獲取分割槽id演算法,不然就用預設的,key不可為空
kafka 訊息的分割槽分配策略
訊息的消費原理 觸發分割槽分配策略的條件 topic 在kafka 中,topic是乙個儲存訊息的邏輯概念,可以認為是乙個訊息的集合。每條訊息傳送到 kafka 集群的訊息都有乙個類別。每個topic可以有多個生產者向他傳送訊息,也可以有多個消費者去消費訊息 partition 每個topic 可以...
詳解Kafka分割槽分配策略
前言 乙個consumer group中有多個consumer,乙個topic有多個partition,所以必然會涉及到partition的分配問題,即確定那個partition由哪個consumer來消費。kafka有兩種分配策略,一是roundrobin,二是range 1.roundrobin...
Kafka 訊息傳送
建立乙個kafkaprodecer物件,傳入上面建立的properties物件 kafkaproducerproducer new kafkaproducer mykafkaprops 使用prodecerrecord string topic,string key,string value 建構函...