Spark SQL中自定義函式詳解

2021-10-11 16:36:34 字數 2202 閱讀 4073

資料來源:

初始化sparksession

package com.kfk.spark.common

import org.apache.spark.sql.sparksession

/** * @author : 蔡政潔

* @email :[email protected]

* @date : 2020/12/2

* @time : 10:02 下午

*/object commsparksessionscala

}

自定義udf將內容改為大寫:

package com.kfk.spark.sql

import com.kfk.spark.common.

/** * @author : 蔡政潔

* @email :[email protected]

* @date : 2020/12/8

* @time : 4:46 下午

*/object udfscala

}

package com.kfk.spark.sql

import org.apache.spark.sql.row

import org.apache.spark.sql.expressions.

import org.apache.spark.sql.types.

/** * @author : 蔡政潔

* @email :[email protected]

* @date : 2020/12/8

* @time : 7:13 下午

*/class mycount extends userdefinedaggregatefunction

/** * 中間進行聚合的時候所處理的資料型別

* @return

*/override

def bufferschema: structtype =

/** * 返回型別

* @return

*/override

def datatype: datatype =

/** * 校驗返回值

* @return

*/override

def deterministic:

boolean

=/**

* 為每個分組的資料執行初始化操作

* @param buffer

*/override

def initialize(buffer: mutableaggregationbuffer)

:unit

=/**

* 每個分組有新的資料進來的時候,如何進行分組對應的聚合值的計算

* @param buffer

* @param input

*/override

def update(buffer: mutableaggregationbuffer, input: row)

:unit

=/**

* 在每乙個節點上的集合值要進行最後的merge

* @param buffer1

* @param buffer2

*/override

def merge(buffer1: mutableaggregationbuffer, buffer2: row)

:unit

=/**

* 返回最終結果

* @param buffer

* @return

*/override

def evaluate(buffer: row)

:any

=}

自定義聚合函式udaf,count

package com.kfk.spark.sql

import com.kfk.spark.common.

/** * @author : 蔡政潔

* @email :[email protected]

* @date : 2020/12/8

* @time : 8:06 下午

*/object udafscala

}

Spark Sql之UDAF自定義聚合函式

udaf user defined aggregate function。使用者自定義聚合函式 我們可能下意識的認為udaf是需要和group by一起使用的,實際上udaf可以跟group by一起使用,也可以不跟group by一起使用,這個其實比較好理解,聯想到mysql中的max min等函...

Spark SQL自定義函式 第五章

1.自定義函式分類 類似於hive當中的自定義函式,spark同樣可以使用自定義函式來實現新的功能。spark中的自定義函式有如下3類 1.udf user defined function 輸入一行,輸出一行 2.udaf user defined aggregation funcation 輸入...

Spark sql 自定義讀取資料源

通常在乙個流式計算的主流程裡,會用到很多對映資料,比較常見的是text文件,但是文件讀進來之後還要匹配相應的schema,本文通過自定義textsource資料來源,自動讀取預設的schema。defaultsource.scala package com.wxx.bigdata.sql custo...