spark中使用自定義UDAF

2022-08-21 00:52:49 字數 1295 閱讀 7771

hive中的自定義函式udaf

udaf(user- defined aggregation funcation),使用者自定義弱型別聚合函式

所有的udaf函式在記憶體裡都是一塊buffer(緩衝區),這個換成區被分成了多個塊,每個塊有乙個index,從0開始。聚合乙個資料時,會占用編號為0的塊。

遍歷表中的每一行資料,然後扔到udaf中做聚合,先把buffer中已存的資料拿出來和新的資料做合併,然後再扔到buffer中,下次在拿乙個新的資料做相同的過程。

二 自定義去重udaf

import org.apache.spark.sql.row

import org.apache.spark.sql.expressions.

import org.apache.spark.sql.types.

class groupconcatdistinct extends userdefinedaggregatefunction

override def update(buffer: mutableaggregationbuffer, input: row): unit = else

//對buffer進行更新,更新的塊是0,更新的資料是buffercityinfo

buffer.update(0, buffercityinfo)}}

//將兩個自定義udaf的值彙總到一起

override def merge(buffer1: mutableaggregationbuffer, buffer2: row): unit = else}}

buffer1.update(0, buffercityinfo1)

}//獲取最後的值

override def evaluate(buffer: row): any =

}三 udaf在spark中的註冊使用

註冊:(1)sparksession.udf.register("concat_long_string", (v1:long, v2:string, split:string) =>)

(2)sparksession.udf.register("group_concat_distinct", new groupconcatdistinct)

使用:val sql = "select area,pid,count(*) click_count,"+"group_concat_distinct(concat_long_string(city_id,city_name,':')) " +"city_infos" +" from tmp_area_basic_info group by area,pid "

Spark Sql之UDAF自定義聚合函式

udaf user defined aggregate function。使用者自定義聚合函式 我們可能下意識的認為udaf是需要和group by一起使用的,實際上udaf可以跟group by一起使用,也可以不跟group by一起使用,這個其實比較好理解,聯想到mysql中的max min等函...

Spark自定義排序

在這之前,我們先準備一些資料,使用rdd存放 獲得sparkcontext val conf sparkconf newsparkconf setmaster local 2 val sc newsparkcontext conf val rdd sc.parallelize list 公尺家雷射投...

Spark自定義排序

spark支援我們自定義,只需要繼承相應的類就可以了,我在下面準備了乙個用身高和年齡做二次排序的例子,希望可以幫到大家 首先寫乙個排序類 名字 年齡 身高 class people val name string val age int,val hight int extends ordered p...