基於MQTT的訊息發布訂閱python實現

2021-09-11 05:55:53 字數 4817 閱讀 8992

mqtt 全稱為 message queuing telemetry transport(訊息佇列遙測傳輸)是一種基於發布/訂閱正規化的「輕量級」訊息協議。該協議構建於tcp/ip協議上。mqtt協議是輕量、簡單、開放和易於實現的,這些特點使它適用範圍非常廣泛。在很多情況下,包括受限的環境中,如:機器與機器(m2m)通訊和物聯網(iot)。其在,通過衛星鏈路通訊感測器、偶爾撥號的醫療裝置、智慧型家居、及一些小型化裝置中已廣泛使用。

1、使用發布/訂閱訊息模式,提供一對多的訊息發布,解除應用程式耦合。該協議需要客戶端和服務端,而協議中主要有三種身份:發布者(publisher)、**(broker,伺服器)、訂閱者(subscriber)。其中,訊息的發布者和訂閱者都是客戶端,訊息**是伺服器,而訊息發布者可以同時是訂閱者,實現了生產者與消費者的脫耦;

2、對負載內容遮蔽的訊息傳輸;

3、使用 tcp/ip 提供網路連線;

4、有三種訊息發布服務質量:

5、小型傳輸,開銷很小(固定長度的頭部是 2 位元組),協議交換最小化,以降低網路流量;

6、使用 last will 和 testament 特性通知有關各方客戶端異常中斷的機制。

實現mqtt協議需要客戶端和伺服器端通訊完成,在通訊過程中,mqtt協議中有三種身份:發布者(publish)、**(broker)(伺服器)、訂閱者(subscribe)。其中,訊息的發布者和訂閱者都是客戶端,訊息**是伺服器,訊息發布者可以同時是訂閱者。

mqtt傳輸的訊息分為:主題(topic)和負載(payload)兩部分:

mqtt會構建底層網路傳輸:它將建立客戶端到伺服器的連線,提供兩者之間的乙個有序的、無損的、基於位元組流的雙向傳輸。

當應用資料通過mqtt網路傳送時,mqtt會把與之相關的服務質量(qos)和主題名(topic)相關連。

乙個使用mqtt協議的應用程式或者裝置,它總是建立到伺服器的網路連線。客戶端可以:

mqtt伺服器以稱為"訊息**"(broker),可以是乙個應用程式或一台裝置。它是位於訊息發布者和訂閱者之間,它可以:

一、訂閱(subscription)

訂閱包含主題篩選器(topic filter)和最大服務質量(qos)。訂閱會與乙個會話(session)關聯。乙個會話可以包含多個訂閱。每乙個會話中的每個訂閱都有乙個不同的主題篩選器。

二、會話(session)

每個客戶端與伺服器建立連線後就是乙個會話,客戶端和伺服器之間有狀態互動。會話存在於乙個網路之間,也可能在客戶端和伺服器之間跨越多個連續的網路連線。

三、主題名(topic name)

連線到乙個應用程式訊息的標籤,該標籤與伺服器的訂閱相匹配。伺服器會將訊息傳送給訂閱所匹配標籤的每個客戶端。

四、主題篩選器(topic filter)

乙個對主題名萬用字元篩選器,在訂閱表示式中使用,表示訂閱所匹配到的多個主題。

五、負載(payload)

訊息訂閱者所具體接收的內容。

mqtt協議中定義了一些方法(也被稱為動作),來於表示對確定資源所進行操作。這個資源可以代表預先存在的資料或動態生成資料,這取決於伺服器的實現。通常來說,資源指伺服器上的檔案或輸出。主要方法有:

更多介紹 1, 更多介紹2

安裝:pip install paho-mqtt

#!/usr/bin/env python  

# encoding: utf-8

"""

@version: v1.0

@author: w_h_j

@license: apache licence

@contact: [email protected]

@software: pycharm

@file: clicentmqtttest.py

@time: 2019/2/22 14:19

@describe: mqtt客戶端

"""import json

