kafka整合flume詳細步驟

2021-09-06 16:33:27 字數 3768 閱讀 4966

對於初學者來說,可能對kafka和flume有一定的了解,但是並沒有過實際的應用,也不知道如何來使用。

這篇文章主要針對kafka和flume的整合

環境:linux

準備工作:搭建好zookeeper集群及kafka集群

版本:kafka_2.11-1.1.0,flume-1.8.0

步驟:啟動zookeeper集群,在每乙個節點執行zkserver.sh start

檢查kafka是否安裝成功

啟動kafka集群

進入kafka安裝的根目錄,在每乙個節點均執行命令

bin/kafka-server-start.sh config/server.properties &

使用 jps 命令檢視是否有kafka執行緒

此時原有的視窗會被占用,可再開啟乙個視窗

3.檢視所有的topic列表

bin/kafka-topics.sh --list --zookeeper huaxia01:2181,huaxia02:2181,huaxia03:2181
在kafka根目錄下執行以上命令

4.建立新的topic

bin/kafka-topics.sh --create --topic news-logs-1807 --zookeeper huaxia01:2181,huaxia02:2181,huaxia03:2181 --partitions 3 --replication-factor 3
5.建立成功後檢視所建立的topic

bin/kafka-topics.sh --describe --topic news-logs-1807 --zookeeper huaxia01:2181,huaxia02:2181,huaxia03:2181
此處可能版本不一樣導致命令報錯,可根據提示修改命令

6.建立生產者和消費者

這裡可以根據

檢視生產者和消費者的關係

7.進入到flume的conf目錄,新建flume-kafka-sink.conf

a1.sources = r1

a1.sinks = k1

a1.channels = c1

#對於source的配置描述 監聽檔案中的新增資料 exec

a1.sources.r1.type = exec

a1.sources.r1.command = tail -f /home/bigdata/data/projects/news/data/news_log_rt.log

#對於sink的配置描述 使用kafka日誌做資料的消費

a1.sinks.k1.type = org.apache.flume.sink.kafka.kafkasink

a1.sinks.k1.kafka.bootstrap.servers = huaxia01:9092,huaxia01:9092,huaxia01:9092

#這裡設定所建立的topic

a1.sinks.k1.kafka.topic = news-logs-1807

a1.sinks.k1.kafka.flumebatchsize = 1000

a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.kafka.producer.linger.ms = 1

#對於channel的配置描述 使用檔案做資料的臨時快取 這種的安全性要高

a1.channels.c1.type = memory

a1.channels.c1.capacity = 100000

a1.channels.c1.transactioncapacity = 1000

#通過channel c1將source r1和sink k1關聯起來

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

8.驗證kafka和flume

建立檔案 /home/huaxia/data/projects/news/data/news_log_rt.log用於存放資料

9.建立news_log_rt.sh指令碼,注意要與news_log_rt.log在同乙個目錄下

任意輸入乙個指令碼,我在這裡舉個例子

arr=("hubei_wuhan" "hebei_shijiazhuang" "guangdong_guangzhou" "jiangsu_nanjing" "hunan_changsha")

function rand()

for((i=0;i<50;i++));

dornd1=$(rand 0 4)

currenttime=`date "+%y-%m-%d %h:%m:%s"`

timestamp=`date -d "$currenttime" +%s`

a=$province=`echo $a | cut -d \_ -f 1`

city=`echo $a | cut -d \_ -f 2`

rnd2=$(rand 0 10)

userid=$rnd2

rnd3="10000"$(rand 0 3)

advid=$rnd3

newstr=$","$","$","$","$

echo $newstr >> news_log_rt.log

done

10.建立kafka消費者監控news-logs-1807這個topic

在kafka的安裝目錄下執行

11.啟動flume

在flume的安裝目錄下執行

bin/flume-ng agent -n a1 -c conf -f conf/flume-kafka-sink.conf -dflume.root.logger=info,console
啟動成功:

12.啟動news_log_rt.sh檔案,在指令碼所在目錄下執行

sh news_log_rt.sh
13.觀察/home/huaxia/data/projects/news/data/news_log_rt.log的輸出結果

同時在消費者控制台上會觀察到以下現象

Kafka與flume的整合

為我們的source channel sink起名 a1.sources r1 a1.channels c1 a1.sinks k1 指定我們的source收集到的資料傳送到哪個管道 a1.sources r1.channels c1 指定我們的source資料收集策略 a1.sources r1....

Flume整合Kafka的簡單demo記錄

啟動zookeeper和kafka,單節點 bin zookeeper server start.sh config zookeeper.properties bin kafka server start.sh config server.properties 2.建立主題 建立乙個主題 flume...

HDP 集群中flume與kafka的整合

首先保證flume與kafka正確安裝並啟動,這個比較簡單,直接在ambari中新增新服務即可,不多贅述。配置flume 新建一配置檔案kafka.conf,編輯,追加一下內容。掃瞄指定檔案配置 agent.sources s1 agent.channels c1 agent.sinks k1 ag...