DAGScheduler的原理剖析以及原始碼解析

2021-08-14 21:28:20 字數 3083 閱讀 3410

dagscheduler的stage劃分演算法:會從觸發的action操作的那個rdd開始往前倒推,首先會為最後乙個rdd建立乙個stage,然後往前倒推的時候,如果發現對某個rdd是寬依賴,那麼就會將寬依賴的那個rdd建立乙個新的stage,那個rdd就是對新的stage的最後乙個rdd,然後依次類推,繼續往前倒推,根據寬窄依賴,進行stage的劃分,直到所有的rdd全部遍歷完了為之。
在**執行了運算元之後,比如count(),**依次如下

def

count

(): long = sc.runjob(this, utils.getiteratorsize _).sum

def runjob[t, u: classtag](rdd: rdd[t], func: iterator[t] => u): array[u] =
def

runjob[t, u: classtag](

rdd: rdd[t],

func: iterator[t] => u,

partitions: seq[int]): array[u] =

def

runjob[t, u: classtag](

rdd: rdd[t],

func: (taskcontext, iterator[t]) => u,

partitions: seq[int]): array[u] =

def

runjob[t, u: classtag](

rdd: rdd[t],

func: (taskcontext, iterator[t]) => u,

partitions: seq[int],

resulthandler: (int, u) => unit): unit =

val callsite = getcallsite

val cleanedfunc = clean(func)

loginfo("starting job: " + callsite.shortform)

if (conf.getboolean("spark.loglineage", false))

// 呼叫sparkcontext,之前初始化建立的dagscheduler的runjob()方法

dagscheduler.runjob(rdd, cleanedfunc, partitions, callsite, resulthandler, localproperties.get)

progressbar.foreach(_.finishall())

rdd.docheckpoint()

}

經過一系列的runjob呼叫,最後走到了具體功能實現的函式,

這個函式中最重要的就是dagscheduler.runjob()方法,接著進入dagscheduler的runjob函式,然後會呼叫submitjob() 函式,進入submitjob函式,dagschedulereventprocessloop 會post jobsubmitted的訊息。

private def

doonreceive

(event: dagschedulerevent): unit = event match catch

第一步:使用觸發job的最後乙個rdd,建立finalstage , 並且將stage漸入dagscheduler內部的記憶體快取區

finalstage

val job = new activejob(jobid, finalstage, callsite, listener, properties)

第二步,用finalstage建立乙個job, 就是說,這個job的最後乙個stage,當然就是我們的finalstage

jobidtoactivejob(jobid) =job
第三部,將job加入記憶體快取中

submitstage

(finalstage)

第四部,使用submitstage方法提交finalstage,這個方法的呼叫,其實會導致第乙個stage提交,並且導致其他所有的stage,都給放入waitingstage佇列裡。接下來我們看一下submitstage函式

// 其實就是stage劃分演算法的入口

// 但是,stage的劃分,其實就是由submitstage方法與getmissingparentstages方法共同組成的

private def submitstage(stage: stage) else

// 並且將當前stage,放入waitingstage等待執行的stage的佇列中

waitingstages += stage}}

} else

}

其中比較重要的函式是getmissingparentstages,進入函式內部

// 獲取某個stage的父stage

// 對乙個stage,如果它的最後乙個rdd的所有依賴都是窄依賴,那麼就不會建立任何新的stage

// 但是,只要發現這個stage的rdd寬依賴了某個rdd,那麼就用寬依賴的那個rdd,建立乙個新的stage

// 然後立即將新的stage返回

private

def getmissingparentstages(stage: stage): list[stage] =

// 如果是窄依賴,那麼將依賴的rdd放入棧

case narrowdep: narrowdependency[_] =>

waitingforvisit.push(narrowdep.rdd)}}

}}

}// 首先往棧中推入了stage最後乙個rdd

waitingforvisit.push(stage.rdd)

while (waitingforvisit.nonempty)

missing.tolist

}

ogg mysql的原理 OGG原理

ogg的資料整合技術實施主要含3程序 資料抽取程序 傳輸程序 應用程序 2個檔案 源資料庫 目標資料庫 1.出庫 投遞 入庫 啟動ogg程序 2.資料庫啟動歸檔模式sqlplus assysdbaarchiveloglist 3.建立gg使用者 4.oracle配置增量日誌 alterdatabas...

mvcc原理 Innodb的MVCC原理

該文章是 innodb的mvcc簡介 中的細節作出解釋。在mvcc出現之前的資料庫,為了實現一致性讀,如sqlserver,db2均採用鎖定讀技術,寫操作往往會阻塞讀操作,導致資料庫併發效能不高。oracle與postgre相繼推出自己的多版本併發控制技術,這一技術的核心是在發生讀寫衝突時候,讀操作...

ogg mysql的原理 OGG工作原理

一.goldengate介紹 ogg 是一種基於日誌的結構化資料複製軟體 ogg 能夠實現大量交易資料的實時捕捉,變換和投遞,實現源資料庫與目標資料庫的資料同步,保持最少10ms的資料延遲 二.工作原理 三.相關元件 1.manager 負責ogg 整體的監控和管理 1 trail檔案的生成和刪除 ...