Kafka重複消費,不丟失資料

2021-09-27 04:01:31 字數 1965 閱讀 5003

kafka0.11.0.0版本正式支援精確一次處理語義exactly once semantic–eos

kafka冪等性參考

1)冪等producer 保證單個分割槽的只會傳送一次,不會出現重複訊息

2)事務(transation):保證原子性的寫入多個分割槽,即寫入到多個分割槽的訊息要麼全部成功,要麼全部回滾

3)流式eos:流處理本質上可看成是「」讀取-處理-寫入的管道「」。此eos保證整個過程的操作是原子性。注意,只使用kafka stream

重複問題:rebalance問題,

1.通常會遇到消費的資料處理很耗時,導致超過了kafka的session timeout時間(0.10.x版本預設是30秒),那麼就會rebalance重平衡,此時有一定機率offset沒提交,會導致重平衡後重複消費。

2.或者關閉kafka時,如果在close之前,呼叫consumer.unsubscribe()則可能有部分offset沒提交,下次重啟會重複消費。

3.消費程式和業務邏輯在乙個執行緒,導致offset提交超時。

try  catch (exception e) 

try catch (exception e)

解決方法

配置解決:offset自動提交為false!

業務邏輯解決:自己提交偏移量,讓consumer邏輯冪等

啟用冪等producer:在producer程式中設定屬性enabled.idempotence=true,若要實現多分割槽上的原子性,需要引入事務,啟用事務支援:在producer程式中設定屬性transcational.id為乙個指定字串(你可以認為這是你的額事務名稱,故最好七個有意義的名字),同時設定enable.idempotence=true

冪等實現業務實現參考

生產者資料不丟失

同步模式:配置=1(只有leader收到,-1所有副本成功,0不等待)。leader partition掛了,資料就會丟失。

解決:設定為-1保證produce寫入所有副本算成功

producer.type=sync

request.required.acks=-1

非同步模式,當緩衝區滿了,如果配置為0(沒有收到確認,一滿就丟棄),資料立刻丟棄

解決:不限制阻塞超時時間。就是一滿生產者就阻塞

producer.type=async

request.required.acks=1

queue.buffering.max.ms=5000

queue.buffering.max.messages=10000

queue.enqueue.timeout.ms = -1

batch.num.messages=200

丟包問題傳送資料過快,導致伺服器網絡卡爆滿,或者磁碟處於繁忙狀態,可能會出現丟包現象。

丟包解決方法

1.啟用重試機制,重試間隔時間設定長一些

2.設定生產者(ack=all 代表至少成功傳送一次) ,即需要相應的所有處於isr的分割槽都確認收到該訊息後,才算傳送成功。

3.對kafka進行限速(限速可能會引起rebalance問題)

rebalance問題參考可解決方案

消費者資料不丟失

receiver(開啟wal,失敗可恢復)和director(checkpoint保證)

kafka丟失和重複消費資料

kafka作為當下流行的高併發訊息中介軟體,大量用於資料採集,實時處理等場景,我們在享受他的高併發,高可靠時,還是不得不面對可能存在的問題,最常見的就是丟包,重發問題。1 丟包問題 訊息推送服務,每天早上,手機上各終端都會給使用者推送訊息,這時候流量劇增,可能會出現kafka傳送資料過快,導致伺服器...

Kafka重複消費和丟失資料研究

kafka重複消費原因 底層根本原因 已經消費了資料,但是offset沒提交。原因1 強行kill執行緒,導致消費後的資料,offset沒有提交。原因2 設定offset為自動提交,關閉kafka時,如果在close之前,呼叫 consumer.unsubscribe 則有可能部分offset沒提交...

kafka如何保證訊息不丟失不被重複消費

在解決這個問題之前,我們首先梳理一下kafka訊息的傳送和消費機制。kafka的訊息傳送機制分為同步和非同步機制。可以通過producer.type屬性進行配置。使用同步模式的時候,有三種狀態來保證訊息的安全生產。可以通過配置request.required.acks屬性。三個屬性分別如下 當ack...