flink運算元(四)

2021-10-24 22:48:39 字數 2848 閱讀 5732

table 的建立方式

# dataset 和datastream 隱式轉化(case

class)

# sink table

# source table

# externalcatalog table

# table api 用法

val orders = tableenv.scan(

"orders"

)val revenue = orders .filter(

'ccountry === "france") .groupby('cid,

'cname'

).select(

'cid, 'cname,

'revenue.sum as 'revsum)

# sql 用法

tableenv.sqlquery(

"select name,sum(age) from t_table group by name"

)

注意

轉化dataset

datastream

tableapi

sql query

dataset--

totable隱式轉化,fromdataset

registerdataset登錄檔

datastream--

totable隱式轉化,fromdatastream

registerdatastream登錄檔

tableapi

todataset[row]

-registertable

sql query

todataset(string, int)

toretractstream

scan

-

# dataset和datastream 註冊成 sqlquery table

//register the datastream as table "mytable"

with fields "f0"

,"f1"

tableenv.registerdatastream(

"mytable"

, stream)

// register the datastream as table "mytable2"

with fields "mylong"

,"mystring"

tableenv.registerdatastream(

"mytable2"

, stream,

'mylong, 'mystring)

# dataset和datastream 轉化成table

// convert the datastream into a table with default fields '_1, '_2

val table1: table = tableenv.fromdatastream(stream)

// convert the datastream into a table with fields 'mylong, 'mystring

val table2: table = tableenv.fromdatastream(stream,

'mylong, 'mystring)

# table轉轉成dataset和datastream

// convert the table into a dataset of row

val dsrow: dataset[row]

= tableenv.todataset[row]

(table)

// convert the table into a dataset of tuple2[string, int]

val dstuple: dataset[

(string, int)

]= tableenv.todataset[

(string, int)

](table)

val dsrow: datastream[row]

(table)

val dstuple: datastream[

(string, int)

(string, int)

](table)

# 登錄檔轉和table api 轉化

// sql to table api

tableenv.registertable(

"table1",.

..) val tapiresult = tableenv.scan(

"table1"

).select(..

.)//from table api to sql

val projtable: table = tableenv.scan(

"x")

.select(..

.) tableenv.registertable(

"projectedtable"

, projtable)

測試案列

import org.apache.flink.streaming.api.scala._

import org.apache.flink.table.api.tableenvironment

import org.apache.flink.table.api.scala._

object tableapitest

case

class order(user:

long

,product:

string

,amount:

int)

}

Flink 運算元Operators總結

operator 作用流的轉換 map將乙個元素轉換成另外乙個元素 datastream datastream本 flapmap 將幾個的乙個元素轉換為零個,乙個或者多個 datastream datastream filter 保留集合中返回true的元素 datastream datastrea...

Flink學習筆記(六) flink的運算元與富函式

一 flink中的的transformation運算元 flink常用運算元就不自己詳細記錄了,看這裡就夠了。二 富函式 在呼叫datastream的運算元例如map filter時,可以傳入乙個函式,也可以傳入乙個function類,就像這樣 val filterstream stream.fil...

Flink的Aggregate運算元的用法

如果定義了window assigner 之後,下一步就可以定義視窗內資料的計算邏輯,這也就是 window function 的定義。flink 中提供了四種型別的 window function 分別為reducefunction aggregatefunction 以及 processwind...