Job的資料輸入格式化器 InputFormat

2021-06-04 04:35:20 字數 3289 閱讀 9965

hadoop被設計用來處理海量資料,這種資料可以是結構化的,半結構化的,甚至是一些無結構化的文字資料(這些資料可能儲存在hdfs檔案中,也可能存放在db中)。它處理資料的核心就是map-reduce模型,但是,無論是map還是reduce,它們的輸入輸出資料都是key-value對的形式,這種key-value對的形式我們可以看做是結構化的資料。同時,對於reduce的輸入,當然就是map的輸出,而reduce、map的輸出又直接可以在map和reduce處理函式中定義,那麼這就只剩下map的輸出了,也就是說,hadoop如何把輸入檔案包裝成key-value對的形式交給map來處理,同時hadoop又是如何切割作業的輸入檔案來結果不同的tasktracker同時來處理的呢?這兩個問題就是本文將要重點講述的內容——作業的輸入檔案格式化器(inputformat)。

在hadoop對map-reduce實現設計中,

作業的輸入檔案格式化器包括兩個元件:檔案讀取器(recordreader)

和檔案切割器(spliter)。其中,檔案切割器用來對作業的所有輸入資料進行分片切割,最後有多少個切片就有多少個map任務,檔案讀取器用來讀取切片中的資料,並按照一定的格式把讀取的資料報裝成乙個個key-value對。而在具體的對應實現中這個輸入檔案格式化器被定義了乙個抽先類,這樣它把如何切割輸入資料以及如何讀取資料並把資料報裝成key-value對交給了使用者來實現,因為只有使用者才知道輸入的資料是如何組織的,map函式需要什麼樣的key-value值作為輸入值。這個

輸入檔案格式化器對應的是org.apache.hadoop.mapreduce.inputformat類:

public abstract class inputformat
顯然,在inputsplit類中,getsplit()方法是讓使用者定義如何對作業的輸入資料進行切割分的,createrecordreader方法是定義如何讀取輸入資料,幷包裝成乙個若干個key-value對的,即定義乙個記錄讀取器。另外,對於乙個輸入資料切片資訊(資料的長度、資料儲存在哪些datanode節點上)被儲存在乙個對應的inputsplit物件中。順帶需要提一下的是,jobclient在呼叫inputformat的getsplit()方法時,對返回的inputsplit陣列又使用jobclient.rawsplit進行了一次封裝,並將其序列化到檔案中。

下面就來看看hadoop在其內部有哪些相關的預設實現的。

從上面的類圖可以看出,hadoop在抽象類fileinputformat中實現了乙個基於檔案的資料分片切割器,所以在這裡我先主要談談它是如何實現的。先來看原始碼:

protected long getformatminsplitsize() 

public static long getminsplitsize(jobcontext job)

public static long getmaxsplitsize(jobcontext context)

protected long computesplitsize(long blocksize, long minsize,long maxsize)

public listgetsplits(jobcontext job) throws ioexception

if (bytesremaining != 0)

} else if (length != 0) else

}log.debug("total # of splits in job["+job.getjobname()+"]'s input files: " + splits.size());

return splits;

}/*是否允許對乙個檔案進行切片*/

protected boolean issplitable(jobcontext context, path filename)

上面的輸入資料切割器是支援多輸入檔案的,而且還要著重注意的是這個輸入資料切割器是如何計算乙個資料切片大小的,因為在很多情況下,切片的大小對乙個作業的執行效能有著至關重要的影響,應為至少切片的數量決定了map任務的數量。

試想一下,如果3個資料塊被切成兩個資料片和被切成三個資料塊,哪一種情況下耗費的網路i/o時間要多一些呢?在作業沒有配置資料切割器的情況下,預設的是textinputformat,對應的配置檔案的設定項為:mapreduce.inputformat.class。

最後,以linerecordreader為例來簡單的講解一下記錄讀取器的實現,這個記錄讀取器是按文字檔案中的行來讀取資料的,它的key-value中為:行號一行文字。

public class linerecordreader extends recordreader else 

in = new linereader(filein, job);

}if (skipfirstline)

this.pos = start;

} public boolean nextkeyvalue() throws ioexception

key.set(pos);

if (value == null)

int newsize = 0;

while (pos < end)

pos += newsize;

if (newsize < maxlinelength)

// line too long. try again

log.debug("skipped this line because the line is too long: linelength["+newsize+"]>maxlinelength["+maxlinelength+"] at position[" + (pos - newsize)+"].");

}if (newsize == 0)

else

} @override

public longwritable getcurrentkey()

@override

public text getcurrentvalue()

/*** get the progress within the split

*/public float getprogress() else

} public synchronized void close() throws ioexception

}}

在記錄讀取器中,getprogress()被用來報告當前讀取輸入檔案的進度,因為hadoop為客戶端檢視當前作業執行進度的api。另外,由於linerecordreader是按照行來讀取的,由於切割器的分割,可能使得某一行在兩個資料片中,所以在初始化的時候有乙個是否跳過第一行的操作。

scanf格式化輸入

scanf 函式的功能是從計算機預設的輸入裝置 一般指鍵盤 向計算機主機輸入資料。呼叫scanf 函式的一般格式如下 scanf 格式字串 輸入項位址表 例如 scanf d f i,f 其中,格式字串 可以包含三種型別的字元 格式指示符 空白字元 空格 跳格鍵 回車鍵 和非空白字元 又稱為普通字元...

scanf 格式化輸入

scanf 有種帶的格式化輸出方式 此格式控制符的基本格式為 scanfset includeint main 以下來自網路 scanfset 有兩種形式 一種是以非 字元開頭的 scanset 表示在讀入字串時將匹配所有在 scanfset 現的字元,遇到非scanfset 中的字元時輸入就結束 ...

UItextfield輸入資料的格式化判斷

在此之前 看到網上寫的方法如下 bool textfield uitextfield textfield shouldchangecharactersinrange nsrange range replacementstring nsstring string nsstring astring te...