MapTask與ReduceTask深入分析與調優

2021-06-16 19:58:50 字數 3876 閱讀 6663

當map task開始運算,並產生中間資料時,其產生的中間結果並非直接就簡單的寫入磁碟。這中間的過程比較複雜,並且利用到了記憶體buffer來進行已經產生的 部分結果的快取,並在記憶體buffer中進行一些預排序來優化整個map的效能。如上圖所示,每乙個map都會對應存在乙個記憶體 buffer(mapoutputbuffer,即上圖的buffer in memory),map會將已經產生的部分結果先寫入到該buffer中,這個buffer預設是100mb大小,但是這個大小是可以根據job提交時的 引數設定來調整的,該引數即為:io.sort.mb。當map的產生資料非常大時,並且把io.sort.mb調 大,那麼map在整個計算過程中spill的次數就勢必會降低,map task對磁碟的操作就會變少,如果map tasks的瓶頸在磁碟上,這樣調整就會大大提高map的計算效能。map做sort和spill的記憶體結構如下如所示:

map在執行過程中,不停的向該buffer中寫入已有的計算結果,但是該buffer並不一定能將全部的map輸出快取下來,當map輸出超出一 定閾值(比如100m),那麼map就必須將該buffer中的資料寫入到磁碟中去,這個過程在mapreduce中叫做spill。map並不是要等到 將該buffer全部寫滿時才進行spill,因為如果全部寫滿了再去寫spill,勢必會造成map的計算部分等待buffer釋放空間的情況。所 以,map其實是當buffer被寫滿到一定程度(比如80%)時,就開始進行spill。這個閾值也是由乙個job的配置引數來控制,即io.sort.spill.percent,預設為0.80或80%。這個引數同樣也是影響spill頻繁程度,進而影響map task執行週期對磁碟的讀寫頻率的。但非特殊情況下,通常不需要人為的調整。調整io.sort.mb對使用者來說更加方便。

當map task的計算部分全部完成後,如果map有輸出,就會生成乙個或者多個spill檔案,這些檔案就是map的輸出結果。map在正常退出之前,需要將這 些spill合併(merge)成乙個,所以map在結束之前還有乙個merge的過程。merge的過程中,有乙個引數可以調整這個過程的行為,該引數 為:io.sort.factor。該引數預設為10。它表示當merge spill檔案時,最多能有多少並行的stream向merge檔案中寫入。比如如果map產生的資料非常的大,產生的spill檔案大於10,而 io.sort.factor使用的是預設的10,那麼當map計算完成做merge時,就沒有辦法一次將所有的spill檔案merge成乙個,而是會 分多次,每次最多10個stream。這也就是說,當map的中間結果非常大,調大io.sort.factor,有利於減少merge次數,進而減少 map對磁碟的讀寫頻率,有可能達到優化作業的目的。

當job指定了combiner的時候,我們都知道map介紹後會在map端根據combiner定義的函式將map結果進行合併。執行combiner函式的時機有可能會是merge完成之前,或者之後,這個時機可以由乙個引數控制,即min.num.spill.for.combine(default 3),當job中設定了combiner,並且spill數最少有3個的時候,那麼combiner函式就會在merge產生結果檔案之前執行。通過這樣 的方式,就可以在spill非常多需要merge,並且很多資料需要做conbine的時候,減少寫入到磁碟檔案的資料數量,同樣是為了減少對磁碟的讀寫 頻率,有可能達到優化作業的目的。

