整理 map reduce工作流程

2021-09-30 10:24:32 字數 3256 閱讀 7905

shuffle階段:

shuffle是指從map 產生輸出開始,包括系統執行排序以及傳送map 輸出到reducer 作為輸入的過程。

首先從map 端開始分析。當map 開始產生輸出時,它並不是簡單的把資料寫到磁碟,因為頻繁的磁碟操作會導致效能嚴重下降。它的處理過程更複雜,資料首先是寫到記憶體中的乙個緩衝區,並做了一些預排序,以提公升效率。

每個map 任務都有乙個用來寫入輸出資料的迴圈記憶體緩衝區。這個緩衝區預設大小是100mb,可以通過io.sort.mb 屬性來設定具體大小。當緩衝區中的資料量達到乙個特定閥值時,系統將會啟動乙個後台執行緒把緩衝區中的內容spill 到磁碟。在spill 過程中,map 的輸出將會繼續寫入到緩衝區,但如果緩衝區已滿,map 就會被阻塞直到spill 完成。spill 執行緒在把緩衝區的資料寫到磁碟前,會對它進行乙個二次快速排序,首先根據資料所屬的partition 排序,然後每個partition 中再按key 排序。輸出包括乙個索引檔案和資料檔案。如果設定了combiner,將在排序輸出的基礎上執行。combiner 就是乙個mini reducer,它在執行map 任務的節點本身執行,先對map 的輸出做一次簡單reduce,使得map 的輸出更緊湊,更少的資料會被寫入磁碟和傳送到reducer。

(一定要注意區別combinner與spill做的資料寫入磁碟前做的預排序的區別:combinner是對該磁碟資料做的一次reducer,而預排序是按key進行排序而已)spill 檔案儲存在由mapred.local.dir指定的目錄中,map 任務結束後刪除。

每當記憶體中的資料達到spill 閥值的時候,都會產生乙個新的spill 檔案,所以在map任務寫完它的最後乙個輸出記錄時,可能會有多個spill 檔案。在map 任務完成前,所有的spill 檔案將會被歸併排序為乙個索引檔案和資料檔案。這是乙個多路歸併過程,最大歸併路數由io.sort.factor 控制(預設是10)。如果設定了combiner,並且spill檔案的數量至少是3(由min.num.spills.for.combine 屬性控制),那麼combiner 將在輸出檔案被寫入磁碟前執行以壓縮資料。

對寫入到磁碟的資料進行壓縮(這種壓縮同combiner 的壓縮不一樣)通常是乙個很好的方法,因為這樣做使得資料寫入磁碟的速度更快,節省磁碟空間,並減少需要傳送到reducer 的資料量。預設輸出是不被壓縮的, 但可以很簡單的設定mapred.compress.map.output 為true 啟用該功能。壓縮所使用的庫由mapred.map.output.compression.codec 來設定,

當spill 檔案歸併完畢後,map 將刪除所有的臨時spill 檔案,並告知tasktracker 任務已完成。reducers 通過http 來獲取對應的資料。用來傳輸partitions 資料的工作執行緒數由tasktracker.http.threads 控制,這個設定是針對每乙個tasktracker 的,並不是單個map,預設值為40,在執行大作業的大集群上可以增大以提公升資料傳輸速率。

map 的輸出檔案放置在執行map 任務的tasktracker 的本地磁碟上(注意:map 輸出總是寫到本地磁碟,但reduce 輸出不是,一般是寫到hdfs),它是執行reduce 任務的tasktracker 所需要的輸入資料。reduce 任務的輸入資料分布在集群內的多個map 任務的輸出中,map 任務可能會在不同的時間內完成,只要有其中的乙個map 任務完成,reduce 任務就開始拷貝它的輸出。這個階段稱之為拷貝階段。reduce 任務擁有多個拷貝執行緒, 可以並行的獲取map 輸出。可以通過設定mapred.reduce.parallel.copies 來改變執行緒數,預設是5。

