大資料基礎 Spark累加器與廣播變數

2022-01-14 09:25:52 字數 1978 閱讀 8717

在 spark 中,提供了兩種型別的共享變數:累加器 (accumulator) 與廣播變數 (broadcast variable):

這裡先看乙個具體的場景,對於正常的累計求和,如果在集群模式中使用下面的**進行計算,會發現執行結果並非預期:

var counter = 0

val data = array(1, 2, 3, 4, 5)

sc.parallelize(data).foreach(x => counter += x)

println(counter)

counter 最後的結果是 0,導致這個問題的主要原因是閉包。

1. scala 中閉包的概念

這裡先介紹一下 scala 中關於閉包的概念:

var more = 10

val addmore = (x: int) => x + more

如上函式addmore中有兩個變數 x 和 more:

按照定義:在建立函式時,如果需要捕獲自由變數,那麼包含指向**獲變數的引用的函式就被稱為閉包函式。

2. spark 中的閉包

在實際計算時,spark 會將對 rdd 操作分解為 task,task 執行在 worker node 上。在執行之前,spark 會對任務進行閉包,如果閉包內涉及到自由變數,則程式會進行拷貝,並將副本變數放在閉包中,之後閉包被序列化並傳送給每個執行者。因此,當在 foreach 函式中引用counter時,它將不再是 driver 節點上的counter,而是閉包中的副本counter,預設情況下,副本counter更新後的值不會回傳到 driver,所以counter的最終值仍然為零。

需要注意的是:在 local 模式下,有可能執行foreach的 worker node 與 diver 處在相同的 jvm,並引用相同的原始counter,這時候更新可能是正確的,但是在集群模式下一定不正確。所以在遇到此類問題時應優先使用累加器。

累加器的原理實際上很簡單:就是將每個副本變數的最終值傳回 driver,由 driver 聚合後得到最終值,並更新原始變數。

sparkcontext中定義了所有建立累加器的方法,需要注意的是:被中橫線劃掉的累加器方法在 spark 2.0.0 之後被標識為廢棄。

使用示例和執行結果分別如下:

val data = array(1, 2, 3, 4, 5)

// 定義累加器

val accum = sc.longaccumulator("my accumulator")

sc.parallelize(data).foreach(x => accum.add(x))

// 獲取累加器的值

accum.value

在上面介紹中閉包的過程中我們說道每個 task 任務的閉包都會持有自由變數的副本,如果變數很大且 task 任務很多的情況下,這必然會對網路 io 造成壓力,為了解決這個情況,spark 提供了廣播變數。

廣播變數的做法很簡單:就是不把副本變數分發到每個 task 中,而是將其分發到每個 executor,executor 中的所有 task 共享乙個副本變數。

// 把乙個陣列定義為乙個廣播變數

val broadcastvar = sc.broadcast(array(1, 2, 3, 4, 5))

// 之後用到該陣列時應優先使用廣播變數,而不是原值

sc.parallelize(broadcastvar.value).map(_ * 10).collect()

rdd programming guide

系列傳送門

大資料開發 Spark 共享變數之累加器和廣播變數

在 spark 中,提供了兩種型別的共享變數 累加器 accumulator 與廣播變數 broadcast variable 這裡先看乙個具體的場景,對於正常的累計求和,如果在集群模式中使用下面的 進行計算,會發現執行結果並非預期 var counter 0 val data array 1,2,...

大資料開發 Spark 共享變數之累加器和廣播變數

在 spark 中,提供了兩種型別的共享變數 累加器 accumulator 與廣播變數 broadcast variable 這裡先看乙個具體的場景,對於正常的累計求和,如果在集群模式中使用下面的 進行計算,會發現執行結果並非預期 var counter 0 val data array 1,2,...

Spark的累加器

val conf newsparkconf jk setmaster local val sc newsparkcontext conf val accumulator sc.longaccumulator 傳入array集合,指定兩個分片 val unit sc.makerdd array 1 5...