Spark操作 行動操作 一

2021-10-06 10:19:56 字數 3709 閱讀 7085

scala> var rdd = sc.makerdd(array(("a", 1), ("a", 2), ("a", 3), ("b", 4), ("b", 5), ("c", 6), ("c", 7), ("c", 8), ("c", 9), ("d", 10)))

rdd: org.apache.spark.rdd.rdd[(string, int)] = parallelcollectionrdd[60] at makerdd at :24

scala> rdd.collect

res50: array[(string, int)] = array((a,1), (a,2), (a,3), (b,4), (b,5), (c,6), (c,7), (c,8), (c,9), (d,10))

scala> rdd.count()

res46: long = 10

scala> rdd.first()

res45: (string, int) = (a,1)

scala> rdd.reduce((x, y) => (x._1 + y._1, x._2 + y._2))

res49: (string, int) = (aaccabbccd,55)

scala> rdd.take(2)

res51: array[(string, int)] = array((a,1), (a,2))

scala> rdd.top(1)

res54: array[(string, int)] = array((d,10))

scala> rdd.takeordered(1)

res56: array[(string, int)] = array((a,1))

scala> rdd.takeordered(2)

res57: array[(string, int)] = array((a,1), (a,2))

聚合rdd中的元素,先使用seqop將rdd中每個分割槽中的t型別元素聚合成u型別,再使用combop將之前每個分割槽聚合後的u型別聚合成u型別,需要注意的是seqop和combop都會使用到zerovalue的值

// 定義rdd,設定第乙個分割槽中包含1,2,3,4,5,第二個分割槽中包含6,7,8,9,10

scala> var rdd = sc.makerdd(1 to 10, 2)

rdd: org.apache.spark.rdd.rdd[int] = parallelcollectionrdd[65] at makerdd at :24

| (partidx, iter) =>

| else

| }

| }

| part_map.iterator

| }

| }.collect

res59: array[(string, list[int])] = array((part_0,list(5, 4, 3, 2, 1)), (part_1,list(10, 9, 8, 7, 6)))

// aggregate的最後結果是58,原因是先在每個分割槽中迭代執行(x: int, y: int) => x + y,並且使用zerovalue的值1,

// 即part_0中計算過程為 1+1+2+3+4+5=16,part_1中計算過程為1+6+7+8+9+10=41

// 再將兩個分割槽中的結果執行(a: int, b: int) => a + b,並應用zerovalue的值,結果為1+16+41=58

scala> rdd.aggregate(1)(

| ,

|

| )res61: int = 58

fold操作與aggregate操作功能類似,區別在於seqop和combop是統一個函式

scala> rdd.fold(1)(

| (x, y) => x + y

| )res63: int = 58

該操作應用於(k, v)形式的rdd,返回指定k所對應的所以v值

scala> rdd.fold(1)(

| (x, y) => x + y

| )res63: int = 58

統計rdd[k, v]中每個k的數量

scala> var rdd = sc.makerdd(array(("a", 1), ("a", 2), ("a", 3), ("b", 4), ("b", 5), ("c", 6), ("c", 7), ("c", 8), ("c", 9), ("d", 10)))

rdd: org.apache.spark.rdd.rdd[(string, int)] = parallelcollectionrdd[67] at makerdd at :24

scala> rdd.countbykey()

res65: scala.collection.map[string,long] = map(d -> 1, a -> 3, b -> 2, c -> 4)

foreach遍歷rdd中的每個元素,並應用函式f。foreachpartition與foreach型別,區別在於前者對針對每個分割槽。

scala> var rdd = sc.makerdd(array(("a", 1), ("a", 2), ("a", 3), ("b", 4), ("b", 5), ("c", 6), ("c", 7), ("c", 8), ("c", 9), ("d", 10)))

rdd: org.apache.spark.rdd.rdd[(string, int)] = parallelcollectionrdd[67] at makerdd at :24

scala> rdd.foreach(println)

(a,1)

(a,3)

(c,8)

(c,6)

(c,9)

(b,4)

(a,2)

(b,5)

(d,10)

(c,7)

根據指定的排序函式f對k進行排序

scala> var rdd = sc.makerdd(array(("a", 1), ("a", 2), ("a", 3), ("b", 4), ("b", 5), ("c", 6), ("c", 7), ("c", 8), ("c", 9), ("d", 10)))

rdd: org.apache.spark.rdd.rdd[(string, int)] = parallelcollectionrdd[67] at makerdd at :24

scala> rdd.sortby(x => x).collect

res68: array[(string, int)] = array((a,1), (a,2), (a,3), (b,4), (b,5), (c,6), (c,7), (c,8), (c,9), (d,10))

scala> rdd.sortby(x => x, false).collect

res70: array[(string, int)] = array((d,10), (c,9), (c,8), (c,7), (c,6), (b,5), (b,4), (a,3), (a,2), (a,1))

參考:

[1] 郭景瞻. **spark:核心技術與案例實戰[m]. 北京:電子工業出版社, 2017.

Spark操作 轉換操作 一

基礎轉換操作 鍵值轉換操作 對rdd中的每個元素都應用乙個指定的函式,以此產生乙個新的rdd scala var rdd sc.textfile users lyf desktop test data1.txt scala rdd.map line line.split collect res16 ...

RDD行動操作

行動操作是第二種型別的rdd操作,它們會把最終求得的結果返回到驅動器程式中,或者寫入外部儲存系統中。常見的rdd行動操作 1.reduce 它接收乙個函式作為引數,這個函式要操作兩個相同的元素型別的rdd資料並返回乙個同樣型別的新元素。乙個簡單的例子就是函式 可以用它來對我們的rdd進行累加。使用r...

Spark操作 控制操作

cache和persist操作都是對rdd進行持久化,其中cache是persist採用memory only儲存級別時的乙個特例,scala var rdd sc.textfile users lyf desktop data.txt scala rdd.cache 第一次計算行數,這裡只能從本地...