Flume的多種採集方式

2021-10-01 06:31:00 字數 3672 閱讀 4933

採集需求:伺服器的某特定目錄下,會不斷產生新的檔案,每當有新檔案出現,就需要把檔案採集到hdfs中去

修改flume安裝目錄下的conf檔案下的netcat-logger.conf檔案

新增一下配置資訊

# name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# describe/configure the source

##注意:不能往監控目中重複丟同名檔案

a1.sources.r1.type = spooldir

a1.sources.r1.spooldir = /root/logs

a1.sources.r1.fileheader = true

# describe the sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%h%m/

a1.sinks.k1.hdfs.fileprefix = events-

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundvalue = 10

a1.sinks.k1.hdfs.roundunit = minute

a1.sinks.k1.hdfs.rollinterval = 3

a1.sinks.k1.hdfs.rollsize = 20

a1.sinks.k1.hdfs.rollcount = 5

a1.sinks.k1.hdfs.batchsize = 1

a1.sinks.k1.hdfs.uselocaltimestamp = true

#生成的檔案型別,預設是sequencefile,可用datastream,則為普通文字

a1.sinks.k1.hdfs.filetype = datastream

# use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactioncapacity = 100

# bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

採集需求:比如業務系統使用log4j生成的日誌,日誌內容不斷增加,需要把追加到日誌檔案中的資料實時採集到hdfs

修改flume安裝目錄下的conf檔案下的netcat-logger.conf檔案

新增一下配置資訊

# name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -f /root/logs/test.log

a1.sources.r1.channels = c1

# describe the sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = /flume/tailout/%y-%m-%d/%h%m/

a1.sinks.k1.hdfs.fileprefix = events-

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundvalue = 10

a1.sinks.k1.hdfs.roundunit = minute

a1.sinks.k1.hdfs.rollinterval = 3

a1.sinks.k1.hdfs.rollsize = 20

a1.sinks.k1.hdfs.rollcount = 5

a1.sinks.k1.hdfs.batchsize = 1

a1.sinks.k1.hdfs.uselocaltimestamp = true

#生成的檔案型別,預設是sequencefile,可用datastream,則為普通文字

a1.sinks.k1.hdfs.filetype = datastream

# use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactioncapacity = 100

# bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

預設值:30

hdfs sink間隔多長將臨時檔案滾動成最終目標檔案,單位:秒;

如果設定成0,則表示不根據時間來滾動檔案;

注:滾動(roll)指的是,hdfs sink將臨時檔案重新命名成最終目標檔案,並新開啟乙個臨時檔案來寫入資料;

預設值:1024

當臨時檔案達到該大小(單位:bytes)時,滾動成目標檔案;

如果設定成0,則表示不根據臨時檔案大小來滾動檔案;

預設值:10

當events資料達到該數量時候,將臨時檔案滾動成目標檔案;

如果設定成0,則表示不根據events資料來滾動檔案;

預設值:false

是否啟用時間上的「捨棄」,這裡的「捨棄」,類似於「四捨五入」。

預設值:1

時間上進行「捨棄」的值;

預設值:seconds

時間上進行「捨棄」的單位,包含:second,minute,hour

示例:

a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%h%m/%s

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundvalue = 10

a1.sinks.k1.hdfs.roundunit = minute

當時間為2015-10-16 17:38:59時候,hdfs.path依然會被解析為:

/flume/events/20151016/17:30/00

因為設定的是捨棄10分鐘內的時間,因此,該目錄每10分鐘新生成乙個。

flume採集案例

1 採集目錄到hdfs 採集需求 某伺服器的某特定目錄下,會不斷產生新的檔案,每當有新檔案出現,就需要把檔案採集到hdfs中去 根據需求,首先定義以下3大要素 採集源,即source 監控檔案目錄 spooldir 下沉目標,即sink hdfs檔案系統 hdfs sink source和sink之...

Flume(03) Flume採集案例

需求分析 採集需求 某伺服器的某特定目錄 export servers dirfile下,會不斷產生新的檔案,每當有新檔案出現,就需要把檔案採集到hdfs中去。需求分析 通過flume採集資料,最重要的就是配置三大元件。這裡可以通過source來監控檔案目錄。通過channel,來將source採集...

flume案例 網路資料採集 Flume的配置

開發配置檔案 根據資料採集的需求配置採集方案,描述在配置檔案中 檔名可任意自定義 配置我們的網路收集的配置檔案 在flume的conf目錄下新建乙個配置檔案 採集方案 vim export servers apache flume 1.8.0 bin conf netcat logger.conf ...