Kafka中的訊息傳遞保證語義

2021-09-24 23:21:21 字數 3559 閱讀 4718

目錄

kafka提供的訊息傳遞保證語義

producer的訊息傳遞語義

at-least-once傳遞語義

exactly-once傳遞語義

request.required.acks永續性級別

consumer的offset記錄方式

記錄offset的位置

提交offset方法

自動提交offset

手動提交offset

consumer的訊息傳遞語義

at-most-once傳遞語義

at-least-once傳遞語義

exactly-once傳遞語義 總結

kafka提供的producer和consumer之間的訊息傳遞保證語義(message delivery guarantee semantics)有以下三種

可以分為兩個問題來看:傳送訊息的保證和消費訊息的保證。

當producer向broker傳送訊息時,一旦這條訊息被commit,由於副本機制(replication)的存在,它就不會丟失。但是如果producer傳送資料給broker後,遇到的網路問題而造成通訊中斷,那producer就無法判斷該條訊息是否已經提交(commit)。

在0.11.0.0之前,如果乙個producer沒有收到訊息提交的響應,它只能重新傳送訊息,確保訊息已經正確傳輸到broker中,這提供了at-least-once傳遞語義,因為如果原來的請求實際上成功了,則在重新傳送時將再次把訊息寫入到日誌中。

自0.11.0.0起,kafka producer支援冪等傳遞選項,保證重新傳送不會導致在日誌**現重複項。為了實現這個目的,broker為每個producer分配乙個id,並通過每個訊息的序列號來進行去重。

啟用冪等傳遞的方法是配置:enable.idempotence=true

從0.11.0.0開始,producer支援使用類似事務的語義將訊息傳送到多個topic分割槽:即所有訊息要麼都被成功寫入,或者都沒有。這個主要用於kafka topic之間「exactly-once「處理。

啟用事務支援的方法是:設定屬性transcational.id為乙個指定字串

並不是所有的場景都需要這麼強的保證,對於延遲敏感的情況,producer可以通過request.required.acks引數指定它期望的永續性級別。如producer指定它想要等待訊息的committed,則這可能需要10毫秒量級的延遲。然而,producer也可以指定它想要完全非同步地執行傳送,或者它只等到leader(不需要副本的響應)的響應。

引數值解釋

優缺點0

producer發出訊息即完成傳送,不等待確認

延遲最小、可靠性最差,最容易丟失訊息

1當且僅當leader收到訊息返回確認訊號後認為傳送成功

只有當leader crash,而且未被同步至其他follower時才丟訊息

-1只有當leader以及所有follower都收到訊息確認後,才傳送成功

最好的可靠性,延遲也較大。但是還是有可能丟訊息

我們知道,kafka的服務端並不會記錄consumer的消費位置,而是由consumer自己決定如何如何儲存、如何記錄其消費的offset。舊版本的kafka將消費位置記錄在zookeeper中,在新版本中,為了緩解zookeeper集群的壓力,在kafka的服務端中新增了乙個名為」__consumer_offsets「的內部topic

使用"__consumer_offsets" topic記錄consumer的消費位置只是預設選項,仍然可以根據業務需求將offset記錄在別的儲存系統中。

當記錄offset到外部系統時,需要將consumer的當前位置與實際要儲存為輸出的位置進行協調。實現這一目標的典型方法是在consumer位置的儲存和consumer輸出的儲存之間引入兩階段的」提交「。也可以更簡單一些,通過讓consumer將其offset儲存在與其輸出相同的位置。這樣最好,因為大多數的輸出系統不支援兩階段」提交「。例如,乙個kafka connect connector,它填充hdfs中的資料以及它讀取的資料的offset,以保證資料和offset都被更新,或者都不更新。 對於需要這些更強大語義的許多其他資料系統,我們遵循類似的模式,為此,訊息不具有用來去重的主鍵。

