HDFS原始碼分析DataXceiver之讀資料塊

2021-09-22 22:17:03 字數 4610 閱讀 4150

/** process op by the corresponding method. */

protected final void processop(op op) throws ioexception

}

那麼今天,我們首先來看下第一種資料讀寫請求--讀資料塊read_block,它是通過呼叫opreadblock()方法完成的,我們先看下這個方法的**:

/** receive op_read_block */

private void opreadblock() throws ioexception finally

}

整個處理流程非常簡單。首先,解析輸入流,得到讀資料塊訊息協議opreadblockproto,即proto,並建立tracescope型別的tracescope;然後從讀資料塊訊息協議proto中解析出讀資料塊的各種引數,比如需要讀取的資料塊block、訪問令牌blocktoken、客戶端名稱clientname、資料塊讀取的起始偏移量blockoffset、資料塊讀取的長度length、是否傳送塊校驗sendchecksum、快取策略cachingstrategy型別的cachingstrategy等,利用這些引數呼叫子類dataxceiver執行緒的readblock()方法,進行讀資料塊的處理,最終關閉tracescope,整個資料塊讀取過程完畢。

我們再來看下其中涉及的部分細節。首先,在我們要概括性的講解讀資料塊訊息協議opreadblockproto前,我們先看下對於輸入流是怎麼處理的,答案就在類pbhelper中的vintprefixed()方法中,其**如下:

public static inputstream vintprefixed(final inputstream input)

throws ioexception

// codedinputstream用來讀取和解碼協議訊息字段。

// varint是一種數值壓縮儲存方法

// readrawvarint32()方法從輸入流中讀取乙個原始的varint,並且,如果高於32位,丟棄之。

// firstbyte是為了告訴codedinputstream已經從輸入流input中讀取了1個位元組

// 返回結果為int型別的訊息大小

int size = codedinputstream.readrawvarint32(firstbyte, input);

// 確保訊息大小必須大於0

assert size >= 0;

// 將輸入流input包裝成exactsizeinputstream,從該輸入流中只能讀取size大小的資料

// exactsizeinputstream是一種從其他輸入流中讀取固定大小資料的輸入流。

return new exactsizeinputstream(input, size);

}

首先呢,從輸入流input中讀入第乙個位元組byte,然後呼叫codedinputstream的readrawvarint32()方法,獲取請求內容的大小。codedinputstream用來讀取和解碼協議訊息字段。varint是一種數值壓縮儲存方法。readrawvarint32()方法從輸入流中讀取乙個原始的varint,並且,如果高於32位,丟棄之。firstbyte是為了告訴codedinputstream已經從輸入流input中讀取了1個位元組,返回結果為int型別的訊息大小,同時確保訊息大小必須大於0。最後,將輸入流input包裝成exactsizeinputstream,從該輸入流中只能讀取size大小的資料,exactsizeinputstream是一種從其他輸入流中讀取固定大小資料的輸入流。

接下來,我們再說下解析輸入流,得到讀資料塊訊息協議opreadblockproto。這個opreadblockproto是什麼呢?它是谷歌開源的protobuf在hdfs中定義的進行資料傳輸時的一種訊息協議,其訊息格式的定義在檔案datatransfer.proto中,內容如下:

message opreadblockproto
其中,header、offset、len為必須的,因為它們使用了關鍵字required,而剩餘兩個sendchecksums、cachingstrategy則由於使用了關鍵字optional,所以為可選的。並且,header為clientoperationheaderproto型別,而clientoperationheaderproto也是一種訊息格式,定義如下:

message clientoperationheaderproto
其中,baseheader還是protobuf定義的一種訊息格式,其名稱為baseheaderproto,其定義如下:

message baseheaderproto
它包含了資料塊block,即extendedblockproto,所以,在獲得讀資料塊訊息協議opreadblockproto之後,呼叫readblock()方法之前,我們可以使用如下語句:

pbhelper.convert(proto.getheader().getbaseheader().getblock()
來獲得readblock()方法需要使用的引數extendedblock。讀資料塊訊息協議中的其他字段不再多一一介紹,讀者可自行分析。

最後,我們來看下讀取資料塊的readblock()方法,其**如下:

@override

public void readblock(final extendedblock block,

final tokenblocktoken,

final string clientname,

final long blockoffset,

final long length,

final boolean sendchecksum,

final cachingstrategy cachingstrategy) throws ioexception catch(ioexception e)

// send op status

// 傳送操縱狀態

writesuccesswithchecksuminfo(blocksender, new dataoutputstream(getoutputstream()));

// 呼叫資料塊傳送器blocksender的sendblock()方法,傳送資料塊

long read = blocksender.sendblock(out, basestream, null); // send data

if (blocksender.didsendentirebyterange())

} catch (ioexception ioe)

} else

// 資料節點datanode記錄相關系統效能指標的增長,這裡是讀取的位元組數、讀取的塊數

datanode.metrics.incrbytesread((int) read);

datanode.metrics.incrblocksread();

} catch ( socketexception ignored )

// its ok for remote side to close the connection anytime.

datanode.metrics.incrblocksread();

ioutils.closestream(out);

} catch ( ioexception ioe ) finally

//update metrics

datanode.metrics.addreadblockop(elapsed());

datanode.metrics.incrreadsfromclient(peer.islocal());

}

readblock()方法大體處理流程如下:

1、將請求中的客戶端名稱clientname賦值給previousopclientname;

2、獲取輸出流basestream,即socketout;

3、將輸出流basestream依次包裝成bufferedoutputstream、dataoutputstream,其緩衝區大小取引數io.file.buffer.size的一半,引數未配置的話預設為512,且最大也不能超過512;

4、呼叫checkaccess()方法進行訪問許可權檢查;

5、傳送資料塊:

5.1、 獲取資料節點註冊資訊datanoderegistration;

5.2、更新當前執行緒名稱:sending block...;

5.3、構造資料塊傳送器blocksender物件blocksender,構造時,需要對應資料塊block、資料在塊中的起始位置blockoffset、讀取資料的長度length等資訊;

5.4、呼叫writesuccesswithchecksuminfo()方法傳送操作狀態;

5.5、呼叫資料塊傳送器blocksender的sendblock()方法,傳送資料塊;

5.6、資料節點datanode記錄相關系統效能指標的增長,這裡是讀取的位元組數、讀取的塊數;

5.7、關閉資料塊傳送器。

大體處理流程就是這個樣子。而關於blocksender及其構造、如何定位資料以及如何傳送資料等,我們將會在專門的文章中進行分析,敬請期待!

HDFS原始碼分析 RPC Client實現

通俗來講rpc remote procedure call 就是呼叫遠端的過程或者方法,既然涉及到遠端,必然會有 c s架構,即 client 和server 下面首先來看一下 client 端的實現。為實現遠端方法呼叫,最重要的就是跟遠端伺服器進行連線,然後不斷的傳輸客戶端想要呼叫的方法,包括方法...

HDFS的DataNode原始碼分析

1.大致流程 datanode.main 入口函式 securemain args,null createdatanode args,null,resources 建立datanode instantiatedatanode args,conf,resources getstoragelocatio...

client讀寫hdfs的原始碼分析總結

週末花了一天的時間仔細了重溫了一下client對hdfs檔案的讀寫過程,總結如下 每次讀寫都是以乙個資料塊的形式來進行的,並且包括資料內容和資料的校驗值。另外,到 namenode 上獲取相應的資訊都是用 rpc來通訊的,而到 datanode 獲取真正的資料塊內容是由 socket 的網路流來進行...