PyFlink基礎應用之kafka

2021-10-08 20:12:50 字數 2493 閱讀 1773

pyflink基礎應用之kafka

執行環境

pyflink需要特定的python版本(3.5、3.6或3.7)。執行一下命令,以確保python版本滿足要求。

$ python -v

pyflink已經發布到pypi,可以直接安裝:

$ python -m pip install apache-flink

拷貝三個jar包到flink_home/lib下。

flink-connector-kafka_2.11-1.11.0.jar

flink-sql-connector-kafka_2.11-1.11.0.jar

kafka-clients-2.4.1.jar

作者的執行環境為python3.7.6、flink1.11.0、kafka2.11(broker1.0.0)。預研的時候遇到最多的問題是缺少jar包和jar包衝突,多看執行輸出的日誌,根據日誌錯誤提示補充相應的jar包。

參考資料有:

執行kafka

建立主題

bin/kafka-topics.sh --zookeeper 192.168.113.11:2181/kafka --create --replication-factor 1 --partitions 1 --topic flink_test2

啟動生產者-發出測試資料

bin/kafka-console-producer.sh --broker-list 192.168.113.11:9092 --topic flink_test2

測試資料格式為:

啟動消費者-檢測是否接受到資料

例項**

本應用採用pyflink+sql方式編寫**。

#!/usr/bin/python3.7

from pyflink.datastream import streamexecutionenvironment, checkpointingmode

from pyflink.table import streamtableenvironment, tableconfig, datatypes, csvtablesink, writemode, sqldialect

s_env = streamexecutionenvironment.get_execution_environment()

s_env.set_parallelism(1)

#必須開啟checkpoint,時間間隔為毫秒,否則不能輸出資料

s_env.enable_checkpointing(3000)

st_env = streamtableenvironment.create(s_env, tableconfig())

st_env.use_catalog(「default_catalog」)

st_env.use_database(「default_database」)

sourcekafkaddl = 「」"

create table sourcekafka(

id int comment 『序號』,

name varchar comment 『姓名』

)comment 『從kafka中源源不斷獲取資料』

with(

『connector』 = 『kafka』,

『topic』 = 『flink_test2』,

『properties.bootstrap.servers』 = 『192.168.113.11:9092』,

『scan.startup.mode』 = 『earliest-offset』,

『format』 = 『json』

)「」"

st_env.execute_sql(sourcekafkaddl)

fieldnames = [「id」, 「name」]

fieldtypes = [datatypes.int(), datatypes.string()]

csvsink = csvtablesink(fieldnames, fieldtypes, 「/root/tiamaes/result.csv」, 「,」, 1, writemode.overwrite)

st_env.register_table_sink(「csvtablesink」, csvsink)

resultquery = st_env.sql_query(「select * from sourcekafka」)

resultquery.insert_into(「csvtablesink」)

st_env.execute(「pyflink-kafka-v2」)

儲存檔案為pyflink_kafka.py
**執行

採用local-single部署模式執行:

python pyflink_kafka.py

持續檢查result.cvs的內容:

tail –f result.cvs

執行時沒有錯誤日誌時,在result.cvs能持續看到通過kafka生產者發生的資料。

WEB應用之httpd基礎入門(五)

前文我們聊到了httpd的啟動使用者和相關許可權的說明,資源壓縮配置 https的實現,回顧請參考今天我們來說說httpd的重定向 hsts 反向 的配置 首先來了解下重定向吧,什麼意思呢?假如我們訪問乙個資源在伺服器上不存在或者不在我們對應訪問url下,而使用者又不知道我們新的url的情況下,我們...

DelegateAndEvent應用之回馬槍

應用 delegate 和event 實現函式的 在實際的開發中非常有用。它實現的實際上是一種依賴通知的效果。通常可以用在 子窗體資訊更新的結果反饋至母窗體 類的屬性值和 ui控制項值依賴時的相互 通知 等。現通過簡單的例子 demo 這兩種應用場景 一 類的屬性值發生變化時,反饋到和它關聯的控制項...

Docker應用之倉庫

倉庫是存放映象的地方 註冊伺服器是管理倉庫的具體伺服器,每個伺服器上可以有多個倉庫,每個倉庫也可以有多個映象 如 dl.dockerpool.com ubuntu dl.dockerpool.com就是註冊伺服器位址,ubuntu是倉庫名 一 docker hub公共映象市場 docker hub是...