MapReduce工作機制(二 Map端流程)

2021-08-22 19:16:18 字數 2874 閱讀 7800

四種map task:

job-setup task:作業執行時啟動的第乙個任務

job-cleanup task:作業執行時啟動的最後乙個任務

task-cleanup task:任務失敗或是被殺死後用於清理已寫入臨時目錄中資料的任務

map task: 處理資料,輸出結果存到本地磁碟

read階段

maptask通過使用者編寫的recordreader,從輸入inputsplit中解析出乙個個key、value

map階段

將解析出的key、value交給使用者編寫的map()函式處理,產生一系列新的key、value

collect階段

在使用者編寫的map()函式中,當資料處理完成後,一般會呼叫outputcollector.collect()輸出結果。

該函式內部會呼叫partitioner產生key、value分片,並寫入乙個環形記憶體緩衝區中。

spill階段

當環形緩衝區滿後,mapreduce會將資料寫到本地磁碟上,生成乙個臨時檔案。

資料寫入磁碟之前,先要對資料進行一次本地排序,並在必要時對資料進行合併壓縮等操作。

combine階段

對所有的臨時檔案進行合併,以確保最終只生成乙個檔案。

每個map任務都有乙個環形緩衝區,用於收集map結果,減少磁碟io的影響。

mapoutputbuffer,寫入環形緩衝區directmapoutputcollector-沒有reducetask時呼叫,直接寫入hdfs
mapoutputbuffer採用二級索引結構,涉及三個環形記憶體緩衝區,由io.sort.mb=100m控制

1. kvoffsets-鍵值對索引的偏移量(在kvindice中進行查詢)

2. kvindices-分割槽資訊,鍵值對索引(在kvbuffer中進行查詢)

3. kvbuffer-鍵值對具體的值

單生產者消費者模型

生產者->mapoutputbuffer.collect(),mapoutputbuffer.buffer.write()

消費者->spillthread

//生產者部分主要偽**

//取得下乙個可寫入的位置

spilllock.lock();

if(緩衝區使用率達到閾值)

if(緩衝區滿)

spilllock.lock();

//將資料寫入緩衝區

溢寫過程

以kvbuffer為例,令bufend=bufindex將緩衝區[bufstart,bufend)(留了最後乙個index)之間的資料寫出到磁碟。

達到soft buffer limit時,將會啟動spill執行緒,spill執行緒以bufstart為讀指標向bufend移動對磁碟進行寫入,此時map task可寫入;

當達到hard buffer limit時,緩衝區將會阻塞直到spill執行緒執行完畢。完成後,maptask才可以繼續向kvbuffer寫入資料。

最後bufstart到達bufend位置,等待新一輪溢寫。

不再將索引和記錄分放到不同的環形緩衝區中,而是讓它們共用乙個環形緩衝區。

引入乙個新指標equator,該指標界定了索引和資料的共同起始位置。從位置開始,索引和資料分別沿著相反的方向增長記憶體使用空間。(使得io.sort.record.percent可以被捨棄,減少溢寫次數)

maptask端會產生一次溢寫過程,reduce端也會產生一次溢寫過程。

sortandspill()

利用快速排序對緩衝區的區間內資料進行排序,先按分割槽編號partition排序,再按key排序(group by partition order by partition,key

按照分割槽編號從小到大將資料寫入臨時檔案output/spilln.out(n表示當前溢寫次數),如果設定了combiner,則寫入檔案之前,還會對每個分割槽進行一次聚集操作。

將分割槽資料的元資料寫到記憶體索引資料結構spillrecord中,元資料報括每個分割槽在spilln.out中的偏移量,壓縮前後資料大小。若索引大小超過1m,則寫到output/spilln.out.index中。

多輪遞迴合併

合併io.sort.factor=100個檔案,產生新檔案

將新檔案加入待合併列表中

對檔案列表重新排序

重複上述步驟

MapReduce工作機制

呼叫job的submit 方法執行mapreduce作業,也可以呼叫waitforcompletion 它用於提交以前沒有提交的作業並等待它的完成。job的submit 方法建立乙個內部的jobsummiter例項,並呼叫submitjobinternal 方法。提交作業後,waitforcompl...

MapReduce工作機制總結

總結從mapreduce程式中的jobclient.runjob conf 開始,給出了mapreduce執行的流程圖 如下 並分析了流程圖中的四個核心實體,結合實際 介紹了mapreduce執行的詳細流程。mapreduce的執行流程簡單概括如下 介紹完mapreduce作業的詳細流程後,還重點介...

MapReduce工作機制總結

總結從mapreduce程式中的jobclient.runjob conf 開始,給出了mapreduce執行的流程圖 如下 並分析了流程圖中的四個核心實體,結合實際 介紹了mapreduce執行的詳細流程。mapreduce的執行流程簡單概括如下 介紹完mapreduce作業的詳細流程後,還重點介...