Spark 傳遞函式與閉包

2022-08-24 23:51:12 字數 2558 閱讀 7263

在scala中,你可以在任何作用於內定義函式,在函式體內,可以訪問相應作用域內的任何變數;還不止,你的函式還可以在變數不再處於作用於內的時候被呼叫,這就是閉包的最基本的理解。

一、transform、action運算元的函式引數

在spark集群中,spark應用由負責執行使用者編寫的main函式,以及在集群上執行的各種並行操作的驅動器程式(driver)和並行執行在集群各節點的工作程序(executor)共同組成。action運算元會觸發spark提交job,在提交job的過程中,transform運算元和action運算元中的func會被封裝成閉包,然後傳送到各個worker節點上去執行(資料就近原則)。

顯然,閉包是有狀態的,主要表現為那些自由變數,以及自由變數依賴到的其他變數,所以,在將乙個簡單的函式或者一段**片段傳遞給運算元作為引數前,spark會檢測閉包內所有涉及的變數,然後序列化變數,再傳給worker節點,再反序列化執行。(檢測——序列化——傳遞變數——反序列化)

函式引數表示為:

val f:(double)=>double = 2*_

f的型別是(double)=> double,傳入乙個double型別引數,返回乙個double型別的值。spark的transform和action運算元都用到了函式引數,這其中閉包的運用最頻繁。

val f(x:int) = (x:int) => 2*x

val rdd = sc.parallelize(1 to 10)

val rdd1 = rdd.map(x => f(x))

結果rdd1的值為array(2,4,6,8,10,12,14,16,18,20),這似乎沒有涉及到什麼閉包的知識點,不要著急,這裡先介紹transform、action運算元是怎樣呼叫函式引數的。

二、閉包的理解

def mulby (factor : double) = (x:double) => factor *x

val triple = mulby(3)

val half = mulby(0.5)

println(s"$, $")

定義了乙個函式mulby,型別是 double,值為(x:double) => factor * x;

首先,mulby的首次被呼叫,將引數3傳給(x:double) => factor * x,factor=3,該變數在mulby被引用,並將函式引數存入triple。然後引數變數factor從執行時的棧上被彈出;

然後,mulby再次被呼叫,factor的值被設定為0.5,同樣的,新的引數函式存入half中,引數變數factor從執行時的棧上被彈出;

因為每次呼叫mulby函式後,都將其值存入到乙個變數中(如上面的triple和half),當使用triple函式或half函式時factor相當於是作用域外,這就是「閉包」,閉包由**和**用到的任何非區域性變數定義構成。因此,輸出結果為:42,7

雖然表象上triple和half的呼叫,仍然使用factor變數,但可以理解為,triple和half函式的factor並不是乙個變數,而是真實的、不變的乙個常量值3和0.5。

三、閉包進一步理解:spark本地模式 vs 集群模式

通過上面可以理解spark運算元怎樣遍歷呼叫乙個函式,函式涉及的變數如何到達worker節點,以及閉包的概念。當乙個集群上執行**時,變數和方法的範圍以及生命週期,是spark比較難理解的地方。

var counter = 0var rdd =sc.parallelize(data)

rdd.foreach(x=>counter +=x)

println(s"counter value : $counter")

對於單純的rdd元素總和,根據是否執行在同乙個虛擬機器上,他們表現的行為完全不同。

在本地模式下,在某些情況下,驅動程式會執行在同乙個jvm內,即各個程式操作的counter屬於同乙個,從而可以得到「預期」的rdd元素總和結果。

在集群模式下,為了執行作業,spark將rdd分拆成多個task,每個task由乙個執行器(executor,即乙個task只能被乙個executor消化,乙個executor可以消化多個task)執行操作。在執行前,spark計算閉包(檢測閉包變數和方法,上述**指的是counter和foreach),這個閉包會被序列化,並分發給每乙個執行器。換句話說,每個執行器得到各自的counter,對counter進行修改時,也只是修改自己的counter,而驅動器(driver)上的counter並沒有被修改,所以最終的counter輸出結果沒有達到預期,輸出為0。這個可以理解為driver的counter變數是全域性變數,executor的counter是區域性變數。

所以spark為了應對這種由於閉包產生的影響,支援定義使用全域性共享變數,廣播(broadcast)變數,用來將乙個值快取到所有節點的記憶體中。對於累加操作,還可以使用累加器(accumulator)。

var accum = sc.accumulator(0)

val value = sc.parallelize(array(1,2,3,4)).foreach(x => accum+=x).value

println(s"accum = $accum")

//accum的輸出結果為10

如何傳遞函式

摘自 非同步 庫提供了一些函式,這些函式使您可以在元件之間傳遞訊息。這些訊息傳遞函式與各種訊息塊型別一起使用。有關併發執行時所定義的訊息塊型別的更多資訊,請參見非同步訊息塊。各節內容 本主題描述以下訊息傳遞函式 send 和 asend concurrency send 函式一條訊息傳送到指定目標同...

c 傳遞函式引數

傳遞普通函式和類的成員函式方式不同,原因是傳遞函式引數實際傳的是函式的位址,但是普通函式和成員函式的位址獲取方式不太相同。普通函式只要傳遞乙個函式名稱即可,但是成員函式在類的內部中沒有位址,選擇乙個成員函式就意味著得知道該函式在類中的偏移量,因此需要知道該物件和對應的偏移量,才能得到真實的位址。當然...

flord 傳遞閉包

傳遞指對於乙個節點i,如果j能到i,i能到k,那麼j就能到k。傳遞閉包,就是把圖中所有滿足這樣傳遞性的節點都弄出來,計算完成後,我們也就知道任意兩個節點之間是否相連。break 指跳出一層迴圈 continue 結束本次迴圈,跳過本次判斷語句 每只奶牛的技能獨一無二,如果給奶牛們排序的話,能確定自己...