使用Flume監控檔案並匯入到HIVE表中

2021-10-01 08:40:59 字數 2093 閱讀 4710

首先因為hive的儲存是基於hdfs的,所以目標等同於,flume監控檔案並上傳hdfs上

hive建表

create table test(name string,gender string)row format delimited fields terminated by ',';
flume配置檔案如下

監控檔案 /usr/local/hive.log

上傳路徑  hdfs://hadoop:9000/user/hive/warehouse/driver.db/test

a3.sources = r3

a3.sinks = k3

a3.channels = c3

# describe/configure the source

a3.sources.r3.type = exec

a3.sources.r3.command = tail -f /usr/local/hive.log

# describe the sink

a3.sinks.k3.type = hdfs

a3.sinks.k3.hdfs.path = hdfs://hadoop:9000/user/hive/warehouse/driver.db/test

#上傳檔案的字首

a3.sinks.k3.hdfs.fileprefix = upload-

#是否按照時間滾動資料夾

a3.sinks.k3.hdfs.round = true

#多少時間單位建立乙個新的資料夾

a3.sinks.k3.hdfs.roundvalue = 1

#重新定義時間單位

a3.sinks.k3.hdfs.roundunit = hour

#是否使用本地時間戳

a3.sinks.k3.hdfs.uselocaltimestamp = true

#積攢多少個event才flush到hdfs一次

a3.sinks.k3.hdfs.batchsize = 100

#設定檔案型別,可支援壓縮

a3.sinks.k3.hdfs.filetype = datastream

#多久生成乙個新的檔案

a3.sinks.k3.hdfs.rollinterval = 30

#設定每個檔案的滾動大小大概是128m

a3.sinks.k3.hdfs.rollsize = 134217700

#檔案的滾動與event數量無關

a3.sinks.k3.hdfs.rollcount = 0

# use a channel which buffers events in memory

a3.channels.c3.type = memory

a3.channels.c3.capacity = 1000

a3.channels.c3.transactioncapacity = 100

# bind the source and sink to the channel

a3.sources.r3.channels = c3

a3.sinks.k3.channel = c3

啟動flume 監控

flume-ng agent --name a3 --conf $flume_home/conf --conf-file $flume_home/conf/flume-dir-hdfs.conf -dflume.root.logger=info,console
向檔案中寫入資料

[root@hadoop local]# echo lisi,nan >> hive.log

[root@hadoop local]# echo wangwu,nan >> hive.log

[root@hadoop local]# echo xiaoming,nan >> hive.log

[root@hadoop local]# echo xiaohong,nv >> hive.log

檔案通過flume匯入到kafka

a1 agent flume三大元件 source channel sink a1.sources f1 a1.channels c1 a1.sinks k1 檔案 a1.sources.f1.type spooldir a1.sources.f1.channels c1 將users.csv檔案備...

上傳Excel檔案並匯入到資料

上傳檔案 上傳excel檔案 protected void btnfileload click object sender,eventargs e catch exception ex else 匯入檔案 將excel中的檔案匯入到資料庫 protected void btninsertdata c...

通過flume把oracle資料匯入到kafka

版本flume 1.6 kafka2.11 第二步 我用的是oracle所以,就把oracle的jdbc包放到flume的lib目錄下。我放的是ojdbc5.jar 第三步 配置flume的conf配置檔案。vi sql kafka.conf 具體配置如下 agenttest.channels ch...