SpringBoot集成MQTT协议

简介

MQTT 可以被解释为一种低开销,低带宽占用的即时通讯协议,可以用较少的代码和带宽为远程设备连接提供实时可靠的消息服务,它适用于硬件性能低下的远程设备以及网络状况糟糕的环境下,因此 MQTT 协议在 IoT(Internet of things,物联网),小型设备应用,移动应用等方面有较广泛的应用。

优点:代码量少,开销低,带宽占用小,即时通讯协议。

MQTT原理

实现mqtt协议需要客户端和服务器端通讯完成,在通讯中,mqtt协议中有三种身份:发布者(publish),代理(broker)(服务器),订阅者(subscribe)。其中,消息的发布者和订阅者都是客户端。消息代理是服务器,消息发布者可以同时是订阅者,传输过程如下如所示。

image.png

有别于传统的客户端/服务器通讯协议,MQTT协议并不是端到端的,消息传递通过代理,包括会话(session)也不是建立在发布者和订阅者之间,而是建立在端和代理之间。代理解除了发布者和订阅者之间的耦合。

除了发布者和订阅者之间传递普通消息,代理还可以为发布者处理保留消息和遗愿消息,并可以更改服务质量(QoS)等级。

MQTT主题

1、主题层级分隔符“/”

用于分割主题的每个层级,为主题名提供一个分层结构。如主题:

china/anhui
china/anhui/hefei

2、多层通配符“#”

用于匹配主题中任意层级的通配符。如主题:china/#

china/anhui
china/anhui/hefei
china/anhui/hefei/shushan

3、单层通配符“+”

加号是只能用于单个主题层级匹配的通配符。如主题:

china/+      只能匹配 china/anhui
china/+/+/shushan     能匹配china/anhui/hefei/shushan

4、通配符“$”

通配符“$”表示匹配一个字符,只要不是放在主题的最开头,即:

$xx
/$xx
/xx$

实战应用

对接协议部分内容

image.png

SpringBoot集成Mqtt协议

本篇文章就不重点介绍MQTT相关的内容了,主要学会怎么运用到实际开发工作中。

  1. 添加依赖
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId>
</dependency>
  1. 添加配置类
@Configuration
@ConfigurationProperties(prefix="spring.mqtt")
public class MqttProperties {private String url;private String username;private String password;private String clientId;public String getUrl() {return url;}public void setUrl(String url) {this.url = url;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}public String getClientId() {return clientId;}public void setClientId(String clientId) {this.clientId = clientId;}
}
@Configuration
@ConditionalOnBean(MqttProperties.class)
public class MqttConfiguration {@Autowiredprivate MqttProperties mqttProperties;@Autowiredprivate MqttMessageHandler mqttMessageHandler;/*** 创建MqttPahoClientFactory,设置MQTT Broker连接属性,如果使用SSL验证,也在这里设置。** @return factory*/@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();// 设置代理端的URL地址,可以是多个options.setServerURIs(new String[]{mqttProperties.getUrl()});options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());options.setKeepAliveInterval(120);factory.setConnectionOptions(options);return factory;}/*** 入站通道*/@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}/*** 入站*/@Beanpublic MessageProducer inbound() {// Paho客户端消息驱动通道适配器,主要用来订阅主题MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId() + "-consumer",mqttClientFactory(),MqttConstants.UP_DEVICE_STATE,MqttConstants.UP_DEVICE_STATUS,MqttConstants.UP_DEVICE_EVENT,MqttConstants.UP_DEVICE_SET_ACK,MqttConstants.UP_DEVICE_GET_ACK,MqttConstants.UP_DEVICE_CAPTURE_REPORT,MqttConstants.UP_DEVICE_CONTROL_QUERY_ACK,MqttConstants.UP_DEVICE_CONTROL_ACK);// Paho消息转换器DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();adapter.setConverter(defaultPahoMessageConverter);adapter.setCompletionTimeout(5000);// 设置QoSadapter.setQos(1);adapter.setOutputChannel(mqttInputChannel());return adapter;}/*** ServiceActivator注解表明:当前方法用于处理MQTT消息,inputChannel参数指定了用于消费消息的channel。** @return*/@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler handler() {return mqttMessageHandler;}/*** 出站通道*/@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}/*** 出站*/@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler outbound() {// 发送消息和消费消息Channel可以使用相同MqttPahoClientFactoryMqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(mqttProperties.getClientId() + "-producer", mqttClientFactory());// 如果设置成true,即异步,发送消息时将不会阻塞。mqttPahoMessageHandler.setAsync(true);// 设置默认QoSmqttPahoMessageHandler.setDefaultQos(1);// Paho消息转换器DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();mqttPahoMessageHandler.setConverter(defaultPahoMessageConverter);return mqttPahoMessageHandler;}
}
  1. 消息处理类
