MQTT服务搭建及python使用示例

1、MQTT协议

1.1、MQTT介绍

        MQTT(Message Queuing Telemetry Transport)是一种轻量级的、基于发布/订阅模式的通信协议,通常用于物联网设备之间的通讯。它具有低带宽、低功耗和开放性等特点,适合在网络带宽有限或者网络连接不稳定的环境下使用。MQTT协议使用TCP/IP协议栈进行通讯,支持多种编程语言和平台,并且能够提供可靠的消息传递机制。

        在MQTT中,设备可以发布消息到特定的主题(topic),同时其他设备可以订阅这些主题以接收相应的消息。这种发布/订阅模式使得设备之间的通讯更加灵活和高效。MQTT还支持三种服务质量等级:至多一次(at most once)、至少一次(at least once)和恰好一次(exactly once),以满足不同场景下对消息传递可靠性的需求。

        优点:代码量少,开销低,带宽占用小,即时通讯协议。

1.2、MQTT概念

        订阅(Subscribtion):订阅包含主题筛选器( Topic Filter )和最大服务质量( QoS )。订阅会与一个会话( Session )关联。一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器。

        会话(Session):每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。

        主题名(Topic Name):连接到一个应用程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端。需要注意的是, MQTT 中消息主题按照层级命名,使用 ‘/’ 进行分割。此外,主题中可以使用通配符进行多个主题或多层级的订阅,有两种常见的通配符:

        单层通配符 + :单层通配符只能匹配一层的主题,例如: China/Beijing/+ ,可以匹配的只有 Beijing 这个主 题下面一层的主题,例如 Xicheng, DongCheng, Xuanwu 等等。

        多层通配符 # :顾名思义,多层通配符就是可以匹配多个层级的主题,例如: China/# ,可以匹配到的主题可能有:China/Beijing/Dongcheng, China/Shanghai/PuDong ,等等。

        主题筛选器(Topic Filter):一个对主题名通配符筛选器,在订阅表达式中使用,表示订阅所匹配到的多个主题。

        消息订阅:消息订阅者所具体接收的内容。

1.3、MQTT中的角色

        Publisher 和 Subscriber 为客户端,Broker 为服务器端,消息主题为消息类型,Broker 根据 Topic 过滤消息,并将消息向客户端推送。

MQTT 中用 QoS 表示服务质量, MQTT 协议中有三种服务质量 (QoS) :

        1 ) QoS =0 ,至多一次,可能会出现丢包的情况,使用在对实时性要求不高的情况,例如,将此服务质量与通信环境传感器数据一起使用。 对于是否丢失个别读取或是否稍后立即发布新的读取并不重要。

        2 ) QoS =1, 至少一次,保证包会到达目的地,但是可能出现重包。

        3 ) QoS =2, 刚好一次,保证包会到达目的地,且不会出现重包的现象。

客户端:

        Publisher 和 Subscriber 都属于客户端。

        发布应用消息给其它相关的客户端。

        订阅以请求接受相关的应用消息。

        取消订阅以移除接受应用消息的请求。

        从服务端断开连接

服务器:

        服务器端即所谓的 MQTT Broker 服务器。

        接受来自客户端的网络连接。

        接受客户端发布的应用消息。

        处理客户端的订阅和取消订阅请求。

        转发应用消息给符合条件的已订阅客户端。

MQTT 提供的公共服务器端( Broker )有:

test.mosquitto.orgbroker.hivemq.comiot.eclipse.org

2、基于公共服务示例

        在次,选择使用EMQX提供的免费MQTT公共服务器,但同样可以选择其他任何MQTT broker。The Free Global Public MQTT Broker | Try Now | EMQ (emqx.com)

Broker: broker.emqx.io 
TCP Port: 1883 
Websocket Port: 8083

python库:pip install paho-mqtt=="1.6.1"

消息发布代码,pub.py

