建立訊息佇列(Kafka)源表

2021-09-19 21:37:24 字數 3872 閱讀 7325

本頁目錄

kafka源表的實現**於自社群的kafka版本實現。

注意:本文件只適合獨享模式下使用。

kafka需要定義的ddl如下。

create

table kafka_stream

(

messagekey varbinary

,

`message`

varbinary

,

topic varchar

,

`partition`

int,

`offset`

bigint

)

with

(

type

='kafka010'

,

topic

='***'

,

`group.id`

='***'

,

bootstrap

.servers

='ip:埠,ip:埠,ip:埠'

);

注意:以上表中的五個字段順序務必保持一致。

引數

注釋說明

備註type

kafka對應版本

推薦使用kafka010

topic

讀取的單個topic

topic名稱

(1)kafka08必選配置:

引數注釋說明

備註group.id

無消費組id

zookeeper.connect

zk鏈結位址

zk連線id

(2)kafka09/kafka010/kafka011必選配置:

引數注釋說明

備註group.id

無消費組id

bootstrap.servers

kafka集群位址

kafka集群位址

如果您的kafka是阿里雲商業版,請參考kafka商業版準備配置文件。

如果您的kafka是阿里雲公測版,請參考kafka公測版準備配置文件。

"consumer.id"

,"socket.timeout.ms"

,"fetch.message.max.bytes"

,"num.consumer.fetchers"

,"auto.commit.enable"

,"auto.commit.interval.ms"

,"queued.max.message.chunks"

,"rebalance.max.retries"

,"fetch.min.bytes"

,"fetch.wait.max.ms"

,"rebalance.backoff.ms"

,"refresh.leader.backoff.ms"

,"auto.offset.reset"

,"consumer.timeout.ms"

,"exclude.internal.topics"

,"partition.assignment.strategy"

,"client.id"

,"zookeeper.session.timeout.ms"

,"zookeeper.connection.timeout.ms"

,"zookeeper.sync.time.ms"

,"offsets.storage"

,"offsets.channel.backoff.ms"

,"offsets.channel.socket.timeout.ms"

,"offsets.commit.max.retries"

,"dual.commit.enabled"

,"partition.assignment.strategy"

,"socket.receive.buffer.bytes"

,"fetch.min.bytes"

注意:其它可選配置項參考kafka官方文件:

kafka09

kafka010

kafka011

type

kafka 版本

kafka08

0.8.22

kafka09

0.9.0.1

kafka010

0.10.2.1

kafka011

0.11.0.2

預設kafka讀到的訊息:

messagekey varbianry

,

message varbianry

,

topic varchar

,

partition

int,

offset bigint

這樣乙個五元組,如果您希望在source階段把資料parser成特定的其它格式,可以按照下面實踐進行。

引數注釋說明

備註parserudtf

自定**析函式

用於解析從kafka讀到的訊息對映到ddl具體對應的型別

如何寫乙個parserudtf參見自定義錶值函式(udtf)。

與阿里雲kafka訊息佇列一樣,ddl定義相同。

create

table kafka_stream

(

messagekey varbinary

,

`message`

varbinary

,

topic varchar

,

`partition`

int,

`offset`

bigint

)

with

(

type

='kafka011'

,

topic

='kafka_01'

,

`group.id`

='cid_blink'

,

bootstrap

.servers

='192.168.0.251:9092'

);

關於自建kafka的with引數,請參考本文件kafka建立時ddl的with引數說明。需要注意的是bootstrap.servers引數需要填寫自建的位址和埠號。

注意:無論是阿里雲kafka還是自建kafka,目前實時計算均無tps、rps等指標資訊。在作業上線之後,運維介面暫時不支援顯示指標資訊。

相關文件

相關產品

本文**實時計算——

建立訊息佇列(kafka)源表

訊息佇列 訊息佇列 kafka

kafka是乙個分布式的基於發布 訂閱模式的訊息佇列,主要用於大資料實時處理領域。要理解kafka首先要有分布式的概念,要有訊息佇列的概念。分布式系統最大的優勢就是解耦和削峰,這種情況下,a系統生成了乙個訊息,b系統非同步獲取,那麼就需要乙個存放訊息的訊息佇列 mq 相比較傳統的訊息佇列,訊息被消費...

訊息佇列 Kafka學習

kafka是乙個分布式的訊息佇列,學習見apache kafka文件,中文翻譯見kafka分享,乙個簡單的入門例子見kafka 入門例項。本文只針對自己感興趣的點記錄下。producer consumer 訊息的生成者和使用者。broker kafka server充當broker角色,起到訊息佇列...

訊息佇列 Kafka學習

kafka是乙個分布式的訊息佇列,學習見apache kafka文件,中文翻譯見kafka分享,乙個簡單的入門例子見kafka 入門例項。本文只針對自己感興趣的點記錄下。producer consumer 訊息的生成者和使用者。broker kafka server充當broker角色,起到訊息佇列...