Dubbo 多執行緒通訊原理

2021-09-22 20:43:44 字數 3217 閱讀 3946

本文**摘錄的時候,將一些與本流程無關的內容去掉了,如有需要請看原始碼。

如果大家對dubbo rpc原理原理感興趣,可以看我之前寫過的另外一篇部落格《

dubbo rpc

碼解讀》。

併發情況下,dubbo的rpc模型如下圖所示:

如圖所示,consumer端可能同時有多個執行緒呼叫provider的服務,此時provider會啟動多個執行緒來分別處理這些併發呼叫,處理完以後將資料返回。在這種多執行緒環境中,dubbo是如何做到consumer thread a不會拿到其他執行緒的請求結果?

其實這是乙個非常普遍的問題,例如我們的伺服器與mq伺服器、資料庫、快取等等第三方伺服器通訊時,都需要確保併發環境下資料不會錯亂,並且要盡可能降低伺服器的效能損耗。

lâ â 

**dubbo rpc多執行緒通訊原理。

lâ â 

寫乙個簡單的示例,實現多執行緒資料交換

如上圖所示,消費端多執行緒並行消費服務的場景,主要流程如下

dubboinvoker#doinvoke

方法中,在exchangeclient#request(inv, timeout)呼叫時,返回乙個defaultfuture物件,接著會呼叫defaultfuture.get()方法(等待返回結果)。

對於consumer端而言,伺服器會為每乙個請求建立乙個執行緒,因為rpc操作是乙個慢動作,為了節省資源,當執行緒傳送rpc請求後,需要讓當前執行緒釋放資源、進入等待佇列,當獲取到返回結果以後,再喚醒這個執行緒。

rpc請求的過程為:每乙個rpc請求都有乙個唯一的id,rpc請求的時候,會將此id也傳送給provider;provider處理完請求後會將此id和返回結果一同返回給consumer;consumer收到返回資訊以後解析出id,然後從futures中找到相對應的defaultfuture,並通過defaultfuture.done#signal()喚醒之前等待執行緒。

下面根據原始碼詳細討論一下多執行緒情況下rpc請求的細節,即dubbo多執行緒模型的實現。

1) defaultfuture#field

這裡列出了與多執行緒相關的幾個重要的屬性

private final lock                            lock = new reentrantlock();

private final condition done = lock.newcondition();

private static final mapfutures = new concurrenthashmap();

2) 

defaultfuture#

建構函式

建立好defaultfuture物件以後,將defaultfuture存入了futures中。其實每一次請求,多會生成乙個唯一的id,即對於每個伺服器而言,id唯一。

public defaultfuture(channel channel, request request, int timeout)
3) defaultfuture#get

主要邏輯是:獲取鎖,呼叫

await

方法,此時當前執行緒進入等待佇列,此執行緒會有兩種結果過:要麼超時,要麼被喚醒;如果被喚醒,則返回

rpc的結果。

public object get(int timeout) throws remotingexception 

if (! isdone())

}} catch (interruptedexception e) finally

if (! isdone())

}return returnfromresponse();

}

4) defaultfuture#received

收到返回結果時,呼叫此方法。首先從

futures

中根據id

獲取defaultfuture

,如果不存在,列印一條日誌;如果存在則通過

signal

釋放乙個喚醒訊號,將執行緒從等待佇列中喚醒。

public static void received(channel channel, response response)  else 

} finally

}private void doreceived(response res)

} finally

if (callback != null)

}

5) defaultfuture#received

以下**是用來從

futures

清理rpc

請求超時的

defaultfuture

private static class remotinginvocationtimeoutscan implements runnable 

if (system.currenttimemillis() - future.getstarttimestamp() > future.gettimeout())

}thread.sleep(30);

} catch (throwable e) }}

}static

6) headerexchangehandler#handlerequest

建立response

物件時,帶上請求id。

response handlerequest(exchangechannel channel, request req) throws remotingexception 

// find handler by message class.

object msg = req.getdata();

try catch (throwable e)

return res;

}

根據dubbo

的實現方案,我寫了乙個簡單的示例,以方便理解,見中concurrentrwtester**

多執行緒 多執行緒原理

我們首先要知道什麼是多執行緒,說白了就是多個執行緒,執行緒是什麼呢,其實就是程序執行的途徑,那麼說道這裡我們又引入了乙個新的名字,就是程序,那麼我們來看看什麼是程序,其實我們自己也能看到,啟動電腦的任務管理器,我們就可以看到程序選項,裡面是我們電腦所有的程序,我們會發現有很多的程序.簡單地說就是程序...

執行緒通訊,多執行緒

多執行緒 thread handler thread處理一些複雜的業務邏輯 耗時的事情 handler在主線程中接收訊息的乙個物件 mhandler.sendmessage msg 傳送乙個訊息物件 mhandler.sendemptymessage what 傳送空訊息,只有what沒有obj m...

多執行緒 執行緒通訊

總結 今天小鹹兒來講解乙個好玩的事,那就是執行緒之間該如何通訊,執行緒通訊之後又會出現什麼問題?先來一張導圖來看看執行緒通訊的分布?疑問 如果想要執行緒按照使用者自定義的順序執行的話,那該如何操作呢?思考 如果能夠讓執行緒等待先執行的執行緒執行完,再執行不就能達到效果了嗎!果然出現問題之後,就會有對...