Spark RDD運算元介紹

2021-07-26 23:29:08 字數 4562 閱讀 6564

spark學習筆記總結

spark可以用於批處理、互動式查詢(spark sql)、實時流處理(spark streaming)、機器學習(spark mllib)和圖計算(graphx)。

spark是mapreduce的替代方案,而且相容hdfs、hive,可融入hadoop的生態系統,以彌補mapreduce的不足。

spark-shell是spark自帶的互動式shell程式,使用者可以在該命令列下用scala編寫spark程式。

直接啟動spark-shell,實質是spark的local模式,在master:8080中並未顯示客戶端連線。

集群模式:

/usr/local/spark/bin/spark-shell \

--master spark: \

--executor-memory 2g \

--total-executor-cores 2

spark-shell中編寫wordcount

sc.textfile("hdfs:").flatmap(.split(" ")).map((,1)).reducebykey(+).sortby(_._2,false).collect

1. 介紹

rdd(resilient distributed dataset)叫做分布式資料集,是spark中最基本的資料抽象,它代表乙個不可變(建立了內容不可變)、可分割槽、裡面的元素可平行計算的集合。

2. 屬性:

由多個分割槽組成。對於rdd來說,每個分片都會被乙個計算任務處理,並決定平行計算的粒度。

乙個計算函式用於每個分割槽。spark中rdd的計算是以分片為單位的,每個rdd都會實現compute函式以達到這個目的。

rdd之間的依賴關係。rdd的每次轉換都會生成乙個新的rdd,所以rdd之間就會形成類似於流水線一樣的前後依賴關係。資料丟失時,根據依賴重新計算丟失的分割槽而不是整個分割槽。

乙個partitioner,即rdd的分片函式。預設是hashpartition

分割槽資料的最佳位置去計算。就是將計算任務分配到其所要處理資料塊的儲存位置。資料本地化。

3. 建立方式:

可通過並行化scala集合建立rdd

val rdd1 = sc.parallelize(array(1,2,3,4,5,6,7,8))

通過hdfs支援的檔案系統建立,rdd裡沒有真的資料,只是記錄了元資料

val rdd2 = sc.textfile("hdfs:")

檢視該rdd的分割槽數量

rdd1.partitions.length

rdd中兩種運算元:

transformation轉換,是延遲載入的

常用的transformation:

(1)map、flatmap、filter

(2)intersection求交集、union求並集:注意型別要一致

distinct:去重

(3)join:型別為(k,v)和(k,w)的rdd上呼叫,返回乙個相同key對應的所有元素對在一起的(k,(v,w))的rdd

(4)groupbykey:在乙個(k,v)的rdd上呼叫,返回乙個(k, iterator[v])的rdd

但是效率reducebykey較高,因為有乙個本地combiner的過程。

(5)cartesian笛卡爾積

常用的action

(1)collect()、count()

(2)reduce:通過func函式聚集rdd中的所有元素

(3)take(n):取前n個;top(2):排序取前兩個

(4)takeordered(n),排完序後取前n個

參考《

val rdd1 = sc.parallelize(list(1,2,3,4,5,6,7,8,9), 2)

val func = (index: int, iter: iterator[(int)]) =>

(2)aggregate

action操作,

第乙個引數是初始值,

第二個引數:是2個函式[每個函式都是2個引數(第乙個引數:先對個個分割槽進行的操作, 第二個:對個個分割槽合併後的結果再進行合併), 輸出乙個引數]

例子:

rdd1.aggregate(0)(_+_, _+_)

//前乙個是對每乙個分割槽進行的操作,第二個是對各分割槽結果進行的結果

rdd1.aggregate(5)(math.max(_, _), _ + _)

//結果:5 + (5+9) = 19

val rdd3 = sc.parallelize(list("12","23","345","4567"),2)

rdd3.aggregate("")((x,y) => math.max(x.length, y.length).tostring, (x,y) => x + y)

//結果:24或者42

val rdd4 = sc.parallelize(list("12","23","345",""),2)

rdd4.aggregate("")((x,y) => math.min(x.length, y.length).tostring, (x,y) => x + y)

//結果01或者10

(3)aggregatebykey

將key值相同的,先區域性操作,再整體操作。。和reducebykey內部實現差不多

val pairrdd = sc.parallelize(list( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)

pairrdd.aggregatebykey(0)(math.max(_, _), _ + _).collect

//結果:array((dog,12), (cat,17), (mouse,6))

ps:

和reducebykey(+)呼叫的都是同乙個方法,只是aggregatebykey要底層一些,可以先區域性再整體操作。

(4)combinebykey

和reducebykey是相同的效果,是reducebykey的底層。

第乙個引數x:原封不動取出來, 第二個引數:是函式, 區域性運算, 第三個:是函式, 對區域性運算後的結果再做運算

每個分割槽中每個key中value中的第乙個值,

val rdd1 = sc.textfile("hdfs://master:9000/wordcount/input/").flatmap(_.split(" ")).map((_, 1))

val rdd2 = rdd1.combinebykey(x => x, (a: int, b: int) => a + b, (m: int, n: int) => m + n)

rdd2.collect

第乙個引數的含義:

每個分割槽中相同的key中value中的第乙個值

如:(hello,1)(hello,1)(good,1)-->(hello(1,1),good(1))-->x就相當於hello的第乙個1, good中的1

val rdd3 = rdd1.combinebykey(x => x + 10, (a: int, b: int) => a + b, (m: int, n: int) => m + n)

rdd3.collect

//每個會多加3個10

val rdd4 = sc.parallelize(list("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)

val rdd5 = sc.parallelize(list(1,1,2,2,2,1,2,2,2), 3)

val rdd6 = rdd5.zip(rdd4)

val rdd7 = rdd6.combinebykey(list(_), (x: list[string], y: string) => x :+ y, (m: list[string], n: list[string]) => m ++ n)

//將key相同的資料,放入乙個集合中

(5)collectasmap

action

map(b -> 2, a -> 1)//將array的元祖轉換成map,以後可以通過key取值

val rdd = sc.parallelize(list(("a", 1), ("b", 2)))

rdd.collectasmap

//可以下一步使用

(6)countbykey

根據key計算key的數量

action

val rdd1 = sc.parallelize(list(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))

rdd1.countbykey

rdd1.countbyvalue//將("a", 1)當做乙個元素,統計其出現的次數

(7)flatmapvalues

對每乙個value進行操作後壓平

Spark RDD運算元介紹

spark學習筆記總結 spark可以用於批處理 互動式查詢 spark sql 實時流處理 spark streaming 機器學習 spark mllib 和圖計算 graphx spark是mapreduce的替代方案,而且相容hdfs hive,可融入hadoop的生態系統,以彌補mapre...

Spark RDD運算元介紹

spark學習筆記總結 spark可以用於批處理 互動式查詢 spark sql 實時流處理 spark streaming 機器學習 spark mllib 和圖計算 graphx spark是mapreduce的替代方案,而且相容hdfs hive,可融入hadoop的生態系統,以彌補mapre...

SparkRDD運算元 sample運算元

val newrdd oldrdd.sample withreplacement,fraction,seed withreplacement表示是抽出的資料是否放回,true為有放回的抽樣,false為無放回的抽樣 fraction表示隨機抽樣的資料數量 seed用於指定隨機數生成器種子 def s...