flink 三 flink批處理

2021-10-24 02:24:36 字數 2950 閱讀 8928

datasource型別運算元

val environment: executionenvironment = executionenvironment.getexecutionenvironment

// 支援多種collection的具體型別

val datasource1: dataset[

string

]= environment.fromcollection(array(

"a",

"b",

"c",

"d")

)// 支援tuple,自定義物件等復合形式

val datasource2: dataset[

string

]= environment.fromelements(

"a",

"b",

"c",

"d")

// 基於迭代的sequence的dataset

val datasource3: dataset[

long

]= environment.generatesequence(1,

20)// 基於本地檔案的讀取

val datasource4: dataset[

string

]= environment.readtextfile(

"word.txt"

)// 讀取hdfs檔案

val datasource5: dataset[

string

]= environment.readtextfile(

"hdfs://node01:9000/word.txt"

)// 讀取csv檔案,可以直接指定泛型

class wordcount(word:

string

,num:

int)

val datasource6: dataset[wordcount]

= environment.readcsvfile[wordcount]

("word.csv"

)// 讀取壓縮檔案,flink會自動識別出壓縮型別,使用對應的方式進行解壓

val datasource7 = environment.readtextfile(

"word.tar.gz"

)

flink批處理transformation運算元

ransformation

說明map

將dataset中的每乙個元素轉換為另外乙個元素

flatmap

將dataset中的每乙個元素轉換為0…n個元素

將乙個分割槽中的元素轉換為另乙個元素

filter

過濾出來一些符合條件的元素

reduce

可以對乙個dataset或者乙個group來進行聚合計算,最終聚合成乙個元素

reducegroup

將乙個dataset或者乙個group聚合成乙個或多個元素

aggregate

按照內建的方式來進行聚合。例如:sum/min/max…

distinct

去重join

將兩個dataset按照一定條件連線到一起,形成新的dataset

union

將兩個dataset取並集,並不會去重

rebalance

讓每個分割槽的資料均勻分布,避免資料傾斜

partitionbyhash

按照指定的key進行hash分割槽

sortpartition

指定欄位對分割槽中的資料進行排序

sum運算元

def main(args: array[

string])

:unit

=

reduce運算元

def main(args: array[

string])

:unit

=

aggregate

def main(args: array[

string])

:unit

=

累加器accumulator

def main(args: array[

string])

:unit

=override

def map(in:

string):

string=}

) res.writeastext(

"data/wordcount"

,filesystem.writemode.overwrite)

val result: jobexecutionresult = environment.execute(

"wordcount"

) println(result.getaccumulatorresult(

"wordcounter"))

}

廣播變數broadcast

def main(args: array[

string])

:unit

=override

def open(parameters: configuration)

:unit=}

).withbroadcastset(score,

"score"

).print(

)}

分布式快取檔案

def main(args: array[

string])

:unit=}

override

def map(in:

string):

string=}

) result.print(

)}

Flink學習系列之二 Flink批處理

此時我們可以使用flink的批處理,我的data目錄下有a.txt檔案,輸入任意的單詞,然後我們開始統計。如下 public class batchhandler groupby 0 sum 1 filepath 檔案輸出結果檔案 n 以換行符作為每行結束條件 以空格分割單詞 setparallel...

flink批處理中的source以及sink介紹

flink在批處理中常見的source主要有兩大類 1.基於本地集合的source collection based source 2.基於檔案的source file based source 1.基於本地集合的source 在flink最常見的建立dataset方式有三種。1.使用env.fro...

flink學習 flink架構

flink結構 graph 2個併發度 source為1個併發度 的sockettextstreamwordcount四層執行圖的演變過程 jobgraph streamgraph經過優化後生成了 jobgraph,提交給 jobmanager 的資料結構。executiongraph jobman...