大資料開發 Spark 閉包的理解

2022-01-10 04:14:06 字數 1705 閱讀 5844

閉包是乙個函式,返回值依賴於宣告在函式外部的乙個或多個變數。閉包通常來講可以簡單的認為是可以訪問乙個函式裡面區域性變數的另外乙個函式。

如下面這段匿名的函式:

val multiplier = (i:int) => i * 10
函式體內有乙個變數 i,它作為函式的乙個引數。如下面的另一段**:

val multiplier = (i:int) => i * factor
multiplier中有兩個變數:i 和 factor。其中的乙個 i 是函式的形式引數,在multiplier函式被呼叫時,i 被賦予乙個新的值。然而,factor不是形式引數,而是自由變數,考慮下面**:

var factor = 3  val multiplier = (i:int) => i * factor
這裡我們引入乙個自由變數factor,這個變數定義在函式外面。

這樣定義的函式變數multiplier成為乙個"閉包",因為它引用到函式外面定義的變數,定義這個函式的過程是將這個自由變數捕獲而構成乙個封閉的函式

完整的例子:

object test   

var factor = 3

val multiplier = (i:int) => i * factor

}

先來看下面一段**:

val data=array(1, 2, 3, 4, 5)

var counter = 0

var rdd = sc.parallelize(data)

// ???? 這樣做會怎麼樣

rdd.foreach(x => counter += x)

println("counter value: " + counter)

首先肯定的是上面輸出的結果是0,park將rdd操作的處理分解為tasks,每個task由executor執行。在執行之前,spark會計算task的閉包。閉包是executor在rdd上進行計算的時候必須可見的那些變數和方法(在這種情況下是foreach())。閉包會被序列化並傳送給每個executor,但是傳送給executor的是副本,所以在driver上輸出的依然是counter本身,如果想對全域性的進行更新,用累加器,在spark-streaming裡面使用updatestatebykey來更新公共的狀態。

另外在spark中的閉包還有別的作用,

1.清除driver傳送到executor上的無用的全域性變數等,只複製有用的變數資訊給executor

2.保證傳送到executor上的是序列化以後的資料

比如在使用dataset時候 case class的定義必須在類下,而不能是方法內,即使語法上沒問題,如果使用過json4s來序列化,implicit val formats = defaultformats的引入最好放在類下,否則要單獨將這個format序列化,即使你沒有使用到它別的東西。

閉包在spark的整個生命週期中處處可見,就比如從driver上拷貝的所有資料都需要序列化 + 閉包的方式到executor上的。

Spark中閉包的理解

概念的理解 函式可以訪問函式外面的變數,但是函式內對變數的修改,在函式外是不可見的。rdd相關操作都需要傳入自定義閉包函式 closure 如果這個函式需要訪問外部變數,那麼需要遵循一定得規則,否則會丟擲執行時異常。閉包函式傳入到節點時,需要經過下面的步驟 注意 外部變數在閉包內的修改不會被反饋到驅...

理解spark中的閉包問題

理解spark中的閉包 spark官方文件 spark.apachecn.org 解釋 什麼叫閉包 跨作用域 即在work節點訪問driver節點 訪問函式變數。又指的乙個擁有許多變數和繫結了這些變數的環境的表示式 通常是乙個函式 因而這些變數也是該表示式的一部分。展示 def main args ...

閉包的理解

閉包 是指有權訪問另乙個函式作用域中的變數的函式。建立閉包的常見方式就是在乙個函式內部建立另乙個函式 在函式createcomparisonfunction 中返回了乙個匿名函式,建立了乙個閉包。當匿名函式被返回時,其作用域鏈包含外部函式createcomparisonfunction的作用域鏈,這...