pyflink的local模式實驗記錄

2021-10-07 02:06:32 字數 2002 閱讀 2744

python和flink相關的網上例子比較少

先溫固下shell中這些環境變數吧。

python

flink shell變數

型別scalaflink shell變數型別

s_env

pyflink.datastream.stream_execution_environment.streamexecutionenvironment

senv

streamexecutionenvironment

st_env

pyflink.table.table_environment.streamtableenvironment

stenv

streamtableenvironmentimpl

b_env

class 'pyflink.dataset.execution_environment.executionenvironment

benv

executionenvironment

bt_env

pyflink.table.table_environment.batchtableenvironment

btenv

batchtableenvironmentimpl

注:scala獲取型別舉例:

scala> senv.getclass.get******name

res2: string = streamexecutionenvironment

啟動命令:

pyflink-shell.sh local

互動模式中輸入的**(python datastream api):

import tempfile

import os

import shutil

sink_path = tempfile.gettempdir() + '/streaming.csv'

if os.path.exists(sink_path):

if os.path.isfile(sink_path):

os.remove(sink_path)

else:

shutil.rmtree(sink_path)

s_env.set_parallelism(1)

t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])

st_env.connect(filesystem().path(sink_path)).with_format(oldcsv()

.field_delimiter(',')

.field("a", datatypes.bigint())

.field("b", datatypes.string())

.field("c", datatypes.string())).with_schema(schema()

.field("a", datatypes.bigint())

.field("b", datatypes.string())

.field("c", datatypes.string())).register_table_sink("stream_sink")

t.select("a + 1, b, c").insert_into("stream_sink")

st_env.execute("stream_job")

實驗結果:/tmp/streaming.csv

開啟後如下:

Spark部署模式(一) Local模式

目錄 1.官方求pi案例 直接執行已打成依賴jar包中的指定class 2.編寫scala語言實現功能 3.整個spark運算的流程 4.spark中的driver和executor 5.總結spark中各種組成部分的關係 local模式就是spark執行在單節點的模式,通常用於在本機上練手和測試,...

Nutch的local和deploy模式

local模式 1.將hbase安裝目錄下lib 下面的所有 jar 複製到nutch runtime local lib下2.nutch runtime local 下先建立urls目錄mkdir urls,目錄下建立seed.txt touch seed.txt,如果能正常執行,則萬事大吉,你會...

Spark的local模式環境搭建

簡介 部署模式 執行模式 spark可以在那些情況下執行,spark 框架編寫的應用程式可以執行在本地模式 local mode 集群模式 cluster mode 和雲服務 cloud 方便開發測試和生產部署。spark本地模式的安裝 1.上傳安裝包解壓安裝包 解壓軟體包 tar zxvf spa...