spark中的廣播變數broadcast

2021-08-13 18:21:17 字數 2690 閱讀 9135

首先先來看一看broadcast的使用**:

val values = list[int](1,2,3)

val broadcastvalues = sparkcontext.broadcast(values)

broadcastvalues.getvalue.foreach(println) })

在上面的**中,首先生成了乙個集合變數,把這個變數通過sparkcontext的broadcast函式進行廣播,

最後在rdd的每乙個partition的迭代時,使用這個廣播變數.

接下來看看廣播變數的生成與資料的讀取實現部分:

defbroadcast[

t: classtag](value: 

t): broadcast[

t] = 

通過broadcastmanager中的newbroadcast函式來進行廣播.

val 

bc = env.broadcastmanager.newbroadcast[

t](value

, islocal)

val 

callsite = getcallsite

loginfo(

"created broadcast " 

+ bc.id + 

" from " 

+ callsite.shortform)

cleaner.foreach(_.registerbroadcastforcleanup(bc))bc}

在broadcastmanager中生成廣播變數的函式,這個函式直接使用的broadcastfactory的對應函式.

broadcastfactory的例項通過配置

spark.broadcast.factory,

預設是torrentbroadcastfactory.

def 

newbroadcast[

t: classtag](value_ : t, 

islocal: 

boolean

): broadcast[

t] = 

在torrentbroadcastfactory中生成廣播變數的函式:

在這裡面,直接生成了乙個torrentbroadcast的例項.

override def 

newbroadcast[

t: classtag](value_ : t, 

islocal: 

boolean, 

id: 

long)

: broadcast[

t] = 

torrentbroadcast例項生成時的處理流程:

這裡主要的**部分是直接寫入這個要廣播的變數,返回的值是這個變數所占用的block的個數.

broadcast的block的大小通過

spark.broadcast.blocksize

配置.預設是4mb,

broadcast的壓縮是否通過

spark.broadcast.compress

private val 

broadcastid 

= broadcastblockid

(id)

/** total number of blocks this broadcast variable contains. */

private val 

numblocks

: int 

= writeblocks(obj)

接下來生成乙個lazy的屬性,這個屬性只有在具體的使用時,才會執行,在例項生成時不執行(上面的示例中的getvalue.foreach時執行).

@transient 

private lazy val 

_value

: t 

= readbroadcastblock()

override protected def 

getvalue() = 

看看例項生成時的writeblocks的函式:

private def 

writeblocks(value: 

t): 

int = 

這個函式的返回值是乙個int型別的值,這個值就是序列化壓縮儲存後block的個數.

blocks.length}

在我們的示例中,使用getvalue時,會執行例項初始化時定義的lazy的函式readbroadcastblock:

private def 

readbroadcastblock(): 

t = utils.

tryorioexception }}

最後再看看readblocks函式的處理流程:

private def 

readblocks(): array[bytebuffer] = 

val 

block: bytebuffer = getlocal.orelse(getremote).getorelse(

throw new 

sparkexception(

s"failed to get 

$pieceid

of $

broadcastid"))

blocks(pid) = block

}blocks

}

spark中的廣播變數broadcast

首先先來看一看broadcast的使用 val values list int 1,2,3 val broadcastvalues sparkcontext.broadcast values broadcastvalues.getvalue.foreach println 在上面的 中,首先生成了乙...

Spark的廣播變數

將每個task都要計算的資料設定成廣播變數,如果不設定成廣播變數,則每個 task 都會儲存乙份這樣一模一樣的資料,這樣就占用了很多的儲存空間,將共享變數設定成廣播變數後只在乙個 excuter 裡面儲存這樣乙份 共享資料,這個 excuter 裡面的task 計算的時候共享這樣乙份資料,這樣就減少...

spark的廣播變數

廣播變數 頻繁使用 而且資料大 分發到每乙個excutor節點 每個task從本地拿取使用 當在excutor端使用了driver變數,不使用廣播變數,在每個excutor中有多少的task就有多少個driver端變數副本 導致的問題 占用了網路io,速度慢 如果使用廣播變數在每乙個excutor端...