import sys

import os

import paho.mqtt.client as mqtt

import time

task_topic = 'test' # 客戶端發布訊息主題

client_id = time.strftime('%y%m%d%h%m%s', time.localtime(time.time()))

"""client_id是連線到**。如果client_id的長度為零或為零,則行為為由使用的協議版本定義。如果使用mqtt v3.1.1,

那麼乙個零長度的客戶機id將被傳送到**,**將被傳送為客戶端生成乙個隨機變數。如果使用mqtt v3.1,那麼id將是

隨機生成的。在這兩種情況下,clean_session都必須為true。如果這在這種情況下不會產生valueerror。

注意:一般情況下如果客戶端服務端啟用兩個監聽那麼客戶端client_id 不能與伺服器相同,如這裡用時間"20190222142358"作為它的id,

如果與伺服器id相同,則無法接收到訊息

"""client = mqtt.client(client_id, transport='tcp')

client.connect("127.0.0.1", 1883, 60) # 此處埠預設為1883,通訊埠期keepalive預設60

client.loop_start()

def clicent_main(message: str):

"""客戶端發布訊息

:param message: 訊息主體

:return:

"""time_now = time.strftime('%y-%m-%d %h-%m-%s', time.localtime(time.time()))

payload =

# publish(主題:topic; 訊息內容)

client.publish(task_topic, json.dumps(payload, ensure_ascii=false))

print("successful send message!")

return true

if __name__ == '__main__':

msg = "我是一條測試資料!"

clicent_main(msg)

#!/usr/bin/env python  

# encoding: utf-8

"""

@version: v1.0

@author: w_h_j

@license: apache licence

@contact: [email protected]

@software: pycharm

@file: servermqtttest.py

@time: 2019/2/22 14:35

@describe: mqtt 服務端

"""import json

import sys

import os

import time

import paho.mqtt.client as mqtt

report_topic = 'test' # 主題

def on_connect(client, userdata, flags, rc):

print('connected to mqtt with resurt code ', rc)

client.subscribe(report_topic) # 訂閱主題

def on_message(client, userdata, msg):

"""接收客戶端傳送的訊息

:param client: 連線資訊

:param userdata:

:param msg: 客戶端返回的訊息

:return:

"""print("start server!")

payload = json.loads(msg.payload.decode('utf-8'))

print(payload)

def server_conenet(client):

client.on_connect = on_connect # 啟用訂閱模式

client.on_message = on_message # 接收訊息

client.connect("127.0.0.1", 1883, 60) # 鏈結

# client.loop_start() # 以start方式執行,需要啟動乙個守護執行緒,讓服務端執行,否則會隨主線程死亡

client.loop_forever() # 以forever方式阻塞執行。

def server_stop(client):

client.loop_stop() # 停止服務端

sys.exit(0)

def server_main():

client_id = time.strftime('%y%m%d%h%m%s', time.localtime(time.time()))

client = mqtt.client(client_id, transport='tcp')

server_conenet(client)

if __name__ == '__main__':

# 啟動監聽

server_main()

官網

安裝教程

C 整合MQTT 發布 訂閱

服務端 服務端是乙個控制台應用 using mqttnet using mqttnet.core.adapter using mqttnet.core.diagnostics using mqttnet.core.protocol using mqttnet.core.server using sy...

mysql訊息訂閱與發布 發布 訂閱配置

public ibus bus public void handle mymessage message bus.publish e e.someproperty xyz 公共汽車 屬性將自動由基礎設施。這就是所謂的 依賴注入 所有開發完成了nservicebus利用這些模式。技術作為nservic...

mosquitto 基於MQTT訊息推送

mosquitto note 基於mqtt訊息推送 要解決的問題是 給android手機做訊息推送 自已搭建訊息推送後台 端 中轉訊息,守護程序 mosquitto 傳送訊息客戶端 mosquitto pub 接收訊息客戶端 mosquitto sub 管理密碼檔案 mosquitto passwd...