MapTask的工作機制

2021-10-09 03:01:17 字數 2769 閱讀 5463

maptask 主要包含四個階段: ①read階段 ②map階段 ③collect階段 ④溢寫階段 ⑤merge階段

maptask工作流程簡述:

public

class

extends

}

collect收集階段,就是將map() 輸出的資料採集,然後寫入到環形緩衝區。

當資料在使用者自己編寫的map()函式中處理完成以後,一般會呼叫outputcollector.collect()輸出結果。在該函式內部,它會將生成的key/value分割槽(呼叫partitioner),並寫入乙個環形記憶體緩衝區中。

環形緩衝區(底層是乙個陣列,左右兩邊同時寫)的預設大小是100m, 左邊存的是元資料,右邊存的是k-v

​ 元資料資訊包括: index partition keystart valstart

1)環形緩衝區:80%以後反向

當寫入資料達到環形緩衝區的80%以後, 會在剩餘的20%的區域找乙個節點,開始反向寫。在反向寫的同時,開始向磁碟溢寫這80%的資料。如果20%部分寫入的執行緒過快,還需要等待溢寫,防止記憶體不夠。

2)在環形緩衝區,這80%的資料會存為乙個檔案,在檔案內部分割槽,且分區內排序(排序方式是快排)所以是檔案分區內有序

3)之後這 80%資料的檔案會溢寫到磁碟中,每個檔案都是分割槽且區內有序。 因為乙個maptask可能會包含多個溢寫檔案所有當所有資料全部溢寫完以後,會對所有檔案進行歸併排序合成乙個檔案(分割槽,且區內有序)

這個過程是:所有溢寫檔案的分割槽合併 在分割槽歸併

merge階段:當所有資料處理完成後,maptask對所有臨時檔案進行一次合併,以確保最終只會生成乙個資料檔案。

因為乙個maptask可能會包含多個溢寫檔案。當所有資料全部溢寫完以後,會對所有檔案進行**歸併排序**合成乙個檔案(分割槽,且區內有序)
​ 當所有資料處理完後,maptask會將所有臨時檔案合併成乙個大檔案,並儲存到檔案output/file.out中,同時生成相應的索引檔案output/file.out.index。

​ 在進行檔案合併過程中,maptask以分割槽為單位進行合併。對於某個分割槽,它將採用多輪遞迴合併的方式。每輪合併mapreduce.task.io.sort.factor(預設10)個檔案,並將產生的檔案重新加入待合併列表中,對檔案排序後,重複以上過程,直到最終得到乙個大檔案。

​ 讓每個maptask最終只生成乙個資料檔案,可避免同時開啟大量檔案和同時讀取大量小檔案產生的隨機讀取帶來的開銷。

​ (1)read階段:maptask通過inputformat獲得的recordreader,從輸入inputsplit中解析出乙個個key/value。

​ (2)map階段:該節點主要是將解析出的key/value交給使用者編寫map()函式處理,並產生一系列新的key/value。

​ (3)collect收集階段:在使用者編寫map()函式中,當資料處理完成後,一般會呼叫outputcollector.collect()輸出結果。在該函式內部,它會將生成的key/value分割槽(呼叫partitioner),並寫入乙個環形記憶體緩衝區中。

​ (4)spill階段:即「溢寫」,當環形緩衝區滿後,mapreduce會將資料寫到本地磁碟上,生成乙個臨時檔案。需要注意的是,將資料寫入本地磁碟之前,先要對資料進行一次本地排序,並在必要時對資料進行合併、壓縮等操作。

​ 溢寫階段詳情:

​ 步驟1:利用快速排序演算法對快取區內的資料進行排序,排序方式是,先按照分割槽編號partition進行排序,然後按照key進行排序。這樣,經過排序後,資料以分割槽為單位聚集在一起,且同一分區內所有資料按照key有序。

​ 步驟2:按照分割槽編號由小到大依次將每個分割槽中的資料寫入任務工作目錄下的臨時檔案output/spilln.out(n表示當前溢寫次數)中。如果使用者設定了combiner,則寫入檔案之前,對每個分割槽中的資料進行一次聚集操作。

​ 步驟3:將分割槽資料的元資訊寫到記憶體索引資料結構spillrecord中,其中每個分割槽的元資訊包括在臨時檔案中的偏移量、壓縮前資料大小和壓縮後資料大小。如果當前記憶體索引大小超過1mb,則將記憶體索引寫到檔案output/spilln.out.index中。

​ (5)merge階段:當所有資料處理完成後,maptask對所有臨時檔案進行一次合併,以確保最終只會生成乙個資料檔案。

​ 當所有資料處理完後,maptask會將所有臨時檔案合併成乙個大檔案,並儲存到檔案output/file.out中,同時生成相應的索引檔案output/file.out.index。

​ 在進行檔案合併過程中,maptask以分割槽為單位進行合併。對於某個分割槽,它將採用多輪遞迴合併的方式。每輪合併mapreduce.task.io.sort.factor(預設10)個檔案,並將產生的檔案重新加入待合併列表中,對檔案排序後,重複以上過程,直到最終得到乙個大檔案。

​ 讓每個maptask最終只生成乙個資料檔案,可避免同時開啟大量檔案和同時讀取大量小檔案產生的隨機讀取帶來的開銷。

對檔案排序後,重複以上過程,直到最終得到乙個大檔案。

​ 讓每個maptask最終只生成乙個資料檔案,可避免同時開啟大量檔案和同時讀取大量小檔案產生的隨機讀取帶來的開銷。

MapTask工作機制

階段 maptask 通過使用者編寫的 recordreader 從輸入 inputsplit 中解析出乙個個 key value。2 map 階段 該節點主要是將解析出的 key value 交給使用者編寫 map 函式處理,並產生一系列新的 key value。3 collect 收集階段 在使...

MapTask工作機制

1 並行度決定機制 1 問題引出 maptask的並行度決定map階段的任務處理併發度,進而影響到整個job的處理速度。那麼,maptask並行任務是否越多越好呢?這種說法是不對的,maptask多的話,消耗的資源就多 cpu,記憶體等等 maptask少的話,執行效率就會變低。2 maptask並...

MapTask工作機制

maptask並行度決定map階段的任務處理併發度,進而影響job的處理速度 maptask 並行度決定機制 乙個job的map階段並行度 個數 由客戶端提交job時的切片個數決定 乙個job的map階段並行度由客戶端在提交job時決定 每乙個split切片分配乙個maptask 預設 切片大小 b...