Flume抽取mysql資料到hive

2021-10-23 05:20:41 字數 4860 閱讀 6673

1.2. 環境配置

hive根目錄

/opt/cloudera/parcels/cdh-6.0.0-1.cdh6.0.0.p0.537114/lib/hive-hcatalog/share/hcatalog

中的4個jar包匯入flume_home/lib;

hive-hcatalog-core-2.3.0.jar

hive-hcatalog-ping-adapter-2.3.0.jar

hive-hcatalog-server-extensions-2.3.0.jar

hive-hcatalog-streaming-2.3.0.jar

hive_home/lib下的所有jar匯入flume_home/lib中;

1.3. hive端建表

注:必須分桶 + orc事務表

create

table flume_user(

user_id string,user_name string,age string

)clustered

by(user_id)

into

2 buckets

stored as orc

tblproperties(

'transactional'

='true'

)#開啟hive支援併發和事務

set hive.support.concurrency=

true

set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.dbtxnmanager;

注:netcat-flume-hive 方式

#netcat-flume-hive(埠傳送資料 sink_hive 測試)

#flume-ng agent --conf-file sink_hive.conf -c conf/ --name a1 -dflume.root.logger=debug,console

a1.sources=r1

a1.channels=c1

a1.sinks=s1

a1.sources.r1.

type

=netcat

a1.sources.r1.bind=hadoop3

a1.sources.r1.port=

44444

a1.sinks.s1.

type

=hive

a1.sinks.s1.hive.metastore=thrift://hadoop3:9083

a1.sinks.s1.hive.

database

=default

a1.sinks.s1.hive.

table

=flume_user

a1.sinks.s1.serializer=delimited

#a1.sinks.s1.hive.partition=%y-%m-%d

#a1.sinks.s1.autocreatepartitions = false

#a1.sinks.s1.uselocaltimestamp=false

a1.sinks.s1.serializer.

delimiter

="\t"

a1.sinks.s1.serializer.serdeseparator=

'\t'

a1.sinks.s1.serializer.fieldnames=user_id,user_name,age

a1.channels.c1.

type

=memory

a1.channels.c1.capacity=

1000

a1.channels.c1.transactioncapacity=

100a1.sources.r1.channels=c1

a1.sinks.s1.channel=c1

注:mysql-flume-hive 方式

#mysql-flume-hive(測試)

#flume-ng agent --conf-file sink_hive.conf -c conf/ --name a1 -dflume.root.logger=debug,console

a1.sources=sqlsource

a1.channels=c1

a1.sinks=s1

#宣告source型別

a1.sources.sqlsource.

type

=org.keedio.flume.source.sqlsource

a1.sources.sqlsource.hibernate.connection.url=jdbc:mysql://xx-xx-xx-xx:3306/test?usessl=false

a1.sources.sqlsource.hibernate.connection.

user

=***x

a1.sources.sqlsource.hibernate.connection.password=***x

#這個引數很重要,預設false,如果設為false就不會自動查詢

a1.sources.sqlsource.hibernate.connection.autocommit=

true

#宣告mysql的hibernate方言

a1.sources.sqlsource.hibernate.dialect=org.hibernate.dialect.mysql5dialect

#宣告mysql驅動

a1.sources.sqlsource.hibernate.connection.driver_class=com.mysql.jdbc.driver

#查詢間隔,單位毫秒

a1.sources.sqlsource.run.query.delay=

10000

#宣告儲存flume狀態的資料夾位置

a1.sources.sqlsource.

status

.file

.path=

/var/lib/flume

a1.sources.sqlsource.

status

.file

.name=test_mysql_hive.

status

#宣告從第一條資料開始查詢

a1.sources.sqlsouce.

start

.from=0

a1.sources.s1.custom.query =

select user_id,user_name,age from flume_hive_test where user_id > $@$

order

by user_id asc

#a1.sources.sqlsource.columns.to.select = *

#a1.sources.sqlsource.incremental.column.name = user_id

#a1.sources.sqlsource.incremental.value = 0

#設定分批引數

a1.sources.sqlsource.batch.size=

1000

a1.sources.sqlsource.max.

rows

=1000

#設定c3p0連線池引數

a1.sources.sqlsource.hibernate.connection.provider_class=org.hibernate.connection.c3p0connectionprovider

a1.sources.sqlsource.hibernate.c3p0.min_size=

1a1.sources.sqlsource.hibernate.c3p0.max_size=

10a1.sinks.s1.

type

=hive

a1.sinks.s1.hive.metastore=thrift://hadoop3:9083

a1.sinks.s1.hive.

database

=default

a1.sinks.s1.hive.

table

=flume_user

a1.sinks.s1.serializer=delimited

#a1.sinks.s1.hive.partition=%y-%m-%d

#a1.sinks.s1.autocreatepartitions = false

#a1.sinks.s1.uselocaltimestamp=true

a1.sinks.k1.round =

true

a1.sinks.k1.roundvalue =

10a1.sinks.k1.roundunit =

minute

a1.sinks.s1.serializer.

delimiter

="\t"

a1.sinks.s1.serializer.serdeseparator=

'\t'

a1.sinks.s1.serializer.fieldnames=user_id,user_name,age

a1.channels.c1.

type

=memory

a1.channels.c1.capacity=

1000

a1.channels.c1.transactioncapacity=

100a1.sources.sqlsource.channels=c1

a1.sinks.s1.channel=c1`

``

flume增量讀取mysql資料寫入到hdfs

宣告source,channel,sink a1.sources sqlsource a1.channels c1 a1.sinks s1 宣告source型別 a1.sources.sqlsource.type org.keedio.flume.source.sqlsource a1.source...

從mysql抽取資料到hive遇到的問題

資料遷移最大的影響是資料型別的不同導致資料不匹配,比如文字mysql中為text,到了hive就變成string,hive中int型別的都不帶長度 重點是時間型別的轉換 mysql中datetime型別的資料是這樣的,hive中需要用timestamp來進行轉換 mysql中date型別hive中也...

flume從kafka導資料到hdfs

flume是cloudera提供的乙個高可用的,高可靠的,分布式的海量日誌採集 聚合和傳輸的系統,flume支援在日誌系統中定製各類資料傳送方,用於收集資料 同時,flume提供對資料進行簡單處理,並寫到各種資料接受方 可定製 的能力.利用flume從kafka導資料到hdfs 配置檔案如下 flu...