我理解的RocketMQ 網路層的通訊協議

2021-10-16 08:55:32 字數 3629 閱讀 4029

如下圖所示,業務層與netty客戶端之間用remotingcommand進行互動,即業務層呼叫netty傳送訊息時,會將訊息封裝在remotingcommand物件裡面,而netty接收到外部訊息的時候會給業務層返回remotingcommand的物件例項。

netty與外部世界通過位元組流進行傳輸。netty在傳送訊息的時候,對remotingcommand進行編碼(物件–>位元組流);在接收到外部訊息的時候會對位元組流進行解碼(位元組流–>物件)。

業務層與netty之間互動方式的偽**:

--

--經過業務層構建處理的訊息體header/body--

--header =..

.;body =..

.;----準備remotingcommand--

--remotingcommand =

newremotingcommand()

;remotingcommand.

setcustomerheader

(header)

;remotingcommand.

setbody

(body);--

--呼叫netty傳送remotingcommand--

--netty.

send

(remotingcommand)

;

netty會用nettyencoder對物件進行編碼。該類的原始碼如下,它繼承了netty中的messagetobyteencoder。覆寫了encode()方法。它的泛型變數為remotingcommand表示它只處理傳遞過來的remotingcommand的物件。

它裡面的編碼邏輯,首先對訊息頭進行編碼,然後編碼訊息體。訊息體其實已經是位元組流了,直接寫入bytebuf中即可。而需要對訊息頭進行編碼。對訊息頭編碼時呼叫的是remotingcommandencodeheader()方法。

public

class

nettyencoder

extends

messagetobyteencoder

}catch

(exception e)

remotingutil.

closechannel

(ctx.

channel()

);}}

}

如下是nettydecoder的源**,它繼承了lengthfieldbasedframedecoder,表明它解決tcp粘包和拆包的方式為長度基礎方式。

裡面的邏輯很簡單,從網路通道讀取到資料,然後呼叫remotingcommand的靜態解碼方法decode進行解碼,返回乙個remotingcommand物件。

public

class

nettydecoder

extends

lengthfieldbasedframedecoder

@override

public object decode

(channelhandlercontext ctx, bytebuf in)

throws exception

bytebuffer bytebuffer = frame.

niobuffer()

;// 直接呼叫remotingcommand進行解碼

return remotingcommand.

decode

(bytebuffer);}

catch

(exception e)

finally

}return null;

}}

remotingcommand的成員屬性。其中customeheader表示訊息頭,body表示訊息體,它是不進行序列化。

public

class

remotingcommand

最終結果就是將remotingcommand物件進行序列化,而customeheader的屬性以及值會被放到remotincommand的extfields之中,之後隨remotingcommand一起序列化成位元組碼。

首先確定訊息體的長度。

public bytebuffer encodeheader()

public bytebuffer encodeheader

(final

int bodylength)

首先將customeheader中的屬性放到remotingcommand的屬性extfields之中,是乙個map

然後根據設定序列的方式採用不同的編碼方式,其實就是序列化remotingcommand物件本身。主要是rocketmq自定義的協議,或者採用json的方式。

private

byte

headerencode()

else

}

最終結果是就是將位元組碼還原為乙個remotingcommand物件。並設定customeheaderbody的值

public

static remotingcommand decode

(final

byte

array)

public

static remotingcommand decode

(final bytebuffer bytebuffer)

cmd.body = bodydata;

return cmd;

}

private

static remotingcommand headerdecode

(byte

headerdata, serializetype type)

return null;

}

在傳遞訊息的時候會建立乙個協議頭customeheader,不同場景下會使用不同的協議頭。

協議頭都實現了乙個公共的介面commandcustomheader。它的實現者有很多。

我理解的RocketMQ 消費者負載均衡的實現

抽象類中有乙個重要的方法dorebalance 這個方法是對外使用的。即使用的方式就是建立乙個負載均衡實現類的例項,然後呼叫它的dorebalance 方法即可進行。幹的核心事情 主要流程說明 public void dorebalance final boolean isorder this tr...

兩層網路 三層網路的理解

對於搞it的同行而言,大部分人都不會直接和網路打交道,因此除非從事網路開發,否則對網路內部機制也不會太關心,但是明白網路資料是怎麼走的,這對每個it工程師應該是很重要的基礎知識。網路資料報如何在網路上遊蕩,長久以來也困擾了我很長時間,現在把這部分內容總結分享一下。說起網路,大家不約而同會想起大學課本...

兩層網路,三層網路的理解

對於搞it的同行而言,大部分人都不會直接和網路打交道,因此除非從事網路開發,否則對網路內部機制也不會太關心,但是明白網路資料是怎麼走的,這對每個it工程師應該是很重要的基礎知識。網路資料報如何在網路上遊蕩,長久以來也困擾了我很長時間,現在把這部分內容總結分享一下。說起網路,大家不約而同會想起大學課本...