Flink flink cep對於複雜事件的處理

2021-10-01 01:34:31 字數 1913 閱讀 3217

flink cep簡單理解就是使用**中的定義的規則去匹配流式資料,找出能成功匹配的資料

先理一下flink cep的**流程

先定義pattern

pattern.begin[x]("start").where(...).next("middle").where(...)
通過cep.pattern()方法將datastream轉化為patternstream
val cepresult: patternstream[event] = cep.pattern(inputdatastream, pattern)
將符合pattern的資料呼叫select方法對資料進行處理

cepresult.select(new patternselectfunction[x, string] 

}

具體的**示例
object cepfun01 

println("water-mark:", checkandgetnextwatermark(element, 0l))

timestamp

}override def checkandgetnextwatermark(lastelement: (event, long), extractedtimestamp: long): watermark =

new watermark(lastemittedwatermark)

}}).map(_._1)

/*** 1、首先要定義pattern,start的條件為id=3,next的條件為score>=3,結束條件為score>=5

* 意思是只要符合以id為3開頭,並且接下來的第一條資料的score大於等於3,第二條資料大於等於5即滿足pattern

*/val pattern = pattern.begin[event]("start").where(event => event.id == 3)

.next("middle").where(event => event.score >= 3)

.followedby("end").where(event => event.score >= 5)

/*** 2、通過cep.pattern()方法將datastream轉化為patternstream

*/val cepresult: patternstream[event] = cep.pattern(input, pattern)

input.print()

/*** 3、將符合pattern的資料呼叫select方法對資料進行處理

*/cepresult.select(new patternselectfunction[event, string]

res}

}).print()

senv.execute(this.getclass.getname)

}}// 使用了lombok依賴,方便列印的時候檢視具體資料

@data

case class event(id: int, name: string, score: double)

執行上述**,會得到如下輸出:

start:【event(3,third,3.0)】 ->middle: 【event(4,forth,4.0)】 ->end: 【event(6,fifth,6.0)】

按照event-time進行cep匹配,id-5的資料為遲到的資料,所以是:3-4-6

具體**可在github上檢視:github**位址

對於物件查詢

結論 對於物件查詢 1 使用list的時候會將物件全部取出,而使用iterate則只先將物件主鍵取出,然後在使用的時候再乙個個取出。2 list第二使用的時候會繼續重新資料庫中取出,而iterate則會先成快取中查詢,如果沒找到再去資料庫中取出。對於屬性查詢 條件 查詢快取關閉 兩者沒什麼差別,根據...

對於物件查詢

對於物件查詢 1 使用list的時候會將物件全部取出,而使用iterate則只先將物件主鍵取出,然後在使用的時候再乙個個取出。2 list第二使用的時候會繼續重新資料庫中取出,而iterate則會先成快取中查詢,如果沒找到再去資料庫中取出。對於屬性查詢 條件 查詢快取關閉 兩者沒什麼差別,根據查詢物...

對於order by子句

order by子句指定排序順序 select username from user order by username 依據username的字母順序對於查詢出來的username進行排序,預設是公升序 a z 也可以進行降序排序,必須指定desc關鍵字 在上面的sql語句變為 select us...