Spark (原始碼) 總結 雜

2021-08-19 19:38:26 字數 2433 閱讀 3565

1.spark-submit 指令碼,在指令碼裡呼叫了org.apache.spark.deloy.sparksubmit 類

2.sparksubmit.scala  main方法

override def main(args: array[string]): unit = 

}}

private def submit(args: sparksubmitarguments): unit =  catch  was not a rest server. " +

"falling back to legacy submission gateway instead.")

args.userest = false

submit(args)

}// in all other modes, just run the main class as prepared

} else

}

4.sparksubmit.scala  runmain()

該方法主要是 確定mainclass,使用classfromname,獲取類物件,然後採用對映呼叫main方法

5.client  main方法

在main方法中,new clientendpoint 物件建立過程中,會向master傳送registerdriver訊息。

object client 

// scalastyle:on println

val conf = new sparkconf()

val driverargs = new clientarguments(args)

if (!conf.contains("spark.rpc.asktimeout"))

logger.getrootlogger.setlevel(driverargs.loglevel)

val rpcenv =

rpcenv.create("driverclient", utils.localhostname(), 0, conf, new securitymanager(conf))

val masterendpoints = driverargs.masters.map(rpcaddress.fromsparkurl).

map(rpcenv.setupendpointref(_, master.endpoint_name))

rpcenv.setupendpoint("client", new clientendpoint(rpcenv, driverargs, masterendpoints, conf))

rpcenv.awaittermination()

}}

6. 之後就是driver 啟動,sparkcontext初始化的過程了

2.worker 在啟動executor的時候,先new executorrunner,runner不是程序也不是執行緒,只是乙個物件,在runner.start()中,使用執行緒非同步 啟動了乙個執行緒,該執行緒用於啟動executorbackend.

3.executor執行結束之後,使用backend.updatestatus() 向schedulerbackend 傳送訊息,schedulerbackend 的receive中,會把結果交給taskschedular進行處理,然後按照處理的結果在進行相關操作,比如,如果執行成功不需要重試,那麼schedulerbackend,就會把cores加到freecores中,然後呼叫makeoffers() 重新進行task的資源分配,看有沒有滿足資源條件的task可以執行。

job提交之後,呼叫runjob,到最終task被分配到executor之前所涉及到的排程相關

1.首先涉及到的排程是job  stage 劃分和提交過程,也就是submitstage方法,所有又依賴的satge,也就是說有父satge的子stage,子stage呼叫submitsatge的時候,會將子satge新增到watingsatge佇列中,換句話說,如果乙個stage有父依賴,那麼他就不能被subnitmissingsatge  submit,會被加入到watingsatge,只有沒有依賴的satge才會被提交。

沒有依賴的stage提交,會將satge轉換成tasksetmanager,提交給taskscheduar

2.taskschedular在初始化的時候,方法位於sparkcontext中,初始化的時候初始化了乙個佇列,這個佇列有兩個選擇:fifo/fair,

tasksetmanager提交給taskschedular的時候就會加入到該佇列中,比如fifo佇列,有兩層排序,一層是根據jobid,jobid越小的優先順序越高,同一job內部,存在第二層排序,stageid,stageid越小的優先順序越高

值得足以的一點就是:stage提交的時候,有依賴,就不會新增到佇列中,會加入到watingsatge中,等待某乙個stage完成之後,會檢查watingsatge提交已經沒有依賴的stage

Spark原始碼分析 Spark整體架構

術語 描述使用者編寫的程式。driver端的sparkcontext sparkconf和執行在executors上使用者編寫的業務邏輯 即map reduce reducebykey等 driver 執行使用者編寫應用程式的main 方法並建立sparkcontext worker 具體執行應用程...

spark原始碼之TaskScheduler解讀

1 spark任務的真正的執行時由action運算元進行乙個觸發,最終呼叫sc.runjob方法,在driver端會初始化2個重要的組建dagscheduler和taskscheduler,a taskscheduler的主要職責 a.1負責將dagscheduler傳送過來的的taskset放入到...

spark原始碼剖析 RDD相關原始碼閱讀筆記

最好的原始碼閱讀方法就是除錯,沒有之一 之前其實有閱讀過rdd相關的原始碼,最近學習過程中發現在之前原本閱讀過的模組中有一些 關節 並沒有打通,所以想通過除錯的方式來更細緻得學習原始碼。本文為編寫測試用例並除錯rdd相關模組的筆記,並沒有列出具體的除錯過程,僅列出結論以做備忘,特別是那些比較容易忽略...