import time
import random
from paho.mqtt import client as mqtt_clientbroker = 'broker.emqx.io'
port = 1883
topic = "/flask/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 1000)}'def connect_mqtt():def on_connect(client, userdata, flags, rc):if rc == 0:print("Connected to MQTT Broker!")else:print("Failed to connect, return code %d\n", rc)client = mqtt_client.Client(client_id)client.on_connect = on_connectclient.connect(broker, port)return clientdef publish(client):msg_count = 0while True:time.sleep(1)msg = f"messages: {msg_count}"result = client.publish(topic, msg)# result: [0, 1]status = result[0]if status == 0:print(f"Send `{msg}` to topic `{topic}`")else:print(f"Failed to send message to topic {topic}")msg_count += 1def run():client = connect_mqtt()client.loop_start()publish(client)if __name__ == '__main__':run()

运行打印结果:

消息订阅代码,sub.py:

import random
from paho.mqtt import client as mqtt_clientbroker = 'broker.emqx.io'
port = 1883
topic = "/flask/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 100)}'def connect_mqtt() -> mqtt_client:def on_connect(client, userdata, flags, rc):if rc == 0:print("Connected to MQTT Broker!")else:print("Failed to connect, return code %d\n", rc)client = mqtt_client.Client(client_id)client.on_connect = on_connectclient.connect(broker, port)return clientdef subscribe(client: mqtt_client):def on_message(client, userdata, msg):print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")client.subscribe(topic)client.on_message = on_messagedef run():client = connect_mqtt()subscribe(client)client.loop_forever()if __name__ == '__main__':run()

运行打印结果:

        可以看到,发布和订阅成功。注意:每个客户端的ID唯一,不能重复!

3、基于Apollo服务示例

3.1、安装Apollo服务

        服务器端搭建:

Index of /dist/activemq/activemq-apollo/1.7.1

解压并打开目标如下:

        Apollo中间件其实是免安装的,我们只需要下载apache-apollo-1.7.1-windows-distro.zip,然后解压到某个文件夹就可以了。在这里我解压到D:\dist\apache-apollo-1.7.1。在apache-apollo-1.7.1/bin目录下打开终端执行命令:apollo create myapollo

        若出现如下错误 Loading configuration file ‘D:\phpStudy\apache-apollo-1.7.1\bin\mybroker\etc\apollo.xml’.Startup failed: java.lang.NoClassDefFoundError: javax/xml/bind/ValidationEventHandler,处理方式:换上jdk1.8版本即可。

        然后在生成的myapollo目录的bin目录下执行:pollo-broker.cmd run,结果如下:

        服务搭建成功,进入后台管理,打开网页,输入ip + : 61680 进入后台管理 ,默认用户名admin 密码 password,例如:http://127.0.0.1:61680

3.2、示例代码

本地服务基础信息:

host="127.0.0.1" 
port = 61613 
用户名:admin 
密码:password

python库:pip install paho-mqtt=="1.6.1"

发布主题,publish.py

import sys
import time
import paho.mqtt.client as mqttdef on_connect(client, userdata, flags, rc):print("Connected with result code " + str(rc))def on_subscribe(client, userdata, mid, granted_qos):print("消息发送成功")client = mqtt.Client(protocol=3)
client.username_pw_set("admin", "password")
client.on_connect = on_connect
client.on_subscribe = on_subscribe
client.connect(host="127.0.0.1", port=61613, keepalive=60)  # 订阅频道
time.sleep(1)i = 0
while True:try:# 发布MQTT信息sensor_data = "test" + str(i)client.publish(topic="public", payload=sensor_data, qos=0)time.sleep(5)i += 1except KeyboardInterrupt:print("EXIT")client.disconnect()sys.exit(0)

订阅主题,subscribe.py

import time
import paho.mqtt.client as mqtt# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):if rc == 0:print("连接成功")print("Connected with result code " + str(rc))def on_message(client, userdata, msg):print(msg.topic + " " + str(msg.payload))client = mqtt.Client(protocol=3)
client.username_pw_set("admin", "password")
client.on_connect = on_connect
client.on_message = on_message
client.connect(host="127.0.0.1", port=61613, keepalive=60)  # 订阅频道
time.sleep(1)# client.subscribe("public")
client.subscribe([("public", 0), ("test", 2)])  # 订阅
client.loop_forever()

执行脚本后,正确大小消息。浏览器可见连接记录。

4、EMQX客户端工具

下载地址:MQTTX 下载选择适合您的平台,立即开始使用 MQTTX。icon-default.png?t=N7T8https://mqttx.app/zh/downloads

4.1、公共服务测试消息接收

创建连接:

