Kafka生產者ack和lag機制剖析

2021-10-21 23:09:26 字數 3343 閱讀 2579

kafka有兩個很重要的配置引數,acks與min.insync.replicas。

其中acks是producer的配置引數,min.insync.replicas是broker端的配置引數,這兩個引數對於生產者不丟失資料起到了很大的作用。接下來,本文會以圖標的方式講解這兩個引數的含義和使用方式。通過本文,你可以了解到:

kafka的topic是可以分割槽的,並且可以為分割槽配置多個副本,改配置可以通過replication.factor引數實現。kafka中的分割槽副本包括兩種型別:領導者副本(leader replica)和追隨者副本(follower replica),每個分割槽在建立時都要選舉乙個副本作為領導者副本,其餘的副本自動變為追隨者副本。在 kafka 中,追隨者副本是不對外提供服務的,也就是說,任何乙個追隨者副本都不能響應消費者和生產者的讀寫請求。所有的請求都必須由領導者副本來處理. 換句話說,所有的讀寫請求都必須發往領導者副本所在的 broker,由該 broker 負責處理。追隨者副本不處理客戶端請求,它唯一的任務就是從領導者副本非同步拉取訊息,並寫入到自己的提交日誌中,從而實現與領導者副本的同步。

kafka預設的副本因子是3,即每個分割槽只有1個leader副本和2個follower副本。具體如下圖所示:

上面提到生產者客戶端僅寫入leader broker,跟隨者非同步複製資料。由於kafka是乙個分布式系統,必然會存在與 leader 不能實時同步的風險,所以需要一種方法來判斷這些追隨者是否跟上了領導者的步伐, 即追隨者是否同步了最新的資料。換句話說,kafka 要明確地告訴我們,追隨者副本到底在什麼條件下才算與 leader 同步?這就是下面所要說的isr同步副本機制.

in-sync replica(isr)稱之為同步副本,isr中的副本都是與leader進行同步的副本,所以不在該列表的follower會被認為與leader是不同步的。那麼,isr中存在是什麼副本呢?首先可以明確的是:leader副本總是存在於isr中。 而follower副本是否在isr中,取決於該follower副本是否與leader副本保持了「同步」。

(1) 上面所說的同步不是指完全的同步,即並不是說一旦follower副本同步滯後與leader副本,就會被踢出isr列表。

(2) kafka的broker端有乙個引數replica.lag.time.max.ms, 該引數表示follower副本滯後於leader副本的最長時間間隔,預設是10秒。這就意味著,只要follower副本落後於leader副本的時間間隔不超過10秒,就可以認為該follower副本與leader副本是同步的,所以哪怕當前follower副本落後於leader副本幾條訊息,只要在10秒之內趕上leader副本,就不會被踢出出局。

(3) 如果follower副本被踢出isr列表,等到該副本追上了leader副本的進度,該副本會被再次加入到isr列表中,所以isr是乙個動態列表,並不是靜態不變的。

如上圖所示:broker3上的partition1副本超過了規定時間,未與leader副本同步,所以被踢出isr列表,此時的isr為[1,3]。

acks引數指定了必須要有多少個分割槽副本收到訊息,生產者才認為該訊息是寫入成功的,這個引數對於訊息是否丟失起著重要作用,該引數的配置具體如下:

acks=0,表示生產者在成功寫入訊息之前不會等待任何來自伺服器的響應。換句話說,一旦出現了問題導致伺服器沒有收到訊息,那麼生產者就無從得知,訊息也就丟失了。改配置由於不需要等到伺服器的響應,所以可以以網路支援的最大速度傳送訊息,從而達到很高的吞吐量。

acks=1,表示只要集群的leader分割槽副本接收到了訊息,就會向生產者傳送乙個成功響應的ack,此時生產者接收到ack之後就可以認為該訊息是寫入成功的。一旦訊息無法寫入leader分割槽副本(比如網路原因、leader節點崩潰),生產者會收到乙個錯誤響應,當生產者接收到該錯誤響應之後,為了避免資料丟失,會重新傳送資料。這種方式的吞吐量取決於使用的是非同步傳送還是同步傳送(非同步傳送如何重試?)。

acks =all,表示只有所有參與複製的節點(isr列表的副本)全部收到訊息時,生產者才會接收到來自伺服器的響應。這種模式是最高端別的,也是最安全的,可以確保不止乙個broker接收到了訊息。該模式的延遲會很高。

上面提到,當acks=all時,需要所有的副本都同步了才會傳送成功響應到生產者。其實這裡面存在乙個問題:如果leader副本是唯一的同步副本時會發生什麼呢?此時相當於acks=1。所以是不安全的。

kafka的broker端提供了乙個引數**min.insync.replicas**,該引數控制的是訊息至少被寫入到多少個副本才算是"真正寫入",該值預設值為1,生產環境設定為乙個大於1的值可以提公升訊息的永續性. 因為如果同步副本的數量低於該配置值,則生產者會收到錯誤響應,從而確保訊息不丟失.

case 1

如下圖,當min.insync.replicas=2且acks=all時,如果此時isr列表只有[1,2],3被踢出isr列表,只需要保證兩個副本同步了,生產者就會收到成功響應。

如下圖,當min.insync.replicas=2,如果此時isr列表只有[1],2和3被踢出isr列表,那麼當acks=all時,則不能成功寫入數;當acks=0或者acks=1可以成功寫入資料。

這種情況是很容易引起誤解的,如果acks=all且min.insync.replicas=2,此時isr列表為[1,2,3],那麼還是會等到所有的同步副本都同步了訊息,才會向生產者傳送成功響應的ack。因為min.insync.replicas=2只是乙個最低限制,即同步副本少於該配置值,則會拋異常,而acks=all,是需要保證所有的isr列表的副本都同步了才可以傳送成功響應。如下圖所示:

總結acks=0,生產者在成功寫入訊息之前不會等待任何來自伺服器的響應。

acks=1,只要集群的leader分割槽副本接收到了訊息,就會向生產者傳送乙個成功響應的ack。

acks=all,表示只有所有參與複製的節點(isr列表的副本)全部收到訊息時,生產者才會接收到來自伺服器的響應,此時如果isr同步副本的個數小於min.insync.replicas的值,訊息不會被寫入。

kafka同步生產者和非同步生產者深入剖析

什麼是kafka同步生產者,什麼是kafka非同步生產者?比如這裡某個topic有3個分割槽。kafka同步生產者 這個生產者寫一條訊息的時候,它就立馬傳送到某個分割槽去。kafka非同步生產者 這個生產者寫一條訊息的時候,先是寫到某個緩衝區,這個緩衝區裡的資料還沒寫到broker集群裡的某個分割槽...

Kafka之生產者

1 方便在集群中擴充套件,乙個topic可以有多個partition組成,而每個partition可以通過調整以適應它所在的機器 2 可以提高併發,因為可以以partition為單位讀寫 我們需要將生產者傳送的資料封裝成乙個producerrecord物件。1 指明partition的情況下,直接將...

kafka 生產者(二)

想要提高生產者的吞吐量可以通過調整一下4個引數來實現 batch.size 批次大小,預設16k linger.ms 等待時間,修改為5 100ms recordaccumulator 緩衝區大小,修改為64m 實現 public class customproducerparameters 關閉資...