Kafka 消費者 位移提交

2021-10-10 18:13:41 字數 767 閱讀 5156

kafka位移機制

broker維護消費者的消費位移資訊,老的版本儲存在zk上,新版本儲存在內部的topic裡。本質上,位移資訊消費者自己維護也可以,但是如果消費者掛了或者重啟,對於某乙個分割槽的消費位移不就丟失了嗎?所以,還是需要提交到broker端做持久化的。

提交方式分為自動和手動。

自動是預設的行為,當然可以通過配置覆蓋關掉自動提交。自動提交是consumer以乙個時間間隔周期性地提交offset,比如每隔5s提交一次。提交發生的時機是poll呼叫時,會將上一次消費的位移資訊提交到broker。自動提交的弊端在於有時間間隔,可能導致重複消費。比如5s提交一次,執行到第3s時,consumer斷電掛了或者重平衡了,那麼這3s內消費的訊息可能沒有提交上去,新的consumer再消費這個分割槽時,就會重複消費。

為了避免間隔導致的重複消費,我們可以每消費一條訊息就提交一次,也就是手動呼叫commitsync提交。這樣當然是最穩的,但是手動commitsync提交是同步阻塞式的,一次不成功,會導致重試,會在一定程度上影響效能。kafka還提供了乙個非同步版本commitasync,這個方法是非同步提交,效能問題基本忽略,但是如果提交失敗並不會重試。(為啥?因為非同步提交,如果重試的話,很可能已經有新的提交了,此時非同步提交是乙個老的位移,會導致重複消費)。那咋整呢?一般會推薦sync+async結合的方式。

while(true)

sync commit()

也就是while迴圈裡非同步提交,即使有一次失敗了,沒關係,後面的非同步提交總會成功的。但是如果是退出了,那麼必須來一次同步提交,確保退出前的位移可以被提交。

Kafka學習之旅 十五 重設消費者組位移

簡介 相信大家也遇到過需要重某個位置或者時間點重新消費的情況那麼本篇講下重新設定消費組位移。kafka 乙個比較有特色的設計是由於它是基於日誌結構 log based 的訊息引擎,消費者在消費訊息時,僅僅是從磁碟檔案上讀取資料而已,是唯讀的操作,因此消費者不會刪除訊息資料。同時,由於位移資料是由消費...

kafka消費者無法消費異常

今天被乙個kafka消費異常折磨了一天,頭差點炸了,還好最後解決了它 異常 伺服器 record is corrupt 記錄損壞 不明原因 有可能磁碟空間不足導致 導致消費者無法正常消費訊息 卡在某乙個offset 不能繼續消費 解決辦法 先停掉消費者程式 殺掉程序 不可關閉kafka服務 然後手動...

kafka 主動消費 Kafka消費者的使用和原理

publicstaticvoidmain string args finally 前兩步和生產者類似,配置引數然後根據引數建立例項,區別在於消費者使用的是反序列化器,以及多了乙個必填引數 group.id,用於指定消費者所屬的消費組。關於消費組的概念在 kafka中的基本概念 中介紹過了,消費組使得...