job觸發流程原理剖析與原始碼分析

2021-08-09 08:02:40 字數 1263 閱讀 9310

以wordcount流程解析

val lines = sc.textfile()

def

textfile

( path: string,

minpartitions: int = defaultminpartitions): rdd[string] = withscope

val words = lines.flatmap(line => line.split(」 「)) val pairs =

words.map(word => (word, 1))

// 其實rdd裡是沒有reducebykey的,因此對rdd呼叫reducebykey()方法的時候,會觸發scala的隱式轉換;此時就會在作用域內,尋找隱式轉換,會在rdd中找到rddtopairrddfunctions()隱式轉換,然後將rdd轉換為pairrddfunctions。

// 接著會呼叫pairrddfunctions中的reducebykey()方法

val counts = pairs.reducebykey(_ + _)

counts.foreach(count => println(count._1 + 「: 」 + count._2))

def

runjob[t, u: classtag](

rdd: rdd[t],

func: (taskcontext, iterator[t]) => u,

partitions: seq[int],

resulthandler: (int, u) => unit): unit =

val callsite = getcallsite

val cleanedfunc = clean(func)

loginfo("starting job: " + callsite.shortform)

if (conf.getboolean("spark.loglineage", false))

//呼叫sparkcontext之前初始化建立的dagscheduler的runjob的方法。

dagscheduler.runjob(rdd, cleanedfunc, partitions, callsite, resulthandler, localproperties.get)

progressbar.foreach(_.finishall())

rdd.docheckpoint()

}

job 觸發原始碼分析

基礎 wordcount.scala 小應用 1 val lines sc.textfile 2 val words lines.flatmap line line.split 3 val pairs words.map word word,1 4 val counts pairs.reduceby...

Job提交流程原始碼

1.開始提交程式 boolean result job.waitforcompletion true 2.當job執行狀態為為define,提交job if state jobstate.define 3.確保job狀態 ensurestate jobstate.define 4.相容新舊api s...

Job提交流程原始碼解析

1.job.waitforcompletion true 在driver中提交job 1 sumbit 提交 1 connect 1 return new cluster getconfiguration initialize jobtrackaddr,conf 通過yarnclientprotoc...