storm流程 flume和kafka的連線

2021-07-13 08:56:42 字數 2593 閱讀 5176

flume和kafka的連線參考部落格:flume,kafka,storm,mysql的整合

相關資源在這flume2kafka相關jar包及配置檔案

若想連線起flume和kafka,需要在flume/conf目錄下,建立乙個.conf檔案,在lib目錄下新增相關jar包。

步驟:

1.在flume/conf目錄下建立相關.conf檔案,

(1)建立flume2kafka.conf檔案

vi flume2kafka.conf
############################################

# producer config

###########################################

#agent section

producer.sources = s

producer.channels = c

producer.sinks = r

#source section

#設定讀取方式

producer.sources

.s.type = exec

#設定讀取資料的位址及命令(tail -f -n+1 /home/storm/work/access.log)

producer.sources

.s.command = tail -f -n+1 /home/storm/work/access.log

producer.sources

.s.channels = c

# each sink's type must be defined

producer.sinks

.r.type = org.apache

.flume

.plugins

.kafkasink

producer.sinks

.r.metadata

.broker

.list=master:9092

producer.sinks

.r.partition

.key=0

producer.sinks

.r.partitioner

.class=org.apache

.flume

.plugins

.singlepartition

producer.sinks

.r.serializer

.class=kafka.serializer

.stringencoder

producer.sinks

.r.request

.required

.acks=0

producer.sinks

.r.max

.message

.size=1000000

producer.sinks

.r.producer

.type=sync

producer.sinks

.r.custom

.encoding=utf-8

#設定kafka的topic為:flume2kafka

producer.sinks

.r.custom

.topic

.name=flume2kafka

#specify the channel the sink should use

producer.sinks

.r.channel = c

# each channel's type is defined.

producer.channels

.c.type = memory

producer.channels

.c.capacity = 1000

2.在lib目錄下新增相關jar包

kafka_2.9

.2-0.8

.0-beta1.jar

metrics-annotation-2.2

.0.jar

scala-compiler-2.9

.2.jar

3.啟動該flume任務

bin/flume-ng agent -n producer -f conf/flume2kafka.conf  -dflume

.root.logger=info,console >>logs/flume.

log2

>&

1&

4.啟動kafka及kafka的consumer任務(檢視是否有資料傳輸)

(1)啟動kafka

sbin/start-kafka.sh
(2)啟動consumer任務

bin/kafka

-console

-consumer.sh

--zookeeper

master:2181--

topic

flume2kafka--

from

-beginning

windows系統flume資料傳給kafka

1 安裝zookeeper 更改flume配置為kafka b.編輯系統變數中的path變數,增加 zookeeper home bin conf 新增檔案 zoo.cfg the number of milliseconds of each tick 心跳間隔 毫秒每次 ticktime 2000...

flume執行流程

flume執行流程 source執行channel的doput方法,將接受到的event先放入putlist裡面臨時快取起來,當到達一定的量 batchsize 的時候,執行docommit方法,將putlist中的event放入channel的queue queue的大小是capacity 中,當...

Hadoop元件Zookeeper和Kafka部署

解壓並改名 mv zookeeper 3.4.6 zookeeper 建立相關目錄,並修改許可權 root master zookeeper mkdir data root master zookeeper chmod 777 data 進入zookeeper目錄下的conf目錄修改zoo.cfg檔...