Kafka Consumer 消費者組

2022-06-07 07:12:10 字數 1968 閱讀 5179

官方定義:

消費者使用乙個消費者組(即group.id)來標記自己,topic的每條訊息都只會被傳送到每個訂閱它的消費者組的乙個消費者例項上。

- 所有consumer例項都屬於相同group--實現基於佇列的模型。每條訊息只會被乙個consumer例項處理。

- consumer都屬於不同group--實現基於發布/訂閱的模型。極端的情況是每個consumer例項都設定完全不同的group,這樣kakfa訊息就會被廣播到所有consumer例項上。

- kafka目前只提供單個分割槽的訊息順序,而不會維護全域性的訊息順序,如果要實現topic全域性的訊息順序,只能通過讓每個consumer group下只包含乙個consumer例項的方式來間接實現。

- consumer group下可以有乙個或者多個comsumer例項。乙個consumer例項可以是乙個執行緒,也可以是其他機器上的程序。

- group.id是consumer group唯一標識。

- 對於某個group而言,訂閱topic的每個分割槽只能分配給該group下的乙個consumer例項(當然該分割槽還可以被分配給其他訂閱該topiv的消費組)

位移:kafka在內部採用乙個map來儲存其訂閱topic所屬分割槽的offset。

位移提交:舊版本的位移提交時提交到zookeeper上面的固定節點上。該路徑是/consumer//offsets//partitonid。

新版本的位移提交到kafka內部的乙個topic(__consumer_offsets)上。

只對消費者組(consumer group)有效,如果是獨立消費者(standlone consumer),沒有rebalance的概念。

rebalance:consumer group下所有consumer如何達成一致來分配訂閱topic的所有分割槽。假設有乙個consumer group有20個consumer例項。該group訂閱了乙個具

有100分割槽的topic。正常情況下每個consumer會分配5各分割槽。

session.timeout.ms: 檢測consumer group組成員傳送崩潰的時間("coordinator檢測失敗的時間")。如果有訊息需要很長時間,那麼consumer有可能無法執行任何消費。

在0.10.1.0版本kafka對該引數做了拆分。可以指定乙個比較小的值讓coordinator能夠快速檢測consumer崩潰,開啟rebalance。預設引數值10秒。

max.poll.interval.ms: 設定訊息處理邏輯的最大時間。

auto.offset.reset: 指定了無位移資訊或位移越界是kafka的應對策略。

- earliest: 指定從最早的位移開始消費,這裡最早的位移不一定是0.

- latest:指定從最新處位移開始消費。

- none:指定如果未發現位移資訊或位移越界,則丟擲異常。很少使用。

enable.auto.commit: 是否自動提交。

fetch.max.bytes: 如果實際業務訊息很大,必須要設定該引數為乙個較大的值,不然會無法消費這些資訊。

max.poll.records: 控制單次poll呼叫返回的最大訊息數。預設500.

heartbeat.interval.ms: 當開啟新的一輪rebalance是,他會將這個以rebalance_in_progress異常的形式「塞進」consumer心跳請求中,這樣其他成員拿到response後

才知道它需要重新加入group。這個引數設定時間必須小於session.timeout.ms。

connections.max.idle.ms: kakfa會定期的關閉空閒socket連線,預設9分鐘,設定-1時,即不管吧這些空閒連線。

consumer訂閱是延遲生效的,只有在下次poll請求時才會生效。

Kafka Consumer消費壓力測試

注意該topic一定要有資料。bin kafka consumer perf test.sh broker list test01 6667,test02 6667,test05 6667 messages 2000000 topic flink test group g1 threads 24指令...

Kafka Consumer 分割槽消費策略

本文主要對 consumer 端對partition的分配策略進行分析。kafka 分割槽分配的規則是乙個分割槽只能被乙個消費者組的某個消費者進行消費。並且kafka會在此規則下實現消費資料的負載均衡。kafka 提供的分割槽策略有三種,分別是 range roundrobin和sticky,其中預...

初始 Kafka Consumer 消費者

根據 kafkaconsumer 類上的注釋上來看 kafkaconsumer 具有如下特徵 對於訊息處理時間不可 的情況下上述兩個引數可能不夠用,那將如何是好呢?通常的建議將訊息拉取與訊息消費分開,乙個執行緒負責 poll 訊息,處理這些訊息使用另外的執行緒,這裡就需要手動提交消費進度。為了控制訊...