Spark的Job提交流程以及相關知識

2021-10-01 03:57:47 字數 1844 閱讀 5320

spark提交作業 呼叫action運算元 --> 呼叫 rdd 類的runjob方法  --> 呼叫 sparkcontext 類的 dagscheduler.runjob方法

--> dagscheduler.handlejobsubmitted 方法

生成 finalstage 

finalstage = createresultstage()

submitstage(finalstage)   // 在這個方法裡面體現出乙個分割槽對應乙個task。核心**是 partitionstocompute.map

-->createresultstage() 方法中 getorcreateparentstage(),這個方法裡面會算出每乙個shuffledependices,就是寬依賴,對每乙個寬依賴呼叫map運算元,建立乙個stage。就是說按照shuffle切分stage  

(1)	rdd.foreach()

(2) rdd}

(3) sparkcontext //類

runjob(rdd, func, 0 until rdd.partitions.length)

runjob(rdd, (ctx: taskcontext, it: iterator[t]) => cleanedfunc(it), partitions)

runjob[t, u](rdd, func, partitions, (index, res) => results(index) = res)

dagscheduler.runjob(rdd, cleanedfunc, partitions, callsite, resulthandler, localproperties.get)

(4) dagscheduler

val waiter = submitjob(rdd, func, partitions, callsite, resulthandler, properties)

submitjob

private def doonreceive(event: dagschedulerevent): unit = event match

private[scheduler] def handlejobsubmitted

// 建立job

val job = new activejob(jobid, finalstage, callsite, listener, properties)

// 提交stage

submitstage(finalstage) }

private def createresultstage: resultstage =

private def getorcreateparentstages(rdd: rdd[_], firstjobid: int): list[stage] = .tolist

}private[scheduler] def getshuffledependencies(

rdd: rdd[_]): hashset[shuffledependency[_, _, _]] = }}

// 最終返回所有依賴

parents

} private def submitstage(stage: stage)

case stage: resultstage =>

partitionstocompute.map }}

......

// 提交任務

taskscheduler.submittasks(new taskset(tasks.toarray, stage.id, stage.latestinfo.attemptid, jobid, properties))

}}

Job提交流程原始碼

1.開始提交程式 boolean result job.waitforcompletion true 2.當job執行狀態為為define,提交job if state jobstate.define 3.確保job狀態 ensurestate jobstate.define 4.相容新舊api s...

Spark任務提交流程

spark任務提交流程挺複雜的,下面給乙個相對簡單的任務提交流程 driver程序啟動以後,首先構建sparkcontext,sparkcontext主要包含兩部分 dagscheduler和taskscheduler master接受到任務註冊資訊之後,根據自身資源呼叫演算法在spark集群的wo...

Spark任務提交流程

建立sparkcontext物件,其中包含dagscheduler和taskscheduler executor內部會建立執行task的執行緒池,然後把啟動的executor反向註冊給driver dagscheduler負責把spark作業轉化成stage的dag,根據寬窄依賴切分stage,然後...