Spark作業排程階段分析

2022-02-10 10:49:20 字數 2574 閱讀 4908

spark作為分布式的大資料處理框架必然或涉及到大量的作業排程,如果能夠理解spark中的排程對我們編寫或優化spark程式都是有很大幫助的;

在spark中存在轉換操作(transformation operation)行動操作(action operation)兩種;而轉換操作只是會從乙個rdd中生成另乙個rdd且是lazy的,spark中只有行動操作(action operation)才會觸發作業的提交,從而引發作業排程;在乙個計算任務中可能會多次呼叫 轉換操作這些操作生成的rdd可能存在著依賴關係,而由於轉換都是lazy所以當行動操作(action operation )觸發時才會有真正的rdd生成,這一系列的rdd中就存在著依賴關係形成乙個dag(directed acyclc graph),在spark中dagscheuler是基於dag的頂層排程模組;

1.1 作業排程關係圖

這裡根據spark原始碼跟蹤觸發action操作時觸發的job提交流程,count()是rdd中的乙個action操作所以呼叫count時會觸發job提交;

在rdd原始碼count()呼叫sparkcontext的runjob,在runjob方法中根據partitions(分割槽)大小建立arrays存放返回結果;

rdd.scala

/*** return the number of elements in the rdd.

*/def count(): long = sc.runjob(this, utils.getiteratorsize _).sum

sparkcontext.scala

def runjob[t, u: classtag](

rdd: rdd[t],

func: (taskcontext, iterator[t]) => u,

partitions: seq[int],

resulthandler: (int, u) => unit): unit =

dagscheduler.runjob(rdd, cleanedfunc, partitions, callsite, resulthandler, localproperties.get)

}

在sparkcontext中將呼叫dagscheduler的runjob方法提交作業,dagscheduler主要任務是計算作業與任務依賴關係,處理呼叫邏輯;dagscheduler提供了submitjob與runjob方法用於 提交作業,runjob方法會一直等待作業完成,submitjob則返回jobwaiter物件可以用於判斷作業執行結果;

在runjob方法中將呼叫submitjob,在submitjob中把提交操作放入到事件迴圈佇列(dagschedulereventprocessloop)中;

def submitjob[t, u](

rdd: rdd[t],

func: (taskcontext, iterator[t]) => u,

partitions: seq[int],

callsite: callsite,

resulthandler: (int, u) => unit,

properties: properties): jobwaiter[u] =

在事件迴圈佇列中將呼叫eventprocessloop的onreceive方法;

提交作業時dagscheduler會從rdd依賴鏈尾部開始,遍歷整個依賴鏈劃分排程階段;劃分階段以shuffledependency為依據,當沒有shuffledependency時整個job 只會有乙個stage;在事件迴圈佇列中將會呼叫dagscheduler的handlejobsubmitted方法,此方法會拆分stage、提交stage;

private[scheduler] def handlejobsubmitted(jobid: int,

finalrdd: rdd[_],

func: (taskcontext, iterator[_]) => _,

partitions: array[int],

callsite: callsite,

listener: joblistener,

properties: properties)

在提交stage時會先呼叫getmissingparentstages獲取父階段stage,迭代該階段所依賴的父排程階段如果存在則先提交該父階段的stage 當不存在父stage或父stage執行完成時會對當前stage進行提交;

private def submitstage(stage: stage)  else 

waitingstages += stage}}

} ......

}

Spark作業排程

spark在standalone模式下,預設是使用fifo的模式,我們可以使用spark.cores.max來設定它的最大核心數,使用spark.executor.memory 來設定它的記憶體。在yarn模式下,使用 num workers設定worker的數量,使用 worker memory設...

Spark作業排程流程

spark首先會對job進行一系列的rdd轉換操作,並通過rdd之間的依賴關係構建dag 有向無環圖 然後根據rdd依賴關係將rdd劃分到不同的stage中,每個stage按照partition的數量建立多個task,最後將這些task提交到集群的work節點上執行。具體流程如下圖所示 通過rdd構...

spark作業提交失敗分析

提交乙個spark作業,報錯 error spark.sparkcontext error inilializing sparkcontext.再提交乙個yarn作業,hadoop jar opt cloudera parcels cdh 6.1 jars hadoop examples.jar p...