Spark 中的累加器及廣播變數

2021-10-09 02:48:48 字數 2854 閱讀 8378

1、原理

累加器用來把 executor 端變數資訊聚合到 driver 端。在 driver 程式中定義的變數,在 executor 端的每個 task 都會得到這個變數的乙份新的副本,每個 task 更新這些副本的值後,傳回 driver 端進行 merge。

2、系統累加器

package spark.core.accumulator

import org.apache.spark.

/** * 系統自帶累加器

*/object spark_os_accumulator_study1

)// 獲取累加器的值

println(

"sum = "

+ sum.value)

}}

執行結果:

3、自定義累加器

package spark.core.accumulator

import org.apache.spark.rdd.rdd

import org.apache.spark.util.accumulatorv2

import org.apache.spark.

import scala.collection.mutable

/** * 自定義累加器

* 1、建立累加器

* 2、註冊累加器

*/object spark_user_accumulator_study1

)// 4、獲取累加器的值

println(acc.value)

}class wordcountaccumulator extends accumulatorv2[

string

, mutable.map[

string

,long]]

// 複製累加器

override

def copy(

): accumulatorv2[

string

, mutable.map[

string

,long]]

=// 重置累加器

override

def reset():

unit

=// 向累加器中增加資料(in)

override

def add(word :

string):

unit

=// 合併累加器

override

def merge(other: accumulatorv2[

string

, mutable.map[

string

,long]]

):unit=)

}// 返回累加器的結果(out)

override

def value: mutable.map[

string

,long

]= map

}}

1、原理

廣播變數用來高效分發較大的物件,向所有工作節點傳送乙個較大的唯讀值,以供乙個或多個 spark 操作使用。比如,如果你的應用需要向所有節點傳送乙個較大的唯讀查詢表,廣播變數用起來都很順手。在多個並行操作中使用同乙個變數,但是 spark 會為每個任務分別傳送。

2、**

package spark.core.accumulator

import org.apache.spark.rdd.rdd

import org.apache.spark.util.accumulatorv2

import org.apache.spark.

import scala.collection.mutable

/** * 自定義累加器

* 1、建立累加器

* 2、註冊累加器

*/object spark_user_accumulator_study1

)// 4、獲取累加器的值

println(acc.value)

}class wordcountaccumulator extends accumulatorv2[

string

, mutable.map[

string

,long]]

// 複製累加器

override

def copy(

): accumulatorv2[

string

, mutable.map[

string

,long]]

=// 重置累加器

override

def reset():

unit

=// 向累加器中增加資料(in)

override

def add(word :

string):

unit

=// 合併累加器

override

def merge(other: accumulatorv2[

string

, mutable.map[

string

,long]]

):unit=)

}// 返回累加器的結果(out)

override

def value: mutable.map[

string

,long

]= map

}}

執行結果:

Spark廣播變數與累加器

在dirver定義乙個變數,executor去使用,如果存在多個task,則會建立多個變數的副本,耗費記憶體。如果當前變數是乙個需要計算的值,在driver端是無法獲取的。scala實現 scala 實現 import org.apache.spark.util.doubleaccumulator ...

Spark累加器和廣播變數

累加器有些類似redis的計數器,但要比計數器強大,不僅可以用於計數,還可以用來累加求和 累加合併元素等。假設我們有乙個word.txt文字,我們想要統計該文字中單詞 sheep 的行數,我們可以直接讀取文字filter過濾然後計數。sc.textfile word.txt filter conta...

spark 廣播變數與累加器

如何理解廣播變數?適用場景 大變數,比如100m以上的大集合。運算元函式中使用到外部變數時,預設情況下,spark會將該變數複製多個副本,通過網路傳輸到task中,此時每個task都有乙個變數副本。如果變數本身比較大的話 比如100m,甚至1g 那麼大量的變數副本在網路中傳輸的效能開銷,以及在各個節...