Flink遲到資料處理

2021-10-07 15:04:37 字數 2052 閱讀 7243

event time語義下我們使用watermark來判斷資料是否遲到。乙個遲到元素是指元素到達視窗運算元時,該元素本該被分配到某個視窗,但由於延遲,視窗已經觸發計算。目前flink有三種處理遲到資料的方式:

(1)直接將遲到資料丟棄:會造成資料丟失

(2)將遲到資料傳送到另乙個流:輸出到側輸出流,保證資料的完整性,可更新計算結果

(3)重新執行一次計算,將遲到資料考慮進來,更新計算結果:資料準確率高,保證資料完整性

import org.apache.flink.api.common.functions.

import org.apache.flink.streaming.api.timecharacteristic

import org.apache.flink.streaming.api.functions.timestamps.boundedoutofordernesstimestampextractor

import org.apache.flink.streaming.api.scala._

import org.apache.flink.streaming.api.scala.function.windowfunction

import org.apache.flink.streaming.api.windowing.time.time

import org.apache.flink.streaming.api.windowing.windows.timewindow

import org.apache.flink.util.collector

object allowedlatenessdemo )

val ds = stream.assigntimestampsandwatermarks(new boundedoutofordernesstimestampextractor[log](time.seconds(2))

})// 定義乙個側輸出流的標籤

val latetag = new outputtag[log]("late")

// 分組,開窗

val result = ds.keyby(_.sid)

.timewindow(time.seconds(10), time.seconds(5))

// 設定遲到的資料超過了2秒的情況下,交給allowedlateness處理

// 也分兩種情況,第一種:允許資料遲到5秒(遲到2-5秒),再次遲到觸發視窗函式,觸發的條件是 watermark < end-of-window + allowedlateness

// 第二種:遲到的資料在5秒以上,輸出到側流中

.allowedlateness(time.seconds(5)) // 執行資料遲到5秒,還可以觸發視窗

.sideoutputlatedata(latetag)

.aggregate(new myaggregatecountfunction, new outputresultwindowfunction) // 視窗聚合函式

// 正常的結果資料

result.print("normal data")

result.getsideoutput(latetag).print("late data") // 遲到時間超過5秒的資料,根據業務做處理,如果正常資料儲存到mysql中,遲到的資料需要進行update

env.execute("allowedlatenessdemo")

} // 統計通話的次數

class myaggregatecountfunction extends aggregatefunction[log, long, long]

// aggregatefunction 輸出是這個函式的輸入

class outputresultwindowfunction extends windowfunction[long, string, string, timewindow]

}}

allowedlateness設定視窗結束後還要等待長為lateness的時間,某個遲到元素的event time大於視窗結束時間但是小於結束時間+lateness,該元素仍然會被加入到該視窗中。

資料處理 流資料處理利器

流處理 stream processing 是一種計算機程式設計正規化,其允許給定乙個資料序列 流處理資料來源 一系列資料操作 函式 被應用到流中的每個元素。同時流處理工具可以顯著提高程式設計師的開發效率,允許他們編寫有效 乾淨和簡潔的 流資料處理在我們的日常工作中非常常見,舉個例子,我們在業務開發...

爬蟲 資料處理 pandas資料處理

使用duplicated 函式檢測重複的行,返回元素為布林型別的series物件,每個元素對應一行,如果該行不是第一次出現,則元素為true keep引數 指定保留哪一重複的行資料 dataframe替換操作 使用df.std 函式可以求得dataframe物件每一列的標準差 資料清洗清洗重複值 清...

資料處理 pandas資料處理優化方法小結

資料處理時使用最多的就是pandas庫,pandas在資料處理方面很強大,整合了資料處理和資料視覺化。pandas的視覺化使用的是matplotlib。回到主題 計算資料的某個欄位的所有值,對其欄位所有值進行運算 處理的字段資料為時間戳,需要計算該時間戳距離現在的時間,單位為天。一般方法 使用現在的...