kafka設計之冪等性和事務

2021-09-26 14:31:53 字數 4226 閱讀 1475

http/1.1中對冪等性的定義是:一次和多次請求某乙個資源對於資源本身應該具有同樣的結果(網路超時等問題除外)。也就是說,其任意多次執行對資源本身所產生的影響均與一次執行的影響相同

實現冪等的關鍵點就是服務端可以區分請求是否重複,過濾掉重複的請求。要區分請求是否重複的有兩點:

唯一標識:要想區分請求是否重複,請求中就得有唯一標識。例如支付請求中,訂單號就是唯一標識

記錄下已處理過的請求標識:光有唯一標識還不夠,還需要記錄下那些請求是已經處理過的,這樣當收到新的請求時,用新請求中的標識和處理記錄進行比較,如果處理記錄中有相同的標識,說明是重複交易,拒絕掉

kafka冪等性實現原理

為了實現 producer 的冪等語義,kafka 引入了producer id(即pid)和sequence number。每個新的 producer 在初始化的時候會被分配乙個唯一的 pid,該 pid 對使用者完全透明而不會暴露給使用者。

對於每個 pid,該 producer 傳送資料的每個都對應乙個從 0 開始單調遞增的sequence number。

類似地,broker 端也會為每個維護乙個序號,並且每次 commit 一條訊息時將其對應序號遞增。對於接收的每條訊息,如果其序號比 broker 維護的序號(即最後一次 commit 的訊息的序號)大一,則 broker 會接受它,否則將其丟棄: 實現

為了實現producer的冪等性,kafka引入了producer id(即pid)和sequence number。

kafka可能存在多個生產者,會同時產生訊息,但對kafka來說,只需要保證每個生產者內部的訊息冪等就可以了,所有引入了pid來標識不同的生產者。

對於kafka來說,要解決的是生產者傳送訊息的冪等問題。也即需要區分每條訊息是否重複。

kafka通過為每條訊息增加乙個sequence numbler,通過sequence numbler來區分每條訊息。每條訊息對應乙個分割槽,不同的分割槽產生的訊息不可能重複。所有sequence numbler對應每個分割槽

broker端在快取中儲存了這seq number,對於接收的每條訊息,如果其序號比broker快取中序號大於1則接受它,否則將其丟棄。這樣就可以實現了訊息重複提交了。但是,只能保證單個producer對於同乙個的exactly once語義。不能保證同乙個producer乙個topic不同的partion冪等。

流程kafkaproducer啟動時,會初始化乙個

transactionmanager 例項,它的作用有以下幾個部分:

記錄本地的事務狀態(事務性時必須)

記錄一些狀態資訊以保證冪等性,比如:每個 topic-partition 對應的下乙個 sequence numbers 和 last acked batch(最近乙個已經確認的 batch)的最大的 sequence number 等;

記錄 produceridandepoch 資訊(pid 資訊)。

冪等性時,producer 的傳送流程如下:

1)呼叫kafkaproducer的send方法將資料新增到 recordaccumulator 中,新增時會判斷是否需要新建乙個 producerbatch,這時這個 producerbatch 還是沒有 pid 和 sequence number 資訊的;

2)producer 後台傳送執行緒 sender,在 run() 方法中,會先根據 transactionmanager 的 shouldresetproducerstateafterresolvingsequences() 方法判斷當前的 pid 是否需要重置,重置的原因是因為:如果有topic-partition的batch已經超時還沒處理完,此時可能會造成sequence number 不連續。因為sequence number 有部分已經分配出去了,而kafka服務端沒有收到這部分sequence number 的序號,kafka服務端為了保證冪等性,只會接受同乙個pid的sequence number 等於服務端快取sequence number +1的訊息,所有這時候需要重置pid來保證冪等性

3)sender執行緒呼叫maybewaitforproducerid()方法判斷是否要申請pid,如果需要,會阻塞直到成功申請到pid

5)最後呼叫sendproducerequest方法將訊息傳送出去

上述冪等設計只能保證單個 producer 對於同乙個的exactly once語義。

另外,它並不能保證寫操作的原子性——即多個寫操作,要麼全部被 commit 要麼全部不被 commit。

更不能保證多個讀寫操作的的原子性。尤其對於 kafka stream 應用而言,典型的操作即是從某個 topic 消費資料,經過一系列轉換後寫回另乙個 topic,保證從源 topic 的讀取與向目標 topic 的寫入的原子性有助於從故障中恢復。

