Flink的Aggregate運算元的用法

2021-10-10 07:04:11 字數 2874 閱讀 8588

如果定義了window assigner 之後,下一步就可以定義視窗內資料的計算邏輯,這也就是 window function 的定義。

flink 中提供了四種型別的 window function , 分別為reducefunction、aggregatefunction 以及 processwindowfunction,(sum 和 max)等。

前三種型別的 window fucntion 按照計算原理的不同可以分為兩大類:

一類是增量聚合函式:對應有 reducefunction、aggregatefunction;

另一類是全量視窗函式,對應有 processwindowfunction(還有 windowfunction)。增量聚合函式計算效能較高,占用儲存空間少,主要因為基於中間狀態的計算結果,視窗中只維護中間結果狀態值,不需要快取原始資料。而全量視窗函式使用的代價相對較高,   效能比較弱,主要因為此時運算元需要對所有屬於該視窗的接入資料進行快取,然後等到視窗觸發的時候,對所有的原始資料進行彙總計算。

下面是aggregatefunction一種用法

package com.jh.windows

import org.apache.flink.api.common.functions.aggregatefunction

import org.apache.flink.streaming.api.scala._

import org.apache.flink.streaming.api.scala.function.windowfunction

import org.apache.flink.streaming.api.windowing.assigners.slidingprocessingtimewindows

import org.apache.flink.streaming.api.windowing.time.time

import org.apache.flink.streaming.api.windowing.windows.timewindow

import org.apache.flink.util.collector

object testaggregatefuntionwindow

) stream.map(log => (log.sid, 1.tolong))

.keyby(_._1)//分組

.window(slidingprocessingtimewindows.of(time.seconds(5), time.seconds(3)))//開窗

.aggregate(new myaggregate, new mywindows)//增量聚合

.print()

env.execute("start --->")

} /* windowfunction 需要傳入4個引數:

* in:是輸入的型別 (他的輸入就是aggregate方法的輸出)

* out:是輸出的型別

* key:key的型別

* w <: window :視窗的型別

* */

class mywindows extends windowfunction[long, (string, long), string, timewindow]

}/*aggregate(new myaggregate): 傳入乙個引數的時候返回值是這樣為何?

1> 4

3> 3

3> 5

2> 3

1> 9

因為在aggregate的方法中

override def getresult(accumulator: long): long = accumulator

返回的就是乙個 累加的數值。

這樣我們就沒法知道是哪個key的資料。

(preaggregator: aggregatefunction[t, acc, v],windowfunction: windowfunction[v, r, k, w])

然後我們使用windowfuntion來處理這種問題,傳入兩個引數 ,把aggregate的資料給windowfuntion處理。 */

/*裡面的add方法, 是來一條資料執行一次 ,getresult,在從視窗結束的時候執行一次*/

class myaggregate extends aggregatefunction[(string, long), long, long] }

/* sid: string : 基站的id

callout: string 主叫號碼

callin: string 被叫號碼

calltype: string 呼叫型別

calltime 呼叫時間

duration 通話時長

資料的樣式

station_9,18600005798,18900002238,busy,1577080455129,0

station_4,18600008825,18900008585,busy,1577080457129,0

station_6,18600005404,18900000558,success,1577080457129,5

station_2,18600002658,18900002018,busy,1577080457129,0

station_2,18600004925,18900001911,busy,1577080457129,0

station_5,18600003713,18900000824,busy,1577080457129,0

*/case class stationlog(sid: string, callout: string,callin: string,calltype: string,calltime:long,duration:long)

mongodb中的aggregate 聚合查詢

aggregate類似於pipe.拆分結果然後對結果進行分析求值然後再返回新結果.mongodb聚合 官方api mongodb aggregate 運用篇 個人總結 fycayy 案例一案例二 案例三 那麼aggregate有什麼作用呢?舉個例子 testname文件中有如下幾個集合 集合一 集合...

spark中aggregate函式的應用與問題

aggregate是rdd中比較常用的乙個方法,其功能是使用者傳入的函式進行運算得出結果,屬於action動作。先看下此方法宣告,def aggregate u classtag zerovalue u seqop u,t u,combop u,u u u從原始碼可以看出,aggregate定義的是...

flink學習 flink架構

flink結構 graph 2個併發度 source為1個併發度 的sockettextstreamwordcount四層執行圖的演變過程 jobgraph streamgraph經過優化後生成了 jobgraph,提交給 jobmanager 的資料結構。executiongraph jobman...