spark之累加器和廣播變數

2021-09-28 17:20:50 字數 1522 閱讀 9747

spark的三大資料結構

rdd:分布式資料集

廣播變數:分布式唯讀共享變數

累加器:分布式只寫共享變數

1.累加器

預設累加器

例子:對乙個list中的所有值進行相加

首先上圖中紅色部分框出來的**,看上去邏輯沒有什麼大問題,但是輸出的結果sum=0。這是因為,sum在driver中被定義,在不同的executor中計算,每個executor得到值既不能彼此相加,也不能傳回driver輸出,所以導致driver中sum的值一直沒有變過。

這時候就可以採用累加器,因為driver和各個executor都需要使用這個資料,所以在這裡定義乙個只寫共享變數是合適的。累加器解決的問題就是資料原本不能從executor傳回driver的問題。

自定義累加器

例子:取出含有"u"的字串,累加

class wordaccumulator extends accumulatorv2[string,util.arraylist[string]]

//複製累加器物件

override def copy(): accumulatorv2[string, util.arraylist[string]] =

//重置累加器

override def reset(): unit =

//實現累加器的邏輯

override def add(v: string): unit =

} //合併累加器

override def merge(other: accumulatorv2[string, util.arraylist[string]]): unit =

//獲取累加器的結果

override def value: util.arraylist[string] = list

}

//建立spark上下文物件

val sc = new sparkcontext(config)

val value = sc.makerdd(list("us","tomoon","oneus","you","loc"))

val accumulator = new wordaccumulator

//需要註冊一下

sc.register(accumulator)

value.foreach

}println(accumulator.value)

sc.stop()

2.廣播變數

使用的時候只要把原本的資料通過broadcast()轉化成廣播變數,使用的時候通過broadcast.value使用即可。

Spark累加器和廣播變數

累加器有些類似redis的計數器,但要比計數器強大,不僅可以用於計數,還可以用來累加求和 累加合併元素等。假設我們有乙個word.txt文字,我們想要統計該文字中單詞 sheep 的行數,我們可以直接讀取文字filter過濾然後計數。sc.textfile word.txt filter conta...

spark的廣播變數和累加器

廣播變數 廣播變數允許開發人員在每個節點快取唯讀的變數,而不是在任務之間傳遞這些變數。例如,使用廣播變數能夠高效地 在集群每個節點建立大資料的副本。同時,spark還使用高效的廣播演算法分發這些變數,從而減少通訊的開銷。spark應用程式作業的執行由一系列排程階段構成,而這些排程階段通過shuffl...

Spark廣播變數與累加器

在dirver定義乙個變數,executor去使用,如果存在多個task,則會建立多個變數的副本,耗費記憶體。如果當前變數是乙個需要計算的值,在driver端是無法獲取的。scala實現 scala 實現 import org.apache.spark.util.doubleaccumulator ...