檔案通過flume匯入到kafka

2021-10-11 19:12:26 字數 1800 閱讀 7847

//a1:agent

//flume三大元件 source、channel、sink

a1.sources=f1

a1.channels=c1

a1.sinks=k1

//檔案***

a1.sources.f1.type = spooldir

a1.sources.f1.channels = c1

//將users.csv檔案備份至/opt/fd,然後開始監聽

a1.sources.f1.spooldir =

/opt/fd

a1.sources.f1.batchsize =

10000

//***:將表的表頭過濾掉

a1.sources.f1.interceptors=i1

a1.sources.f1.interceptors.i1.type=regex_filter

//正則匹配,將行開頭為user_id的一行資料過濾掉

a1.sources.f1.interceptors.i1.regex=user_id.*

a1.sources.f1.interceptors.i1.excludeevents=

true

//設定臨時存放資料的位址以及檢查點

a1.channels.c1.type = file

a1.channels.c1.checkpointdir =

/opt/flume/checkpoint

a1.channels.c1.datadir =

/opt/flume/data

a1.sinks.k1.channel=c1

a1.sinks.k1.type=org.apache.flume.sink.kafka.kafkasink

//建立乙個kafka的topic為users,這邊也可以不設定,自己在kafka中建立

a1.sinks.k1.kafka.topic=users

a1.sinks.k1.kafka.bootstrap.servers=

192.168

.153

.200

:9092

a1.sinks.k1.kafka.flumebatchsize =

100//設定應答機制acks=1,表示將生產者生產資料後,leader進行同步資料備份,follower非同步備份

//由於這邊只有乙個分割槽乙個consumer,所以這邊表示當這個consumer資料同步備份,資料不會丟失

檔案中定義的agent名稱

通過flume把oracle資料匯入到kafka

版本flume 1.6 kafka2.11 第二步 我用的是oracle所以,就把oracle的jdbc包放到flume的lib目錄下。我放的是ojdbc5.jar 第三步 配置flume的conf配置檔案。vi sql kafka.conf 具體配置如下 agenttest.channels ch...

使用Flume監控檔案並匯入到HIVE表中

首先因為hive的儲存是基於hdfs的,所以目標等同於,flume監控檔案並上傳hdfs上 hive建表 create table test name string,gender string row format delimited fields terminated by flume配置檔案如下...

csv檔案本地匯入和通過web匯入到伺服器

檔案本地匯入和通過微博導入到伺服器 本地匯入,主要是讀取本地檔案 如下 public class csvutils bw new bufferedwriter osw if datalist null datalist.isempty issucess true catch exception e ...