Flink 雙流合併之connect Demo2

2022-01-22 06:20:49 字數 4198 閱讀 2270

1、主類

package

towstream

/*** @program: demo

* @description: $

* @author

: yang

* @create: 2020-12-31 11:39 */

import

org.apache.flink.api.common.state.

import

org.apache.flink.api.scala.typeutils.types

import

org.apache.flink.streaming.api.timecharacteristic

import

org.apache.flink.streaming.api.functions.co.keyedcoprocessfunction

import

org.apache.flink.streaming.api.functions.source.sourcefunction

import

org.apache.flink.streaming.api.scala._

import

org.apache.flink.util.collector

import

scala.util.random

object twostreamjoindemo

}override def cancel(): unit = false

}).assignascendingtimestamps(_.eventtime).keyby(_.orderid)

//自定義資料來源2

val pays: keyedstream[payevent, string] = env.addsource(new

sourcefunction[payevent]

}override def cancel(): unit = false

}).assignascendingtimestamps(_.eventtime).keyby(_.orderid)

val processed = orders.connect(pays).process(new

matchfunction)

processed.print()

processed.getsideoutput(unmatchedorders).print()

processed.getsideoutput(unmatchedpays).print()

env.execute()

} //進入同一條流中的資料肯定是同乙個key,即orderid

class matchfunction extends

keyedcoprocessfunction[string, orderevent, payevent, string]

else

}override def processelement2(value: _root_.towstream.twostreamjoindemo.payevent, ctx: _root_.org.apache.flink.streaming.api.functions.co.keyedcoprocessfunction[_root_.scala.predef.string, _root_.towstream.twostreamjoindemo.orderevent, _root_.towstream.twostreamjoindemo.payevent, _root_.scala.predef.string]#context, out: _root_.org.apache.flink.util.collector[_root_.scala.predef.string]): unit =

else

}override def ontimer(timestamp: long, ctx: keyedcoprocessfunction[string, orderevent, payevent, string]#ontimercontext, out: collector[string]): unit = 的兩條流沒有對賬成功!")

orderstate.clear()

}if (paystate.value() != null

) 的兩條流沒有對賬成功!")

paystate.clear()}}

}}

2、結果

(source1:order_3:pay,1609753528069)

(source2:order_3:weixin,1609753528069)

處理器2:訂單id為 orderevent(order_3,pay,1609753528069)==payevent(order_3,weixin,1609753528069) 的兩條流對賬成功!

(source2:order_1:weixin,1609753529085)

(source1:order_0:pay,1609753529085)

(source1:order_3:pay,1609753530086)

(source2:order_2:weixin,1609753530086)

(source1:order_1:pay,1609753531087)

(source2:order_0:weixin,1609753531087)

處理器1:訂單id為 payevent(order_1,weixin,1609753529085)==orderevent(order_1,pay,1609753531087) 的兩條流對賬成功!

處理器2:訂單id為 orderevent(order_0,pay,1609753529085)==payevent(order_0,weixin,1609753531087) 的兩條流對賬成功!

(source2:order_0:weixin,1609753532087)

(source1:order_1:pay,1609753532087)

(source2:order_1:weixin,1609753533088)

(source1:order_1:pay,1609753533088)

處理器2:訂單id為 orderevent(order_1,pay,1609753533088)==payevent(order_1,weixin,1609753533088) 的兩條流對賬成功!

訂單id為 order_3 的兩條流沒有對賬成功!

(source1:order_0:pay,1609753534088)

(source2:order_3:weixin,1609753534088)

處理器1:訂單id為 payevent(order_0,weixin,1609753532087)==orderevent(order_0,pay,1609753534088) 的兩條流對賬成功!

(source2:order_3:weixin,1609753535089)

(source1:order_0:pay,1609753535089)

訂單id為 order_3 的兩條流沒有對賬成功!

訂單id為 order_2 的兩條流沒有對賬成功!

(source2:order_2:weixin,1609753536089)

(source1:order_2:pay,1609753536089)

處理器2:訂單id為 orderevent(order_2,pay,1609753536089)==payevent(order_2,weixin,1609753536089) 的兩條流對賬成功!

(source2:order_0:weixin,1609753537090)

(source1:order_1:pay,1609753537090)

處理器2:訂單id為 orderevent(order_0,pay,1609753535089)==payevent(order_0,weixin,1609753537090) 的兩條流對賬成功!

訂單id為 order_1 的兩條流沒有對賬成功!

(source1:order_2:pay,1609753538091)

(source2:order_1:weixin,1609753538091)

訂單id為 order_1 的兩條流沒有對賬成功!

(source1:order_2:pay,1609753539091)

(source2:order_3:weixin,1609753539091)

訂單id為 order_3 的兩條流沒有對賬成功!

Flink 雙流合併之connect Demo1

1 主類 package towstream program demo description author yang create 2020 12 31 11 39 import org.apache.flink.api.common.state.import org.apache.flink.a...

Flink 實現 雙流Join

將五分鐘之內的訂單資訊和支付資訊進行對賬,對不上的發出警告 import org.apache.flink.api.common.state.import org.apache.flink.api.scala.typeutils.types import org.apache.flink.strea...

Flink實時對賬 雙流join

import org.apache.flink.api.common.state.valuestatedescriptor import org.apache.flink.api.scala.typeutils.types import org.apache.flink.streaming.api....