Flume(03) Flume採集案例

2021-08-28 15:28:39 字數 4739 閱讀 4754

需求分析

採集需求:某伺服器的某特定目錄/export/servers/dirfile下,會不斷產生新的檔案,每當有新檔案出現,就需要把檔案採集到hdfs中去。

需求分析:

通過flume採集資料,最重要的就是配置三大元件。

這裡可以通過source來監控檔案目錄。

通過channel,來將source採集到的內容傳送到sink

通過sink,將檔案上傳到hdfs檔案系統。

資料來源元件source ——監控檔案目錄 的type為: spooldir

spooldir具有以下一些特性:

1、監視乙個目錄,只要目錄中出現新檔案,就會採集檔案中的內容

2、採集完成的檔案,會被agent自動新增乙個字尾:completed

3、所監視的目錄中不允許重複出現相同檔名的檔案

開發配置檔案

flume的開發,大多數時候是flume配置檔案的開發。

cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf

mkdir -p /export/servers/dirfile

vim spooldir.conf

# name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# describe/configure the source

##定義type為spooldir,必配項。不能往監控目錄中重複丟同名檔案

a1.sources.r1.type = spooldir

## spooldir是要監控的目錄,這個選項為必配項。

a1.sources.r1.spooldir = /export/servers/dirfile

a1.sources.r1.fileheader = true

# describe the sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.channel = c1

a1.sinks.k1.hdfs.path = hdfs://node01:8020/spooldir/files/%y-%m-%d/%h%m/

a1.sinks.k1.hdfs.fileprefix = events-

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundvalue = 10

a1.sinks.k1.hdfs.roundunit = minute

a1.sinks.k1.hdfs.rollinterval = 3

a1.sinks.k1.hdfs.rollsize = 20

a1.sinks.k1.hdfs.rollcount = 5

a1.sinks.k1.hdfs.batchsize = 1

a1.sinks.k1.hdfs.uselocaltimestamp = true

#生成的檔案型別,預設是sequencefile,可用datastream,則為普通文字

a1.sinks.k1.hdfs.filetype = datastream

# use a channel which buffers events in memory

a1.channels.c1.type = memory

#capacity:預設該通道中最大的可以儲存的event數量

a1.channels.c1.capacity = 1000

#trasactioncapacity:每次最大可以從source中拿到或者送到sink中的event數量

a1.channels.c1.transactioncapacity = 100

# bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

啟動flume

cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin

bin/flume-ng agent -c ./conf -f ./conf/spooldir.conf -n a1 -dflume.root.logger=info,console
測試:上傳檔案到指定目錄

將不同的檔案上傳到目錄/export/servers/dirfile裡面去,注意檔案不能重名。會發現檔案匯被自動收集。並且儲存到hdfs://node01:8020/spooldir/files/%y-%m-%d/%h%m/中。

需求分析:

採集需求:某業務系統使用log4j生成日誌,日誌內容不斷增加,需要把追加到日誌檔案中的資料實時採集到hdfs

根據需求,首先定義以下3大要素

採集源,即source——監控檔案內容更新 : exec 『tail -f file』

下沉目標,即sink——hdfs檔案系統 : hdfs sink

source和sink之間的傳遞通道——channel,可用file channel 也可以用 記憶體channel

開發配置檔案

cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf

vim tail-file.conf

配置檔案內容:

a1.sources = source1

a1.sinks = sink1

a1.channels = channel1

# describe/configure tail -f source1

a1.sources.source1.type = exec

a1.sources.source1.command = tail -f /export/servers/taillogs/access_log

a1.sources.source1.channels = channel1

#configure host for source

#a1.sources.source1.interceptors = i1

#a1.sources.source1.interceptors.i1.type = host

#a1.sources.source1.interceptors.i1.hostheader = hostname

# describe sink1

a1.sinks.sink1.type = hdfs

#a1.sinks.k1.channel = c1

a1.sinks.sink1.hdfs.path = hdfs://node01:8020/weblog/flume-collection/%y-%m-%d/%h-%m

a1.sinks.sink1.hdfs.fileprefix = access_log

a1.sinks.sink1.hdfs.maxopenfiles = 5000

a1.sinks.sink1.hdfs.batchsize= 100

a1.sinks.sink1.hdfs.filetype = datastream

a1.sinks.sink1.hdfs.writeformat =text

a1.sinks.sink1.hdfs.rollsize = 102400

a1.sinks.sink1.hdfs.rollcount = 1000000

a1.sinks.sink1.hdfs.rollinterval = 60

a1.sinks.sink1.hdfs.round = true

a1.sinks.sink1.hdfs.roundvalue = 10

a1.sinks.sink1.hdfs.roundunit = minute

a1.sinks.sink1.hdfs.uselocaltimestamp = true

# use a channel which buffers events in memory

a1.channels.channel1.type = memory

a1.channels.channel1.keep-alive = 120

a1.channels.channel1.capacity = 500000

a1.channels.channel1.transactioncapacity = 600

# bind the source and sink to the channel

a1.sources.source1.channels = channel1

a1.sinks.sink1.channel = channel1

(agent的名字也是可以隨便起的,不一定非要叫a1)

啟動flume

cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin

bin/flume-ng agent -c conf -f conf/tail-file.conf -n a1  -dflume.root.logger=info,console
增加檔案測試程式

flume採集案例

1 採集目錄到hdfs 採集需求 某伺服器的某特定目錄下,會不斷產生新的檔案,每當有新檔案出現,就需要把檔案採集到hdfs中去 根據需求,首先定義以下3大要素 採集源,即source 監控檔案目錄 spooldir 下沉目標,即sink hdfs檔案系統 hdfs sink source和sink之...

flume配置採集日誌

上傳dir hdfs.conf 到flume的conf目錄下 定義三大元件的名稱 ag1.sources source1 ag1.sinks sink1 ag1.channels channel1 配置source元件 ag1.sources.source1.type spooldir ag1.so...

flume資料採集扇入

conf檔案的定義 1 agent a3.sources r3 r4 a3.sinks k3 a3.channels c3 2 source 監控目錄的型別 a3.sources.r3.type spooldir 監控目錄的路徑 a3.sources.r3.spooldir opt model ha...