大資料開發 Spark 共享變數之累加器和廣播變數

2021-10-16 09:23:00 字數 2280 閱讀 9604

在 spark 中,提供了兩種型別的共享變數:累加器 (accumulator) 與廣播變數 (broadcast variable):

這裡先看乙個具體的場景,對於正常的累計求和,如果在集群模式中使用下面的**進行計算,會發現執行結果並非預期:

var counter = 0

val data = array(1, 2, 3, 4, 5)

sc.parallelize(data).foreach(x => counter += x)

println(counter)

counter 最後的結果是 0,導致這個問題的主要原因是閉包。

1. scala 中閉包的概念

這裡先介紹一下 scala 中關於閉包的概念:

var more = 10

val addmore = (x: int) => x + more

如上函式addmore中有兩個變數 x 和 more:

按照定義:在建立函式時,如果需要捕獲自由變數,那麼包含指向**獲變數的引用的函式就被稱為閉包函式。

2. spark 中的閉包

也可以參考:

在實際計算時,spark 會將對 rdd 操作分解為 task,task 執行在 worker node 上。在執行之前,spark 會對任務進行閉包,如果閉包內涉及到自由變數,則程式會進行拷貝,並將副本變數放在閉包中,之後閉包被序列化並傳送給每個執行者。因此,當在 foreach 函式中引用counter時,它將不再是 driver 節點上的counter,而是閉包中的副本counter,預設情況下,副本counter更新後的值不會回傳到 driver,所以counter的最終值仍然為零。

需要注意的是:在 local 模式下,有可能執行foreach的 worker node 與 diver 處在相同的 jvm,並引用相同的原始counter,這時候更新可能是正確的,但是在集群模式下一定不正確。所以在遇到此類問題時應優先使用累加器。

累加器的原理實際上很簡單:就是將每個副本變數的最終值傳回 driver,由 driver 聚合後得到最終值,並更新原始變數。

sparkcontext中定義了所有建立累加器的方法,需要注意的是:被中橫線劃掉的累加器方法在 spark 2.0.0 之後被標識為廢棄。

使用示例和執行結果分別如下:

val data = array(1, 2, 3, 4, 5)

// 定義累加器

val accum = sc.longaccumulator("my accumulator")

sc.parallelize(data).foreach(x => accum.add(x))

// 獲取累加器的值

accum.value

在上面介紹中閉包的過程中我們說道每個 task 任務的閉包都會持有自由變數的副本,如果變數很大且 task 任務很多的情況下,這必然會對網路 io 造成壓力,為了解決這個情況,spark 提供了廣播變數。

廣播變數的做法很簡單:就是不把副本變數分發到每個 task 中,而是將其分發到每個 executor,executor 中的所有 task 共享乙個副本變數。

// 把乙個陣列定義為乙個廣播變數

val broadcastvar = sc.broadcast(array(1, 2, 3, 4, 5))

// 之後用到該陣列時應優先使用廣播變數,而不是原值

sc.parallelize(broadcastvar.value).map(_ * 10).collect()

建立的accumulator變數的值能夠在spark web ui上看到,在建立時應該盡量為其命名,下面**如何在spark web ui上檢視累加器的值

大資料開發 Spark 共享變數之累加器和廣播變數

在 spark 中,提供了兩種型別的共享變數 累加器 accumulator 與廣播變數 broadcast variable 這裡先看乙個具體的場景,對於正常的累計求和,如果在集群模式中使用下面的 進行計算,會發現執行結果並非預期 var counter 0 val data array 1,2,...

Spark特性之共享變數

spark乙個非常重要的特性就是共享變數。預設情況下,如果在乙個運算元的函式中使用到了某個外部的變數,那麼這個變數的值會被拷貝到每個task中。此時每個task只能操作自己的那份變數副本。如果多個task想要共享某個變數,那麼這種方式是做不到的。spark為此提供了兩種共享變數,一種是broadca...

Spark共享變數

預設情況下,如果在乙個運算元的函式中使用到了某個外部的變數,那麼這個變數的值會被拷貝到每個task中。此時每個task只能操作自己的那份變數副本。如果多個task想要共享某個變數,那麼這種方式是做不到的。spark為此提供了兩種共享變數,一種是broadcast variable 廣播變數 另一種是...