減少中間結果讀寫進出磁碟的方法不止這些,還有就是壓縮。也就是說map的中間,無論是spill的時候,還是最後merge產生的結果檔案,都是 可以壓縮的。壓縮的好處在於,通過壓縮減少寫入讀出磁碟的資料量。對中間結果非常大,磁碟速度成為map執行瓶頸的job,尤其有用。控制map中間結果 是否使用壓縮的引數為:mapred.compress.map.output(true/false)。將這個引數 設定為true時,那麼map在寫中間結果時,就會將資料壓縮後再寫入磁碟,讀結果時也會採用先解壓後讀取資料。這樣做的後果就是:寫入磁碟的中間結果數 據量會變少,但是cpu會消耗一些用來壓縮和解壓。所以這種方式通常適合job中間結果非常大,瓶頸不在cpu,而是在磁碟的讀寫的情況。說的直白一些就 是用cpu換io。根據觀察,通常大部分的作業cpu都不是瓶頸,除非運算邏輯異常複雜。所以對中間結果採用壓縮通常來說是有收益的。以下是乙個 wordcount中間結果採用壓縮和不採用壓縮產生的map中間結果本地磁碟讀寫的資料量對比:

map中間結果不壓縮:

map中間結果壓縮:

可以看出,同樣的job,同樣的資料,在採用壓縮的情況下,map中間結果能縮小將近10倍,如果map的瓶頸在磁碟,那麼job的效能提公升將會非常可觀。

當採用map中間結果壓縮的情況下,使用者還可以選擇壓縮時採用哪種壓縮格式進行壓縮,現在hadoop支援的壓縮格式 有:gzipcodec,lzocodec,bzip2codec,lzmacodec等壓縮格式。通常來說,想要達到比較平衡的cpu和磁碟壓縮 比,lzocodec比較適合。但也要取決於job的具體情況。使用者若想要自行選擇中間結果的壓縮演算法,可以設定配置引數:mapred.map.output.compression.codec=org.apache.hadoop.io.compress.defaultcodec或者其他使用者自行選擇的壓縮方式。

選項

型別

預設值

描述

io.sort.mb

int100

快取map中間結果的buffer大小(in mb)

io.sort.record.percent

float

0.05

io.sort.mb中用來儲存map output記錄邊界的百分比,其他快取用來儲存資料

io.sort.spill.percent

float

0.80

map開始做spill操作的閾值

io.sort.factor

int10

做merge操作時同時操作的stream數上限。

min.num.spill.for.combine

int3

combiner函式執行的最小spill數

mapred.compress.map.output

boolean

false

map中間結果是否採用壓縮

mapred.map.output.compression.codec

class name

org.apache.hadoop.io.

compress.defaultcodec

map中間結果的壓縮格式

選項

型別

預設值

描述

mapred.reduce.parallel.copies

int5

mapred.reduce.copy.backoff

int300

io.sort.factor

int10

同上mapred.job.shuffle.input.buffer.percent

float

0.7用來快取shuffle資料的reduce task heap百分比

mapred.job.shuffle.merge.percent

float

0.66

快取的記憶體中多少百分比後開始做merge操作

mapred.job.reduce.input.buffer.percent

float

0.0sort完成後reduce計算階段用來快取資料的百分比

Hadoop資料切片與MapTask並行度決定機制

資料塊 block 是hdfs 物理上把資料分成一塊一塊。資料切片 資料切片只是在邏輯上對輸入進行分片,並不會在磁碟上將其切分成片進行儲存。假設切片大小設定成100m 1 乙個job的map階段並行度由客戶端在提交job時的切片數決定 2 每乙個split切片分配乙個maptask並行例項處理 3 ...

MapTask工作機制

階段 maptask 通過使用者編寫的 recordreader 從輸入 inputsplit 中解析出乙個個 key value。2 map 階段 該節點主要是將解析出的 key value 交給使用者編寫 map 函式處理,並產生一系列新的 key value。3 collect 收集階段 在使...

MapTask工作機制

1 並行度決定機制 1 問題引出 maptask的並行度決定map階段的任務處理併發度,進而影響到整個job的處理速度。那麼,maptask並行任務是否越多越好呢?這種說法是不對的,maptask多的話,消耗的資源就多 cpu,記憶體等等 maptask少的話,執行效率就會變低。2 maptask並...