解決Kafka重複消費問題

2021-09-09 05:26:54 字數 802 閱讀 4336

某服務(用了springboot + spring-kafka)處理kafka訊息時,發現每條訊息處理時間長達60+秒。幾百條訊息處理完後,又重新從第一條開始重複消費。

kafka消費者有兩個配置引數:

max.poll.interval.ms

兩次poll操作允許的最大時間間隔。單位毫秒。預設值300000(5分鐘)。

兩次poll超過此時間間隔,kafka服務端會進行rebalance操作,導致客戶端連線失效,無法提交offset資訊,從而引發重複消費。

max.poll.records

一次poll操作獲取的訊息數量。預設值50。

如果每條訊息處理時間超過60秒,那麼一批訊息處理時間將超過5分鐘,從而引發poll超時,最終導致重複消費。

每條訊息的處理時間不要超過5分鐘。

如果超過5分鐘,則考慮進行架構上的優化。

比如a執行緒消費訊息,放到程序內部佇列中,提交offset;其他執行緒從內部佇列取訊息,並處理業務邏輯。為防止內部佇列訊息積壓,a執行緒需要監控佇列中訊息數量,超過一定量時進入等待。

springboot沒有提供可調節此數值的引數。如果修改此數值,需要自己封裝方法建立kafka客戶端:

org.apache.kafka.clients.consumer.kafkaconsumer.kafkaconsumer(properties)

可通過springboot配置引數"spring.kafka.consumer.max-poll-records"調整。

kafka重複消費問題

問題描述 採用kafka讀取訊息進行處理時,consumer會重複讀取afka佇列中的資料。問題原因 kafka的consumer消費資料時首先會從broker裡讀取一批訊息資料進行處理,處理完成後再提交offset。而我們專案中的consumer消費能力比較低,導致取出的一批資料在session....

kafka的重複消費問題

1.消費端消費能力比較低,處理訊息速度慢 2.根據kafka消費特性,消費者在每個partion上的位置都是乙個整數,即消費下一條訊息的偏移量。這個狀態可以定期檢查點,使得訊息的確認變得非常的方便,消費者可以倒退回舊的偏移量,重新消費。3.訊息處理完之後提交下乙個消費的offset,而在sessio...

Kafka重複消費解決方案

1.問題背景 某服務 用了springboot spring kafka 處理kafka訊息時,發現每條訊息處理時間長達60 秒。幾百條訊息處理完後,又重新從第一條開始重複消費。2.原因分析 kafka消費者有兩個配置引數 max.poll.interval.ms 兩次poll操作允許的最大時間間隔...