MapReduce的資料流程 執行流程

2021-12-30 02:56:01 字數 2632 閱讀 2534

由可以看到mapreduce執行下來主要包含這樣幾個步驟

1.首先對輸入資料來源進行切片

2.master排程worker執行map任務

3.worker讀取輸入源片段

4.worker執行map任務,將任務輸出儲存在本地

5.master排程worker執行reduce任務,reduce worker讀取map任務的輸出檔案

6.執行reduce任務,將任務輸出儲存到hdfs

若對流程細節進行深究,可以得到這樣一張流程圖

角色描述:

jobclient:執行任務的客戶端

jobtracker:任務排程器

tasktracker:任務***

task:具體的任務(map or reduce)

從生命週期的角度來看,mapreduce流程大概經歷這樣幾個階段:初始化、分配、執行、反饋、成功與失敗的後續處理

每個階段所做的事情大致如下

1.jobclient對資料來源進行切片

切片資訊由inputsplit物件封裝,介面定義如下:

public inte***ce inputsplit extends writable 可以看到split並不包含具體的資料資訊,而只是包含資料的引用,map任務會根據引用位址去載入資料

inputsplit是由inputformat來負責建立的

public inte***ce inputformat jobclient通過getsplits方法來計算切片資訊,切片預設大小和hdfs的塊大小相同(64m),這樣有利於map任務的本地化執行,無需通過網路傳遞資料

切片成功後,jobclient會將切片資訊傳送至jobtracker

2.通過jobtracker生成jobid

jobtracker.getnewjobid()

3.檢查輸出目錄和輸入資料來源是否存在

輸出目錄已存在,系統丟擲異常

輸入源目錄不存在,系統丟擲異常

4.拷貝任務資源到jobtracker機器上(封裝任務的jar包、集群配置檔案、輸入源切片資訊)

任務分配

jobtracker遍歷每乙個inputsplit,根據其記錄的引用位址選擇距離最近的tasktracker去執行,理想情況下切片資訊就在tasktracker的本地,這樣節省了網路資料傳輸的時間

jobtracker和tasktracker之間是有心跳通訊的邏輯的,通過彼此間不停的通訊,jobtracker可以判斷出哪些tasktracker正在執行任務,哪些tasktracker處於空閒狀態,以此來合理分配任務

任務執行

tasktracker接到任務後開始執行如下操作:

1.將任務jar包從hdfs拷貝到本地並進行解壓

2.建立乙個新的jvm來執行具體的任務,這樣做的好處是即使所執行的任務出現了異常,也不會影響tasktracker的執行使用

如果所執行的任務是map任務,則處理流程大致如下:

首先載入inputsplit記錄的資料來源切片,通過inputformat的getrecordreader()方法

獲取到reader後,執行如下操作:

k key = reader.createkey();

v value = reader.createvalue();

while (reader.next(key, value))

執行反饋

mapreduce的執行是乙個漫長的過程,執行期間會將任務的進度反饋給使用者

任務結束後,控制台會列印counter資訊,方便使用者以全域性的視角來審查任務

執行成功

清理mapreduce本地儲存(mapred.local.dir屬性指定的目錄)

清理map任務的輸出檔案

執行失敗

1.如果task出現問題(map或者reduce)

錯誤可能原因:使用者**出現異常;任務超過mapred.task.timeout指定的時間依然沒有返回

錯誤處理:

首先將錯誤資訊寫入日誌

然後jobtracker會排程其他tasktracker來重新執行次任務,如果失敗次數超過4次(通過mapred.map.max.attempts和mapred.reduce.max.attempts屬性來設定,預設為4),則job以失敗告終

如果系統不想以這種方式結束退出,而是想通過task成功數的百分比來決定job是否通過,則可以指定如下兩個屬性

mapred.max.map.failures.percent map任務最大失敗率

mapred.max.reduce.failures.percent reduce任務最大失敗率

如果失敗比率超過指定的值,則job以失敗告終

2.如果是tasktracker出現問題

判斷問題的依據:和jobtracker不再心跳通訊

jobtracker將該tasktracker從資源池中移除,以後不在排程它

3.jobtracker出現問題

jobtracker作為系統的單點如果出現問題也是最為嚴重的問題,系統將處於癱瘓。

MapReduce資料流(一)

在 作業由那些基本元件組成,從高層來看,所有的元件在一起工作時如下圖所示 圖4.4高層mapreduce工作流水線 近距離觀察 圖4.5細節化的hadoop mapreduce資料流 圖4.5展示了流線水中的更多機制。雖然只有2個節點,但相同的流水線可以複製到跨越大量節點的系統上。下去的幾個段落會詳...

MapReduce資料流(二)

輸入塊 inputsplit 乙個輸入塊描述了構成mapreduce程式中單個map任務的乙個單元。把乙個mapreduce程式應用到乙個資料集上,即是指乙個作業,會由幾個 也可能幾百個 任務組成。map任務可能會讀取整個檔案,但一般是讀取檔案的一部分。預設情況下,fileinputformat及其...

Hadoop HDFS的資料流程

hdfs資料寫入流程 客戶端通過distributed filesystem模組向namenode請求上傳檔案,namenode檢查目標檔案是否已存在,父目錄是否存在。namenode返回是否可以上傳。客戶端請求第乙個 block上傳到哪幾個datanode伺服器上。namenode返回3個data...