filebeat 資料採集流程

2021-09-25 13:03:47 字數 4169 閱讀 8545

filebeat啟動流程 講解了filebeat的啟動流程,filebeat在構建完crawler物件,開始採集流程。

crawlerstart方法內,會啟動inputs

func

(c *crawler)

start

( pipeline beat.pipeline,

r *registrar.registrar,

configinputs *common.config,

configmodules *common.config,

pipelineloade***ctory fileset.pipelineloade***ctory,

overwritepipelines bool,)

error

}...

}

c.startinput(pipeline, inputconfig, r.getstates())方法初始化input

首先構建input物件

執行input

func

(c *crawler)

startinput

( pipeline beat.pipeline,

config *common.config,

states [

]file.state,

)error

connector := channel.

connectto

(pipeline, c.out)

p, err := input.

new(config, connector, c.beatdone, states,

nil)

...// 開始收集

p.start()

return

nil}

p.start()方法內啟動input,他在乙個單獨的協程裡執行。

這裡的p是對input的封裝,他的run方法是對某個介面的實現,因為我們用來收集日誌,所以我們只需要關心filebeat/input/log/input.go檔案內的run方法。run方法內部呼叫了inputscan方法,開始採集資料。

// run runs the input

func

(p *input)

run(

)

scan方法內首先獲取所有的檔案。其次獲取檔案狀態,根據狀態來判定收集最新資料,還是從歷史檔案收集。檔案收集會構建harvester物件。

// scan starts a scanglob for each provided path/glob

func

(p *input)

scan()

else

select

newstate, err :=

getfilestate

(path, info, p)

if err !=

nil// load last state

laststate := p.states.

findprevious

(newstate)

...// decides if previous state exists

if laststate.

isempty()

if err !=

nil}

else

}}

p.startharvester(newstate, 0)內構建harvester。(harvester是另乙個filebeat官網描述的核心元件之一)

func

(p *input)

startharvester

(state file.state, offset int64

)error

// set state to "not" finished to indicate that a harvester is running

state.finished =

false

state.offset = offset

// create harvester with state

// 這部分構建了 harvester

h, err := p.

createharvester

(state,

func()

)if err !=

nil// 配置 harvester

err = h.

setup()

if err !=

nil// update state before staring harvester

// this makes sure the states is set to finished: false

// this is synchronous state update as part of the scan

h.sendstateupdate()

// 啟動 harvester

if err = p.harvesters.

start

(h); err !=

nilreturn err

}

p.createharvester構建harvesterp.setup配置harvestersetup方法內會初始化檔案相關的內容,以及構建檔案reader

p.harvesters.start(h)執行harvester

主要還是要看harvesters.start方法,會在單獨的協程內執行harvester

func

(r *registry)

start

(h harvester)

error()

// 非同步執行

err := h.

run(

)if err !=

nil}()

return

nil}

harvester.run方法真是長。。

func

(h *harvester)

run(

)error

return

nil}

state := h.

getstate()

startingoffset := state.offset

state.offset +=

int64

(message.bytes)

...// 讀取到的檔案內容

text :=

string

(message.content)

...// 資料內容都包裝在 data 內,harvester 傳送 data,其實就是 forwarder **的

if!h.

sendevent

(data, forwarder)

// update state of harvester as successfully sent

h.state = state

}}

h.sendevent(data, forwarder)這段**將採集的資料傳送到下游,內部其實就是用forwarder**了資料。

到這裡資料的採集流程應該就差不多了,剩下的是資料的傳送流程。

資料採集流程整理

資料採集2013年11月24日到現在已經有38天了。這中間斷斷續續的,但卻始終沒有停止過。回頭想想我們走過的路程可分為以下六個方面 一開始,公尺老師就讓我們拿出一套方案來,關於這次採集資料的執行方案。每次任務的執行都要有乙個規劃指導。以指導為方向,不至於走錯方向。有了指導,可以彌補執行中的不足。有了...

filebeat 啟動流程

因為各種各樣的原因,好久沒有寫部落格了,還是希望能夠堅持下來 講解一下filebeat的啟動流程吧,核心功能先不描述了0.0 filebeat啟動入口在main.go檔案內,cmd.rootcmd.execute 啟動filebeat func main 在filebeat cmd root.go檔...

Filebeat採集日誌講解(一)

總結在elkf中,filebeat作為日誌採集端,採集日誌併發送到kafka。input就是以檔案形式儲存的log檔案,output就是kafka集群。在採集日誌中一般情況有以下幾點需要注意的 輸出內容確定,一般包括時間戳,主機名,日誌型別,日誌內容,其他的根據業務的實際需求,無用資訊可以直接過濾掉...