Spark常用運算元 action

2021-08-21 17:59:27 字數 2755 閱讀 4504

spark action常用運算元型別如下:

1.collectasmap():map[k, v]:二元組rdd轉為map資料型別

countbykey(): map[k, long]:統計rdd中每個key出現的次數,還回map型別表示每個key出現了幾次

countbyvalue(): map[t, long]:統計rdd中每個元素出現的次數,還回map型別表示每個元素出現了幾次

val rdd2: rdd[(string, int)] = sc.parallelize(list(("a", 21), ("b", 2), ("c", 3), ("a", 3), ("d", 21), ("e", 21)), 2)

// action: collectasmap

val kv: map[string, int] = rdd2.collectasmap()

println(kv)

// countbykey

val keycount = rdd2.countbykey()

println(keycount)

// countbyvalue

val valuecount = rdd2.countbyvalue()

println(valuecount)

/**map(e -> 21, b -> 2, d -> 21, a -> 3, c -> 3)

map(e -> 1, a -> 2, b -> 1, c -> 1, d -> 1)

map((c,3) -> 1, (b,2) -> 1, (e,21) -> 1, (a,3) -> 1, (a,21) -> 1, (d,21) -> 1)

*/

2.foreach(f: t => unit): unit:迴圈遍歷rdd中每乙個元素

foreachpartition(f: iterator[t] => unit): unit:該運算元與foreach運算元類似,遍歷處理元素資料時可共享分區內資源,當需要額外物件資料時foreachpartition運算元比foreach效率高。

val rdd = sc.parallelize(list(1, 2, 3, 4, 5, 6), 2)

rdd.foreach(println(_))

rdd.foreachpartition

println(value+iterator.reduce(_*_))

}

3.aggregate[u: classtag](zerovalue: u)(seqop: (u, t) => u, combop: (u, u) => u): u:運算元中引數zerovalue是初始值;seqop是分區內task的執行邏輯,在分區內zerovalue與第乙個元素按業務邏輯聚集再依次與剩餘元素聚集,還回與zerovalue同樣資料型別;combop是聚集各分割槽的業務邏輯操作,起始也是由zerovalue與第乙個分割槽聚集結果按業務邏輯聚集,再依次與剩餘分割槽聚集結果依次聚集,輸入與輸出資料型別都與zerovalue一致。

/**

* 分區內初始值zerovalue與每個元素依次聚集

** @param zerovalue

* @param value

* @return

*/def seqop(zerovalue: arraybuffer[int], value: int): arraybuffer[int] =

/*** 各個分割槽聚集後的結果再依次聚集

** @param a

* @param b

* @return

*/def combop(a: arraybuffer[int], b: arraybuffer[int]): arraybuffer[int] =

val rdd = sc.parallelize(list(1, 2, 3, 4, 5, 6), 2)

val result = rdd.aggregate(arraybuffer[int](88))(seqop, combop)

println("------------------------------------")

println(result)

/**結果如下:

zerovalue:arraybuffer(88) value:1

zerovalue:arraybuffer(88, 1) value:2

zerovalue:arraybuffer(88, 1, 2) value:3

zerovalue:arraybuffer(88) value:4

zerovalue:arraybuffer(88, 4) value:5

zerovalue:arraybuffer(88, 4, 5) value:6

a:arraybuffer(88) b:arraybuffer(88, 1, 2, 3)

a:arraybuffer(88, 88, 1, 2, 3) b:arraybuffer(88, 4, 5, 6)

------------------------------------

arraybuffer(88, 88, 1, 2, 3, 88, 4, 5, 6)

說明:zerovalue先在每個分割槽與元素依次聚集,zerovalue再與各個分割槽結果依次聚集,故2個分割槽結果有3個88

*/

未完待續

spark運算元 五 action運算元

collect package com.doit.spark.demoday05 import org.apache.spark.sparkcontext author 向陽木 date 2020 09 22 22 19 description 將資料以陣列形式收集回driver端,資料按照分割槽編...

Spark 常用運算元

官網rdd操作指南 2 key value資料型別的transfromation運算元 三 連線 3 action運算元 val list list 1 2,3 sc.parallelize list map 10 foreach println 輸出結果 10 20 30 這裡為了節省篇幅去掉了換...

Spark常用運算元練習

package cn.allengao.exercise import org.apache.spark.class name package describe sparkrdd運算元練習 creat user allen gao creat date 2018 1 25 creat time 10...