kafka 訊息的儲存分析

2021-10-24 02:22:32 字數 4967 閱讀 2273

為了規避隨機讀寫帶來的時間消耗,kafka採用順序寫的方式儲存資料。即使是這樣,但是i/o操作仍然會造成磁碟的效能瓶頸,所以kafka還有乙個效能策略。

一般應用程式有乙個buffer空間在使用者空間中,來自於網路或者磁碟,無論來自網路或者磁碟,都需要通過核心,也就是說核心中也要有buffer。

1)磁碟到核心 --> 2)核心到應用程式buffer 寫資料時 --> 3)應用程式buffer寫到核心buffer --> 4)核心buffer寫到磁碟

這個過程多了兩次拷貝,kafka本身因為不處理資料,所以沒有必要把資料放入應用程式的buffer中。所以搞了個基於核心的資料儲存和傳輸,使用sendfile機制,直接基於核心kernel處理。

offset 即 每個訊息針對每個consumer group 的偏移量,記錄該consumer group 消費到了具體的位置。

在kafka 中,體用了乙個__consumer_offsets-*的乙個topic ,把offset 資訊寫入到這個topic中。預設有50個分割槽。

檢視groupid的offset儲存在哪個分割槽中,計算公式為

system.out.

println

(math.

abs(

("kafkaconsumerdemo1"

.hashcode()

)%50)

);

bin/kafka-******-consumer-shell.sh --topic __consumer_offsets  --partition 4 --broker-list 192.168.45.135:9092,192.168.45.131:9092,192.168.45.134:9092 --formatter "kafka.coordinator.group.groupmetadatamanager\$offsetsmessageformatter"
[groupid,topic,partition]::[offsetmetadata[offset,..]....]

[kafkaconsumerdemo1,demo,0]::[offsetmetadata[165,no_metadata],committime 1547543212536,expirationtime 1547629612536]

乙個topic 可以有多個partition 在物理磁碟上進行儲存,進入到logs目錄中,可以找到對應partition下的日誌內容

cd /guaoran/kafka/logs/guaoran-0/

ls00000000000000000000.index 00000000000000000000.log 00000000000000000000.timeindex leader-epoch-checkpoint

kafka 是通過分段的方式將log分為多個logsegment,logsegment是乙個邏輯上的概念,乙個logsegment對應磁碟上的乙個日誌檔案和乙個索引檔案,其中(.log)日誌檔案是用來記錄訊息的,(.index)索引檔案時用來儲存訊息的索引。

當kafka producer 不斷傳送訊息,必然會引起partition檔案的五險擴張,這樣對於訊息檔案的維護以及被消費的訊息的清理都會帶來非常大的挑戰,所以kafka 以segment 為單位又把partition進行細分。每個partition相當於乙個巨型檔案被平均分配到多個大小相等的segment資料檔案中(每個segment檔案中的訊息不一定相等),這種特性方便已經被消費的訊息的清理,提高磁碟的利用率。

server.properties 中有以下幾個配置

# 分段檔案的大小

log.segment.bytes=107370

## 訊息清理

# 日誌訊息預設儲存7天

log.retention.hours=168

# 訊息的大小,超過這個大小,會清理

log.retention.bytes=1073741824

為了看到明顯的效果,將分段檔案大小改小了,並進行傳送多個訊息到 guaoran 的topic中,再次檢視

segment 檔案由三部分組成,分別是.index , .log , .timeindex 字尾,

採用以下命令對 .index 檔案進行檢視

/guaoran/kafka/kafka_2.11-1.1.0/bin/kafka-run-class.sh kafka.tools.dumplogsegments --files 00000000000000000000.index --print-data-log
結果如下:

offset: 53 position: 4124

offset: 106 position: 8264..

.offset: 1302 position: 103050

offset: 1354 position: 107210

採用以下命令對 .log檔案進行檢視

/guaoran/kafka/kafka_2.11-1.1.0/bin/kafka-run-class.sh kafka.tools.dumplogsegments --files 00000000000000000000.log --print-data-log
結果如下:

offset: 1301 position: 102970 createtime: 1547557716588 payload: message_1301

offset: 1302 position: 103050 createtime: 1547557716601 payload: message_1302

offset: 1303 position: 103130 createtime: 1547557716612 payload: message_1303

offset: 1304 position: 103210 createtime: 1547557716624 payload: message_1304..

.offset: 1353 position: 107130 createtime: 1547557717167 payload: message_1353

