flink 不設定水印 flink中的兩種水印

2021-10-13 02:33:12 字數 3888 閱讀 4181

在flink中的時間視窗中有個重要概念,就是watermark,也就是我們經常談論的水印,這裡我們不對水印的概念和使用方式進行介紹,這裡從原始碼的角度來看,如何不斷的生成水印。

在flink中,有兩種水印timestampsandpunctuatedwatermarksoperator

timestampsandperiodicwatermarksoperator

我們編寫原因水印的**如下:

//抽取timestamp和生成watermarkdatastream> watermarkstream =

inputmap.assigntimestampsandwatermarks(

new assignerwithperiodicwatermarks>() {

long currentmaxtimestamp = 0l;

final long maxoutoforderness = 10000l; // 最大允許的亂序時間是10s

@nullable

@override

public watermark getcurrentwatermark() {

return new watermark(currentmaxtimestamp - maxoutoforderness);

//定義如何提取timestamp@override

public long extracttimestamp(tuple2 element, long previouselementtimestamp) {

long timestamp = element.f1;

return timestamp;

timestampsandpunctuatedwatermarksoperator

是乙個流運算子,生成水印是根據輸入的元素,沒輸出乙個元素,就會輸出乙個水印,如果不想輸出水印,那麼就輸出乙個null,核心**

public void processelement(streamrecord element) throws exception {

final t value = element.getvalue();

// 通過使用者的**獲取到事件時間,注入到element裡面就直接往下個opeartor傳送final long newtimestamp = userfunction.extracttimestamp(value,

element.hastimestamp() ? element.gettimestamp() : long.min_value);

output.collect(element.replace(element.getvalue(), newtimestamp));

//通過使用者**獲取水印,這裡會判斷水印是否為null//不為null的就直接往下游emit 了final watermark nextwatermark = userfunction.checkandgetnextwatermark(value, newtimestamp);

if (nextwatermark != null && nextwatermark.gettimestamp() > currentwatermark) {

currentwatermark = nextwatermark.gettimestamp();

output.emitwatermark(nextwatermark);

上面的方法中,我們每乙個元素的處理,都會呼叫processelement 方法,引數就是處理的滅乙個元素,方便內部主要做下面幾件事:

1、從使用者的**中獲取事件時間,然後注入到element中,然後傳送到下乙個operator中

2、通過使用者的**獲取定義的水印,如果水印不為null,那麼就emit到下游

根據上面的分析,我們可知,如果存在水印,那麼每乙個元素後就會輸出乙個水印。

timestampsandperiodicwatermarksoperator

這也是乙個流操作,定義水印的生成方式,從類名字中的periodic,我們可以猜測這是乙個週期性生成水印的操作,我們從類中看核心**:

public void open() throws exception {

super.open();

currentwatermark = long.min_value;

// 獲取週期性生成水印的間隔watermarkinterval = getexecutionconfig().getautowatermarkinterval();

// 週期性水印,是通過處理時間來實現的,一開始會獲取當前的真實時間+我們設定的水印間隔 來作為乙個定時觸發器if (watermarkinterval > 0) {

// 獲取當前的處理時間long now = getprocessingtimeservice().getcurrentprocessingtime();

getprocessingtimeservice().registertimer(now + watermarkinterval, this);

open 方法時這個類的初始化方法,我們可以從上面的**中看到,在open方法中,先從我們的環境配置中獲取週期生成水印的時間間隔watermarkinterval ,如果時間間隔大於0,那麼就獲取當前的時間,然後註冊乙個process定時器,下次觸發的時間是now+watermarkinterval ,從這裡我們可以看到,這個類生成水印是需要借助processtime服務的。

// 到了一定的間隔時間 會觸發onprocessingtime 這個方法裡面的內容@override

public void onprocessingtime(long timestamp) throws exception {

// register next timerwatermark newwatermark = userfunction.getcurrentwatermark();

if (newwatermark != null && newwatermark.gettimestamp() > currentwatermark) {

currentwatermark = newwatermark.gettimestamp();

// emit watermark 傳送乙個水印output.emitwatermark(newwatermark);

// 繼續註冊乙個以當前時間+間隔,作為乙個定時器 ,這樣乙個週期性觸發水印往下游傳送的實現就完成了long now = getprocessingtimeservice().getcurrentprocessingtime();

getprocessingtimeservice().registertimer(now + watermarkinterval, this);

onprocessingtime 這個方法時到了定時器觸發的時候,會呼叫這個方法。這個方法主要作用如下:

1、從使用者的**中獲取watermark,如果存在watermark,並且時間大於currentwatermark,那麼就emit乙個水印到下游。

2、獲取當前時間now,然後用now+watermarkinterval 繼續註冊乙個process定時器。

public void processelement(streamrecord element) throws exception {

// 獲取事件時間,然後傳送出去final long newtimestamp = userfunction.extracttimestamp(element.getvalue(),

element.hastimestamp() ? element.gettimestamp() : long.min_value);

output.collect(element.replace(element.getvalue(), newtimestamp));

上面的processelement 方法,就是從使用者**中獲取時間,然後註冊到element中,輸出到下游。

上面我們就分析了flink中的兩種水印。

flink水印的產生方式

assignerwithpunctuatedwatermarks 每乙個event到來的時候,就會提取一次watermark assignerwithperiodicwatermarks 可以定義乙個最大允許亂序的時間,生成水印的間隔 每n毫秒 使用 executionconfig.setautow...

Flink水印機制(watermark)

flink流處理時間方式 設定flink流處理的時間型別 env.setstreamtimecharacteristic timecharacteristic.eventtime 問題 1.使用時間視窗來統計10分鐘內的使用者流量 2.有乙個時間視窗 3.有乙個資料,因為網路延遲 4.時間視窗並沒有...

Flink 事件 水印 計算的關係

看了好久的對水印的介紹,總結出以下的關係。水印 用於衡量事件時間進度的機制 為了解決亂序事件輸出正確的結果。事件 水印 計算的關係 事件生成水印的策略 1 遞增式的水印生成,適合遞增的資料,如果有不遞增的資料,那麼會被認為壞資料處理 2 週期計算,每次生成通過 週期大小,比如設定的週期是10s,那麼...