Spark自定義累加器的使用例項詳解

2022-09-25 09:39:11 字數 2045 閱讀 6046

累加器(accumulator)是spark中提供的一種分布式的變數機制,其原理類似於mapreduce,即分布式的改變,然後聚合這些改變。累加器的乙個常見用途是在除錯時對作業執行過程中的事件進行計數。

累加器簡單使用

spark內建的提供了long和double型別的累加器。下面是乙個簡單的使用示例,在這個例子中我們在過濾掉rdd中奇數的同時進行計數,最後計算剩下整數的和。

val sparkconf = new sparkconf().setappname("test").setmaster("local[2]")

val sc = new sparkcontext(sparkconf)

val accum = sc.longaccumulator("longaccum") //統計奇數的個數

val sum = s程式設計客棧c.parallelize(array(1,2,3,4,5,6,7,8,9),2).filter(n=>).reduce(_+_)

println("sum: "+sum)

println("accum: "+accum.value)

sc.stop()

結果為:

sum: 20

accum: 5

這是結果正常的情況,但是在使用累加器的過程中如果對於spark的執行過程理解的不夠深入就會遇到兩類典型的錯誤:少加(或者沒加)、多加。

自定義累加器

自定義累加器型別的功能在1.x版本中就已經提供了,但是使用起來比較麻煩,在2.0版本後,累加器的易用性有了較大的改進,而且官方還提供了乙個新的抽象類:accumulatorv2來提供更加友好的自定義型別累加器的實現方式。官方同時給出了乙個實現的示例:collectionaccumulator類,這個類允許以集合的形式收集spark應用執行過程中的一些資訊。例如,我們可以用這個類收集spark處理資料時的一些細節,當然,由於累加器的值最終要匯聚到driver端,為了避免 driver端的outofmemory問題,需要對收集的資訊的規模要加以控制,不宜過大。

繼承accumulatorv2類,並複寫它的所有方法

package spark

import constant.constant

import org.apache.spark.util.accumulatorv2

import util.getfieldfromconcatstring

import util.setfieldfromconcatstring

open class sessionaccmulator : accumulatorv2www.cppcns.comtring>()

/*** 合併資料

*/override fun merge(other: accumulatorv2?)

//問題就在於這裡,自定義沒有寫錯,合併錯了

newresult = newresult.setfieldfromconcatstring("|", it, newvalue.tostring())}}

result = newresult}}

} override fun copy(): accumulatorv2

override fun add(p0: string?) else

result = newresult

}} override fun reset()

override fun iszero(): boolean

}方法介紹

value方法:獲取累加器中的值

merge方法:該方法特別重要,一定要寫對,這個方法是各個task的累加器進行合併的方法(下面介紹執行流程中將要用到)

iszero方法:判斷是否為初始值

reset方法:重置累加器中的值

copy方法:拷貝累加器

spark中累加器的執行流程:

首先有幾個task,spark engine就呼叫copy方法拷貝幾個累加器(不註冊的),然後在各個task中進行累加(注意在此過程式設計客棧程中,被最初註冊的累加器的值是不變的),執行最後將調程式設計客棧用merge方法和各個task的結果累計器進行合併(此時被註冊的累加器是初始值)

總結

Spark中自定義累加器

通過繼承accumulatorv2可以實現自定義累加器。官方案例可參考 下面是我自己寫的乙個統計卡種數量的案例。package com.shuai7boy.myscalacode import org.apache.spark.import org.apache.spark.util.accumul...

Spark的累加器

val conf newsparkconf jk setmaster local val sc newsparkcontext conf val accumulator sc.longaccumulator 傳入array集合,指定兩個分片 val unit sc.makerdd array 1 5...

Spark累加器的作用和使用

不經過shuffle,實現詞頻統計 bject spark06 accumulator 累加器的tostring方法 println sumacc 取出累加器中的值 println sumacc.value sc.stop 不經過shuffle,計算以h開頭的單詞出現的次數。object spark...