Hadoop原始碼分析之檔案拆分

2021-10-04 21:17:51 字數 3483 閱讀 5650

當我們編寫mapreduce程式的時候,都會進行輸入格式的設定,方便hadoop可以根據設定得檔案格式正確的讀取資料進行處理,一般設定**如下:

job.

setinputformatclass

(textinputformat.

class

)

通過上面的**來保證輸入的檔案是按照我們想要的格式被讀取,所有的輸入格式都繼承於inputformat,這是乙個抽象類,其子類有專門用於讀取普通檔案的fileinputformatt,用於讀取資料庫檔案的dbinputfromat,用於讀取hbase的tableinputformat等等。下面是inputformat的定義:

public

abstract

class

inputformat

textinputformat繼承於fileinputformat,檔案切分沒有進行更改,對recordreader進行了定製。

org.apache.hadoop.mapreduce.lib.input.fileinputformat

org.apache.hadoop.mapreduce.lib.input.textinputformat

檔案切分原理
public

static

final string split_maxsize =

"mapreduce.input.fileinputformat.split.maxsize"

;public

static

final string split_minsize =

"mapreduce.input.fileinputformat.split.minsize"

;

fileinputformat提供了三個引數來共同控制分片的大小:

乙個檔案分片最小的有效位元組數:mapreduce.input.fileinputformat.split.minsize

乙個檔案分片最大有效位元組數: mapreduce.input.fileinputformat.split.maxsize

hdfs中塊的大小: dfs.blocksize

這三個引數按照公式splitsize = max(minimumsize, min(maximumsize, blocksize))來進行分片大小的確定,可以通過改變上述三個引數來調節最終的分片大小。

檔案切分原始碼分析

先舉乙個切分的例項,假設我們的輸入檔案是128m,dfs的大小是40m,則這個檔案應該是有5個塊,假設我們定義的最大切分大小是30m,則根據公式max(minimumsize, min(maximumsize, blocksize)),我們的分片的大小是30m,接下來我們分析如何切分產出inputsplit,主要是分析函式getsplits

首先獲取minimumsize和maximumsize

/** 最小map分片長度 min<1, "mapreduce.input.fileinputformat.split.minsize">**/

long minsize = math.

max(

getformatminsplitsize()

,getminsplitsize

(job));

/** 最大map分片長度 mapreduce.input.fileinputformat.split.maxsize **/

long maxsize =

getmaxsplitsize

(job)

;

獲取輸入目錄下所有的檔案狀態資訊

list

files =

liststatus

(job)

;

遍歷檔案進行每個檔案的切分

for

(filestatus file: files)

檔案長度不為0時候,首先獲取檔案的位置資訊;檔案長度為0,直接創造乙個空的host的陣列返回

blocklocation[

] blklocations;

/** 獲取改檔案所在的位置資訊 **/

if(file instanceof

locatedfilestatus

)else

然後如果檔案是可切分的,進行blocksize獲取,求出切分大小,按照切分大小進行切分。

/** 乙個檔案塊大小:預設為128m **/

long blocksize = file.

getblocksize()

;/** 根據檔案塊大小,map最小分片大小和最大分片大小確定分片的大小:

公式:max(minimumsize, min(maximumsize, blocksize))` **/

long splitsize =

computesplitsize

(blocksize, minsize, maxsize)

;long bytesremaining = length;

// 還剩餘的檔案長度

/** slpit_slop是1.1避免最後剩一點點檔案大小也劃分乙個map **/

while((

(double

) bytesremaining)

/splitsize > split_slop)

/** 為滿足切分條件,但是還剩下部分資料 **/

if(bytesremaining !=0)

獲取檔案塊的index是通過函式getblockindex進行的,我們根據開頭的例子進行具體分析

protected

intgetblockindex

(blocklocation[

] blklocations,

long offset)

} blocklocation last = blklocations[blklocations.length -1]

;long filelength = last.

getoffset()

+ last.

getlength()

-1;throw

newillegalargumentexception

("offset "

+ offset +

" is outside of file (0.."

+ filelength +

")")

;}

至此我們分析完了切分的過程,最終返回的是切分的檔案的元資訊,包含了檔案位置,要讀取得開始位置,讀取的長度,塊所在的host資訊等。

hadoop 原始碼分析一

inputformat inputsplit 繼承自writable介面,因此乙個inputsplit實則包含了四個介面函式,讀和寫 readfields和 write getlength能夠給出這個split中所記錄的資料大小,getlocations能夠得到這個split位於哪些主機之上 blk...

Hadoop原始碼之JobTracker

jobtracker是map reducer中任務排程的伺服器。1 有如下執行緒為其服務 1 提供兩組rpc服務 intertrackerprotocol jobsubmissionprotocol 的1個listener執行緒與預設10個handler執行緒 2 提供任務執 況查詢的一組web服務...

Hadoop 中 IPC 的原始碼分析

最近開始看 hadoop 的一些原始碼,展開hadoop的原始碼包,各個元件分得比較清楚,於是開始看一下 ipc 的一些原始碼。ipc模組,也就是程序間通訊模組,如果是在不同的機器上,那就可以理解為 rpc 了,也就是遠端呼叫。事實上,hadoop 中的 ipc 也就是基於 rpc 實現的。使用 s...