Mqtt協議無法接收到離線訊息

2021-10-08 02:37:30 字數 3513 閱讀 1529

但在用c#實現的過程中,連線已經正常了,qos也設定為2了,就是收不到離線的訊息。後來發現,問題不是出現在協議上,而是在資料的接收上。在接收資料時,用socket.receive來接收,在連線的時候,會返回資料,而接收的buff開的較大,則會在接收到服務端對connect回應的資料報外,也會把離線的資料也一起收起來。這時就要按協議來作解包處理了。如果沒有作解包處理,則對收到的connect的conack命令處理,那後面的離線資料就會丟失了。

namespace mqttsdk

/// /// 標識。佔第乙個位元組的0-3bit

/// 包型別為publish時,對於mqtt 3.1.1,bit 0:retain3,bit 1:qos2,bit 2:qos2,bit 3:dup1。

/// 包型別為pubrel、subscribe、unsubscribe時,bit 1:1,bit 0、bit 2、bit 3均為0。

/// 其他包型別,bit 0、bit 1、bit 2、bit 3均為0。

///

public int flags

/// /// 剩餘長度。可變頭部和載荷的位元組數之和。

///

public int remaininglength

public byte firstbyte

/// /// 剩餘長度對應的位元組

///

public listremaininglengthbytes

#endregion

#region 可變頭和載荷的資料

/// /// 可變頭和載荷的資料

///

public listbytesofvariableheaderandpayload

#endregion

#endregion

#region read

/// /// 從socket讀取資料

///

///

public imqttpacket read(socket socket)

var byteone = new byte[1];

var len = socket.receive(byteone);

if (len <= 0)

bytesofvariableheaderandpayload = new list();

remaininglengthbytes = new list();

//讀取第1個位元組

firstbyte = byteone[0];

flags = firstbyte & 0b00001111;

packettype = (packettypeenum)(firstbyte >> 4);

//讀取第2個位元組

byteone[0] = 0;

len = socket.receive(byteone);

if (len <= 0)

var second_byte = byteone[0];

//===計算【剩餘長度】===

//【剩餘長度】取7bit來計算,最高位的bit7作為標識,

// bit7為1則說明下乙個位元組也是剩餘長度的一部分,

// bit7為0則讀取剩餘長度完畢。

int remaining_length = second_byte & 0b01111111;

var flag_bit = (second_byte & 0b10000000);

int multiplier = 1;

while (true)

len = socket.receive(byteone);

if (len <= 0)

var next_byte = byteone[0];

flag_bit = next_byte & 0b10000000;

remaining_length += (next_byte & 0b01111111) * multiplier;

remaininglengthbytes.add(next_byte);

}remaininglength = remaining_length;

//讀取可變頭和載荷的資料

var blocksize = 4096;

var buff = new byte[blocksize];

int readsize = blocksize;

while (remaining_length > 0)

else

len = socket.receive(buff, 0, readsize, socketflags.none);

if (len <= 0)

remaining_length -= len;

var tempbuff = new byte[len];

array.copy(buff, tempbuff, len);

bytesofvariableheaderandpayload.addrange(tempbuff);

}var packet = createpacket();

return packet;

}#endregion

#region createpacket

private imqttpacket createpacket()

case packettypeenum.suback:

case packettypeenum.puback:

case packettypeenum.pubrec:

case packettypeenum.pubrel:

case packettypeenum.pubcomp:

case packettypeenum.publish:

case packettypeenum.unsuback:

case packettypeenum.pingresp:

}return packet;

}#endregion

#region gettotalbytes

/// /// 獲取所有位元組

///

///

public byte gettotalbytes()

if (!listhelper.isempty(bytesofvariableheaderandpayload))

var buff = list.toarray();

return buff;

}#endregion}}

其中mqtt***packet是對每個命令包的實現,可以參照協議格式自行封裝。

namespace mqttsdk

}

測試:先用publish端發布訊息。

測試:再開始subcribe端接收訊息(離線訊息)

無法收到redis訂閱訊息

現網程式執行一段時間後,經常發現收不到redis訂閱訊息。輸入client list查詢redis連線資訊,輸出如下資訊 id 2375018 addr 120.15.207.135 9159 fd 663 name subarea age 3324 idle 563 flags n db 0 su...

android訊息推送 mqtt協議

對與訊息推送是什麼個概念,在此就不贅述啦。google自帶的c2md服務,可以幫助我們實現該功能,可以該伺服器在國外,所以鑑於網速等各種條件限制,我們也沒法實現。為解決該問題,在讀了大量的部落格等質料之後,終於見到啦陽光。第三個是由ibm提供的mqtt協議的實現,就相當於乙個 開啟1883埠,在se...

MQTT斷線重連訂閱無法接收

mqtt客戶端是用的 paho 採用以下配置 connopts new mqttconnectoptions connopts.setcleansession true connopts.setconnectiontimeout 10 connopts.setkeepaliveinterval 90...