Spark之使用者自定義 聚合 函式

2021-08-14 21:26:12 字數 3012 閱讀 8569

packagecom.lyzx.reviewday30

importorg.apache.spark.sql.types._

importorg.apache.spark.sql.

importorg.apache.spark.

classt1 )

resultdf.write.parquet("./person")

} /**

* 讀取

parquet

格式的檔案

*@param sc

*/deff2(sc:sparkcontext): unit =

/*** 簡單的案例

* 2017-11-12,270,11733

*

統計每個月的的銷售額要帶有資料清洗功能

*/deff3(sc:sparkcontext): unit =

/*** udf

* user defined function

*/deff4(sc:sparkcontext):unit =)

sqlctx.sql("select name,strlen(name) as len from names").show()

} /**

* udaf

*/deff5(sc:sparkcontext): unit =

valstructtype = structtype(array(structfield("name"

, stringtype,

true)))

valnamesdf = sqlcontext.createdataframe(namesrowrdd, structtype)

namesdf.registertemptable("names")

sqlcontext.udf

.register("strcount"

,newstringcount)

// 使用自定義函式

sqlcontext.sql("select name,strcount(name) from names group by name")

.collect()

.foreach(println)

}deff6(sc:sparkcontext): unit =

for(v <- itr)yieldv

}).collect()

valschema = structtype(array(structfield("w"

,stringtype,

false)))

valdf = sqlctx.createdataframe(data,schema)

df.registertemptable("words")

sqlctx.udf

.register("mycount"

,newmyudaf)

sqlctx.sql("select w,mycount(w) from words group by w").show()

}}objectt1

}

packagecom.lyzx.reviewday30

importorg.apache.spark.sql.row

importorg.apache.spark.sql.expressions.

importorg.apache.spark.sql.types._

/***

這個聚合函式實現

count

函式的功能

*/classmyudafextendsuserdefinedaggregatefunction

//聚合過程中的中間結果集型別

override defbufferschema: structtype =

override defdatatype: datatype =

override defdeterministic: boolean =

//初始值override definitialize(buffer: mutableaggregationbuffer): unit =

//相當於map

的combiner

,buffer

裡面存放著累計的執行結果,

input

是當前的執行結果

override defupdate(buffer:mutableaggregationbuffer, input: row): unit =

//相當於reduce

端的合併

override defmerge(buffer1: mutableaggregationbuffer, buffer2: row): unit =

override defevaluate(buffer: row): any =

}

SQL之使用者自定義函式

使用者自定義函式 user defined functions 是sql server 的資料庫物件,它 不能用於執行一系列改變資料庫狀態的操作,但它可以像系統函式一樣在查詢或儲存過程等的程式段中使用,也可以像儲存過程一樣通過 execute 命令來執行。使用者自定義函式中儲存了乙個 transac...

自定義聚合函式

新建database project 新建concatenate class using system using system.data using microsoft.sqlserver.server using system.data.sqltypes using system.io usin...

自定義聚合函式

create or replace type string sum obj as object 聚合函式的實質就是乙個物件 sum string varchar2 4000 static function odciaggregateinitialize v self in out string su...