mosquitto原始碼分析(五)

2021-08-27 14:09:23 字數 4151 閱讀 3994

2023年03月18日 16:02:52

更多個人分類:

linux

計算機網路

mqtt-mosquitto

c/c++

所屬專欄:

mqtt協議及其應用

計算機網路

本文由逍遙子撰寫,**請標註原址:

3.2.1、poll機制簡介

poll機制是一種i/o多路轉接(i/o multiplexing)技術,這種技術先構造乙個有關描述符的表,然後呼叫乙個函式,知道這些描述符中的乙個已準備就緒好進行i/o時,該函式才返回,該函式返回之後它告訴程序那些描述符已經準備好進行i/o)。其工作過程為:

(1)首先構造乙個pollfd結構陣列,每個陣列元素指定乙個描述符編號以及對其所關心的狀態。

(2)為每乙個需要監聽狀態的裝置描述符建立乙個pollfd結構體,並將之放入pollfd結構陣列中。

(3)呼叫函式poll監控pollfd結構陣列中每個結構體的狀態。poll函式的原型如下:

int poll(struct pollfdfdarray, nfds_t nfds, int timeout)

其中引數fdarray就是pollfd結構體陣列,nfds是陣列的元素數,timeout是等待超時限定,如果超過此時間還沒有描述符準備好i/o,就返回;pollfd結構體的定義為:

struct pollfd{

int fd;

short event;

short revent;

其中,int fd是檔案描述符,在mosquitto的中主要指socket描述符(在linux一切i/o皆檔案的工作模式下,socket描述符與普通檔案描述被同等對待);events成員用於告訴核心,我們對描述符fd關心什麼;revents用以說明該描述符fd發生了什麼事情。

另外,使用poll是需要注意,每次呼叫poll函式之前都要重新設定一下各描述符的狀態。

3.2.2、mosquitto的訊息機制

mosquito在工作過程中用poll機制檢測監聽所有socket以判斷其是否有資料傳送或接收,mosquitto所監聽的socket按照其功能分為監聽socket和業務socket,監聽socket主要負責監聽各客戶端的連線;業務socket主要負責mosquitto伺服器與各客戶端之間的資料收發;一旦mosquitto伺服器從監聽socket中接收到乙個客戶端連線進來,將立即為該客戶端建立乙個業務socket,負責後續mosquitto伺服器與該客戶端的所有資料收發。

mosquitto中對poll操作主要在/ mosquitto-1.2/src loop.c檔案的mosquitto_main_loop函式中,在該函式中將按照以下步驟迴圈處理所有的socket:

1)   建立pollfd結構體陣列pollfds

2)   將要監聽的socket放入pollfds

3)   使用poll函式查詢pollfds陣列中各描述符的狀態;

4)   poll函式返回之後,遍歷pollfds陣列對其中就緒的socket描述符進行處理。

mosquitto_main_loop函式不僅涉及對poll函式的相關處理,它也是整個訊息處理架構的實現之處,該函式對訊息處理的流程如下圖3-11

圖3-11 mosquitto函式的訊息業務處理流程

在上述訊息處理過程中,poll函式返回之後,pollfds陣列中socket描述的pollfd結構的revents成員就會置為相應的就緒狀態,因此只要迴圈掃瞄該陣列,即可根據對應socket完成相應的讀寫操作。

在mosquitto程式中,poll函式所監聽的socket有兩種型別:監聽socket和業務socket,其對應的結果處理函式就分兩個過程來完成。

監聽socket處理過程主要在mosquitto_main_loop函式中直接完成,它主要完成的工作是處理每個監聽socket,如果有客戶端連線進入時,為之建立乙個業務socket和乙個對應的context結構體,context結構體描述了該連線的所有資訊,因此在mosquitto程式中,乙個context就表示乙個客戶端連線。

業務socket的處理主要通過loop_handle_reads_writes函式完成,在該函式中將迴圈檢查所有的context,如果該context對應的socket有資料寫入則呼叫函式_mosquitto_packet_write進行寫操作;如果對應的socket有資料要讀取,則呼叫函式_mosquitto_packet_read完成socket的讀取和處理,其中讀取訊息的處理在函式mqtt3_packet_handle中完成,在該函式中將根據不同的訊息型別,呼叫不同的處理函式,業務socket的訊息讀取及其處理的函式呼叫關係如圖3-12所示。

圖3-12 訊息讀取和處理函式呼叫關係

3.2.3、mosquitto的訊息收發