Host:为代码中定义好的 broker.emqx.io 
Port:为代码中定义好的 1883 
用户名、密码根据需要添加

添加订阅:

主题为:/flask/mqtt

在MQTTX中发布消息:

测试成功。

4.2、自建服务测试消息接收

MQTT版本选择3.1.1,以下参数,与服务保持一致:

host="127.0.0.1" 
port = 61613 
用户名:admin 
密码:password

其它测试步骤,一样。(创建主题,测试)

5、EMQX代理服务器

        windows下搭建MQTT代理服务,与Apollo服务功能一样,更方便好用,下载地址:https://www.emqx.io/zh/downloads

        下载后解压,进入目录bin文件下,执行:emqx start 

        打开浏览器输入 http://localhost:18083 进入EMQ的web控制台,输入用户名:admin 密码:public 进行登录。登录进入后,界面如下:

至此,代理服务器已经创建完成!客户端就可以连接代理服务器进行消息的分发和订阅!

附录:

java1.8.1下载,地址:Java Downloads | Oracleicon-default.png?t=N7T8https://www.oracle.com/java/technologies/downloads/#java8-windows

参考:

1、嵌入式QT- QT使用MQTT

嵌入式QT- QT使用MQTT_qt mqtt-CSDN博客

2、Python实现通信协议(mqtt)5星,mqtt flask

【小沐学Python】Python实现通信协议(mqtt)_python mqtt-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3022055.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

Linux的虚拟机操作

一、linux系统 我们知道的系统用到的大多数是Windows系统。 Windows个人用到的有:win7 win10 win11 winxp 服务器用到的有:windows server 2003、2008、2013...........等等 linux也有系统。 centos 7 有5、6、8等等 redhat ubuntu kail 二、了…

C语言 循环语句 (3) for 循环语句

接下来 我们来看第三个 for语句 基本语句是 for关键字 然后小括号 括号中三个表达式 然后它对表达式2进行判断 如果表达式2条件成立 则走进循环体 执行完循环体 会回来执行表达式3 然后再返回来 继续对表达式2进行判断 如果表达式2 还是成立 这继续循环往复 直到表达式2的条件…

JAVA队列相关习题4

1. 用队列实现栈。 225. 用队列实现栈 - 力扣(LeetCode) 一个队列无法实现栈 尝试使用两个队列 1)push元素的时候应当放在那里?哪个队列不为空就放在哪里 2)出栈的时候,出不为空的队列size-1元素,剩余元…

关于Anaconda常用的命令

常用命令 查看当前环境下的环境:conda env list查看当前conda的版本;conda --version conda create -n your_env_name pythonX.X(2.7、3.6等)命令创建python版本为X.X。名字为your_env_name的虚拟环境。your_env_name文件可以在Anaconda安装…

抖音小店怎么运营?最全的运营攻略来了?

大家好,我是电商糖果 很多开好店铺的小伙伴,都会遇到一个难题,那就是不会运营店铺。 可能好几个月才出十几单,甚至体验分都没有弄出来。 说实话,这种情况糖果见多了。 糖果做抖音小店也有四年多了,也开…

Java快速入门系列-11(项目实战与最佳实践)

第十一章:项目实战与最佳实践 11.1 项目规划与需求分析项目规划需求分析实例代码 11.2 系统设计考虑实例代码 11.3 代码实现与重构实例代码 11.4 性能优化与监控实例代码 11.5 部署与持续集成/持续部署(CI/CD)实例代码 11.1 项目规划与需求分析 在进行任何软件开发…

《编译原理》阅读笔记:p1-p3

《编译原理》学习第 1 天,p1-p3总结,总计 3 页。 一、技术总结 1.compiler(编译器) p1, But, before a program can be run, it first must be translated into a form in which it can be executed by a computer. The software systems that do thi…

动态规划day.2

62.不同路径 链接:. - 力扣(LeetCode) 题目描述: 一个机器人位于一个 m x n 网格的左上角 (起始点在下图中标记为 “Start” )。 机器人每次只能向下或者向右移动一步。机器人试图达到网格的右下角&#…

Google Earth Engine谷歌地球引擎计算遥感影像在每个8天间隔内的多年平均值

