kafka重複消費問題

2021-09-20 06:14:22 字數 1030 閱讀 2786

問題描述 

採用kafka讀取訊息進行處理時,consumer會重複讀取afka佇列中的資料。

問題原因 

kafka的consumer消費資料時首先會從broker裡讀取一批訊息資料進行處理,處理完成後再提交offset。而我們專案中的consumer消費能力比較低,導致取出的一批資料在session.timeout.ms時間內沒有處理完成,自動提交offset失敗,然後kafka會重新分配partition給消費者,消費者又重新消費之前的一批資料,又出現了消費超時,所以會造成死迴圈,一直消費相同的資料。

解決方案 

專案中使用的是spring-kafka,所以把kafka消費者的配置enable.auto.commit設為false,禁止kafka自動提交offset,從而使用spring-kafka提供的offset提交策略。spring-kafka中的offset提交策略可以保證一批訊息資料沒有完成消費的情況下,也能提交offset,從而避免了提交失敗而導致永遠重複消費的問題。

首先來看看spring-kafka的消費執行緒邏輯

if (isrunning() && this.definedpartitions != null) 

}

上面可以看到,如果auto.commit關掉的話,spring-kafka會啟動乙個invoker,這個invoker的目的就是啟動乙個執行緒去消費資料,他消費的資料不是直接從kafka裡面直接取的,那麼他消費的資料從**來呢?他是從乙個spring-kafka自己建立的阻塞佇列裡面取的。

然後會進入乙個迴圈,從源**中可以看到如果auto.commit被關掉的話, 他會先把之前處理過的資料先進行提交offset,然後再去從kafka裡面取資料。

然後把取到的資料丟給上面提到的阻塞列隊,由上面建立的執行緒去消費,並且如果阻塞佇列滿了導致取到的資料塞不進去的話,spring-kafka會呼叫kafka的pause方法,則consumer會停止從kafka裡面繼續再拿資料。

接著spring-kafka還會處理一些異常的情況,比如失敗之後是不是需要commit offset這樣的邏輯。

解決Kafka重複消費問題

某服務 用了springboot spring kafka 處理kafka訊息時,發現每條訊息處理時間長達60 秒。幾百條訊息處理完後,又重新從第一條開始重複消費。kafka消費者有兩個配置引數 max.poll.interval.ms 兩次poll操作允許的最大時間間隔。單位毫秒。預設值30000...

kafka的重複消費問題

1.消費端消費能力比較低,處理訊息速度慢 2.根據kafka消費特性,消費者在每個partion上的位置都是乙個整數,即消費下一條訊息的偏移量。這個狀態可以定期檢查點,使得訊息的確認變得非常的方便,消費者可以倒退回舊的偏移量,重新消費。3.訊息處理完之後提交下乙個消費的offset,而在sessio...

kafka重複消費 漏消費情況

kafka重複消費的情況 資料沒有丟,只是資料重複消費了。丟不丟資料指的是producer到broker的過程,以及broker儲存資料的過程。重複消費 漏消費指的是消費結果,所以我們記憶這些過程的時候,或者定位問題的時候,首先應該明確,是丟資料了還是重複消費了。重複消費 ack 1 produce...