kafka負載均衡相關資料收集(二)

2021-09-07 09:34:47 字數 2019 閱讀 4579

***關於kafka producer 分割槽策略的思考

from:

今天跑了乙個簡單的kafka produce程式,如下所示

public class kafkaproducer  extends thread @override public void run()  try  catch (interruptedexception e)  } } private producer createproducer()  public static void main(string args)  } 

發現其只向topic:user11中的某乙個partiton中寫資料。一下子感覺不對啊,kafka不是號稱可以實現producer的訊息均發嗎?後來查了一下相關的引數:partitioner.class

# 分割槽的策略

# 預設為kafka.producer.defaultpartitioner,取模

partitioner.class = kafka.producer.defaultpartitioner

在上面的程式中,我在producer中沒有定義分割槽策略,也就是說程式採用預設的kafka.producer.defaultpartitioner,來看看原始碼中是怎麼定義的:

class defaultpartitioner(props: verifiableproperties = null) extends partitioner  }

其核心思想就是對每個訊息的key的hash值對partition數取模得到。再來看看我的程式中有這麼一段:

producer.send(new keyedmessage(topic,string))

來看看keymessage:

case class keyedmessage[k, v](val topic: string, val key: k, val partkey: any, val message: v)  def haskey = key != null }

由於上面生產者**中沒有傳入key,所以程式呼叫:

def this(topic: string, message: v) = this(topic, null.asinstanceof[k], null, message)

但是如果key為null時會傳送到哪個分割槽?我在實驗的時候發現,每次執行生產者執行緒好像傳送的分割槽都不太相同。具體的解釋可以參考博文:

好的問題發現了該怎麼解決呢?只需要在生產者執行緒中對每條訊息指定key,如下:

producer.send(new keyedmessage(topic,string.valueof(i),string)); 

如下所示為自定義的分割槽函式,分割槽函式實現了partitioner介面

public class personalpartition implements partitioner public int partition(object arg0, int arg1)  else } }

然後修改配置即可:

properties.put("partitioner.class", "com.xx.kafka.personalpartition"); 

當然,也可以向topic中指定的partition中寫資料,如下**為向」user11」中partition 1中寫入資料:

public class kafkaproducer  extends thread @override public void run()  try  catch (interruptedexception e)  } } private kafkaproducer createproducer()  } 

kafka負載均衡相關資料收集(二)

關於kafka producer 分割槽策略的思考 from 今天跑了乙個簡單的kafka produce程式,如下所示 public class kafkaproducer extends thread override public void run try catch interruptede...

kafka手動負載均衡

針對執行中kafka的集群,因為特定原因,部分kafka節點負荷量超載,可以進行手動topic重新分配 還可以根據需求只重新分配特定topic到特定kafka節點,以實現只想在固定節點使用特定topic的目的。vim topics.json version 1 usr local kafka bin...

負載均衡相關

現在記下關閉linux防火牆的方法 1.即時生效,重啟後失效 開啟 service iptables start 關閉 service iptables stop 2 重啟後生效 開啟 chkconfig iptables on 關閉 chkconfig iptables off 關閉selinux...