spark 常用操作

2021-09-22 19:47:47 字數 1625 閱讀 4480

該文章記錄使用的spark的基本操作

import breeze.numerics.pow

import org.apache.spark.sparkconf

import org.apache.spark.sql.sparksession

object template

val dot_udf = udf((rateing: int, rateing_v: int) => rateing * rateing_v)

// 資料庫可以儲存集合(使用udf的返回集合)

// 使用udf一般結合withcolumn使用

val dot = udata.withcolumn("dot", kismet_udf(col("rating"), col("rating")))

//*************************===細節操作******************************

// 列轉行使用explode(其中col("itemsimrating")為陣列或列表)

val useritemscore = dot.select(dot("user_id"), explode(dot("dot"))as("dot_list"))

// 倒排使用

useritemscore.orderby(col("sum_score").desc)

// map(x=(x(0),x(1))可以直接tomap

// lit給統一的值要引入(增加一行lable值為1

useritemscore.withcolumn("label", lit(1))

// 啟動執行緒數,至少是兩個。乙個執行緒用於監聽資料來源,其他執行緒用於消費或列印。至少是2個

// rdd的排序中這是false即為倒敘

udata.rdd.map(x => (x(0).tostring, x(2).tostring)).map(x => (x._1, x._2.todouble)).sortby(_._2,false)

// 取出單行資料

udata.head().getas[string]("user_id")

//******************************=join相關*************************=

// join不同欄位時

val udata_v = udata.selectexpr("user_id as user_v")

udata.join(udata_v,udata("user_id")===udata_v("user_v"))

//多欄位join

val traindata = udata_v.join(udata, seq("user_id", "item_id"), "outer").na.fill(0)

//增加自增序列col("id")

val urdd = udata.rdd.zipwithindex()

val rowrdd = dfrdd.map(tp => row.merge(tp._1,row(tp._2)))

spark.createdataframe(rowrdd,udata.schema.add(structfield("id",longtype)))

}}

Spark操作 控制操作

cache和persist操作都是對rdd進行持久化,其中cache是persist採用memory only儲存級別時的乙個特例,scala var rdd sc.textfile users lyf desktop data.txt scala rdd.cache 第一次計算行數,這裡只能從本地...

Spark 常用運算元

官網rdd操作指南 2 key value資料型別的transfromation運算元 三 連線 3 action運算元 val list list 1 2,3 sc.parallelize list map 10 foreach println 輸出結果 10 20 30 這裡為了節省篇幅去掉了換...

Spark入門 常用Spark監控Tab

最近用spark做任務,中間來回配置集群環境,檢視配置後的效果,以及監測程式執行過程中的執行進度等,需要頻繁檢視webui的幾個tab。各個tab功能不一,從不同方面顯示了spark的各方面效能引數和執行進度。特意記錄一下,方便以後用得到的時候能夠快速回顧知識點。第乙個tab是在配置好hadoop之...