flume從kafka導資料到hdfs

2021-07-23 14:14:51 字數 3103 閱讀 1340

flume是cloudera提供的乙個高可用的,高可靠的,分布式的海量日誌採集、聚合和傳輸的系統,flume支援在日誌系統中定製各類資料傳送方,用於收集資料;同時,flume提供對資料進行簡單處理,並寫到各種資料接受方(可定製)的能力.

利用flume從kafka導資料到hdfs

配置檔案如下:

flumetohdfs_agent.sources = source_from_kafka

flumetohdfs_agent.channels = mem_channel

flumetohdfs_agent.sinks = hdfs_sink

#auto.commit.enable = true

## kerberos config ##

#flumetohdfs_agent.sinks.hdfs_sink.hdfs.kerberosprincipal = flume/[email protected]

#flumetohdfs_agent.sinks.hdfs_sink.hdfs.kerberoskeytab = /root/apache-flume-1.6.0-bin/conf/flume.keytab

# for each one of the sources, the type is defined

flumetohdfs_agent.sources.source_from_kafka.type = org.apache.flume.source.kafka.kafkasource

flumetohdfs_agent.sources.source_from_kafka.zookeeperconnect = 10.129.142.46:2181,10.166.141.46:2181,10.166.141.47:2181/testkafka

flumetohdfs_agent.sources.source_from_kafka.topic = itil_topic_4097

#flumetohdfs_agent.sources.source_from_kafka.batchsize = 10000

flumetohdfs_agent.sources.source_from_kafka.groupid = flume4097

flumetohdfs_agent.sources.source_from_kafka.channels = mem_channel

# the channel can be defined as follows.

flumetohdfs_agent.sinks.hdfs_sink.type = hdfs

#flumetohdfs_agent.sinks.hdfs_sink.fileprefix = %

flumetohdfs_agent.sinks.hdfs_sink.hdfs.path = hdfs:

## roll every hour (after gz)

flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollsize = 0

flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollcount = 0

flumetohdfs_agent.sinks.hdfs_sink.hdfs.rollinterval = 3600

flumetohdfs_agent.sinks.hdfs_sink.hdfs.threadspoolsize = 300

#flumetohdfs_agent.sinks.hdfs_sink.hdfs.codec = gzip

#flumetohdfs_agent.sinks.hdfs_sink.hdfs.filetype = compressedstream

flumetohdfs_agent.sinks.hdfs_sink.hdfs.filetype=datastream

flumetohdfs_agent.sinks.hdfs_sink.hdfs.writeformat=text

#specify the channel the sink should use

flumetohdfs_agent.sinks.hdfs_sink.channel = mem_channel

# each channel's type is defined.

flumetohdfs_agent.channels.mem_channel.type = memory

# other config values specific to each type of channel(sink or source)

# can be defined as well

# in this case, it specifies the capacity of the memory channel

flumetohdfs_agent.channels.mem_channel.capacity = 100000

flumetohdfs_agent.channels.mem_channel.transactioncapacity = 10000

啟動agent:

./flume-ng agent --conf ../conf/ -n flumetohdfs_agent -f ../conf/flume-conf-4097.properties

**的名字(-n flumetohdfs_agent)必須跟配置檔案裡的名字一致,預設輸出hdfs的檔案格式為sequencefile,無法直接開啟瀏覽,可以設定輸出格式為文字:

flumetohdfs_agent.sinks.hdfs_sink.hdfs.filetype=datastream  

flumetohdfs_agent.sinks.hdfs_sink.hdfs.writeformat=text  

也可以設定壓縮輸出:

flumetohdfs_agent.sinks.hdfs_sink.hdfs.codec = gzip

flumetohdfs_agent.sinks.hdfs_sink.hdfs.filetype = compressedstream

從kafka到hive:

從flume到kafka,日誌收集

實時日誌分析 本篇文章主要測試 從flume到kafka的日誌收集,storm日誌分析,學習中!flume 配置檔案 collector collector.sources cs collector.sinks ck hbasesink collector.channels cc hbasechan...

從mysql導資料到trafodion

1.安裝odbc mysql驅動 yum install mysql connector odbc.x86 64 2.先配置 etc odbc.ini 配置trafodion和mysql odbc odbc traceflags error tracestart 0 tracefile tracef...

從oracle導資料到mysql

工具 sql developer,mysql workbench,notepad 思路 a.sql developer連線oracle資料庫將表匯出為csv格式檔案 點查詢資料之後有匯出按鈕 b.notepad 開啟csv格式檔案選擇全部資料轉編碼為utf 8 c.mysql workbench右鍵...