spark中運算元詳解 aggregateByKey

2021-08-16 07:04:46 字數 1867 閱讀 9189

**:

通過scala集合以並行化方式建立乙個rdd

scala> val

pairrdd = sc.parallelize(list(("cat"

,2),("cat"

,5),("mouse"

,4),("cat"

,12),("dog"

,12),("mouse"

,2)),2

)

pairrdd 這個rdd有兩個區,乙個區中存放的是:

("cat"

,2),("cat"

,5),("mouse"

,4)

另乙個分割槽中存放的是:

("cat"

,12),("dog"

,12),("mouse"

,2)

然後,執行下面的語句

scala > pairrdd.aggregatebykey

(100

)(math.max

(_ , _), _ + _ ).collect

結果:

res0: array

[(string

,int

)] = array

((dog,100

),(cat,200

),(mouse,200

))

下面是以上語句執行的原理詳解:

aggregatebykey的意思是:按照key進行聚合

第一步:將每個分區內key相同資料放到一起

分割槽一

("cat"

,(2,5

)),("mouse",4)

分割槽二("cat"

,12),("dog"

,12),("mouse"

,2)

第二步:區域性求最大值

對每個分割槽應用傳入的第乙個函式,math.max(_ , _),這個函式的功能是求每個分割槽中每個key的最大值

這個時候要特別注意,aggregatebyke(100)(math.max(_ , _),_+_)裡面的那個100,其實是個初始值

在分割槽一中求最大值的時候,100會被加到每個key的值中,這個時候每個分割槽就會變成下面的樣子

分割槽一

("cat"

,(2,5

,100

)),("mouse"

,(4,100

))然後求最大值後變成:

("cat"

,100

), ("mouse"

,100

)分割槽二

("cat"

,(12

,100

)),("dog"

,(12.100

)),("mouse"

,(2,100

))求最大值後變成:

("cat"

,100

),("dog"

,100

),("mouse"

,100

)

第三步:整體聚合

將上一步的結果進一步的合成,這個時候100不會再參與進來

最後結果就是:

(dog,100

),(cat,200

),(mouse

,200

)

spark中運算元詳解 aggregateByKey

通過scala集合以並行化方式建立乙個rdd scala val pairrdd sc.parallelize list cat 2 cat 5 mouse 4 cat 12 dog 12 mouse 2 2 pairrdd 這個rdd有兩個區,乙個區中存放的是 cat 2 cat 5 mouse ...

Spark運算元詳解

目錄 spark常用運算元詳解 3.getnumpartitions 4.partitions 5.foreachpartition 6.coalesce 7.repartition 8.union,zip,join 9.zipwithindex,zipwithuniqueid 未完待續.本文主要介...

spark常用運算元詳解

1.map 接收乙個函式,對於rdd中的每乙個元素執行此函式操作,結果作為返回值。eg val rdd sc.parallelize array 1,2,3,4 1 rdd.map x x x foreach println x x x 將元素x做平方處理,scala語句 sparkcontext....