Hadoop框架 job提交流程 本地模式

2022-07-10 06:15:10 字數 3115 閱讀 9421

/**

* 主要是將任務提交到集群中去並等待完成

* boolean verbose:是否將進度列印給使用者看

* return 任務成功返回true

*/public boolean waitforcompletion(boolean verbose

) throws ioexception, interruptedexception,

classnotfoundexception

if (verbose) else catch (interruptedexception ie) }}

return issuccessful();

}

public void submit() 

throws ioexception, interruptedexception, classnotfoundexception

});state = jobstate.running;

log.info("the url to track the job: " + gettrackingurl());

}

private synchronized void connect()

throws ioexception, interruptedexception, classnotfoundexception

});}

} /**

* public cluster(configuration conf)

* 主要作用是獲取你在driver中configuration配置的檔案資訊,沒有配置使用預設

*/public cluster(configuration conf) throws ioexception

/*** 呼叫initialize(jobtrackaddr, conf)返回的值

* jobtrackaddr:local還是yarn

* conf:配置的資訊

*/public cluster(inetsocketaddress jobtrackaddr, configuration conf)

throws ioexception

/*** ...表示該方法中的校驗**

* jobtrackaddr:狀態

* conf:配置資訊

*/private void initialize(inetsocketaddress jobtrackaddr, configuration conf)throws ioexception else

...} /**

* yarn是否和driver中配置的conf.set("mapreduce.framework.name","yarn");

* 相同,相同的話就返回乙個yarnrunner物件,沒有配置的話就返回乙個null,而後進行

* 第二次判斷

*/public clientprotocol create(configuration conf) throws ioexception

/*** 之前返回為null的話就進行第二次判斷

*/public clientprotocol create(configuration conf) throws ioexception

conf.setint(jobcontext.num_maps, 1);

return new localjobrunner(conf);

} // --> submint() --> return submitter.submitjobinternal(job.this, cluster);

總結:connect方法最終要的地方就是,為我們建立了乙個關鍵的物件localjobrunner物件,這個物件為我們之後提交作業所用,很重要。

jobstatus submitjobinternal(job job, cluster cluster)throws classnotfoundexception, 		interruptedexception, ioexception {

/*校驗檔案輸出路徑是否在driver中配置,如果沒有配置丟擲invalidjobconfexception,如果檔案路徑存在丟擲 filealreadyexception

*/checkspecs(job);

// 獲取conf配置

configuration conf = job.getconfiguration();

addmrframeworktodistributedcache(conf);

path jobstagingarea = jobsubmissionfiles.getstagingdir(cluster, conf);

// 配置校驗資訊我就用...代替了,太多容易把眼睛看花,感興趣的朋友可以用debug邊跳邊看裡面的具體資訊。

...// 建立jobid 也就是8088埠中 任務的id

jobid jobid = submitclient.getnewjobid();

// 獲得jobid設定到job裡

job.setjobid(jobid);

// 將jobstagingarea和jobid拼在一起,拼成乙個提交資訊(配置資訊,切片資訊,jar包)的路徑

path submitjobdir = new path(jobstagingarea, jobid.tostring());

...// 拷貝jar包到集群(本地模式看不到,在向集群提交的時候才能看到jar包)

copyandconfigurefiles(job, submitjobdir);

// 會在submitjobdir目錄下建立乙個job.xml檔案

path submitjobfile = jobsubmissionfiles.getjobconfpath(submitjobdir);

log.debug("creating splits at " + jtfs.makequalified(submitjobdir));

/* 切片,具體怎麼切,再看切片原始碼的時候會提到,在執行完該方法後submitjobdir路徑中會多出split和crc 檔案

*/int maps = writesplits(job, submitjobdir);

Job提交流程原始碼

1.開始提交程式 boolean result job.waitforcompletion true 2.當job執行狀態為為define,提交job if state jobstate.define 3.確保job狀態 ensurestate jobstate.define 4.相容新舊api s...

Job提交流程原始碼解析

1.job.waitforcompletion true 在driver中提交job 1 sumbit 提交 1 connect 1 return new cluster getconfiguration initialize jobtrackaddr,conf 通過yarnclientprotoc...

Spark的Job提交流程以及相關知識

spark提交作業 呼叫action運算元 呼叫 rdd 類的runjob方法 呼叫 sparkcontext 類的 dagscheduler.runjob方法 dagscheduler.handlejobsubmitted 方法 生成 finalstage finalstage createres...