MapTask的工作機制原始碼解析

2021-10-08 15:02:59 字數 2462 閱讀 4513

1. 從job提交流程的(2)--><9> 進去

job job = new job(jobid.downgrade(jobid), jobsubmitdir); 構造真正執行的job , localjobrunnber$job

2. localjobrunnber$job 的run()方法

1) tasksplitmetainfo tasksplitmetainfos =

splitmetainforeader.readsplitmetainfo(jobid, localfs, conf, systemjobdir);

// 讀取job.splitmetainfo

2) int numreducetasks = job.getnumreducetasks(); // 獲取reducetask個數

3) listmaprunnables = getmaptaskrunnables(

tasksplitmetainfos, jobid, mapoutputfiles);

// 根據切片的個數, 建立執行maptask的 maptaskrunnable

4) executorservice mapservice = createmapexecutor(); // 建立執行緒池

5) runtasks(maprunnables, mapservice, "map"); //執行 maptaskrunnable

6) 因為runnable提交給執行緒池執行,接下來會執行maptaskrunnable的run方法。

7) 執行 localjobrunner$job$maptaskrunnable 的run()方法.

(1) maptask map = new maptask(systemjobfile.tostring(), mapid, taskid,

info.getsplitindex(), 1); //建立maptask物件

(2) map.run(localconf, job.this); //執行maptask中的run方法

① org.apache.hadoop.mapreduce.taskattemptcontext taskcontext = jobcontextimpl

③ org.apache.hadoop.mapreduce.inputformatinputformat = textinputformat

④ split = getsplitdetails(new path(splitindex.getsplitlocation()),

splitindex.getstartoffset()); // 重構切片物件

切片物件的資訊 : file:/d:/input/inputword/janeeyre.txt:0+36306679

⑤ org.apache.hadoop.mapreduce.recordreaderinput = maptask$nettrackingrecordreader

⑥ output = new newoutputcollector(taskcontext, job, umbilical, reporter); //構造緩衝區物件

[1] collector = createsortingcollector(job, reporter); //獲取緩衝區物件

maptask$mapoutputbuffer

. collector.init(context); //初始化緩衝區物件

1>>.final float spillper =

job.getfloat(jobcontext.map_sort_spill_percent, (float)0.8);

// 溢寫百分比 0.8

2>>.final int sortmb = job.getint(mrjobconfig.io_sort_mb,

mrjobconfig.default_io_sort_mb);

// 緩衝區大小 100m

3>>.sorter = reflectionutils.newinstance(job.getclass(

mrjobconfig.map_sort_class, quicksort.class,

indexedsorter.class), job);

// 排序物件

// 排序使用的是快排,並且基於索引排序。

4>> . // k/v serialization // kv序列化

5>> . // output counters // 計數器

6>> . // compression // 壓縮

7>> . // combiner // combiner

map(context.getcurrentkey(), context.getcurrentvalue(), context);

context.write(outk,outv);

MapTask工作機制

階段 maptask 通過使用者編寫的 recordreader 從輸入 inputsplit 中解析出乙個個 key value。2 map 階段 該節點主要是將解析出的 key value 交給使用者編寫 map 函式處理,並產生一系列新的 key value。3 collect 收集階段 在使...

MapTask工作機制

1 並行度決定機制 1 問題引出 maptask的並行度決定map階段的任務處理併發度,進而影響到整個job的處理速度。那麼,maptask並行任務是否越多越好呢?這種說法是不對的,maptask多的話,消耗的資源就多 cpu,記憶體等等 maptask少的話,執行效率就會變低。2 maptask並...

MapTask工作機制

maptask並行度決定map階段的任務處理併發度,進而影響job的處理速度 maptask 並行度決定機制 乙個job的map階段並行度 個數 由客戶端提交job時的切片個數決定 乙個job的map階段並行度由客戶端在提交job時決定 每乙個split切片分配乙個maptask 預設 切片大小 b...