flink sql 新增函式

2021-10-19 18:07:15 字數 2531 閱讀 6874

修改 flink-table-common專案

org.apache.flink.table.functions.builtinfunctiondefinitions 類

增加乙個成員:

public static final builtinfunctiondefinition nvl =

new builtinfunctiondefinition.builder()

.name("nvl")

.kind(scalar)

.outputtypestrategy(typestrategies.missing)

.build();

flink獲取函式列表,會使用放射獲取這個類的全部成員。所以這一步註冊是必須的。

新增乙個類,證明這個函式

case class nvl(child: plannerexpression, value: plannerexpression) extends plannerexpression with inputtypespec
org.apache.flink.table.planner.expressions.plannerexpressionconverter#translatecall

增加一行模式匹配

case nvl =>

assert(args.size == 2)

nvl(args.head, args(1))

org.apache.flink.table.planner.functions.sql.flinksqloperatortable

增加成員:

public static final  sqlfunction nvl = new sqlfunction(

"nvl",

sqlkind.other_function,

returntypes.cascade(

new firstoperandreturntypeinference(),

sqltypetransforms.to_nullable),

null,

new repeatfamilyoperandtypechecker(sqltypefamily.character),

sqlfunctioncategory.string

);

firstoperandreturntypeinference,表示使用函式的第乙個引數作為返回值型別。

/**

* function中,以第乙個引數的型別作為返回值型別。

*/public class firstoperandreturntypeinference implements sqlreturntypeinference

@override

public reldatatype inferreturntype(sqloperatorbinding opbinding)

}

org.apache.flink.table.planner.expressions.rexnodeconverter

增加乙個行:

// 註冊

conversionsofbuiltinfunc

.put(builtinfunctiondefinitions.nvl, exprs -> convert(flinksqloperatortable.nvl, exprs));

org.apache.flink.table.planner.codegen.calls.stringcallgen#generatecallexpression

增加一行匹配:

case nvl =>

generatenvl(ctx, operands)

generatenvl 方法**如下:

val paic_util: string = classname[paicfunctionutils]

// 這裡就是**生成的邏輯

def generatenvl(

ctx: codegeneratorcontext,

operands: seq[generatedexpression]): generatedexpression = )"

}}

paicfunctionutils **如下:

package org.apache.flink.table.util;

import org.apache.flink.table.dataformat.binarystring;

public class paicfunctionutils

public static final integer nvl(integer left, integer right)

public static final double nvl(double left, double right)

}

Flink SQL中Timestamp使用的坑

flink版本為1.10。flink sql消費kafka訊息,表定義為 create table start log source mid id varchar user id int,13位的時間戳 1587975971431 interval 5 second 在ts上定義5 秒延遲的 wat...

Flink SQL 如何實現列轉行

在 sql 任務裡面經常會遇到一列轉多行的需求,今天就來總結一下在 flink sql 裡面如何實現列轉行的,先來看下面的乙個具體案例.需求原始資料格式如下 name data jasonlee data 格式化 現在希望得到的資料格式是這樣的 name content type urljasonl...

Flink SQL 實時計算UV指標

用乙個接地氣的案例來介紹如何實時計算 uv 資料。大家都知道,在 toc 的網際網路公司,uv 是乙個很重要的指標,對於老闆 商務 運營的及時決策會產生很大的影響,筆者在電商公司,目前主要的工作就是計算 uv 銷售等各類實時資料,體驗就特別深刻,因此就用乙個簡單demo 演示如何用 flink sq...