reducer 是怎麼知道從哪些tasktrackers 中獲取map 的輸出呢?當map 任務完成之後,會通知它們的父tasktracker,告知狀態更新,然後tasktracker 再轉告jobtracker。這些通知資訊是通過

心跳通訊機制

傳輸的。因此針對乙個特定的作業,jobtracker 知道map 輸出與tasktrackers 的對映關係。reducer 中有乙個執行緒會間歇的向

jobtracker 

如果map 輸出足夠小,它們會被拷貝到reduce tasktracker 的記憶體中;如果緩衝區空間不足,會被拷貝到磁碟上。當記憶體中的緩衝區用量達到一定比例閥值(由mapred.job.shuffle.merge.threshold 控制),或者達到了map 輸出的閥值大小(由mapred.inmem.merge.threshold 控制),緩衝區中的資料將會被歸併然後spill 到磁碟。

拷貝來的資料疊加在磁碟上,有乙個後台執行緒會將它們歸併為更大的排序檔案,這樣做節省了後期歸併的時間。對於經過壓縮的map 輸出,系統會自動把它們解壓到記憶體方便對其執行歸併。

當所有的map 輸出都被拷貝後,reduce 任務進入排序階段(更恰當的說應該是歸併階段,因為排序在map 端就已經完成),這個階段會對所有的map 輸出進行歸併排序,這個工作會重複多次才能完成。

假設這裡有50 個map 輸出(可能有儲存在記憶體中的),並且歸併因子是10(由io.sort.factor 控制,就像map 端的merge 一樣),那最終需要5 次歸併。每次歸併會把10個檔案歸併為乙個,最終生成5 個中間檔案。在這一步之後,系統不再把5 個中間檔案歸併壓縮格式工具演算法副檔名支援分卷是否可分割成乙個,而是排序後直接「喂」給reduce 函式,省去向磁碟寫資料這一步。最終歸併的資料可以是混合資料,既有記憶體上的也有磁碟上的。由於歸併的目的是歸併最少的檔案數目,使得在最後一次歸併時總檔案個數達到歸併因子的數目,所以每次操作所涉及的檔案個數在實際中會更微妙些。譬如,如果有40 個檔案,並不是每次都歸併10 個最終得到4 個檔案,相反第一次只歸併4 個檔案,然後再實現三次歸併,每次10 個,最終得到4 個歸併好的檔案和6 個未歸併的檔案。要注意,這種做法並沒有改變歸併的次數,只是最小化寫入磁碟的資料優化措施,因為最後一次歸併的資料總是直接送到reduce 函式那裡。

在reduce 階段,reduce 函式會作用在排序輸出的每乙個key 上。這個階段的輸出被直接寫到輸出檔案系統,一般是hdfs。在hdfs 中,因為tasktracker 節點也執行著乙個datanode 程序,所以第乙個塊備份會直接寫到本地磁碟。

備註:目前主要有以下幾個壓縮格式:

deflate 無deflate .deflate 不支援不可以

gzip gzip deflate .gz 不支援不可以

zip zip deflate .zip 支援可以

bzip2 bzip2 bzip2 .bz2 不支援可以

lzo lzop lzo .lzo 不支援不可以

文章出處:

MapReduce工作流程

1.流程示意圖 mapreduce詳細工作流程 一 mapreduce詳細工作流程 二 流程詳解 上面是整個mapreduce最全工作流程,但是shuffle過程知識從第7步開始到第16步結束,具體shuffle過程詳解 1 maptask收集我們的map 方法輸出的kv對,放到記憶體緩衝區中 2 ...

詳解MapReduce工作流程

這個階段要完成以下工作 public inte ce inputsplit extends writable 我們看到inputsplit中記錄了原始資料的長度length,而location則有多個 是乙個陣列 location只記錄了主機名,它用於在指派map task的時候,讓map task...

map reduce的工作流程

mapreduce工作流程 wordcount 3.map shuffle 對map結果的key根據reducer的個數進行hash寫入緩衝區 key,value,partition 當緩衝區的大小占用了80 左右,將緩衝區的資料寫入磁碟,並根據partition key進行排序,生成乙個 多個溢寫...