spark 學習筆記

2021-07-10 13:55:48 字數 3565 閱讀 5165

最近使用spark簡單的處理一些實際中的場景,感覺簡單實用,就記錄下來了。

部門使用者業績表(1000w測試資料)

使用者、部門、業績

資料載入:

val context = new sparkcontext(conf)

var data = context.textfile("data.txt")

場景1:求每個部門的總業績,並從大到小排序

data.map(_.split(",")).map(p => (p(1), p(2).toint)).reducebykey(_ + _).sortby(_._2, false).foreach(println) 

//取出部門和業績字段,對其進行按部門彙總,並進行排序即可。

場景2:求每個部門中業績最高的人

data.map(_.split(",")).map(p => (p(0), p(1), p(2).toint)).groupby(_._2)

.map(p => (p._2.map(p => (p._1,p._3)).toarray.maxby(_._2),p._1))

.sortby(p => p._1._2, false).map .foreach(println)

//step1:先對部門進行分組

//step2:找出部門中最高業績和人

//step3:排序,並去除格式

載入使用者訂單表使用者、月份、訂單

var data1 = context.textfile("/userdt.txt")
場景3:求每季度的訂單金額

data1.map(_.split(",")).map(p =>  else

if (p(1).toint > 3 && p(1).toint <= 6) else

if (p(1).toint > 6 && p(1).toint <= 9) else

}).reducebykey(_ + _).foreach(println)

//這裡用if else 對每季度進行了分組,應該可以用match匹配的,個人對match不熟。

//分完組,按組進行彙總即可

場景4:求使用者每季度訂單金額

data1.map(_.split(",")).map(p =>  else

if (p(1).toint > 3 && p(1).toint <= 6) else

if (p(1).toint > 6 && p(1).toint <= 9) else

}).groupby(p => (p._1, p._2)).map(p => (p._1, p._2.map(p => p._3).reduce(_ + _)))

.map(p => (p._1._1, p._1._2, p._2)).sortby(p => (p._1, p._2)).foreach(println)

//這裡只有月份,有年也是一樣,先用filter過濾出那一年,再按季度進行彙總

//根據使用者和季度進行分組,再使用者和季度的組對金額進行彙總

//格式化資料,並按使用者和季度排序

場景5:緯度關聯,求全國各省份2023年各季度銷售業績資料表城市資訊表(城市id,城市,省份)

訂單資訊表(使用者,年份,月份,城市id,業績)

//把城市id和省份轉化成map形式,方便獲取

var dd = context.textfile("dingdan.txt").mapelse

if(quarter>3 && quarter<=6)else

if(quarter>6 && quarter<=9)else

( sp(1),quarter, sp(3), sp(4))

}.filter(_._1.toint.equals(2015))

//對月份按季度進行分組,並過濾出2015的訂單

var data2 = dd.map(p => (p._1, p._2, cy.getorelse(p._3.toint,"其他"),p._4)).groupby(p=>(p._1,p._2,p._3))

.map(p=>(p._1,p._2.map(_._4.toint).reduce(_+_))).sortby(p=>(p._1._3,p._1._1,p._1._2))

.map

.foreach(println)

//step1:兩表關聯,訂單的城市id通過城市緯度表轉換成省份

//step2:對年份、季度、省份進行聯合分組

//step3:對訂單金額進行合計,並按省份、年份、季度進行排序,格式化後輸出

通過sqlcontext(隱式轉換的方式)來處理場景,這裡只舉乙個例子場景:求每個部門中業績最高的人

val sqlcontext=new sqlcontext(context)

import sqlcontext.implicits._ //隱式轉換

case

class

temptable

(id:string,dept:string,sar:int)

var sql=data.map(_.split(",")).map(p=>temptable(p(0),p(1),p(2).toint)).todf()

sql.registertemptable("deptinfo")

val jh= sqlcontext.sql("select a.id,a.sar,a.dept from deptinfo a inner join (select max(sar) aaa,dept from deptinfo group by dept)b on a.sar=b.aaa and a.dept=b.dept order by a.sar ")

jh.foreach(println)

//把內容轉換成臨時表,並通過sql直接編寫

spark學習筆記

1 缺省會寫成一堆小檔案,需要將其重新分割槽,直接指定幾個分割槽 spark.sql select row number over partition by depid order by salary rownum from emp repartition 2 write.parquet hdfs ...

Spark學習筆記

spark不僅僅支援mapreduce,還支援sql machine learning graph運算等,比起hadoop應用更靈活寬泛。spark 中的rdd 資料結構應對mapreduce中data replication disk io serialization引起的低效問題。rdd 類似於...

Spark學習筆記

hadoop中mapreduce計算框架是基於磁碟的,每次計算結果都會直接儲存到磁碟,下一次計算又要從磁碟中讀取,因而io消耗大,迭代計算效率很低,且模型單一,不能適應複雜需求。spark是一種基於記憶體的開源計算框架,迭代計算效率非常高。另外,mapreduce的計算是一步一步來的,而spark將...