flume sink執行過程簡單分析

2022-03-21 13:31:12 字數 2555 閱讀 2460

沒有執行,直接看原始碼得到sink簡單執行過程

sinkrunner負責執行sink程式

內部類pollingrunner implements runnable

private sinkprocessor policy

;負責執行sink

run方法

while (!shouldstop.get()) else

} catch (interruptedexception e) catch (exception e) else

try catch (interruptedexception ex) }}

policy 對應具體的sink處理器,這裡以failoversinkprocessor舉例子

這裡面,針對failoversinkprocessor可以參照 講解,這裡大致說下便可

configure方法

livesinks = new treemap();

failedsinks = new priorityqueue();

從配置檔案中定義的sinks中遍歷每乙個sink,獲得其優先順序,然後放到livesinks中,無論sink是否可用。

最後,activesink = livesinks.get(livesinks.lastkey());,從livesinks按照key排序,獲得最後乙個key(優先順序,最大)對應的sink初始化 activesink 

policy.process().equals(sink.status.backoff))執行的是failoversinkprocessor的process()方法

process()方法

首先乙個while迴圈,遍歷所有的failedsinks ,拿出每乙個failed的sink,如果拿出來的failed sink能夠訪問了,則把他付給activesink ,並returnsink.process()

的狀態。在輪詢的過程中,如果failed sink還是不能到達,則重新放入到failedsinks 中並重新整理時間,否則,如果能夠聯通,但是狀態不是ready,也放入到failedsinks 中且不重新整理。

之後,是對activesink進行while迴圈,呼叫activesink中的每乙個sink.proccess().呼叫成功,則return狀態。否則,出現異常,將當前active的sink移動到failedsinks 中,同時獲得下乙個active的sink從activesink中。繼續while判斷

函式的最後是乙個異常,即沒有任何乙個sink可用。

sink.process()是啥?是從channel中拿出資料的。

這裡以nullsink為例

根據事務和batchsize從chanel中拿出資料來,並寫入到相應的位置

public status process() throws eventdeliveryexception successful processed {} events.", getname(), eventcounter);

}if(event == null)

}transaction.commit();

countergroup.addandget("events.success", (long) math.min(batchsize, i));

countergroup.incrementandget("transaction.success");

} catch (exception ex) finally

return status;

}sinkprocessor之loadbalancingsinkprocessor

同樣也有configure、process方法,只不過內部邏輯不同,要實現loadbalance功能。

configure()方法主要是根據使用者的設定,初始化selector,selector_name_round_robin, selector_name_random或者使用定義的config_selector類(sinkselector子類)

if

(selectortypename.equalsignorecase(selector_name_round_robin))

else

if(selectortypename.equalsignorecase(selector_name_random))

else

catch

(exception ex)

}

具體這兩個selector怎麼搞的,這裡不講,可以參考: 

process()方法就是使用selector獲得乙個sink,呼叫其process方法,成功則返回status

@override

public status process() throws

eventdeliveryexception

catch

(exception ex)

}if (status == null

)

return

status;

}

make 執行過程簡單概述

make的引入,使得乙個大型複雜的linux 可以被裁剪定製為都有功能的專案,這也很容易理解,不可能任何乙個專案都要包含linux的所有功能。功能類似於c語言程式中的 條件編譯 make的執行過程可以簡單的分為2個階段,第乙個階段,它會讀取所有的makefile檔案以及包含的makefile檔案等,...

C 程式的簡單執行過程

編譯階段 main.c 預編譯 main.i 編譯 main.s 彙編 o obj 二進位制可重定位目標檔案 預編譯 刪注釋,預編譯指令 編譯 語法 語義分析,的優化,彙總所有的符號 彙編 把彙編指令轉化成特定平台的機器碼 資料產生符號。指令只產生乙個符號 函式名 1.合併所有obj檔案的段 所有相...

tomcat執行過程的簡單理解

1,tomcat 的結構主要由這幾個類或介面組成 catalina,server,service,connector,container,engine,host,context catalina主要負責tomcat的啟動和關閉 server對應的就是tomcat 可以當做乙個伺服器吧 connect...