kafka訊息回溯

2021-10-23 06:37:34 字數 3840 閱讀 8031

kafka支援兩種方式回溯。一種是基於訊息偏移量回溯,一種是基於時間點的訊息回溯。

基於訊息偏移量回溯

在kafka的每個分割槽中,每條訊息都有乙個唯一的offset值 ,即訊息偏移量,用來

表示訊息在partition分割槽中的位置。消費者每次消費了訊息,都會把消費的此條訊息的offset提交到

broker(訊息節點),用於記錄消費到分割槽中的位置,下條訊息從這個位置之後開始消費。所以基於訊息偏移量回溯很簡單,只需要重置offset,然後消費者會從該offset之後開始消費

基於時間點的訊息回溯

要想講清楚kafka基於時間點的訊息回溯的原理,得先從kafka儲存訊息的檔案格式開始講。

kafka儲存訊息是以日誌的形式儲存的,每乙個partition都對應乙個日誌,但是日誌不是乙個檔案,是多個檔案組成的。日誌檔案都儲存在乙個資料夾裡面的,檔案格式為: topic-0 。

其中topic是kafka對應的主題名稱、0是partition所在的分割槽號。資料夾裡面儲存的是什麼檔案呢,日誌分段檔案、偏移量索引檔案、時間戳索引檔案。

日誌分段檔案

kafka訊息儲存在乙個.log的日誌檔案中,但是隨著日誌檔案越來越大不利於訊息的維護與清理,也不利於集群擴容時訊息的複製。所以kafka需要對日誌進行分段。

日誌分段檔名稱的定義:

日誌分段名稱是由日誌從這日誌片段開始的基準偏移量( baseoffset )命名的,名稱固定為 20 位數字。因為baseoffset是long型的,long型最大值19位,所以檔名20位即可滿足所以的偏移量要求

例如:00000000000000000054.log

當kafka判斷乙個日誌檔案很大了時,就會重新開闢乙個.log的日誌檔案進行訊息寫入,對老的.log檔案設定為唯讀,不能寫入。kafka判斷訊息需要分片的策略有4中,滿足其一即可。

1.當前日誌分段檔案的大小超過了broker端引數 log.segmentbytes配置的值。log.segmentbytes引數的預設值為 1073741824,即lgb

2.當前日誌分段中訊息的最大時間戳與當前系統的時間戳的差值大於log.roll.hours。預設情況下,配置了 log.roll.hours引數,其值為168天。

3.偏移量索引檔案或時間戳索引檔案的大小達到broker端引數log.index.size.max.bytes配置的值。log.index.size.max.bytes 的預設值為10485760,即10mb

4.追加的訊息的偏移量與當前日誌分段的偏移量之間的差值大於 integer.max_value,

偏移量索引檔案檔案儲存的是訊息對應在物理磁碟的位址。是以key、value形式存在的。索引檔案採用稀疏索引( sparse index )的方式構造訊息的索引。它並不保證每個訊息在索引檔案中都有對應的索引 每當寫入一定量(由 broker 端引數 log.index.interval.bytes 指定,預設值為 4096 ,即 4kb )的訊息時,偏移量索引檔案和時間戳索引檔案分別增加乙個偏移量索引項和時間戳索引項。

檔案格式:00000000000054.index 。其中54依然是索引檔案中存在的第乙個訊息的offset(偏移量)

索引格式如下:

每個索引占用 8 個位元組,分為兩個部分。

1. relativeoffset :日誌索引的相對偏移量。相對於檔名的基準偏移量來說的,比如索引的第乙個訊息,那麼relativeoffset是0

2. position :訊息儲存在磁碟的物理位置

那現在應該知道,通過訊息的偏移量怎麼找到對應的日誌分段檔案,然後之後的所有訊息都知道了。

例如現在日誌分段檔案是這樣的:                 那麼對應的索引檔案是這樣的:

0000000000000000012.log                           0000000000000000012.index

0000000000000000034.log                           0000000000000000034.index

0000000000000000078.log                           0000000000000000078.index

其中0000000000000000034.index偏移量索引檔案內容如下:

那麼我們要找到偏移量為56的訊息怎麼找呢?

1. 首先定位到0000000000000000034.log 的日誌檔案。那麼它是怎麼找到這個日誌檔案的呢,kafka中用跳躍表儲存了日誌檔案baseoffset對應的日誌檔名。通過跳躍表很容易查到不大於56的最大的baseoffset,然後定位到日誌檔案。可以計算出56相對於34的相對偏移量為22。

2. 既然定位到日誌檔案是0000000000000000034.log。那麼索引檔案也肯定是0000000000000000034.index。在步驟1中,已經算出了相對偏移量為22,即對應在index索引檔案是relativeoffset。那麼我們需要在0000000000000000034.index中找到不大於22的最大的relativeoffset,是按照第一索引訊息順序找到的,即定位到22的索引。拿到對應的position

3.拿到了對應的物理磁碟position,既能直接找到訊息,然後順序往後查詢,既能拿到所有的訊息進行訊息回溯

時間索引檔案檔案格式:檔案格式:0001586662165087.timeindex 。其中1586662165087對應的是這個時間點生成的時間索引檔案

儲存的時間索引內容格式:

每個索 項占用 12 個位元組,分為兩個部分。

1.timestamp :時間戳。

2.relativeoffset :時間戳所對應的訊息的相對偏移量。

那麼是怎麼通過時間戳找到定義的訊息的呢?

我們先通過時間索引檔案找到時間對應的offset偏移量,在通過偏移量索引檔案找到訊息位置。

例如我們要回溯2020-4-12 09:00:00的之後訊息:

1.首先換算成時間戳為1586653200000

2.根據時間戳1586653200000在時間索引中找到不大於1586653200000的最大的偏移量

3.找了偏移量,按照偏移量索引講解的步驟,逐一去查詢,即可找到對應的訊息position

4.通過position定位了訊息,獲取訊息的生成時間,比1586653200000進行比對,然後按順序逐漸和後面的訊息一一進行時間戳比對,如果前乙個訊息的時間戳<1586653200000 & 後乙個訊息的時間戳 > 1586653200000 。 那麼這個位置是就是訊息回溯點,拿到訊息的offset,對消費者消費記錄的offset進行重置,那麼整個回溯就完成了

訊息佇列 訊息佇列 kafka

kafka是乙個分布式的基於發布 訂閱模式的訊息佇列,主要用於大資料實時處理領域。要理解kafka首先要有分布式的概念,要有訊息佇列的概念。分布式系統最大的優勢就是解耦和削峰,這種情況下,a系統生成了乙個訊息,b系統非同步獲取,那麼就需要乙個存放訊息的訊息佇列 mq 相比較傳統的訊息佇列,訊息被消費...

Kafka訊息模型

一 訊息傳遞模型 傳統的訊息佇列最少提供兩種訊息模型,一種p2p,一種pub sub,而kafka並沒有這麼做,巧妙的,它提供了乙個消費者組的概念,乙個訊息可以被多個消費者組消費,但是只能被乙個消費者組裡的乙個消費者消費,這樣當只有乙個消費者組時就等同與p2p模型,當存在多個消費者組時就是pub s...

Kafka 訊息傳送

建立乙個kafkaprodecer物件,傳入上面建立的properties物件 kafkaproducerproducer new kafkaproducer mykafkaprops 使用prodecerrecord string topic,string key,string value 建構函...