本文介绍在谷歌地球引擎(Google Earth Engine,GEE)中,求取多年时间中,遥感影像在每1个8天时间间隔内的多年平均值的方法。 本文是谷歌地球引擎(Google Earth Engine,GEE)系列教学文章…

【linux软件基础知识】-死锁问题

死锁问题 当两个或多个线程由于每个线程都在等待另一个线程持有的资源而无法继续时,就会发生死锁 如下图所示, 在线程 1 中,代码持有了 L1 上的锁,然后尝试获取 L2 上的锁。 在线程 2 中,代码持有了 L2 上的锁,然后尝试获取 L1 上的锁。 在这种情况下,线程 1 已获取 L…

CSS---Emmet(二)

一、Emmet语法 Emmet语法是一种用于快速编写HTML和CSS的缩写技术。它允许开发者通过简洁的表达式快速生成复杂的代码结构,极大地提高了编码效率。使用Emmet,你只需要写出一些简短的缩写符号和操作符,然后通过快捷键(通常是Tab键&…

PPP点对点协议

概述 Point-to-Point Protocol,点到点协议,工作于数据链路层,在链路层上传输网络层协议前验证链路的对端,主要用于在全双工的同异步链路上进行点到点的数据传输。 PPP主要是用来通过拨号或专线方式在两个网络节点之间建立连接、…

C语言例题35、反向输出字符串(指针方式),例如:输入abcde,输出edcba

#include <stdio.h>void reverse(char *p) {int len 0;while (*p ! \0) { //取得字符串长度p;len;}while (len > 0) { //反向打印到终端printf("%c", *--p);len--;} }int main() {char s[255];printf("请输入一个字符串&#xff1a;");gets(s)…

泛型编程四:容器

文章目录 前言一、序列容器verctor 总结 前言 STL有六大部件&#xff0c;容器、算法、仿函数、迭代器、适配器和分配器。除了算法是函数模板&#xff0c;其他都是类模板。容器可以分为序列容器和关联容器。常见的序列容器有vector、array、deque、list、forward-list&#xff…

各种流量包特征

[CVE-2013-1966] Apache Struts2 远程命令执行漏洞 要执行的命令在exec里面&#xff0c;而且回显数据包里面有明显执行结果回显 [CVE-2017-8046] Spring Data Rest 远程命令执行漏洞 回显不明显&#xff0c;考试提供的解码工具不能解密&#xff0c; [CVE-2017-12149] JBOSS…

基于Springboot的校园健康驿站管理系统(有报告)。Javaee项目,springboot项目。

演示视频&#xff1a; 基于Springboot的校园健康驿站管理系统&#xff08;有报告&#xff09;。Javaee项目&#xff0c;springboot项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系…

【stm-4】PWM驱动LED呼吸灯 PWM驱动舵机PWM驱动直流电机

1.PWM驱动LED呼吸灯 void TIM_OC1Init(TIM_TypeDef* TIMx, TIM_OCInitTypeDef* TIM_OCInitStruct); //结构体初始化输出比较单元 void TIM_OC2Init(TIM_TypeDef* TIMx, TIM_OCInitTypeDef* TIM_OCInitStruct); void TIM_OC3Init(TIM_TypeDef* TIMx, TIM_OCInitTypeDef*…

力扣HOT100 - 131. 分割回文串

解题思路&#xff1a; class Solution {List<List<String>> res new ArrayList<>();List<String> pathnew ArrayList<>();public List<List<String>> partition(String s) {backtrack(s,0);return res;}public void backtrack(Str…

创新指南|创新组合管理的7个陷阱以及如何避免它们

进入未知领域的第一步可能具有挑战性。尽管创新会犯错误&#xff0c;但在将 IPM 作为公司实践实施时&#xff0c;您可以准备好并避免一些常见的陷阱。在这篇文章中&#xff0c;我们将讨论组织在实施创新组合管理时遇到的最常见的陷阱。 01. 在映射中包含日常业务任务 映射中的…

Github的使用教程(下载和上传项目)

根据『教程』一看就懂&#xff01;Github基础教程_哔哩哔哩_bilibili 整理。 1.项目下载 1&#xff09;直接登录到源码链接页或者通过如下图的搜索 通过编程语言对搜索结果进一步筛选。 2&#xff09;红框区为项目的源代码&#xff0c;README.md &#xff08;markdown格式&…