Flume基本案例

2021-10-10 01:33:45 字數 4513 閱讀 2659

採集檔案到hdfs

採集需求:某伺服器的某特定目錄下,會不斷產生新的檔案,每當有新檔案出現,就需要把檔案採集到hdfs中去

根據需求,首先定義以下3大要素

配置檔案編寫

cd  /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf

mkdir -p /export/servers/dirfile

vim spooldir.conf

# name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# describe/configure the source

##注意:不能往監控目中重複丟同名檔案

a1.sources.r1.type = spooldir

a1.sources.r1.spooldir = /export/servers/dirfile

a1.sources.r1.fileheader =

true

# describe the sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.channel = c1

a1.sinks.k1.hdfs.path = hdfs://node01:8020/spooldir/files/%y-%m-%d/%h%m/

a1.sinks.k1.hdfs.fileprefix = events-

a1.sinks.k1.hdfs.round =

true

a1.sinks.k1.hdfs.roundvalue = 10

a1.sinks.k1.hdfs.roundunit = minute

a1.sinks.k1.hdfs.rollinterval = 3

a1.sinks.k1.hdfs.rollsize = 20

a1.sinks.k1.hdfs.rollcount = 5

a1.sinks.k1.hdfs.batchsize = 1

a1.sinks.k1.hdfs.uselocaltimestamp =

true

#生成的檔案型別,預設是sequencefile,可用datastream,則為普通文字

a1.sinks.k1.hdfs.filetype = datastream

# use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactioncapacity = 100

# bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

channel引數解釋:

capacity:預設該通道中最大的可以儲存的event數量

trasactioncapacity:每次最大可以從source中拿到或者送到sink中的event數量

keep-alive:event新增到通道中或者移出的允許時間

bin/flume-ng agent -c ./conf -f ./conf/spooldir.conf -n a1 -dflume.root.logger=info,console
將不同的檔案上傳到下面目錄裡面去,注意檔案不能重名

cd /export/servers/dirfile
採集需求:比如業務系統使用log4j生成的日誌,日誌內容不斷增加,需要把追加到日誌檔案中的資料實時採集到hdfs

根據需求,首先定義以下3大要素

node03開發配置檔案

cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf

vim tail-file.conf

agent1.sources = source1

agent1.sinks = sink1

agent1.channels = channel1

# describe/configure tail -f source1

agent1.sources.source1.type =

exec

agent1.sources.source1.command =

tail -f /export/servers/taillogs/access_log

agent1.sources.source1.channels = channel1

#configure host for source

#agent1.sources.source1.interceptors = i1

#agent1.sources.source1.interceptors.i1.type = host

#agent1.sources.source1.interceptors.i1.hostheader = hostname

# describe sink1

agent1.sinks.sink1.type = hdfs

#a1.sinks.k1.channel = c1

agent1.sinks.sink1.hdfs.path = hdfs://node01:8020/weblog/flume-collection/%y-%m-%d/%h-%m

agent1.sinks.sink1.hdfs.fileprefix = access_log

agent1.sinks.sink1.hdfs.maxopenfiles = 5000

agent1.sinks.sink1.hdfs.batchsize= 100

agent1.sinks.sink1.hdfs.filetype = datastream

agent1.sinks.sink1.hdfs.writeformat =text

agent1.sinks.sink1.hdfs.rollsize = 102400

agent1.sinks.sink1.hdfs.rollcount = 1000000

agent1.sinks.sink1.hdfs.rollinterval = 60

agent1.sinks.sink1.hdfs.round =

true

agent1.sinks.sink1.hdfs.roundvalue = 10

agent1.sinks.sink1.hdfs.roundunit = minute

agent1.sinks.sink1.hdfs.uselocaltimestamp =

true

# use a channel which buffers events in memory

agent1.channels.channel1.type = memory

agent1.channels.channel1.keep-alive = 120

agent1.channels.channel1.capacity = 500000

agent1.channels.channel1.transactioncapacity = 600

# bind the source and sink to the channel

agent1.sources.source1.channels = channel1

agent1.sinks.sink1.channel = channel1

cd  /export/servers/apache-flume-1.6.0-cdh5.14.0-bin

bin/flume-ng agent -c conf -f conf/tail-file.conf -n agent1 -dflume.root.logger=info,console

mkdir -p /export/servers/shells/

cd /export/servers/shells/

vim tail-file.sh

#!/bin/bash

while

true

dodate

>> /export/servers/taillogs/access_log;

sleep 0.5;

done

建立資料夾

mkdir -p /export/servers/taillogs
啟動指令碼

sh /export/servers/shells/tail-file.sh

Flask Celery 基本案例 01

win10 python3.7 flask 1.1.1 celery 4.4.1 virtualenv venv no site packages 啟用虛擬環境 venv scripts activate pip install flask 1.1.1 celery 4.4.1 redis 安裝ge...

shell指令碼案例

bin bash for迴圈的使用 for num in 1 2 3 4 5 6 do echo num done bin bash a whoami read p 請輸入想要驗證的使用者名稱 b if b a then echo 是當前使用者 else echo 不是當前使用者,需示警 fi bi...

shell指令碼案例賞析

bin bash 用法 rebatch.sh 截止到月份的日期 例如 rebatch.sh 2014 06 etc profile bashrc arg 1 start date 01 end date 01 count 1 cat dev null log file db2 o connect t...