Kafka筆記4(消費者)

2022-07-24 20:54:13 字數 2323 閱讀 9647

消費者和消費群組:

kafka消費者從屬於消費者群組,乙個群組裡的消費者訂閱的是同乙個主題,每個消費者接收主題的一部分分割槽訊息

消費者的數量不要超過主題分割槽的數量,多餘的消費者只會被閒置

乙個主題可以被多個消費群組使用,消費者群組之間互不影響

當乙個消費者加入群組時,他讀取的資料是原本由其他消費者讀取的資訊

分割槽的所有權從乙個消費者轉移至另乙個消費者的行為稱為「再均衡」

再均衡期間,消費者當前的讀取狀態會丟失,消費者無法讀取資訊,造成集群一小段時間的不可用,在恢復狀態之前會拖慢應用程式

消費者通過向群組協調器broker傳送心跳維持他們和群組的從屬關係以及他們對分割槽的所有權關係,如果broker認為消費者死亡會觸發再均衡行為

分配分割槽過程:

當消費者加入群組時,他會向群組協調器傳送乙個joingroup請求,第乙個加入群組的消費者稱為群主,群主從協調器那裡獲得群組的成員列表,並負責給每乙個消費者分配分割槽。他使用乙個實現partitionassignor介面的類來決定哪些分割槽應該被分配給消費者,分配完畢之後,群主把分配情況列表傳送給broker,broker再把這些資訊傳送給所有消費者,每個消費者只能看到自己的分配資訊,只有群主知道群組的所有消費者的分配資訊

訊息輪詢是消費者api核心,通過從乙個簡單的輪詢向伺服器請求資料,一旦消費者訂閱了主題,輪詢就會處理所有細節,包括群組協調/分割槽再均衡/傳送心跳/獲取資料

乙個消費者使用乙個執行緒

消費者重要的屬性引數配置:

fetch.min.bytes

指定了消費者從伺服器獲取記錄的最小位元組數,如果broker收到消費者請求,但資料可用量小於fetch.min.bytes,就會等到有足夠的可用資料才把它返回給消費者

fetch.max.wait.ms

指定broker等待時間,預設500ms  

max.partition.fetch.bytes

指定伺服器從每個分割槽裡返回給消費者的最大位元組數,預設1mb    max.partition.fetch.size的值必須比broker能接收的最大訊息位元組數(max.message.size)大

session.timeout.ms

指定消費者在被認為死亡之前可以與伺服器斷開連線的時間,預設3s

heartbeat.interval.ms =  session.timeout.ms / 3

auto.offset.reset

指定消費者在讀取乙個沒有偏移量的分割槽或者偏移量無效的情況下該如何處理

=latest  消費者從最新的記錄開始讀取資料

=earliest  消費者從起始位置讀取分割槽記錄

enable.auto.commit

指定消費者是否自動提交偏移量,預設true

auto.commit.interval.ms 控制提交頻率

partition.assignment.strategy

=org.apache.kafka.clients.consumer.rangeassignor 把主題的若干連續分割槽分配給消費者

=org.apache.kafka.clients.consumer.roundrobinassignor  把主題的所有分割槽逐個分配給消費者

client.id  

任意字串,broker用來標識從客戶端傳送來的訊息

max.poll.records

用於控制單次呼叫call() 方法返回的記錄數量,可以幫你控制在輪詢裡需要處理的資料量

receive.buffer.bytes 和 send.buffer.bytes

預設-1   

更新分割槽當前位置的操作叫提交

消費者會向乙個叫做 _consumer_offset 的特殊主題傳送訊息,訊息裡包含了每個分割槽的偏移量

kafka可以設定消費者自動提交偏移量,設定enable.auto.commit=true,提交時間間隔auto.commit.interval.ms=5s

自動提交是在輪詢裡進行的,消費者每次輪詢時會檢查是否該提交偏移量了,是則提交上一次輪詢返回的偏移量

提交當前偏移量,使用api函式 commitsync()

非同步提交偏移量,使用api函式commitasync()

可以使用乙個單調遞增的序列號來維護非同步提交順序

kafka學習筆記4 kafka消費者

消費者和消費者群組 kafka消費者分為消費者群組和消費者。每乙個kafka消費者都隸屬於乙個kafka消費者群組。每個消費者群組可以對應乙個或多個topic,每個topic內的分割槽只能對應消費者群組內的乙個消費者,當消費者比topic中的分割槽數多時,多餘的消費者不會接收topic中的資訊。這種...

kafka消費者無法消費異常

今天被乙個kafka消費異常折磨了一天,頭差點炸了,還好最後解決了它 異常 伺服器 record is corrupt 記錄損壞 不明原因 有可能磁碟空間不足導致 導致消費者無法正常消費訊息 卡在某乙個offset 不能繼續消費 解決辦法 先停掉消費者程式 殺掉程序 不可關閉kafka服務 然後手動...

kafka 主動消費 Kafka消費者的使用和原理

publicstaticvoidmain string args finally 前兩步和生產者類似,配置引數然後根據引數建立例項,區別在於消費者使用的是反序列化器,以及多了乙個必填引數 group.id,用於指定消費者所屬的消費組。關於消費組的概念在 kafka中的基本概念 中介紹過了,消費組使得...