SparkCore stage劃分演算法原始碼分析

2021-09-26 18:46:42 字數 4585 閱讀 3156

在之前的文章中,已經分析了stage的劃分演算法,這裡我們到原始碼裡面去看劃分演算法是怎麼實現的。

首先找到提交job的入口(從action操作開始,找到action操作的runjob -> dagscheduler.runjob -> submitjob -> eventprocessloop.jobsubmitted -> handlejobsubmitted)handlejobsubmitted()這個方法,下面我們具體分析原始碼

private

[scheduler] def handlejobsubmitted

(jobid: int,

finalrdd: rdd[_]

, func:

(taskcontext, iterator[_])=

> _,

partitions: array[int]

, callsite: callsite,

listener: joblistener,

properties: properties)

catch

// 用finalstage建立乙個job,裡面封裝了job的一些資訊(比如partition的數量,resultstage和shufflemapstage是不一樣的,這裡在效能調優的時候再講)

val job =

newactivejob

(jobid, finalstage, callsite, listener, properties)

// 清除rdd快取

clearcachelocs()

loginfo

("got job %s (%s) with %d output partitions"

.format

( job.jobid, callsite.shortform, partitions.length)

)loginfo

("final stage: "

+ finalstage +

" ("

+ finalstage.name +

")")

loginfo

("parents of final stage: "

+ finalstage.parents)

loginfo

("missing parents: "

+getmissingparentstages

(finalstage)

) val jobsubmissiontime = clock.

gettimemillis()

// 將job加入記憶體快取中

jobidtoactivejob

(jobid)

= job

activejobs += job

finalstage.

setactivejob

(job)

val stageids =

jobidtostageids

(jobid)

.toarray

val stageinfos = stageids.

flatmap

(id =

> stageidtostage.

get(id)

.map

(_.latestinfo)

) listenerbus.

post

(sparklistenerjobstart

(job.jobid, jobsubmissiontime, stageinfos, properties)

)// 提交finalstage

// 這個方法會導致第乙個stage被提交,並且其他stage,都放入了waitingstages裡了。

submitstage

(finalstage)

// 提交完第乙個stage0後,剩餘的stage,通過這個函式提交

submitwaitingstages()

}

上面的**中,首先使用觸發job的最後乙個rdd,建立乙個finalstage,這個stage是resultstage(乙個job裡面只有最後乙個stage是resultstage,其餘的都是shufflemapstage),然後建立job(裡面包含了job的一些資訊),並將job的相關資訊放入快取中,接著就建立了比較重要的submitstage(finalstage)方法,這個方法裡面就包含了stage的劃分和提交;而submitwaitingstages()則是提交剩餘的stage,下面我們分析一下submitstage(finalstage)方法。

private def submitstage

(stage: stage)

else

// 並且將當前stage,放入waitingstages等待佇列中

waitingstages += stage

}}}else

}

下面簡單說是如何進行stage的劃分的,看注釋,首先就是根據當前這個stage找到它的父stage,假如父stage的rdd與當前stage的rdd是寬依賴的關係,那麼就用這個寬依賴的rdd建立乙個shufflemapstage,並返回;假如不存在寬依賴,那麼就一直遍歷下去,直到第乙個rdd為止。我們看一下getmissingparentstages()方法。

private def getmissingparentstages

(stage: stage)

: list[stage]

=// 如果是窄依賴,那麼將rdd放入棧中

case narrowdep: narrowdependency[_]

=>

waitingforvisit.

push

(narrowdep.rdd)}}

}}}// 首先往棧中推入stage最後的乙個rdd

waitingforvisit.

push

(stage.rdd)

// 進行while迴圈

while

(waitingforvisit.nonempty)

missing.tolist

}

可以看到這個方法也是乙個遞迴呼叫,為了防止棧溢位,使用了乙個stack結構waitingforvisit,我們看內部函式visit()方法,它會遍歷當前rdd的依賴,假如存在shuffle依賴,那麼就建立乙個shufflemapstage,並返回,否則就一直執行下去,直到沒有父rdd為止,這也就意味著整個job只建立了乙個stage(resultstage)。

下面接著看submitstage的原始碼,通過getmissingparentstages()獲取當前stage的父stage,假如存在,就執行到下面這塊**,這塊**就是stage劃分演算法的推動者:

if

(missing.isempty)

else

// 並且將當前stage,放入waitingstages等待佇列中

waitingstages += stage

}

如果存在父stage,那麼遍歷父stage,並遞迴呼叫submitstage();我們以下面這幅經典的圖來說明:

如上面圖所示,首先以最後乙個rddg建立finalstage(stage3),接著通過**getmissingparentstages()**找它的父rdd,它有兩個父rdd,分別是rddf和rddb。先看rddb,它與rddg是窄依賴,接著遍歷rddb的父rdd,也即rdda,它們之間是groupbykey操作(發生了shuffle),是寬依賴,因此建立乙個stage1,這個遍歷結束;接著看rddf,它與rddg之間的操作是join,也發生了shuffle操作,因此建立了stage2;在遍歷完兩個父rdd之後,就返回getmissingparentstages()函式。這時候missing列表裡面包含了stage1和stage2。

下面判斷missing是否為空,由於包含了stage1和stage2,因此不為空,就接著遍歷。先遍歷stage1,由於stage1沒有依賴,因此它的missing是空,那麼這裡就呼叫submitmissingtasks()提交stage1;接著遍歷到stage2,stage2的rddf的父rdd中沒有寬依賴,因此它的missing列表也為空,提交stage2。

接著執行到waitingstages,它將finalstage,也就stage3加入這個等待佇列中。到這裡整個submitstage()就執行完成,只有最後乙個stage被加入了等待佇列中。

submitstage()執行完成之後,接著執行submitwaitingstages(),提交加入等待佇列的stage。

整個stage的劃分以及提交就結束了。以上就是stage的劃分以及提交過程,主要分析了一下流程,至於細節這塊,以後再慢慢研究,更新部落格。

總結一下,stage的劃分是以shuffle為界,也即寬依賴,如果rdd之間發生了shuffle,那麼就會以shuffle為界建立新的stage,依次內推。而stage的提交是遞迴提交,最先建立的stage,會最後提交,這剛好符合rdd的處理流程的先後順序。

IP位址歸劃

不管是學習網路還是上網,ip位址都是出現頻率非常高的詞。windows系統中設定ip位址的介面如圖1所示,圖中出現了ip位址 子網掩碼 預設閘道器和dns伺服器這幾個需要設定的地方,只有正確設定,網路才能通,那這些名詞都是什麼意思呢?學習ip位址的相關知識時還會遇到網路位址 廣播位址 子網等概念,這...

Ubuntu 劃詞翻譯

在 ubuntu 下可以自己寫指令碼實現乙個簡陋的版本。步驟如下 然後把以下 複製進乙個 notify translate.sh 檔案中,usr bin env bash need installed.need xsel or xclip installed.se xsel b n o tr n t...

劃重點 Python xlrd簡介

import xlrd data xlrd.open workbook r c users asus desktop 重新開始 python獲取excel資料 user1.xlsx print data.sheet names 獲取excel檔案所有sheet名字 table data.sheet ...