Spark的累加器

2021-10-03 07:00:17 字數 1329 閱讀 6066

val conf =

newsparkconf()

.("jk").

setmaster

("local"

) val sc =

newsparkcontext

(conf)

val accumulator = sc.longaccumulator

//傳入array集合,指定兩個分片

val unit = sc.

makerdd

(array(1

,5,3

,4),

2)unit.

foreach

(x=>

)//返回值,驅動端獲取最終的值

println

("sun ="

+accumulator.value)

package com.uu.exe2

import org.apache.spark.util.accumulatorv2

/** * 1.繼承accumulatorv2類

* 2.實現相關的方法

*/class

myaddaccumulator extends accumulatorv2[long, long]

else

}//copy累加器,每個節點與驅動都包含累加器

override def copy()

: accumulatorv2[long, long]

=//重置:分片的累加器返回值後,進行重置

override def reset()

: unit =

//將具體的內容傳入自定義累加器,型別為繼承類的第乙個型別

override def add

(v: long)

: unit =

//合併各個分割槽的值

override def merge

(other: accumulatorv2[long, long]

): unit =

//累加器的返回值型別

override def value: long = i

}

實現了與自身累加器同樣的功能

package com.uu.exe2

import org.apache.spark.

/** * created by ibm on 2020/2/27.

*/object add

)println

("sun ="

+accumulator1.value)

}}

Spark累加器 Accumulator 使用詳解

def accumulator t initialvalue t,name string implicit param org.apache.spark.accumulatorparam t org.apache.spark.accumulator t 第乙個引數應是數值型別,是累加器的初始值,第二...

Spark廣播變數與累加器

在dirver定義乙個變數,executor去使用,如果存在多個task,則會建立多個變數的副本,耗費記憶體。如果當前變數是乙個需要計算的值,在driver端是無法獲取的。scala實現 scala 實現 import org.apache.spark.util.doubleaccumulator ...

Spark累加器和廣播變數

累加器有些類似redis的計數器,但要比計數器強大,不僅可以用於計數,還可以用來累加求和 累加合併元素等。假設我們有乙個word.txt文字,我們想要統計該文字中單詞 sheep 的行數,我們可以直接讀取文字filter過濾然後計數。sc.textfile word.txt filter conta...