Flume資料傳輸事務分析

2021-08-02 06:38:45 字數 3344 閱讀 9405

本文基於thriftsource,memorychannel,hdfssink三個元件,對flume資料傳輸的事務進行分析,如果使用的是其他元件,flume事務具體的處理方式將會不同。一般情況下,用memorychannel就好了,我們公司用的就是這個,filechannel速度慢,雖然提供日誌級別的資料恢復,但是一般情況下,不斷電memorychannel是不會丟資料的。

flume提供事物操作,保證使用者的資料的可靠性,主要體現在:

flume在對channel進行put和take操作的時候,必須要用事物包住,比如:

channel ch = new memorychannel();

transaction txn = ch.gettransaction();

//事物開始

txn.begin();

try catch (throwable t)

} finally

put事務可以分為以下階段:

我們從source資料接收到寫入channel這個過程對put事物進行分析。

thriftsource會spawn多個worker執行緒(thriftsourcehandler)去處理資料,worker處理資料的介面,我們只看batch批量處理這個介面:

@override

listflumeevents = lists.newarraylist();

for(thriftflumeevent event : events)

//channelprocessor,在source初始化的時候傳進來.將資料寫入對應的channel

getchannelprocessor().processeventbatch(flumeevents);

...return status.ok;

}

事務邏輯都在processeventbatch這個方法裡:

public

void

processeventbatch(listevents)

//提交,將資料寫入channel的佇列中

tx.commit();

} catch (throwable t)

}...

}

每個worker執行緒都擁有乙個transaction例項,儲存在channel(basicchannelsemantics)裡的threadlocal變數currenttransaction.

那麼,事務到底做了什麼?

實際上,transaction例項包含兩個雙向阻塞佇列linkedblockingdeque(感覺沒必要用雙向佇列,每個執行緒寫自己的putlist,又不是多個執行緒?),分別為:

對於put事物操作,當然是只用到putlist了。putlist就是乙個臨時的緩衝區,資料會先put到putlist,最後由commit方法會檢查channel是否有足夠的緩衝區,有則合併到channel的佇列。

channel.put -> transaction.doput:

protected

void

doput(event event) throws interruptedexception

putbytecounter += eventbytesize;

}

transaction.commit:

@override

protected

void

docommit() throws interruptedexception }}

//清除臨時佇列

putlist.clear();

...}

...}

如果在事務期間出現異常,比如channel剩餘空間不足,則rollback:

@override

protected

void

dorollback()

take事務分為以下階段:

sink其實是由sinkrunner執行緒呼叫sink.process方法來了處理資料的。我們從hdfseventsink的process方法說起,sink類都有個process方法,用來處理傳輸資料的邏輯。:

public status process() throws eventdeliveryexception 

...//寫資料到hdfs

...// flush all pending buckets before committing the transaction

for (bucketwriter bucketwriter : writers)

//commit

transaction.commit();

...} catch (ioexception eio) finally

}

大致流程圖:

接著看看channel.take,作用是將資料放到臨時緩衝區,實際呼叫的是transaction.dotake:

protected event dotake() throws interruptedexception 

...//將資料放到臨時緩衝區

takelist.put(event);

...return

event;

}

接著,hdfs寫執行緒bucketwriter將take到的資料寫到hdfs,如果批資料都寫完了,則要commit了:

protected

void

docommit() throws interruptedexception

很簡單,其實就是清空takelist而已。如果bucketwriter在寫資料到hdfs的時候出現異常,則要rollback:

protected

void

dorollback()

...}

...}

C SFTP資料傳輸

我們有些客戶公司的資料比較重要,為了安全集團公司內部都使用的是區域網,但是有時候又不得不予外界網際網路做資料互動,所以有些不重要的系統是放在外界網際網路的,這樣以來內部系統和外部系統的資料互動就成為了問題,這樣以來就使用到了sftp伺服器來作為資料傳輸的中介。下面不說了,直接上c 此處使用了第三方動...

資料傳輸方式

資料傳輸方式 1 並行傳輸與序列傳輸 並行傳輸指的是資料以成組的方式,在多條並行通道上同時進行傳輸。常用的就是將構成一 個字元 的幾位二進位製碼,分別在幾個並行通道上進行傳輸。例如,採用8單位 的字 符 可以用8個通道並行傳輸。一次傳送乙個字元,因此收 發雙方不存在字元的同步問題,不需要另加 起 止...

flex wcf 資料傳輸

最近跟哲子做個專案,本來可以用asp.net輕鬆搞定,但是鑑於大家都比較想試用下學習已久的技術,所以便出現了flex與wcf entity這樣得前後臺組合。專案開始之初,著實為兩者之間的互動頭疼一番,在經過大量資料得獲取後,終於解決,以下寫出簡要的注意事項,由於wcf 我還是個徹頭徹尾的門外漢,資料...