通過flume把oracle資料匯入到kafka

2021-08-20 12:09:47 字數 2893 閱讀 9615

版本flume 1.6 kafka2.11

第二步:我用的是oracle所以,就把oracle的jdbc包放到flume的lib目錄下。我放的是ojdbc5.jar

第三步:配置flume的conf配置檔案。

vi sql-kafka.conf   具體配置如下:

agenttest.channels = channeltest

agenttest.sources = sourcetest

agenttest.sinks = sinktest

###########sql source#################

# for each test of the sources, the type is defined

agenttest.sources.sourcetest.type = org.keedio.flume.source.sqlsource

agenttest.sources.sourcetest.hibernate.connection.url = jdbc:oracle:thin:@192.168.200.8:1521/orcl

# hibernate database connection properties

agenttest.sources.sourcetest.hibernate.connection.user = typpcits

agenttest.sources.sourcetest.hibernate.connection.password = typpcits

agenttest.sources.sourcetest.hibernate.connection.autocommit = true

agenttest.sources.sourcetest.hibernate.dialect = org.hibernate.dialect.oracle10gdialect

agenttest.sources.sourcetest.hibernate.connection.driver_class = oracle.jdbc.driver.oracledriver

agenttest.sources.sourcetest.run.query.delay=1

agenttest.sources.sourcetest.status.file.path = /opt/flume

agenttest.sources.sourcetest.status.file.name = agenttest.sqlsource.status

# custom query

agenttest.sources.sourcetest.custom.query = select * from its_base_area

agenttest.sources.sourcetest.batch.size = 6000

agenttest.sources.sourcetest.max.rows = 1000

agenttest.sources.sourcetest.hibernate.connection.provider_class = org.hibernate.connection.c3p0connectionprovider

agenttest.sources.sourcetest.hibernate.c3p0.min_size=1

agenttest.sources.sourcetest.hibernate.c3p0.max_size=10

##############################

agenttest.channels.channeltest.type = memory

agenttest.channels.channeltest.capacity = 10000

agenttest.channels.channeltest.transactioncapacity = 10000

agenttest.channels.channeltest.bytecapacitybufferpercentage = 20

agenttest.channels.channeltest.bytecapacity = 1600000

agenttest.sinks.sinktest.type = org.apache.flume.sink.kafka.kafkasink

agenttest.sinks.sinktest.topic = testtopic

agenttest.sinks.sinktest.brokerlist = 192.168.72.129:9092,192.168.72.130:9092,192.168.72.131:9092

agenttest.sinks.sinktest.requiredacks = 1

agenttest.sinks.sinktest.batchsize = 20

agenttest.sinks.sinktest.channel = channeltest

agenttest.sinks.sinktest.channel = channeltest

agenttest.sources.sourcetest.channels=channeltest

以上需要替換別忘記換了。

第四步:在flume bin路徑下執行命令:

./bin/flume-ng agent -n agenttest  -c conf -f conf/sql-kafka.conf -dflume.root.logger=info,console

第五步:在kafka主題testtopic上看有沒有資料。在路徑kafka的bin下執行命令:

./kafka-console-consumer.sh --zookeeper 192.168.72.129:2181 --topic testtopic --from-beginning

如果成功的話,這時候就可以看見你查詢oracle的資料了。

flume 監控傳送數和成功數

使用flume實時收集日誌的過程中,儘管有事務機制保證資料不丟失,但仍然需要時刻關注source channel sink之間的訊息傳輸是否正常,比如,souce channel傳輸了多少訊息,channel sink又傳輸了多少,兩處的訊息量是否偏差過大等等。flume為我們提供了monitor的...

Flume監聽oracle表增量

需求 獲取oracle表增量資訊,傳送至udp514埠,支援ip配置 步驟 1 需要的jar oracle的 odbc5.jar oracle安裝目錄 jdbc lib下查詢 這兩個jar 都拷貝到flume的lib下 3 flume配置檔案 4 遞增欄位要放在select的第一位 切記 a1.so...

本地檔案到通過flume到kafka

配置檔案 agent1 name agent1.sources source1 agent1.sinks sink1 agent1.channels channel1 建立linux目錄建立kakfa的topicname 啟動flume的配置檔案 flume ng agent n agent1 c ...