詳解Flink中的Window

2021-10-06 01:45:25 字數 4653 閱讀 1286

1.1 window概述

流式計算是一種用於處理無限資料集的資料處理引擎,而無線資料集是指一種不斷增長的無限的資料集,而window是一種將無限資料集切割為有限塊進行處理的手段。

window是無限資料流處理的核心,window將乙個無限的stream拆分成有限大小的「bucket」桶,方便我們在桶上做計算操作。

1.2 window型別

window可以分成兩類:

① timewindow:按照時間生成window。

② countwindow:按照指定的資料條數生成乙個window,與時間無關。

對於timewindow,可以根據視窗實現原理的不同分成三類:滾動視窗(tumbling window)、滑動視窗(sliding window)和會話視窗(session window)。

(1)滾動視窗

依據固定的視窗長度對資料進行切片。

特點:時間對齊,視窗長度固定,沒有重疊。

滾動視窗分配器將每個元素分配到乙個特定的視窗大小的視窗中,滾動視窗有乙個固定的大小,並且不會出現重疊。例如:如果你指定了乙個5分鐘大小的滾動視窗,視窗的建立如下圖所示:

圖1 滾動視窗

適用場景:適合做bi統計等。(做每個時間段的聚合計算)。

(2)滑動視窗

滑動視窗是固定視窗的更廣義的一種形式,滑動視窗由固定的視窗長度和滑動間隔組成。

特點:時間對齊,視窗長度固定,可以有重疊。

滑動視窗分配器將元素分配到固定長度的視窗中,與滾動視窗類似,視窗的大小由視窗大小引數來配置,另乙個視窗滑動引數控制滑動視窗滑動的步長。

若滑動視窗的滑動引數小於視窗大小,則視窗出現重疊。在這種情況下,元素被分配到多個視窗中。

例如:若你有10秒鐘的視窗和5秒鐘的滑動,則每個視窗中5秒鐘的視窗裡包含著上個10秒鐘產生的資料,如下圖:

圖2 滑動視窗

適用場景:對最近乙個時間段內的統計(根據某介面最近5分鐘的失敗率來決定是否報警。)

(3)會話視窗

由一系列事件組合乙個指定時間長度的timeout間隙組成,類似於web應用的session,也就是一段時間沒有接收到新資料就會生成新的視窗。

特點:時間無對齊。

會話視窗分配器通過session活動來對元素進行分組,會話視窗跟滾動視窗和滑動視窗相比,不會有重疊和固定的開始時間和結束時間的情況。相反,當它在乙個固定的時間週期內不再收到元素,即非活動間隔產生,那這個視窗就會關閉。乙個會話視窗通過乙個session間隔來配置,這個session間隔定義了非活躍週期的長度,當這個非活躍週期產生,那麼當前的session將關閉並且後續的元素將被分配到新的session視窗中去。

圖3 會話視窗

2.1 timewindow

timewindow是將指定時間範圍內的所有資料組成乙個window,一次對乙個window裡面的所有資料進行計算。

(1)滾動視窗

flink預設的時間視窗根據processing time進行視窗的劃分,將flink獲取到的資料根據進入flink的時間劃分到不同的視窗中。

val mintempperwindow = datastream

.map(data =

>

(data.id, data.temperature)).

keyby

(_._1)

.timewindow

(time.

seconds(15

)).reduce

((d1,d2)

=>

(d1._1,d1._2.

min(d2._2)

))

時間間隔可以通過time.milliseconds(x),time.seconds(x),time.minutes(x)等其中的乙個來指定。

(2)滑動視窗

滑動視窗和滾動視窗的函式名是完全一致的,只是在傳引數時需要傳入兩個引數,乙個是window_size,乙個是sliding_size。

下面**中的sliding_size設定為了5s,也就是說,視窗每5s就計算一次,每一次計算的window範圍是15s內的所有元素。

val mintempperwindow = datastream

.map(data =

>

(data.id, data.temperature)).

keyby

(_._1)

.timewindow

(time.

seconds(15

),time.

seconds(5

)).reduce

((d1,d2)

=>

(d1._1,d1._2.

min(d2._2)

))

時間間隔可以通過time.milliseconds(x),time.seconds(x),time.minutes(x)等其中的乙個來指定。

(3)會話視窗

由於會話視窗是基於event time,故將會話視窗**放在了下一章flink時間語義和watermark中。

2.2 countwindow

countwindow根據視窗中相同key元素的數量來觸發執行,執行時只計算元素數量達到視窗大小的key對應的結果。

:countwindow的window_size指的是相同key的元素的個數,不是輸入的所有元素的總數。

(1)滾動視窗

預設的countwindow是乙個滾動的視窗,只需要指定視窗大小即可,當元素數量達到視窗大小時,就會觸發視窗的執行。

val mintempperwindow = datastream

.map(data =

>

(data.id, data.temperature)).

keyby

(_._1)

.countwindow(5

).reduce

((d1,d2)

=>

(d1._1,d1._2.

min(d2._2)

))

(2)滑動視窗滑動視窗和滾動視窗的函式名是完全一致的,只是在傳引數時需要傳入兩個引數,乙個是window_size,乙個是sliding_size。

下面**中的sliding_size設定為2,也就是說,每收到兩個相同key的資料就計算一次,每一次計算的window範圍是5個元素。

val mintempperwindow = datastream

.map(data =

>

(data.id, data.temperature)).

keyby

(_._1)

.countwindow(5

,2).

reduce

((d1,d2)

=>

(d1._1,d1._2.

min(d2._2)

))

2.3 window function

window function定義了要對視窗中收集的資料做的計算操作,主要可以分為兩類:

① 增量聚合函式:每條資料到來就進行計算,保持乙個簡單的狀態。典型的增量聚合函式有reducefunction,aggreatefunction。

② 全視窗函式:先把視窗所有資料收集起來,等到計算的時候會遍歷所有資料。典型的全視窗函式有processwindowfunction。

2.4 其他可選api

(1)觸發器——.trigger()

定義window什麼時候關閉,觸發計算並輸出結果。

:globalwindow預設的觸發器時nevertrigger,該觸發器從不出發,所以在使用globalwindow時必須自定義觸發器。

(2)移除器——.evitor()

定義移除某些資料的邏輯。evictor可以在觸發器觸發之後以及視窗函式被應用之前或之後可選擇地移除元素。使用evictor可以防止預聚合,因為視窗的所有元素都必須在應用計算邏輯之前先傳給evictor進行處理。

(3)允許處理遲到多久的資料——.allowedlateness()

(4)將遲到的資料放入側輸入流——.sideoutputlatedata()

(5)獲取側輸出流——.getsideoutput()

下面將這些api以圖的形式展示如下:

圖4 api介紹圖

Flink原理與實現 詳解Flink中的狀態管理

上面flink原理與實現的文章中,有引用word count的例子,但是都沒有包含狀態管理。也就是說,如果乙個task在處理過程中掛掉了,那麼它在記憶體中的狀態都會丟失,所有的資料都需要重新計算。從容錯和訊息處理的語義上 at least once,exactly once flink引入了stat...

Flink核心概念之window

計數視窗 1 windowall就是把所有資料弄到乙個slot處理,並行度始終為1 2 keyby會把資料分到不同的slot,keyby.window可以設定並行度 package com.fouth sink import org.apache.flink.streaming.api.functi...

window的location物件詳解

開啟google瀏覽器,調出除錯工具,在console一欄中輸入window.location,出現如圖所示 包含location的多個屬性 接下來以 為例子,介紹一下location的常用屬性 結果為 結果為 http 結果為 www.myurl.com 8866 結果為 www.myurl.co...