Spark中Accumulator的使用

2021-09-21 01:12:17 字數 1414 閱讀 7699

accumulator簡介

accumulator是spark提供的累加器,顧名思義,該變數只能夠增加。 

只有driver能獲取到accumulator的值(使用value方法),task只能對其做增加操作(使用 +=)。你也可以在為accumulator命名(不支援python),這樣就會在spark web ui中顯示,可以幫助你了解程式執行的情況。

accumulator使用

使用示例

舉個最簡單的accumulator的使用例子:

//在driver中定義

val accum = sc.accumulator(0, "example accumulator")

//在task中進行累加

sc.parallelize(1 to 10).foreach(x=> accum += 1)

//在driver中輸出

accum.value

//結果將返回10

res: 10

累加器的錯誤用法

val accum= sc.accumulator(0, "error accumulator")

val data = sc.parallelize(1 to 10)

//用accumulator統計偶數出現的次數,同時偶數返回0,奇數返回1

val newdata = data.mapelse 1

}}

看了上面的分析,大家都有這種印象了,那就是使用累加器的過程中只能使用一次action的操作才能保證結果的準確性。

事實上,還是有解決方案的,只要將任務之間的依賴關係切斷就可以了。什麼方法有這種功能呢?你們肯定都想到了,cache,persist。呼叫這個方法的時候會將之前的依賴切除,後續的累加器就不會再被之前的transfrom操作影響到了。

//

val accum= sc.accumulator(0, "error accumulator")

val data = sc.parallelize(1 to 10)

//**和上方相同

val newdata = data.map}

//使用cache快取資料,切斷依賴。

newdata.cache.count

//此時accum的值為5

accum.value

newdata.foreach(println)

//此時的accum依舊是5

accum.value

總結

使用accumulator時,為了保證準確性,只使用一次action操作。如果需要使用多次則使用cache或persist操作切斷依賴。

Spark中executor memory引數詳解

我們知道,spark執行的時候,可以通過 executor memory 來設定executor執行時所需的memory。但如果設定的過大,程式是會報錯的,如下 555.png 那麼這個值最大能設定多少呢?本文來分析一下。文中安裝的是spark1.6.1,安裝在hadoop2.7上。1 相關的2個引...

Spark基礎(三)Spark中的任務執行

容錯機制 spark的架構特點 根據客戶端提交的jar包劃分出來乙個個的rdd,根據rdd之間的lineage關係劃分dag。劃分dag的目的是為了劃分stage。2 dag通過dagscheller劃分為stage 再劃分為taskset 根據劃分出來的dag,將dag送個dagscheduler...

spark更改分割槽 Spark中的分割槽方法詳解

一 spark資料分割槽方式簡要 在spark中,rdd resilient distributed dataset 是其最基本的抽象資料集,其中每個rdd是由若干個partition組成。在job執行期間,參與運算的partition資料分布在多台機器的記憶體當中。這裡可將rdd看成乙個非常大的陣...