事務保證可使得應用程式將生產資料和消費資料當作乙個原子單元來處理,要麼全部成功,要麼全部失敗,即使該生產或消費跨多個。

另外,有狀態的應用也可以保證重啟後從斷點處繼續處理,也即事務恢復。

為了實現這種效果,應用程式必須提供乙個穩定的(重啟後不變)唯一的 id,也即transaction id。transactin id與pid可能一一對應。區別在於transaction id由使用者提供,而pid是內部的實現對使用者透明。

另外,為了保證新的 producer 啟動後,舊的具有相同transaction id的 producer 即失效,每次 producer 通過transaction id拿到 pid 的同時,還會獲取乙個單調遞增的 epoch。由於舊的 producer 的 epoch 比新 producer 的 epoch 小,kafka 可以很容易識別出該 producer 是老的 producer 並拒絕其請求。

有了transaction id後,kafka 可保證:

需要注意的是,上述的事務保證是從 producer 的角度去考慮的。從 consumer 的角度來看,該保證會相對弱一些。尤其是不能保證所有被某事務 commit 過的所有訊息都被一起消費,因為:

這一節所說的事務主要指原子性,也即 producer 將多條訊息作為乙個事務批量傳送,要麼全部成功要麼全部失敗。

為了實現這一點,kafka 0.11.0.0 引入了乙個伺服器端的模組,名為transaction coordinator,用於管理 producer 傳送的訊息的事務性。

該transaction coordinator維護transaction log,該 log 存於乙個內部的 topic 內。由於 topic 資料具有永續性,因此事務的狀態也具有永續性。

producer 並不直接讀寫transaction log,它與transaction coordinator通訊,然後由transaction coordinator將該事務的狀態插入相應的transaction log。

transaction log的設計與offset log用於儲存 consumer 的 offset 類似。

許多基於 kafka 的應用,尤其是 kafka stream 應用中同時包含 consumer 和 producer,前者負責從 kafka 中獲取訊息,後者負責將處理完的資料寫回 kafka 的其它 topic 中。

為了實現該場景下的事務的原子性,kafka 需要保證對 consumer offset 的 commit 與 producer 對傳送訊息的 commit 包含在同乙個事務中。否則,如果在二者 commit 中間發生異常,根據二者 commit 的順序可能會造成資料丟失和資料重複:

如果先 commit producer 傳送資料的事務再 commit consumer 的 offset,即at least once語義,可能造成資料重複。

如果先 commit consumer 的 offset,再 commit producer 資料傳送事務,即at most once語義,可能造成資料丟失。

為了區分寫入 partition 的訊息被 commit 還是 abort,kafka 引入了一種特殊型別的訊息,即control message。該類訊息的 value 內不包含任何應用相關的資料,並且不會暴露給應用程式。它只用於 broker 與 client 間的內部通訊。

對於 producer 端事務,kafka 以 control message 的形式引入一系列的transaction marker。consumer 即可通過該標記判定對應的訊息被 commit 了還是 abort 了,然後結合該 consumer 配置的隔離級別決定是否應該將該訊息返回給應用程式。

Kafka冪等性和事務

訊息交付可靠性保障 所謂的訊息交付可靠性保障,是指kafka對producer和consumer要處理的訊息提供什麼樣的承諾,常見三種如下 最多一次 at most once 訊息可能會丟失,但絕不會被重 送。只需要設定producer禁止重複即可。訊息要麼寫入成功,要麼寫入失敗,不會進行重試。至少...

Kafka生產者的冪等性和事務性

訊息交付可靠性保證,即指kafka對producer和consumer要處理的資訊的保證。最多一次 訊息可能會丟失,但絕對不會重複 至少一次 訊息不會丟失,但可能重複 精確一次 訊息不會丟失,也不會重 送 kafka預設對訊息的保證是至少一次 當然我們都希望精確一次。kafka同樣也提供了對精確一次...

分布式事務七 冪等性設計

分布式事務二 分布式事務處理三 分布式事務四 基於可靠訊息的最終一致性 分布式事務五 基於可靠訊息的最終一致性 異常流程 分布式事務六 常規mq佇列 分布式事務七 冪等性設計 分布式事務八 可靠訊息最終一致性方案 分布式事務九 基於可靠訊息的最終一致性 分布式事務10 最大努力通知形勢 柔性事務解決...