在consumer消費訊息的過程中,提交offset的時機顯得非常重要,因為它決定了consumer故障重啟後的消費位置。

通過將enable.auto.commit設定為true,可以啟用自動提交,auto.commit.interval.ms則設定了自動提交的時間間隔。

自動提交是由輪詢迴圈驅動的。當輪詢時,consumer檢查是否提交,如果是的話,它將提交上次輪詢中返回的偏移量。

如果consumer從未崩潰,它可以將這個位置儲存在記憶體中,但是如果consumer失敗了,我們希望這個topic分割槽被另乙個程序接管,那麼新程序將需要選擇乙個合適的位置開始處理。假設consumer讀取了一些資訊 - 它有幾個選項用於處理訊息並更新其位置。

讀取訊息,然後在日誌中儲存它的位置,最後處理訊息。

在這種情況下,有可能consumer儲存了位置之後,但是處理訊息輸出之前崩潰了。如下圖

在這種情況下,接管處理的程序會在已儲存的位置開始,即使該位置之前有幾個訊息尚未處理。在consumer處理失敗訊息的情況下,不進行處理。

讀取訊息,處理訊息,最後儲存訊息的位置。

在這種情況下,可能消費程序處理訊息之後,但儲存它的位置之前崩潰了。如下圖

在這種情況下,當新的程序接管了它,這將接收已經被處理的前幾個訊息。這就符合了「至少一次」的語義。在多數情況下訊息有乙個主鍵,以便更新冪等(其任意多次執行所產生的影響均與一次執行的影響相同)。

當從乙個kafka topic消費並produce到另乙個topic時(例如kafka stream),我們可以利用之前提到0.11.0.0中的producer的新事務功能。consumer的位置可以像乙個訊息一樣儲存到topic中,我們就可以使用接收處理資料並輸出topic相同的事務,將offset寫入到kafka。如果事務中斷,則consumer的位置將恢復到老的值,根據其」隔離級別「,其他consumer將不會看到輸出topic產生的資料,在預設的」讀取未提交「隔離級別中,所有訊息對consumer都是可見的,即使是被中斷的事務的訊息。但是在」讀取已提交「中,consumer將只從已提交的事務中返回訊息。

kafka預設是保證「至少一次」訊息傳遞;

通過禁止producer重試和在處理訊息前提交它的offset可以實現「最多一次」訊息傳遞;

在kafka streams中支援「正好一次」訊息傳遞;

在kafka topics之間傳遞和處理資料時,通過帶事務的producer/consumer也可以提供「正好一次」的訊息傳遞;

當和其它儲存系統傳遞資料時,如果要保證「正好一次」訊息傳遞,需要與其配合,但kafka提供了偏移量,所以實現起來也很簡單(可參考kafka connect)。

kafka如何保證訊息有序

兩種方案 方案一,kafka topic 只設定乙個partition分割槽 方案二,producer將訊息傳送到指定partition分割槽 解析 方案一 kafka預設保證同乙個partition分區內的訊息是有序的,則可以設定topic只使用乙個分割槽,這樣訊息就是全域性有序,缺點是只能被co...

kafka保證訊息順序性

順序保證 kafka 可以保證同乙個分割槽裡的訊息是有序的。也就是說,如果生產者按照一定的順序傳送訊息,broker 就會按照這個順序把它們寫入分割槽,消費者也會按照同樣的順序讀取它們。1.單分割槽 2.如果把 retries 設為非零整數,必須把 max.in.flight.requests.pe...

kafka保證訊息不丟失

一 消費端保證訊息不丟失 消費端從broker取到訊息以後,先處理業務邏輯,然後再手動提交,這樣就可以避免消費端訊息丟失。二 生產端訊息不丟失 首先是設定每個訊息分割槽的副本,一本是幾個broker就配置幾個分割槽,然後設定如下,保證生產這生產的訊息傳送到broker時,不但leader確認收到訊息...