Flink的WaterMark,及demo例項

2022-08-24 23:48:13 字數 2215 閱讀 5845

實際生產中,由於各種原因,導致事件建立時間與處理時間不一致,收集的規定對實時推薦有較大的影響。所以一般情況時選取建立時間,然後事先建立flink的時間視窗。但是問題來了,如何保證這個視窗的時間內所有事件都到齊了?這個時候就可以設定水位線(watermark)。

概念:支援基於時間視窗操作,由於事件的時間**於源頭系統,很多時候由於網路延遲、分布式處理,以及源頭系統等各種原因導致源頭資料的事件時間可能亂序。這時可以設定乙個時間閾值,或者說水位線(watermark),其作用定義乙個最大亂序時間,比如某條日誌時間為2019-01-01 08:00:10,如果亂序最大允許時間為10s,那麼就認為2019-01-01 08:00:00之前產生的所有事件都到齊了,可以進行計算。

時間視窗:指定乙個固定時間間隔的視窗

一、滑動視窗

1、slidingeventtimewindows.of(time.second(4), time.seconds(3)):表示滑動視窗大小為4秒,滑動步長是3 秒,同時,每3秒才滑動一次;

2、每條資料存活的時間為滑動視窗的大小;

3、如果滑動視窗超過之前的視窗,那麼後面來的屬於前面視窗的資料會丟失;

4、來了一條資料,邊移動邊計算滑動視窗的資料(乙個視窗停留,計算一次,不移動,不計算 ),直至視窗到達指定位置。

計算某位置時間的公式:   

//

n:時間戳;size視窗大小;slide:滑動長度

//根據等差公式推導

an = a1 + (x-1)*s

a1 = size - slide -1x = [n - (size-slide)]/slide //

除數後再乘以slide

s =slide //

當來了一條時間戳為n的事件,就認為指定位置時間之前的所有事件都到齊了

指定位置 = (size-slide-1) + [(n-watermark) - (size-slide)]/slide * slide

二、翻滾視窗

基於時間視窗,對連續資料進行迭代計算時,不會重疊。翻滾視窗是乙個特殊的滑動視窗,當視窗的長度等於滑動的長度時,滑動視窗就是翻滾視窗。

計算某位置時間的公式:

指定位置 = -1 + (n-watermark)/size * size     //

除數後再乘以size,size為視窗大小,n為時間戳

三、會話視窗

時間間隔達到一定時間長度時才進行統計計算。

##測試**(需要集群telnet乙個producer):

package

com.cjs

import

org.apache.flink.streaming.api.timecharacteristic

import

org.apache.flink.streaming.api.functions.timestamps.boundedoutofordernesstimestampextractor

import

org.apache.flink.streaming.api.scala.streamexecutionenvironment

import

org.apache.flink.streaming.api.windowing.time.time

import

org.apache.flink.api.scala._

import

org.apache.flink.streaming.api.windowing.assigners.

object watermarktest

})

//提取時間戳之後,該資料流是帶有時間的,用於事件視窗

.map(x=>(x.split(" ")(1),1l)).keyby(0)

//設定使用事件時間,因為watermark是基於事件時間

senv.setstreamtimecharacteristic(timecharacteristic.eventtime)

//定義翻滾視窗

//直接輸出,沒有用到事件時間視窗,flink預設是累計統計,來乙個,統計乙個

//定義滑動視窗

stream.window(slidingeventtimewindows.of(time.seconds(4),time.seconds(2))).sum(1).print()

senv.execute("watermark")

} }

Flink學習筆記之WaterMark

event time 業務系統中事件發生的事件。通常因為各種原因會有部分延遲到達系統,所以需要進行亂序處理。ingestion time 到達流處理系統的事件,因為是在入口的地方賦值,具有流中統一不變的特性。processing time 流處理器的本地事件,因為flink是併發執行,各個處理器的本...

Flink水印機制(watermark)

flink流處理時間方式 設定flink流處理的時間型別 env.setstreamtimecharacteristic timecharacteristic.eventtime 問題 1.使用時間視窗來統計10分鐘內的使用者流量 2.有乙個時間視窗 3.有乙個資料,因為網路延遲 4.時間視窗並沒有...

資料水印 watermark

外發資料建立水印 產品通過對外發資料進行新增資料標記 自動生成水印 資料來源追溯等功能,避免了內部人員外發資料洩露無法對事件追溯,提高了資料傳遞的安全性和可追溯能力。資料水印系統 資料安全管理工具 安華金和 加密資料解密演算法 介面如果涉及敏感資料 如wx.getuserinfo當中的 openid...