1、主類
packagetowstream
/*** @program: demo
* @description: $
* @author
: yang
* @create: 2020-12-31 11:39 */
import
org.apache.flink.api.common.state.
import
org.apache.flink.api.scala.typeutils.types
import
org.apache.flink.streaming.api.timecharacteristic
import
org.apache.flink.streaming.api.functions.co.keyedcoprocessfunction
import
org.apache.flink.streaming.api.scala._
import
org.apache.flink.util.collector
object twostreamjoindemo
//進入同一條流中的資料肯定是同乙個key,即orderid
class matchfunction extends
keyedcoprocessfunction[string, orderevent, payevent, string]
else
}override def processelement2(value: _root_.towstream.twostreamjoindemo.payevent, ctx: _root_.org.apache.flink.streaming.api.functions.co.keyedcoprocessfunction[_root_.scala.predef.string, _root_.towstream.twostreamjoindemo.orderevent, _root_.towstream.twostreamjoindemo.payevent, _root_.scala.predef.string]#context, out: _root_.org.apache.flink.util.collector[_root_.scala.predef.string]): unit =
else
}override def ontimer(timestamp: long, ctx: keyedcoprocessfunction[string, orderevent, payevent, string]#ontimercontext, out: collector[string]): unit = 的兩條流沒有對賬成功!")
orderstate.clear()
}if (paystate.value() != null
) 的兩條流沒有對賬成功!")
paystate.clear()}}
}}
2、結果
處理器2:訂單id為 orderevent(order_1,pay,2000)==payevent(order_1,weixin,7000) 的兩條流對賬成功!處理器2:訂單id為 orderevent(order_2,pay,5000)==payevent(order_2,weixin,8000) 的兩條流對賬成功!
訂單id為 order_3 的兩條流沒有對賬成功!
訂單id為 order_4 的兩條流沒有對賬成功!
Flink 雙流合併之connect Demo2
1 主類 package towstream program demo description author yang create 2020 12 31 11 39 import org.apache.flink.api.common.state.import org.apache.flink.a...
Flink 實現 雙流Join
將五分鐘之內的訂單資訊和支付資訊進行對賬,對不上的發出警告 import org.apache.flink.api.common.state.import org.apache.flink.api.scala.typeutils.types import org.apache.flink.strea...
Flink實時對賬 雙流join
import org.apache.flink.api.common.state.valuestatedescriptor import org.apache.flink.api.scala.typeutils.types import org.apache.flink.streaming.api....