1、   訊息接收過程是mosquitto伺服器接收到客戶端向某個主體發布一條訊息。

2、   訊息傳送過程是指mosquito伺服器將訊息傳送給訂閱客戶端。

在mosquitto的程式實現中,上述兩個過程是分開實現和操作的:

訊息接收過程,mosquitto伺服器端在收到客戶端向某個主體傳送的訊息之後,會遍歷訂閱樹,找到該主體的訂閱列表,然後將訊息掛到訂閱列表中每個訂閱者的訊息佇列中。需注意的是,此時訊息並實際傳送給訂閱客戶端,只是被掛在了mosquitto服務程式中該客戶端對應的context結構中所定義的訊息佇列中;上述工作過程涉及到的主要函式及其呼叫過程可參照圖3-9所示。

mosquitto的訊息的組裝傳送過程集中在函式mqtt3_db_message_write中完成,其函式呼叫關係如下圖3-13所示。

圖3-13 訊息組裝和傳送函式呼叫關係

3.3、mosquitto的ping/pong功能

1、為什麼mosquitto要引人ping/pong的操作

根據tcp/ip協議的描述,tcp連線建立之後,如果雙方沒有通訊,連線可以一直儲存下去,假如中間路由器崩潰或者中間的某條線路斷開,只要兩端的主機沒有被重啟,連線就一直被保持著。儘管tcp協議規範中未做次要求,但是在很多tcp協議的實際實現中,卻提供了保活定時器的功能,保活定時器一般配置的時間是2小時。在實際的伺服器程式開發過程中,2個小時的連線斷開的時間太長。因此,很多伺服器程式都在上層自己提供保活功能,也就是伺服器程式開發過程中經常提到的:心跳連線或ping/pong訊息等功能。

2、mosquitto的ping/pong功能描述

在mosquitto中,提供了ping/pong功能來判斷連線異常斷開的情況,並通過keepalive的引數來控制檢查時間,一般客戶端需要在keepalive時間內向伺服器傳送一條訊息,表明自己還存在,伺服器會週期檢查與客戶端建立起的每乙個連線,如果某個連線在keepalive*1.5的時間內沒有收到過訊息,則認為這個連線就失效了,於是伺服器將主動斷開這個超時的連線。

mosquitto中,每個客戶端所對應的context中有兩個變數last_msg_out和last_msg_in,分別用於記錄該context上次傳送和接收訊息的時間,然後在mosquitto_main_loop(位於檔案loop.c)中每次迴圈都對每個context的所記錄的訊息收發時間進行檢查,如果超過設定的keepalive時間的1.5倍則斷開此客戶端的連線,因此,如果mosquitto客戶端在keepalive時間內與mosquitto伺服器之間存在任何通訊(無論是普通資訊還是ping/pong訊息,都是如此),mosquitto就認為該客戶端是連線狀態良好的。

3、ping/pong功能對mosquitto效能產生的潛在影響

mosquitto以keepalive*1.5時間作為判斷客戶端連線是否異常斷開的時間界限,這裡keepalive的值對mosquitto的效能會產生較大影響,此值過大,可能無法及時判斷處異常的發生;此值過小,不僅浪費網路頻寬,還可能造成誤判,例如客戶端與伺服器之間tcp連線上的某個伺服器異常重啟,可能會被伺服器誤判為tcp連線斷開了。此值需根據實際情況分析後確定。

mosquitto原始碼分析(五)

本文由逍遙子撰寫,請標註原址 3.2.1 poll機制簡介 poll機制是一種i o多路轉接 i o multiplexing 技術,這種技術先構造乙個有關描述符的表,然後呼叫乙個函式,知道這些描述符中的乙個已準備就緒好進行i o時,該函式才返回,該函式返回之後它告訴程序那些描述符已經準備好進行i ...

uC OS II原始碼分析(五)

每個任務被賦予不同的優先順序等級,從0 級到最低優先順序os lowest pr1o,包括0 和 os lowest pr1o 在內。當 c os 初始化的時候,最低優先順序os lowest pr1o 總是被賦給空閒任務idle task 注意,最多工數目os max tasks 和最低優先順序數...

uC OS II原始碼分析(五)

每個任務被賦予不同的優先順序等級,從0 級到最低優先順序os lowest pr1o,包括0 和 os lowest pr1o 在內。當 c os 初始化的時候,最低優先順序os lowest pr1o 總是被賦給空閒任務idle task 注意,最多工數目os max tasks 和最低優先順序數...