kafka 多執行緒消費

2022-07-03 20:51:11 字數 1508 閱讀 8258

一、

1、kafka的消費並行度依賴topic配置的分割槽數,如分割槽數為10,那麼最多10臺機器來並行消費(每台機器只能開啟乙個執行緒),或者一台機器消費(10個執行緒並行消費)。即消費並行度和分割槽數一致。

2、(1)如果指定了某個分割槽,會只講訊息發到這個分割槽上

(2)如果同時指定了某個分割槽和key,則也會將訊息傳送到指定分割槽上,key不起作用 

(3)如果沒有指定分割槽和key,那麼將會隨機傳送到topic的分割槽中

(4)如果指定了key,那麼將會以hash的方式傳送到分割槽中 

二、多執行緒消費例項

paritition 為3,broker為3,節點為3

1、生產者隨機分割槽提交資料

這也是乙個比較關鍵步驟,只有隨機提交到不同的分割槽才能實現多分割槽消費; 

自定義隨機分割槽:

public class mypartition implements partitioner

@override

public void close()

@override

public int partition(string topic, object key, byte keybytes, object value,

byte valuebytes, cluster cluster) catch (exception e)

// system.out.println("kafkamessage topic:"+ topic+" |key:"+ key+" |value:"+value);

return math.abs(partitionnum % numpartitions);

}}

然後在初始化kafka生產者配置的時候修改如下配置

props.put("partitioner.class", properties.getproperty("com.mykafka.mypartition"));

這樣就實現了kafka生產者隨機分割槽提交資料。

2、消費者

最後一步就是消費者,修改單執行緒模式為多執行緒,這裡的多執行緒實現方式有很多,本例知識用了最簡單的固定執行緒模式:

executorservice fixedthreadpool = executors.newfixedthreadpool(3);

for (int i = 0; i < 3; i++)

});}

在消費方面得注意,這裡得遍歷所有分割槽,否則還是只消費了乙個區:

consumerrecordsrecords = consumer.poll(1000);

for (topicpartition partition : records.partitions()) else

}}

注意上面的執行緒為啥只有3個,這裡得跟上面kafka的分割槽個數相對應起來,否則如果執行緒超過分割槽數量,那麼只會浪費執行緒,因為即使使用3個以上的執行緒也只會消費三個分割槽,而少了則無法消費完全。所以這裡必須更上面的對應起來。 

kafka多執行緒消費topic的問題

案例 topic my topic,分割槽 6 消費者 部署三颱機器,每台機器上面開啟6個執行緒消費。消費結果 只有一台機器可以正常消費,另外兩台機器直接輸出六條告警日誌 no broker partitions consumed by consumer thread my topic group ...

集群下的kafka實現多執行緒消費

多執行緒消費,說白了就是多區消費,kafka可以給topic設定多個partition,從而實現生產的時候提交到不同的分割槽,以減少統一區塊的壓力。而消費則是從不同的分割槽裡拿資料進行消費。1.首先修改server.properties裡 num.partitions 3 這裡等於3是因為本人的集群...

kafka系列 多執行緒消費者實現

看了一下kafka,然後寫了消費kafka資料的 感覺自己功力還是不夠。不能隨心所欲地運算元據,資料結構沒學好,spark的rdd操作沒學好。不能很好地組織 結構,設計模式沒學好,物件導向思想理解不夠成熟。用佇列來儲存要消費的資料。用佇列來儲存要提交的offest,然後處理執行緒將其給回消費者提交。...