Flume ng FileChannel原理解析

2021-09-21 09:57:51 字數 4653 閱讀 3397

對於

flume

來說主要有兩個

channel

:memory

,file

;對於線上環境主要以

filechannel

為主,因此這裡主要討論它的實現: 在

filechannel

裡主要由乙個

wal的

log和乙個記憶體佇列組成:

filechannel

的queue

主要又以下幾個部分組成:

privatefinaleventqueuebackingstore 

backingstore;

privatefinal

inflighttakes;

privatefinal

inflightputs;

其中backingstore

代表了queue

在持久化存在,使用了記憶體對映檔案的方式;每次對

queue

的讀寫操作都記錄在

backingstore

的overwritemap

(update in place

)中,當進行

checkpoint

的時候合併到

elementsbuffer

並持久化到磁碟;所有未提交的正在讀寫資料都分別儲存在

inflight

結構中,當

checkpoint

時一併進行持久化,為回滾時使用; 在

inflight

中儲存了

transactionid->fileid

以及transactionid->eventptr

的對映,具體儲存在

backingstore

裡的則是

eventptr

(fileid,offset);

checkpoint file

的檔案結構如下:

file header

:1029 bytes

eventptr; 在

file header裡前8

個位元組儲存了版本號,接下來

24個位元組是

sequeuece no.(

類似rdbms

的scn)

,接下來

4個位元組儲存了

checkpoint

的狀態;

作為wal的

log主要儲存了(

transactionid,sequenceno,event),

每次讀寫都先在

log裡寫入

event

,對於寫操作會拿到

eventptr

放入queue

中;而commit

和rollback

操作在log

中的記錄形式是

(transactionid,sequenceno

,op=);

這兩個結構主要是體現在

filebackedtransaction

中如下:

filebackedtransaction extendsbasictransactionsemantics

......

linkedblockingdeque

takelist;

linkedblockingdeque

putlist;

longtransactionid;

log 

log;

flumeeventqueue 

queue

: eventqueuebackingstorefile

其中queue = log.getflumeeventqueue();

首先看put/take path

以及commit:

1. doput(eventevent)->

queue.addwithoutcommit(ptr, transactionid)

log.put(transactionid, event)->

synchronized logfile.writer.put(bytebufferbuffer)

putlist.offer(ptr)

2. dotake()->

flumeeventpointer ptr = 

queue

.removehead(

transactionid);

takelist

.offer(ptr),

log.take(

transactionid

, ptr); 

->

synchronizedlogfile.writer.take(bytebuffer buffer)

event event = 

log.get(ptr);

3. docommit()->

if(puts > 0) }

elseif(takes > 0) }

從上面的**可以看出,對於每乙個

put/take

都會記錄一條

oplog

到log裡,

當commit

的時候會對

log進行

sync

到磁碟持久化,同時會把

event

指標存放到

queue

上;這裡的

log就類似於

mysql

裡的binlog(binlog_format=statement)

,而這裡的

queue

存放的是指向

event

的指標;

簡例:filechannel如下,對filechannel put了2個訊息,a,b;則在log,queue裡的儲存狀態如下,log裡儲存了(transactionid,sequenceno,event),queue則儲存了eventptr;

queue:ptr->a,ptr->b

wal log:(1,1,put a),(1,2,put b),(1,3,commit)

當例項crash

時,通過

log來恢復

queue

的狀態,類似

rdbms

一樣,replay

是很耗時的操作,因此會定期對

queue

進行checkpoint:

log在初始化的時候會啟動乙個排程執行緒

workerexecutor,

由排程執行緒定期(

checkpoint interval

)排程乙個

backgroupworkder

來進行非強制性

checkpoint;

log.writecheckpoint(boolean force):trylockexclusive->

synchronized queue.checkpoint->

backingstore

.begincheckpoint();//

檢查是否

checkpoint

正在進行

;同時進行標記

checkpoint

開始,並同步

mmap file;

inflightputs

.serializeandwrite();//

inflighttakes

.serializeandwrite();//

將inflightputs/takes

序列化並寫到相應檔案

backingstore.checkpoint();->

setlogwriteorderid(writeorderoracle.next());

writecheckpointmetadata();

//copy from overwritemap toelementsbuffer(mmap)

//標記checkpoint

結束,並同步檔案

簡例:接上例,在

a,b提交後,這時進行了一次

checkpoint

(儲存在磁碟上的

checkpoint則是2

個指標ptr->a,ptr->b

),此時

scn=4

;之後,又完成了乙個

take transaction ,ptr to a 

也同時被刪除;如果這時

flume crash

,queue

從checkpoint

中重建,並且取得

checkpoint scn=4,

則replay

這之後的

log進行

crash recovery

;在恢復後,立刻執行一次

checkpoint.

queue:ptr->b

wal log:(1,1,put a),(1,2,put b),(1,3,commit),(2,5,take a),(2,6,commit)

ConcurrentHashMap原理解析

什麼是concurrenthashmap?眾所周知,hashmap是一種非常高效的資料結構,但是依舊有它的缺陷。那就是在併發插入資料時,有可能會出現帶環鍊錶,讓下一次的讀操作出現死迴圈。於是為了避免hashmap的執行緒安全問題,concurrenthashmap應運而生。concurrenthas...

ConcurrentHashMap原理解析

concurrenthashmap是jdk提供的乙個執行緒安全的集合類,它內部的結構原理和我們常用的hashmap基本是一致,那我們可以先來認識一下hashmap,這樣基本上也能大致明白concurrenthashmap了。hashmap與concurrenthashmap都是用來存放一種鍵值對形式...

理解爬蟲原理

本次作業 於 1.簡單說明爬蟲原理 請求 並提取資料的自動化流程 2.理解爬蟲開發過程 1 簡要說明瀏覽器工作原理 web瀏覽器提交請求後,通過http協議傳送給web伺服器。web伺服器接到後,進行事務處理,處理結果又通過http傳回給web瀏覽器,從而在web瀏覽器上顯示出所請求 的頁面。2 使...