MapReduce 詳解Shuffle過程

2021-09-02 15:02:33 字數 2779 閱讀 4378

shuffle過程,也稱copy階段。reduce task從各個map task上遠端拷貝一片資料,並針對某一片資料,如果其大小超過一定的閥值,則寫到磁碟上,否則直接放到記憶體中。

注意:shuffle過程是貫穿於map和reduce兩個過程的!

hadoop的集群環境,大部分的map task和reduce

task是執行在不同的節點上的,那麼reduce就要取map的輸出結果。那麼集群中執行多個job時,task的正常執行會對集群內部的網路資源消耗嚴重。雖說這種消耗是正常的,是不可避免的,但是,我們可以採取措施盡可能的減少不必要的網路資源消耗。另一方面,每個節點的內部,相比於記憶體,磁碟io對job完成時間的影響相當的大。

所以:從以上分析,shuffle過程的基本要求:

1.完整地從map task端拉取資料到reduce task端

2.在拉取資料的過程中,盡可能地減少網路資源的消耗

3.盡可能地減少磁碟io對task執行效率的影響 那麼,shuffle的設計目的就要滿足以下條件:

1.保證拉取資料的完整性

2.盡可能地減少拉取資料的資料量

3.盡可能地使用節點的記憶體而不是磁碟

說明:map節點執行map task任務生成map的輸出結果。

這個記憶體緩衝區是有大小限制的,預設100mb。當map task的輸出結果很多時,就可能撐爆記憶體。需將緩衝區的資料臨時寫入磁碟,然後重新利用這塊緩衝區。

從記憶體往磁碟寫資料被稱為spill(溢寫),由單獨執行緒完成,不影響往緩衝區寫map結果的執行緒。溢寫比例:spill.percent(預設0.8)。

當緩衝區的資料達到閥值,溢寫執行緒啟動,鎖定這80mb的記憶體,執行溢寫過程。剩下的20mb繼續寫入map task的輸出結果。互不干涉!

當溢寫執行緒啟動後,需要對這80mb空間內的key做排序(sort)。排序是mapreduce模型的預設行為,也是對序列化的位元組做的排序。排序規則:字典排序!

map task的輸出結果寫入記憶體後,當溢寫執行緒未啟動時,對輸出結果並沒有做任何的合併。從官方圖可以看出,合併是體現在溢寫的臨時磁碟檔案上的,且這種合併是對不同的reduce端的數值做的合併。所以溢寫過程乙個很重要的細節在於,如果有很多個key/value對需要傳送到某個reduce端,那麼需要將這些鍵值對拼接到一塊,減少與partition相關的索引記錄。如果client設定過combiner,其會將有相同key的key/value對的value加起來,減少溢寫到磁碟的資料量。注意:這裡的合併並不能保證map結果中所有的相同的key值的鍵值對的value都合併了,它合併的範圍只是這80mb,它能保證的是在每個單獨的溢寫檔案中所有鍵值對的key值均不相同!

溢寫生成的臨時檔案的個數隨著map輸出結果的資料量變大而增多,當整個map task完成,記憶體中的資料也全部溢寫到磁碟的乙個溢寫檔案。

也就是說,不論任何情況下,溢寫過程生成的溢寫檔案至少有乙個!但是最終的檔案只能有乙個,需要將這些溢寫檔案歸併到一起,稱為merge。

merge是將所有的溢寫檔案歸併到乙個檔案,結合上面所描述的combiner的作用範圍,歸併得到的檔案內鍵值對有可能擁有相同的key,這個過程如果client設定過

combiner,也會合併相同的key值的鍵值對,如果沒有,merge得到的就是鍵值集合,如

注意:combiner的合理設定可以提高效率,但是如果使用不當會影響效率!

3. 至此,map端的所有工作都已經結束

當mapreduce任務提交後,reduce task就不斷通過rpc從jobtracker那裡獲取map

task是否完成的資訊,如果獲知某台tasktracker上的map

task執行完成,shuffle的後半段過程就開始啟動。其實呢,reduce task在執行之前的工作就是:不斷地拉取當前job裡每個map

task的最終結果,並對不同地方拉取過來的資料不斷地做merge,也最終形成乙個檔案作為reduce task的輸入檔案。

1.copy過程

簡單地拉取資料。reduce程序啟動一些資料copy執行緒(fether),通過http方式請求map task所在的tasktracker獲取map task的輸出檔案。因為map task早已結束,這些檔案就歸tasktracker管理在本地磁碟。

2.merge過程

這裡的merge如map端的merge動作,只是陣列中存放的是不同map端copy過來的數值。copy過來的資料會先放入記憶體緩衝區中,這裡緩衝區的大小要比map端的更為靈活,它是基於jvm的heap size設定,因為shuffler階段reducer不執行,所以應該把絕大部分的記憶體都給shuffle用。

merge的三種形式:

記憶體到記憶體、記憶體到磁碟、磁碟到磁碟

預設情況下,第一種形式不啟用。當記憶體中的資料量達到一定的閥值,就啟動記憶體到磁碟的merge。與map端類似,這也是溢寫過程,當然如果這裡設定了combiner,也是會啟動的,然後在磁碟中生成了眾多的溢寫檔案。第二種merge方式一直在執行,直到沒有map端的資料時才結束,然後啟動第三種磁碟到磁碟的merge方式生成最終的那個檔案。

3.reducer的輸入檔案

不斷地merge後,最後會生成乙個「最終檔案」。這個最終檔案可能在磁碟中也可能在記憶體中。當然我們希望它在記憶體中,直接作為reducer的輸入,但預設情況下,這個檔案是存放於磁碟中的。當reducer的輸入檔案已定,整個shuffle才最終結束。然後就是reducer執行,把結果存放到hdfs上。

感謝博主:

shuf處理文字

在cu上面看到了乙個帖子,帖子的內容即要求是 請教一下,我需要頻繁不斷地聯接9臺伺服器執行某個相同的服務。但我有特殊的要求 1。每次都按不同的順序來訪問這9臺伺服器。例如 135987642,下次又是亂序依次訪問。2。希望這9臺伺服器的主機名通過乙個shell 指令碼整合不需要另外起乙個txt文件來...

MapReduce過程詳解

1.輸入分片 input split 在進行map計算之前,mapreduce會根據輸入檔案計算輸入分片 input split 每個輸入分片 input split 針對乙個map任務。2.map階段 就是我們寫的map函式,map函式效率相對好控制,而且一般map操作都是本地化操作也就是在資料儲...

詳解MapReduce過程

textinputformat的原始碼注釋為 檢視inputformat介面的原始碼注釋我們了解到這個介面的作用為 在inputformat的源 中有如下兩個方法 inputsplit getsplits jobconf job,int numsplits throws ioexception 獲取...