Map Reduce過程概述

2021-06-02 05:23:39 字數 3265 閱讀 3004

map-reduce的過程首先是由客戶端提交乙個任務開始的。

提交任務主要是通過jobclient.runjob(jobconf)靜態函式實現的:

public static runningjob runjob(jobconf job) throws ioexception finally finally finally

//建立reduce task

this.reduces = new taskinprogress[numreducetasks];

for (int i = 0; i < numreducetasks; i++) else if (action instanceof committaskaction) else else else finally else else finally finally while (kvfull);

} finally catch (mapbuffertoosmallexception e) else {

combinecollector.setwriter(writer);

combineandspill(kviter, combineinputcounter);

reducetask的run函式如下:

public void run(jobconf job, final taskumbilicalprotocol umbilical)

throws ioexception {

job.setboolean("mapred.skip.on", isskipping());

//對於reduce,則包含三個步驟:拷貝,排序,reduce

if (ismaporreduce()) {

copyphase = getprogress().addphase("copy");

sortphase  = getprogress().addphase("sort");

reducephase = getprogress().addphase("reduce");

startcommunicationthread(umbilical);

final reporter reporter = getreporter(umbilical);

initialize(job, reporter);

//copy階段,主要使用reducecopier的fetchoutputs函式獲得map的輸出。建立多個執行緒mapoutputcopier,其中copyoutput進行拷貝。

boolean islocal = "local".equals(job.get("mapred.job.tracker", "local"));

if (!islocal) {

reducecopier = new reducecopier(umbilical, job);

if (!reducecopier.fetchoutputs()) {

copyphase.complete();

//sort階段,將得到的map輸出合併,直到檔案數小於io.sort.factor時停止,返回乙個iterator用於訪問key-value

setphase(taskstatus.phase.sort);

statusupdate(umbilical);

final filesystem rfs = filesystem.getlocal(job).getraw();

rawkeyvalueiterator riter = islocal

? merger.merge(job, rfs, job.getmapoutputkeyclass(),

job.getmapoutputvalueclass(), codec, getmapfiles(rfs, true),

!conf.getkeepfailedtaskfiles(), job.getint("io.sort.factor", 100),

new path(gettaskid().tostring()), job.getoutputkeycomparator(),

reporter)

: reducecopier.createkviterator(job, rfs, reporter);

mapoutputfilesondisk.clear();

sortphase.complete();

//reduce階段

setphase(taskstatus.phase.reduce);

reducer reducer = reflectionutils.newinstance(job.getreducerclass(), job);

class keyclass = job.getmapoutputkeyclass();

class valclass = job.getmapoutputvalueclass();

reducevaluesiterator values = isskipping() ?

new skippingreducevaluesiterator(riter,

job.getoutputvaluegroupingcomparator(), keyclass, valclass,

job, reporter, umbilical) :

new reducevaluesiterator(riter,

job.getoutputvaluegroupingcomparator(), keyclass, valclass,

job, reporter);

//逐個讀出key-value list,然後呼叫reducer的reduce函式

while (values.more()) {

reduceinputkeycounter.increment(1);

reducer.reduce(values.getkey(), values, collector, reporter);

values.nextkey();

values.informreduceprogress();

reducer.close();

out.close(reporter);

done(umbilical);

map-reduce的過程總結如下圖:

mapreduce文件概述

1.mapreduce 教程 文件簡介 這個文件描述所有使用者認識hadoop mapreduce 框架和服務 英文 2.mapreduce命令指南 文件作用 所有的mapreduce命令通過 bin mapred指令碼呼叫。執行mapred指令碼沒有任何引數列印所有命令的描述。英文 3.遷移從ha...

MapReduce 程式設計模型概述

可以帶著下面問題來閱讀 mapreduce的過程都包含什麼操作?map處理完後,tasktracer會完成什麼任務?ruducer的作用是什麼?map中經過誰的處理之後,變為reduce輸入?1.首先,我們能確定我們有乙份輸入,而且他的資料量會很大 2.通過split之後,他變成了若干的分片,每個分...

MapReduce 程式設計模型概述

mapreduce 程式設計模型給出了其分布式程式設計方法,共分 5 個步驟 1 迭代 iteration 遍歷輸入資料,並將之解析成 key value 對。2 將輸入 key value 對對映 map 成另外一些 key value 對。3 依據 key 對中間資料進行分組 grouping ...