使用Flume往kafka和hdfs裡同時寫資料

2022-03-24 06:10:17 字數 3669 閱讀 2454

元件名稱

元件版本

flume  

flume-ng-1.6.0-cdh5.7.0.tar.gz

zookeeper

zookeeper-3.4.5

kafka

kafka_2.11-0.10.0.0.tgz 

zookeeper部署   參照第4部

flume的部署

#解壓

[hadoop@hadoop001 soft]$ cd ~/soft

[hadoop@hadoop001 kafka_2.

11-0.10.0.0]$ vim config/server.properties

#新增環境變數

[hadoop@hadoop001 kafka_2.

11-0.10.0.0]$ vim ~/.bash_profile

export path=$kafka_home/bin:$path

[hadoop@hadoop001 kafka_2.

11-0.10.0.0]$ source ~/.bash_profile

[hadoop@hadoop001 kafka_2.

11-0.10.0.0]$ which kafka-topics.sh

#啟動[hadoop@hadoop001 kafka_2.

11-0.10.0.0]$ bin/kafka-server-start.sh config/server.properties

#測試:建立topic

[hadoop@hadoop001 kafka_2.

11-0.10.0.0]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wsk_test

#測試:顯示topic列表

[hadoop@hadoop001 kafka_2.

11-0.10.0.0]$ bin/kafka-topics.sh --list --zookeeper localhost:2181

#測試:控制台生產者

[hadoop@hadoop001 kafka_2.

11-0.10.0.0]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wsk_test

#測試:控制台消費者

使用flume的taildir source採集資料傳送到kafka以及hdfs。具體配置如下:

taildir-hdfsandkafka-agnet.sources = taildir-source   

taildir-hdfsandkafka-agnet.channels =c1 c2

taildir-hdfsandkafka-agnet.sinks = hdfs-sink kafka-sink

taildir-hdfsandkafka-agnet.sources.taildir-source.type =taildir

taildir-hdfsandkafka-agnet.sources.taildir-source.filegroups =f1

taildir-hdfsandkafka-agnet.sources.taildir-source.filegroups.f1 = /home/hadoop/data/flume/hdfsandkafka/input/.*taildir-hdfsandkafka-agnet.sources.taildir-source.positionfile = /home/hadoop/data/flume/hdfsandkafka/taildir_position/taildir_position.json

taildir-hdfsandkafka-agnet.sources.taildir-source.selector.type =replicating

taildir-hdfsandkafka-agnet.channels.c1.type =memory

taildir-hdfsandkafka-agnet.channels.c2.type =memory

taildir-hdfsandkafka-agnet.sinks.hdfs-sink.type =hdfs

taildir-hdfsandkafka-agnet.sinks.hdfs-sink.hdfs.path = hdfs://

hadoop001:9000/flume/hdfsandkafka/%y%m%d%h%m

taildir-hdfsandkafka-agnet.sinks.hdfs-sink.hdfs.uselocaltimestamp=true

taildir-hdfsandkafka-agnet.sinks.hdfs-sink.hdfs.fileprefix = wsktest-taildir-hdfsandkafka-agnet.sinks.hdfs-sink.hdfs.rollinterval = 10

taildir-hdfsandkafka-agnet.sinks.hdfs-sink.hdfs.rollsize = 100000000

taildir-hdfsandkafka-agnet.sinks.hdfs-sink.hdfs.rollcount = 0

taildir-hdfsandkafka-agnet.sinks.hdfs-sink.hdfs.filetype=datastream

taildir-hdfsandkafka-agnet.sinks.hdfs-sink.hdfs.writeformat=text

taildir-hdfsandkafka-agnet.sinks.kafka-sink.type =org.apache.flume.sink.kafka.kafkasink

taildir-hdfsandkafka-agnet.sinks.kafka-sink.brokerlist = localhost:9092

taildir-hdfsandkafka-agnet.sinks.kafka-sink.topic =wsk_test

taildir-hdfsandkafka-agnet.sources.taildir-source.channels =c1 c2

taildir-hdfsandkafka-agnet.sinks.hdfs-sink.channel =c1

taildir-hdfsandkafka-agnet.sinks.kafka-sink.channel = c2

flume-ng agent \

--name taildir-hdfsandkafka-agnet \

--conf $flume_home/conf \

--conf-file $flume_home/conf/taildir-hdfsandkafka-agnet.conf \

-dflume.root.logger=info,console

flume 如何使用flume將檔案存到hdfs

一 如何使用flume將檔案存到hdfs 簡單例子 set name agent1.sources source1 agent1.channels channel1 agent1.sinks sink1 link sources and sinks agent1.sources.source1.ch...

kafka和flume的對比

摘要 1 kafka和flume都是日誌系統。kafka是分布式訊息中介軟體,自帶儲存,提供push和pull訪問資料功能。flume分為agent 資料採集器 collector 資料簡單處理和寫入 storage 儲存器 三部分,每一部分都是可以定製的。比如agent採用 rpc thrift ...

storm流程 flume和kafka的連線

flume和kafka的連線參考部落格 flume,kafka,storm,mysql的整合 相關資源在這flume2kafka相關jar包及配置檔案 若想連線起flume和kafka,需要在flume conf目錄下,建立乙個.conf檔案,在lib目錄下新增相關jar包。步驟 1.在flume ...