Flink WaterMark原理與實現

2021-10-07 07:45:51 字數 3126 閱讀 4179

在使用 eventtime 處理 stream 資料的時候會遇到資料亂序的問題,流處理從 event(事 件)產生,流經 source,再到 operator,這中間需要一定的時間。雖然大部分情況下,傳輸到 operator 的資料都是按照事件產生的時間順序來的,但是也不排除由於網路延遲等原因而導致亂序的產生,特別是使用 kafka 的時候,多個分割槽之間的資料無法保證有序。因此, 在進行 window 計算的時候,不能無限期地等下去,必須要有個機制來保證在特定的時間後, 必須觸發 window 進行計算,這個特別的機制就是 watermark(水位線)。watermark 是用於 處理亂序事件的。

在 flink 的視窗處理過程中,如果確定全部資料到達,就可以對 window 的所有資料做 視窗計算操作(如彙總、分組等),如果資料沒有全部到達,則繼續等待該視窗中的資料全 部到達才開始處理。這種情況下就需要用到水位線(watermarks)機制,它能夠衡量資料處 理進度(表達資料到達的完整性),保證事件資料(全部)到達 flink 系統,或者在亂序及 延遲到達時,也能夠像預期一樣計算出正確並且連續的結果。當任何 event 進入到 flink 系統時,會根據當前最大事件時間產生 watermarks 時間戳。

如何計算watermark的值?

watermark = 進入 flink 的最大的事件時間(mxteventtime)— 指定的延遲時間(t)

有watermark 的 window 是怎麼觸發視窗函式?

如果有視窗的停止時間等於或者小於maxeventtime – t(當時的 warkmark),那麼 這個視窗被觸發執行

watermark 的使用存在三種情況:

1. 本來有序的 stream 中的 watermark

如果資料元素的事件時間是有序的,watermark 時間戳會隨著資料元素的事件時間按順 序生成,此時水位線的變化和事件時間保持一直(因為既然是有序的時間,就不需要設定延 遲了,那麼 t 就是 0。所以 watermark=maxtime-0 = maxtime),也就是理想狀態下的水位 線。當 watermark 時間大於 windows 結束時間就會觸發對 windows 的資料計算,以此類推,下乙個 window 也是一樣。

2.亂序事件中的 watermark

現實情況下資料元素往往並不是按照其產生順序接入到 flink 系統中進行處理,而頻繁出現亂序或遲到的情況,這種情況就需要使用 watermarks 來應對。比如下圖,設定延遲時 間t為2

3.並行資料流中的 watermark

在多並行度的情況下,watermark 會有乙個對齊機制,這個對齊機制會取所有 channel中最小的 watermark。

1.有序的watermark

object watermark1 ).assignascendingtimestamps(_.calltime) // 資料有序的公升序watermark

.filter(_.calltype.equals("success"))

.keyby(_.sid)

.timewindow(time.seconds(10), time.seconds(5))

.reduce(new myreducefunction(), new returnmaxtimewindowfunction)

env.execute("assignascendingtimestampsdemo")

}

2.無序的watermark
object watermark2 )

// 資料是亂序的,延遲時間為3秒,週期性watermark

/*** 第一種實現

*/val ds = stream.assigntimestampsandwatermarks(new boundedoutofordernesstimestampextractor[log](time.seconds(3))

})/**

* 第二種實現

*/val ds2 = stream.assigntimestampsandwatermarks(new assignerwithperiodicwatermarks[log]

// 設定eventtime是哪個屬性

override def extracttimestamp(element: log, previouselementtimestamp: long): long =

})env.execute("assigntimestampsandwatermarksdemo")

}

with punctuated(間斷性的) watermark
val env = streamexecutionenvironment.getexecutionenvironment

// 使用eventtime

env.setstreamtimecharacteristic(timecharacteristic.eventtime)

//讀取檔案資料

val data = env.sockettextstream("flink101",8888)

.map(line=>)

// 生成watermark

data.assigntimestampsandwatermarks(

new mycustomerpunctuatedwatermarks(3000l)) //自定義延遲

}class mycustomerpunctuatedwatermarks(delay:long) extends assignerwithpunctuatedwatermarks[stationlog]else }

override def extracttimestamp(element: stationlog, previouselementtimestamp: long): long =

}

以上三種watermark的實現,根據資料的事件時間是否有延遲和業務需求選擇相應的生成watermark的方法。

flink watermark原理總結

通過視窗對input按照eventtime進行聚合,使得大體按照event time 發生的順序去處理資料,同時利用watermark來觸發視窗。watermark window機制 watermark是flink為了處理eventtime時間型別 其他時間型別不考慮亂序問題 的視窗計算提出的一種機...

Flink WaterMark機制白話分析

最近遇見乙個流處理的資料嚴重遲到亂序的場景,基於saprk streaming開發的統計使用者頁面停留時間。使用的思想是 遲到資料的時間補償機制。由於spark不支援亂序的支援,所以自行實現了乙個容器儲存一定量的歷史資料,最後對遲到的資料插到歷史容器中,對插入資料的位置進行區域性計算求補償時間最後新...

Flink WaterMark 的了解 更新

我來講下 我理解的watermark 一起 如果要去了解乙個東西 我認為需要從以下幾個點去分析 1.是什麼 概念一點 2.解決了什麼問題以及是如何解決的 3.如果去使用 如何下手 4.他會牽扯的一些問題 好了 讓我們去 這個東西 1.首先 watermark 是乙個全域性標籤,本身是乙個時間戳 2....