Spark的任務排程流程

2021-10-07 06:13:35 字數 2251 閱讀 5569

client 提交應用,master節點啟動driver

sparkcontext向clustermanager申請executor資源,worker會先例項化executorrunner物件,在executorrunner啟動中會建立程序生成器processbuilder,然後由該生成器建立coarsegrainedexecutorbackend物件。

spark應用程式中會有各種轉換操作,會通過行動操作觸發job。job提交之後,會依據rdd之間的依賴關係構建dag圖。dag圖構建好之後,會交給dagscheduler進行解析。

dagscheduler是面向排程階段的高層次排程器。他會把dag拆分成相互依賴的排程階段(stage),stage是以rdd的依賴是否為寬依賴。當遇到寬依賴,就劃分為新的排程階段,每個排程階段包含乙個或者多個任務,這些任務形成任務集(task set)。dagscheduler會記錄哪些rdd被存入磁碟等物化操作,同時還會尋求任務的最優化排程(資料本地性)。dagscheduler會監控排程階段的執行過程,如果某個階段執行失敗,就會重新提交該階段。

dagscheduler會將taskset提交給taskscheduler。每個taskscheduler只為乙個sparkcontext服務,taskscheduler接收來自dagscheduler傳送來的任務集,taskscheduler接收到任務集後,會把任務集中以任務的形式乙個個分發到集群worker節點的executor中執行。如果任務執行失敗,taskscheduler要負責重試。如果某個任務一直執行不完,就可能啟動多個節點執行同乙個任務。

worker中的executor收到taskscheduler傳送過來的任務後,以多執行緒的方式執行。每個執行緒負責乙個任務。任務結束後需要返回給taskscheduler,不同型別的任務,返回的方式也不同。shufflemap task 返回的是乙個map status物件,而不是結果本身。result task會返回結果。

在返回結果時,對於executor的計算結果

1. 生成結果大小在(∞,

1gb)

(\infty, 1gb)

(∞,1gb

):直接丟棄。該配置項可以通過spark.driver.maxresultsize進行設定

2. 生成結果大小在(1g

b,

128mb−

200kb)

(1gb, 128mb-200kb)

(1gb,1

28mb

−200

kb):會把結果所在的taskid儲存至blockmanager中,然後將該編號通過netty傳輸給driver終端點。該閾值是netty框架的傳輸最大值spark.akka.framesize(預設是128mb)和netty的預留空間reservedsizebytes(200kb)的差值。

3. 生成結果大小在(

128mb−

200kb,

0)

(128mb-200kb, 0)

(128mb

−200

kb,0

):通過netty直接將結果傳送到driver終端點。

同時,taskrunner將任務的執行結果傳送給driverendpoint。該終端點會轉給taskschedulerimpl的stateupdate進行處理。

如果返回狀態是taskstate.finished,那麼呼叫taskresultgetter的enqueuesuccessfultask方法進行處理。如果是indirecttaskresult,就會通過blockid進行獲取:sparkenv.blockmanager.getremotebytes(blockid);如果是directresult,則可以直接獲取結果。

如果返回狀態時taskstate.failed、taskstate.killed或者是taskstate.lost,呼叫taskresultgetter的enqueuefailedtask進行處理。對於taskstate.lost,還需要將其所在的executor標記為failed,並根據更新後的executor重新進行排程。

shufflemap task還會涉及到shuffle過程。

standalone模式下,clustermanager即為master。在yarn下,clustermanager為資源管理器

driver program可以在master上執行,此時driver就在master節點上。如果是yarn集群,那麼driver可能被排程到worker node上執行。為了防止driver和executor間通訊過慢,一般原則上要使它們分布在同乙個區域網中

result task過後,若作業已完成,則標記已完成。

Spark作業排程流程

spark首先會對job進行一系列的rdd轉換操作,並通過rdd之間的依賴關係構建dag 有向無環圖 然後根據rdd依賴關係將rdd劃分到不同的stage中,每個stage按照partition的數量建立多個task,最後將這些task提交到集群的work節點上執行。具體流程如下圖所示 通過rdd構...

spark資源排程和任務排程

資源排程 1 executor預設在集群中分散啟動,可通過引數配置集中在某個work啟動,不過分散啟動有利於資料本地化。2 如果spark submit提交任務時,如果不指定 executor cores,則spark會在每個work中啟動乙個executor並消耗掉work中的所有core和1g的...

Spark的執行基本流程以及任務排程機制

cluster 模式用於監控和排程的 driver 模組啟動在 yarn 集群中執行,一般用於生產環境當中。excutor 程序啟動後會向 driver 進行反向註冊 內部通訊時 excutorbackend 向 excutor 全部註冊完成後driver 開始執行 main 函式 之後執行到 ac...