Kafka分割槽策略及自定義

2021-08-02 12:59:23 字數 1385 閱讀 2158

預設分割槽策略是:取正(bytearray生成32位hash值)%numpartitions

這個公式的結果是得到0-(numpartitions-1)間正整數的個數大致相等,也就是說kafka的預設分割槽策略是無論我們給定多少個分割槽,我們存放的資料基本上會平均的分到各個分割槽上。

private

intdefaultpartition(string topic, object key, byte keybytes, object value, byte valuebytes, cluster cluster) else

} else

return partition;

}private

static

inttopositive(int number)

//生成32位的hash 值

public

static

intmurmur2(final

byte data)

實際開發中會遇到不讓資料均勻分布,如按照範圍放到不同的分割槽中,這樣就得使用自定義的分割槽策略了

int partition = 0;

if(key<100)else

if(key<200)else

producerrecordrecords = new producerrecord(topic,partition,key,value);

kafkaproducer.send(records);

public

class

kafkacustompartitioner

implements

partitioner

public

void

close() {}

public

intpartition(string topic, object arg1, byte keybytes, object arg3, byte arg4, cluster arg5) else

if(key<200)else

return partition;

}}

2.新增配置

partitioner.class值為自定義分割槽類的完整包名,這樣生產者就會選擇自定義的分割槽策略。

props.put("partitioner.class", "xx.xx.kafkacustompartitioner");
說明:1.客戶端測試環境中,自定義分割槽類跟生產者類在乙個專案中,不需要其他操作;2.想要自定義的分割槽放到kafka的伺服器端環境時,需要將自定義的分割槽類生成jar包放到kafka環境的lib下,同樣配置檔案中指定完整包名。

Kafka自定義分割槽器

kafka通過生產者kafkaproducer的send 方法將訊息傳送到broker中,但在傳送過程中需要經過 interceptor 序列化器 serializer 和分割槽器 partitioner 的一系列作用之後才能被真正地發往broker。訊息在經過序列化後需要確定它發往的分割槽,如果訊...

flink寫入kafka之自定義分割槽器

直入正題,flink寫入kafka根據某個資料中的字段做分割槽傳送到kafka的指定分割槽,如果你在sink中每次要手動寫producer,那麼你可以略過此文章 接著上篇文章flink寫入kafka之預設序列化類和預設分割槽器 直接上 自定義分割槽 suppresswarnings unchecke...

Kafka 自定義分割槽的生產者

1,實現 介面 patitioner public class kafkapartitioner implements partitioner override public void close override public void configure mapmap 2,預設的分割槽器 def...