MapReduce JOB 的輸出與輸出筆記。

2021-10-10 11:27:09 字數 3519 閱讀 6340

提高 mapreduce 價值,自定義輸入和輸出。

比如 跳過儲存到 hdfs 中這個耗時的布置。 而只是從

原始資料源接受資料,或者

直接將資料傳送給某些處理程式。 這些處理

程式在 mapreduce 作業完成後使用這些資料。 有時由檔案塊和輸入 split 組成的基礎 hadoop 正規化並不能滿足需求。 此時自定義 inputformat 和 outputformat 。

三種處理輸入的模式:

1 生成資料(generating data)

2 外部源輸入(external source input)

3 分割槽裁剪(partition pruning)

map 特性 在拿到

輸入的輸入對之前完全不知道這個複雜的事情是如何發生的。

一種處理輸出的模式

1 外部源輸出。

在 hadoop 中自定義輸入和輸出

hadoop 允許使用者

修改從磁碟載入資料的方式: 

配置如何根據 hdfs 的塊生成連續的輸入分塊;

配置記錄在 map 階段如何出現;

hadoop 允許使用者通過類似的方式修改資料的儲存方式:通過outputfirnat 和 recordwriter 類。

inputformat  hadoop 依賴作業的輸入格式完成三個任務。

1)驗證作業的輸入配置(檢查資料是否存在

2)按 inputsplit 型別將輸入塊和檔案拆分成邏輯分塊,並將每個資料分塊分配給乙個 map 任務處理。

outputformat hadoop 依賴作業的輸出格式完成以下兩個主要任務:

1)驗證作業輸入配置

2)建立 recordwriter 的實現,他負責寫作業的輸出

inputformat 介面

hadoop 中最常見的輸入格式都是 fileinputformat 的子類。 hadoop 預設的輸入格式是 textinputformat。輸入格式首先驗證作業的輸入,確保所有的輸入路徑都存在。 然後根據檔案的總位元組大小邏輯的拆分每個輸入檔案

並將塊大小作為邏輯 split 的上界。

舉個栗子: hdfs 塊大小設定為 64m 。 現在又乙個 200mb 的檔案將生成 4個輸入 split ,  他的位元組範圍是:0~64mb、64~128mb、128~172mb、 172b~200mb。每個 map 任務只能分配到乙個輸入 split ,然後 recordreader 負責將其分配到所有位元組生成鍵值對。

recordreader 還有乙個額外的固定邊界作用。因為輸入 split 的邊界是任意的,並且很可能落不到記錄的邊界上。 例如:textinputformat 使用 linerecordreader 讀取文字檔案,為每個 map 任務的每行文字(換行符分割)建立鍵值對。見識到目前為止從檔案中讀取的位元組數,值是每行的換行符之前所有字元組成的字串。由於每個輸入 split 的位元組塊不太可能加號和換行符對齊,因此為確保讀取到完整的一行資料, linerecordreader 將度過其給定的「結尾」。這一小部分資料來自乙個不同的資料庫,因此沒有儲存在同乙個節點上,所以他是通過流式方式從儲存該快的 datanode 上獲取。改資料流是由 fsdataimputstream 類的乙個例項處理,我們不需要關心這些塊在**。 不用怕你自己格式的 split 邊界,只要進行了完整的測試,就不會再重複或丟失任何資料

inputformat 抽象類包含兩個抽象方法:

1) getsplits

getsplits 在實現時,通常是通過給定 jobcontext 

的 物件獲取配置的輸入並返回乙個 inputsplit 物件的 list。 輸入 split 有乙個方法, 能夠

返回與資料在集群中儲存位置相關的機器組成的陣列

,這為框架決定那個 tasktracker 應該執行那個 map 任務提供了依據。因為該方法也在前端使用(即作業提交到 jobtracker 之前),所以也很適合驗證作業配置並丟擲任何必要的異常。

2) createrecordreader

該方法用於在後端生成乙個 recordreader 的實現.  record reader 有乙個會被框架呼叫的 initialize 的方法

recordreader 抽象類

reducereader 使用建立輸入 split 時產生的邊界內的資料生成鍵值對, 讀取資料」開始"的位置是檔案中 recordreader 能夠開始生成鍵值對的地方。 而讀取資料的結束的位置是檔案中 recordreader 應該停止讀取的地方。

必須過載的一些方法

initialize

初始化 record reader 基於檔案輸入格式,適合在這裡尋找檔案開始讀取的位元組位置。

getcurrentkey 和 getcurrentvalue

nextkeyvalue

方法讀取了乙個鍵值對後返回 true 直到所有資料消費完。

與 inputformat 同

。getprogerss

框架使用該方法收集度量資訊。

與 inputformat 同

close

木有鍵值後,框架用它做清理工作。

outputformat

hadoop 預設輸出格式是 textoutputformat, 他將所有的鍵值對寫入到配置檔案的預設 hdfs 輸出目錄,並且鍵和值用 tab 分隔,每個 reduce 任務將單獨的輸出乙個 part 檔案 並寫入到配置的輸出目錄中。

textoutputformat 使用 linerecordwriter 為每個 map 任務或 reduce 任務寫鍵值對, 使用 tostring 方法將每個鍵值對序列化到 hdfs 上輸出的 part 檔案中, 並且鍵值用 tab 分隔。 鍵值tab可以通過作業配置修改。

outputformat 抽象類

checkoutputspecs

確保輸出目錄不存在,否則輸出目錄將會被覆蓋。

getrecordwriter

負責將鍵值對序列化到乙個輸出(通常是乙個filesystem 物件)

getoutputcommiter

初始化時,作業的提交者設定每個任務,當成功完成任務時,並且當每個任務結束時(不管成功或失敗)後執行清理, 基於檔案輸出的 fileoutputcommiter 可以用於處理這些繁重的事情。

2.5 版本 

getbaserecordwriter

必須重寫。  否則就沒人給你輸出啦。

recordwriter抽象類

負責將鍵值對寫入到檔案系統, recordwriter 不包含初始化階段,  但在需要時建構函式既能夠用於完成 record writer 的設定。

writer

在寫每個鍵值對時,框架將會呼叫該方法。該方法實現更多打的依賴於你的使用場景。 可以寫入乙個外部的記憶體 鍵值對中(如 riduce)

close

沒有鍵值對時呼叫此方法

來自為知筆記(wiz)

從HiveQL到MapReduce job過程簡析

hiveql是一種宣告式語言,使用者提交查詢,而hive會將其轉換成mapreduce job,如下圖。一般來說大部分時間可以無視這個執行過程的內部邏輯,但是如果能了解這些底層實現細節,在調優的時候就會更得心應手。將hiveql轉化為mapreduce任務,整個編譯過程主要分為六個階段 1 antl...

大資料學習 之MapReduce Job

1 先把yarn服務停了 hadoop hadoop001 hadoop 2.6.0 cdh5.7.0 sbin stop yarn.sh 2 把之前的資訊刪了 hadoop hadoop001 hadoop 2.6.0 cdh5.7.0 hdfs dfs rm r f user 3 再建立hdfs...

mapreduce job提交原始碼流程

waitforcompletion submit 1建立連線 connect 1 建立提交job的 newcluster getconfiguration 1 判斷是本地yarn還是遠端 initialize jobtrackaddr,conf 2 提交job submitter.submitjob...