@Service
public class MqttMessageHandler implements MessageHandler {private final Logger logger = LoggerFactory.getLogger(MqttMessageHandler.class);@Overridepublic void handleMessage(Message<?> message) throws MessagingException {try {String payload = message.getPayload().toString();String topic = message.getHeaders().get("mqtt_receivedTopic").toString();logger.info("接受来自mqtt的订阅信息,topic:{}", topic);//离线上报if (topic.matches(".+/offline")) {statusReport(payload);}//状态上报else if (topic.matches(".+/status")) {deviceInfoReport(payload);} //事件上报else if (topic.matches(".+/eventReport")) {eventReport(payload);} else {logger.info("主题topic:{},负载payload:{}", topic, payload);}} catch (Exception e) {logger.error("handleMessage 接受mqtt订阅消息异常:", e);}}/*** 设备状态上报** @param payload*/private void statusReport(String payload) {OfflineReport offlineReport = JSONUtil.toBean(payload, OfflineReport.class);logger.info("收到设备状态信息上报:{}", offlineReport);//...省略}//...省略}
/*** 消息发送*/
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {/*** 定义重载方法,用于消息发送** @param payload*/void sendToMqtt(String payload);/*** 指定topic进行消息发送** @param topic* @param payload*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
}

源码地址:https://gitee.com/jiangwang001/springboot/tree/master/cy-cabinet-adapter

小结

本文简单介绍了一下MQTT协议的基本知识,在实际工作中,MQTT通常应用于物联网、智能家居等设备和应用程序之间的通信。在嵌入式领域,MQTT已经占据着无法替代的分量,因为大多数的嵌入式设备,都需要这样的协议进行数据交互。

2024一起加油~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/319661.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

‘react-native‘ 不是内部或外部命令,也不是可运行的程序或批处理文件。

原因&#xff1a;没有下载react-native 解决下载react-native npm i -g react-native-cli

B样条曲线

零次 B 样条 F i &#xff0c; 0 ( t ) { 1 t i ≤ t < t i 1 0 o t h e r s \bm{F}_{i&#xff0c;0}(t) \begin{cases} 1 & t_i \leq t <t_{i1} \\ 0 & others \end{cases} Fi&#xff0c;0​(t){10​ti​≤t<ti1​others​ 2. 一次 B 样条&#xff0c;…

Green Sock | GSAP 动画库

1.什么是“GSAP”&#xff1f; GreenSock Animation Platform&#xff08;GSAP&#xff09; 是一个业界知名的动画工具套件&#xff0c;在超过1100万个网站上使用&#xff0c;其中包括大量获奖网站&#xff01; 您可以使用GSAP在任何框架中制作几乎任何JavaScript可以触及的动…

从第一步开始

从新建文件到开始写代码 新建文件 创建项目步骤 选择C++ 选择存放路径,给项目起个名字 最后选择编译器 进入写代码环节 写代码 #include <iostream>using

地震烈度速报与预警工程成功案例的经验分享 | TDengine 技术培训班第一期成功落地

近日&#xff0c;涛思数据在成都开设了“国家地震烈度速报与预警工程数据库 TDengine、消息中间件 TMQ 技术培训班”&#xff0c;这次培训活动共分为三期&#xff0c;而本次活动是第一期。其目标是帮助参与者深入了解 TDengine 和 TMQ 的技术特点和应用场景&#xff0c;并学习如…

学习Vue 03-03 为TypeScript使用defineComponent支持

03 为TypeScript使用defineComponent支持 The defineComponent() method is a wrapper function that accepts an object of configurations and returns the same thing with type inference for defining a component. defineComponent() 方法是一个封装函数&#xff0c;它…

来自云仓酒庄分享为什么同一种葡萄会使用不同的名称?

如果你只是刚刚走进葡萄酒世界&#xff0c;走在葡萄酒通道上可能会令人生畏&#xff0c;因为有不同的国家、地区和生产商&#xff0c;除此之外还有数千酿酒葡萄品种。更令人困惑的是&#xff0c;有些地方对同一种葡萄使用不同的名称&#xff01;一个著名的例子是西拉和它澳大利…

S32K312软件看门狗之Software Watchdog Timer (SWT)

S32DS的SDK中提供了Wdg&#xff0c;是属于MCAL层的&#xff0c;配置有点复杂&#xff0c;还需要以来Gpt、Mcu和Platform框架里的东西&#xff0c;配置到已经开发好的工程中还容易出现配置问题。本文主要讲解Software Watchdog Timer (SWT)的软件看门狗配置和使用示例&#xff0…

Linux配置Acado

如果需要使用acado的matlab接口&#xff0c;请移步&#xff1a;Linux Matlab配置Acado 首先&#xff0c;安装必要的软件包&#xff1a; sudo apt-get install gcc g cmake git gnuplot doxygen graphviz在自定义目录下&#xff0c;下载源码 git clone https://github.com/ac…

业务中台IT内部拉通会分享

在我们这个项目中&#xff0c;各个产品之间是通过扁平化的方式进行管理。在前期规划阶段&#xff0c;由于项目计划和模块负责人已经提前确认&#xff0c;各小组都能专注于自己的工作&#xff0c;一切井然有序。 然而&#xff0c;到了UAT阶段&#xff0c;我们发现扁平化的管理方…

Mysql锁机制与优化

欢迎大家关注我的微信公众号&#xff1a; 传送门&#xff1a;Mysql事务原理与优化 目录 概述 锁分类 锁等待分析 锁优化实践 概述 锁是计算机协调多个进程或线程并发访问某一资源的机制。 在数据库中&#xff0c;除了传统的计算资源&#xff08;如CPU、RAM、I/O等&…

提升Windows系统安全性的一些有效的策略

假设一个杀猪的机器人感染了病毒&#xff0c;把人识别成了猪&#xff0c;&#xff0c;&#xff0c;&#xff0c;&#xff0c; 1&#xff1a;我偶然发现的&#xff1a;把所有向外的UDP都禁止&#xff0c;但是要开放53号端口&#xff0c;因为这是DNS通讯端口&#xff0c;没有这个…