offset: 1354 position: 107210 createtime: 1547557717179 payload: message_1354

offset: 1355 position: 107290 createtime: 1547557717183 payload: message_1355

第乙個log檔案的最後乙個offset為1355,所以下乙個segment的檔案命名為00000000000000001356.log

如上面所看檢視的index 和log 的檔案內容,進行分析

為了提高查詢訊息的效能,為每乙個日誌檔案新增2個索引,索引檔案:offsetindex 和 timeindex ,分別對應 .index 和 .timeindex

.index 檔案中儲存了索引 以及物理偏移量。.log 檔案中儲存了訊息的內容。索引檔案的元資料執行對應資料檔案中message 的物理偏移位址。以【1302,103050】為例, log檔案中,對應的是滴1302條記錄,物理偏移量(position)為103050,position 是bytebuffer 的指標位置。

根據offset 的值,查詢 segment 段中的 index 索引檔案。由於索引檔案命名是以上乙個檔案的最後乙個offset進行命令的,所以,使用二分查詢演算法能夠根據offset快速定位到指定的索引檔案。

找到索引檔案後,根據offset進行定位,找到索引檔案中的復合範圍的索引。(kafka 採用稀疏索引的方式來提高查詢效能)

得到position以後,在到對應的log檔案中,從position處開始查詢offset對應的訊息,將每條訊息的offset與目標offset進行比較,知道找到訊息

比如找 offset=1303的訊息,那麼會先找到000.index 檔案,找到【1302,103050】 這個索引,在到log檔案中,根據 103050 這個position 開始查詢offset = 1303的訊息,當確定對應的訊息後進行返回。

日誌是分段儲存的,一方面能夠減少單個檔案內容的大小,另一方面,方便kafka 進行日誌清理。日誌的清理策略有兩個:

根據訊息的保留時間,當訊息在kafka中儲存的時間超過了指定的時間,就會觸發清理過程log.retention.hours=168預設7天

根據topic儲存的資料大小,當topic所佔的日誌檔案大小大於一定的閾值,則開始刪除最久的訊息。kafka會啟動乙個後台執行緒,定期檢查是否存在可以刪除的訊息。log.retention.bytes=1073741824預設1g

通過上面這兩個引數來設定,當其中任意乙個達到要求,都會執行刪除。

kafka 還提供了日誌壓縮功能,通過這個功能可以有效的減少日誌檔案的大小,緩解磁碟緊張的情況,在很多實際場景中,訊息的key和value的值之間的對應關係是不斷變化的,就像資料庫中的資料會不斷被修改一樣消費者只關心key對應的最新value值。因此,我們可以開啟kafka的日誌壓縮功能,服務端會在後台啟動cleaner 執行緒池,定期將相同的key進行合併,只保留最新的value值。

預設情況下啟動日誌清理程式,要在特定主題上啟用日誌清理,您可以新增特定於日誌的屬性log.cleanup.policy=compact,日誌清理程式可以配置為保留最小量的日誌的未壓縮「頭」。通過設定壓縮時間延遲來啟用此功能。log.cleaner.min.compaction.lag.ms.

日誌壓縮的原理

Kafka訊息儲存概覽

kafka作為乙個訊息中介軟體系統,面臨的首要問題就是訊息如何持久化,如何方便地進行讀寫和解析。本文將就kafka的訊息儲存問題開乙個頭,後續將會對重要的 部分一一講解。kafka的訊息概念,首先我們在此談論的不是網路傳遞中的訊息,而更多偏向於記錄的意思,也就是消費者和生產者所在意的實際物件。訊息是...

Kafka 日誌訊息儲存時間

分段策略屬性 屬性名含義 預設值log.roll.日誌滾動的週期時間,到達指定週期時間時,強制生成乙個新的segment 168 7day log.segment.bytes 每個segment的最大容量。到達指定容量時,將強制生成乙個新的segment 1g 1為不限制 log.retention...

訊息佇列 訊息佇列 kafka

kafka是乙個分布式的基於發布 訂閱模式的訊息佇列,主要用於大資料實時處理領域。要理解kafka首先要有分布式的概念,要有訊息佇列的概念。分布式系統最大的優勢就是解耦和削峰,這種情況下,a系統生成了乙個訊息,b系統非同步獲取,那麼就需要乙個存放訊息的訊息佇列 mq 相比較傳統的訊息佇列,訊息被消費...