kafka處理訊息寫入和備份的全流程

2021-10-10 00:20:45 字數 3219 閱讀 9584

base offset:是起始位移,該副本中第一條訊息的offset,如下圖,這裡的起始位移是0,如果乙個日誌檔案寫滿1g後(預設1g後會log rolling),這個起始位移就不是0開始了。

hw(high watermark):副本的高水印值;

leo包括leader副本和follower副本。

leader leo:leader的leo就儲存在其所在的broker的快取裡,當leader副本log檔案寫入訊息後,就會更新自己的leo。

remote leo和follower leo:remote leo是儲存在leader副本上的follower副本的leo,可以看出leader副本上儲存所有副本的leo,當然也包括自己的。follower leo就是follower副本的leo,因此follower相關的leo需要考慮上面兩種情況。

hw包括leader副本和follower副本。

leader hw:它的更新是有條件的,參考書籍中給出了四種情況,如下是其中的一種,就是producer向leader副本寫訊息的情況,當滿足四種情況之一,就會觸發hw嘗試更新。如下圖所示更新時會比較所有滿足條件的副本的leo,包括自己的leo和remote leo,選取最小值作為更新後的leader hw。

四種情況如下,其中最常見的情況就是前兩種。

1.producer向leader寫訊息,會嘗試更新。

2.leader處理follower的fetch請求,先讀取log資料,然後嘗試更新hw。

3.副本成為leader副本時,會嘗試更新hw。

4.broker崩潰可能會波及leader副本,也需要嘗試更新。

follower hw:更新發生在follower副本更新leo之後,一旦follower向log寫完資料,它就會嘗試更新hw值。比較自己的leo值與fetch響應中leader副本的hw值,取最小者作為follower副本的hw值。可以看出,如果follower的leo值超過了leader的hw值,那麼follower hw值是不會超過leader hw值的。

前提條件:考慮乙個主題,只有乙個分割槽,兩個副本的情況,並且剛開始都沒有任何訊息在log日誌檔案。

在考慮fetch請求時,需要考慮兩種情況,接下來就只考慮第二種情況,第一種情況也可以參考第二種情況。

producer暫時無法響應follower partition的請求,如沒有資料可以返回,這時fetch請求會快取在乙個叫做purgatory的物件裡(請求不會無限期快取,預設500ms)。在快取期間,如果producer傳送produce請求,則被喚醒,接下來會正常處理fetch請求。

producer正常響應follower partition的請求。

下面分析第二種情況,即producer正常響應follower的情況。

當leader副本接受到了producer的訊息,並且此時沒有follower副本fetch請求,在這樣的前提下,它會先做如下操作。

寫入訊息到log日誌檔案,更新leader leo為1。

嘗試更新remote leo,由於沒有fetch請求,因此它是0,不需要更新。

做min(leader leo,remote leo)的計算,結果為0,這樣leader hw無需更新,依然是0。

第一次fetch請求,分leader端和follower端:

leader端:

讀取底層log資料。

根據fetch帶過來的offset=0的資料(就是follower的leo,因為follower還沒有寫入資料,因此leo=0),更新remote leo為0。

嘗試更新hw,做min(leader leo,remote leo)的計算,結果為0。

把讀取到的log資料,加上leader hw=0,一起發給follower副本。

follower端:

寫入資料到log檔案,更新自己的leo=1。

更新hw,做min(leader hw,follower leo)的計算,由於leader hw=0,因此更新後hw=0。

可以看出,第一次fetch請求後,leader和follower都成功寫入了一條訊息,但是hw都依然是0,對消費者來說都是不可見的,還需要第二次fetch請求。

第二次fetch請求,分leader端和follower端:

leader端:

讀取底層log資料。

根據fetch帶過來的offset=1的資料(上一次請求寫入了資料,因此leo=1),更新remote leo為1。

嘗試更新hw,做min(leader leo,remote leo)的計算,結果為1。

把讀取到的log資料(其實沒有資料),加上leader hw=1,一起發給follower副本。

follower端:

寫入資料到log檔案,沒有資料可以寫,leo依然是1。

更新hw,做min(leader hw,follower leo)的計算,由於leader hw=1,因此更新後hw=1。

這個時候,才完成資料的寫入,並且分割槽hw(分割槽hw指的就是leader副本的hw)更新為1,代表消費者可以消費offset=0的這條訊息了,上面的過程就是kafka處理訊息寫入和備份的全流程。

最後,使用hw來記錄訊息在副本中提交或備份的進度,其實是存在缺陷的,在kafka 0.11.0.0後的版本中,使用leader epoch解決了。

leader 故障:

leader 發生故障後,會從副本同步佇列isr 協會去找乙個新的leader ,然後他下面的所有員工小弟就必須聽自己

的話,以為我為hw為標準,大於我的都個字擷取掉。然後繼續同步資料;

follower故障:

當故障後會被isr協會踢出去,待恢復後,首先讀本地磁碟的log記錄,高於hw部分截掉,然後從hw開始向leader進行同步,等到追上了leader後,才能加入isr協會;

php處理kafka訊息

php如果要使用kafka的話,需要安裝一下kafka php composer require nmred kafka phpkafka php的github位址 先寫乙個kafka producer.php,用來做為生產者 require var www extend vendor autolo...

關於kafka處理大訊息的方法

最近發現kafka在傳送一些大訊息的時候會報錯,修改了配置max.request.size。問題依舊。後來查閱了一下,都說要調大限制message大小的引數,不過試過之後發現貌似沒什麼作用。查閱文件發現之前用的客戶端kafka已經三年沒更新了0.0,後改為目前官方推薦的客戶端confluent ka...

訊息的寫入和讀取流程

接之前幾篇訊息中介軟體元件的模組劃分,本篇內容講述訊息的寫入和讀取流程。佇列模型 在描述訊息的寫入和讀取流程之前,首先要弄清楚訊息佇列的模型是怎麼樣的,包括訊息是怎麼儲存的。簡化的佇列模型大致如上圖所示。儲存模型 為了便於理解,上面的佇列模型中僅僅將訊息的乙個寫入佇列抽象成乙個topic parti...