Spark系列 八 Worker工作原理

2021-07-25 04:00:36 字數 3861 閱讀 2788

工作原理圖

源**分析

包名:org.apache.spark.deploy.worker

啟動driver入口點:registerwithmaster方法中的case launchdriver

1case

launchdriver(driverid,

driverdesc)

=>

driverrunner

管理乙個driver的執行,包括失敗時自動重啟driver,這種方式僅僅適用於standalone集群部署模式

driverrunner類中start方法實現

1  defstart()

=}"

=>

workerurl

13 

case"}"

=>

localjarfilename

14 

case

other

=>

other

15 

}16 

17 

//todo:

ifwe

addability

tosubmit

multiple

jars

they

should

also

beadded

here

18 

//構建processbuilder物件,傳入啟動driver命令(所需記憶體大小)

19 

valbuilder

=commandutils

.buildprocessbuilder(driverdesc

.command,

driverdesc

.mem,

20 

sparkhome

.getabsolutepath,

substitutevariables)

21 

//啟動driver程序

22 

launchdriver(builder,

driverdir,

driverdesc

.supervise)

23 

}24 

catch

27 

28 

//driver退出狀態處理

29 

valstate

=30 

if

(killed)

elseif

(finalexception

.isdefined)

else

39 

}40 

41 

finalstate

=some(state)

42 

//向driver所屬worker傳送driverstatechanged訊息

43 

worker

!driverstatechanged(driverid,

state,

finalexception)

44 

}45  }.

start()

46  }

launchexecutor

管理launchexecutor的啟動

1case

launchexecutor(masterurl,

execid,

cores_,

memory_)

=>

2if

(masterurl!=

activemasterurl)

else

14 

15 

//create

local

dirs

forthe

executor.

these

arepassed

tothe

executor

viathe

16 

//spark_local_dirs

environment

variable,

anddeleted

bythe

worker

when

the17 

//finishes.

18 

val=..

getorelse

.toseq

22 

}23 

=24 

//建立executorrunner物件

25 

valmanager

=new

executorrunner(

26 

27 

execid,

28 

.copy(command

=worker..

command,

conf)),

29 

cores_,

30 

memory_,

31 

self,

32 

workerid,

33 

host,

34 

webui

.boundport,

35 

publicaddress,

36 

sparkhome,

37 

executordir,

38 

akkaurl,

39 

conf,

40 

executorstate

.loading)

41 

//executor加入本地快取

42 

+"/"

+execid)

=manager

43 

manager

.start()

44 

//增加worker已使用core

45 

coresused+=

cores_

46 

//增加worker已使用memory

47 

memoryused+=

memory_

48 

//通知master傳送executorstatechanged訊息

49 

master

!execid,

manager

.state,

none,

none)

50 

}51 

//異常情況處理,通知master傳送executorstatechanged

failed訊息

52 

catch

59 

master

!execid,

executorstate

.failed,

60 

some(e

.tostring),

none)

61 

}62 

}63  }

總結

Q A pytorch中的worker如何工作的

目錄一直很迷,在給dataloader設定worker數量 num worker 時,到底設定多少合適?這個worker到底怎麼工作的?如果將num worker設為0 也是預設值 就沒有worker了嗎?worker的使用場景 from torch.utils.data import datalo...

spark原始碼 worker啟動原理和原始碼

worker啟動一般包含兩大部分 driverrunner和excetorrunner。worker啟動driver的幾個基本原理,最核心的是。worker內部會啟動乙個執行緒,這個執行緒可以理解為driverrunner。然後driverrunner會去負責啟動driver程序,並在之後對driv...

Spark入門系列

讀完spark官方文件後,在研究別人的原始碼以及spark的原始碼之前進行一番入門學習,這個系列不錯。spark系列 除此之外,databricks也是乙個非常不錯的 上面可以使用免費的spark集群進行 提交與測試,在youtube以及spark大會中都有其發布教程以及spark應用部署的相關細節...