SpringBoot集成MQTT

官网配置说明

MQTT Supporticon-default.png?t=N658https://docs.spring.io/spring-integration/reference/html/mqtt.html#mqtt

Spring integration交互逻辑

对于发布者:

  • 消息通过消息网关发送出去,由 MessageChannel 的实例 DirectChannel 处理发送的细节。
  • DirectChannel 收到消息后,内部通过 MessageHandler 的实例 MqttPahoMessageHandler 发送到指定的 Topic。

对于订阅者:

  • 通过注入 MessageProducerSupport 的实例 MqttPahoMessageDrivenChannelAdapter,实现订阅 Topic 和绑定消息消费的 MessageChannel。
  • 同样由 MessageChannel 的实例 DirectChannel 处理消费细节。Channel 消息后会发送给我们自定义的 MqttInboundMessageHandler 实例进行消费。

添加maven依赖

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId><version>5.5.5</version></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.5.5</version></dependency>

yaml配置文件

mqtt.username=admin
mqtt.password=123456
mqtt.url=tcp://127.0.0.1:1883
mqtt.client.id=${random.value}
mqtt.default.topic=topic,mqtt/test/#
mqtt.completionTimeout=5000

mqtt生产者消费者配置类

package com.xinghe.tech.user.infrastructure.mqtt;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;import java.util.Arrays;
import java.util.List;@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttSenderAndReceiveConfig {private static final byte[] WILL_DATA;static {WILL_DATA = "offline".getBytes();}@Autowiredprivate MqttReceiveHandle mqttReceiveHandle;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Value("${mqtt.url}")private String hostUrl;@Value("${mqtt.client.id}")private String clientId;@Value("${mqtt.default.topic}")private String defaultTopic;@Value("${mqtt.completionTimeout}")private int completionTimeout;   //连接超时/*** MQTT连接器选项**/@Bean(value = "getMqttConnectOptions")public MqttConnectOptions getMqttConnectOptions1() {MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接mqttConnectOptions.setCleanSession(true);// 设置超时时间 单位为秒mqttConnectOptions.setConnectionTimeout(10);mqttConnectOptions.setAutomaticReconnect(true);mqttConnectOptions.setUserName(username);mqttConnectOptions.setPassword(password.toCharArray());mqttConnectOptions.setServerURIs(new String[]{hostUrl});// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制mqttConnectOptions.setKeepAliveInterval(10);// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。//mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false);return mqttConnectOptions;}/*** MQTT工厂**/@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(getMqttConnectOptions1());return factory;}/*** MQTT信息通道(生产者)**/@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}/*** MQTT消息处理器(生产者)**/@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "_producer", mqttClientFactory());messageHandler.setAsync(true);messageHandler.setDefaultTopic(defaultTopic);messageHandler.setAsyncEvents(true); // 消息发送和传输完成会有异步的通知回调//设置转换器 发送bytes数据DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();converter.setPayloadAsBytes(true);return messageHandler;}/*** 配置client,监听的topic* MQTT消息订阅绑定(消费者)**/@Beanpublic MessageProducer inbound() {List<String> topicList = Arrays.asList(defaultTopic.trim().split(","));String[] topics = new String[topicList.size()];topicList.toArray(topics);MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter(clientId + "_consumer", mqttClientFactory(), topics);adapter.setCompletionTimeout(completionTimeout);DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();converter.setPayloadAsBytes(true);adapter.setConverter(converter);adapter.setQos(2);adapter.setOutputChannel(mqttInputChannel());return adapter;}/*** MQTT信息通道(消费者)**/@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}/*** MQTT消息处理器(消费者)**/@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler handler() {return new MessageHandler() {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {//处理接收消息mqttReceiveHandle.handle(message);}};}
}

消息处理类

package com.xinghe.tech.user.infrastructure.mqtt;import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;/*** mqtt客户端消息处理类**/
@Slf4j
@Component
public class MqttReceiveHandle {public void handle(Message<?> message) {log.info("收到订阅消息: {}", message);String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();log.info("消息主题:{}", topic);}
}

mqtt发送接口

package com.xinghe.tech.user.infrastructure.mqtt;import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;/*** mqtt发送消息* (defaultRequestChannel = "mqttOutboundChannel" 对应config配置)* **/
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {/*** 发送信息到MQTT服务器** @param*/void sendToMqttObject(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);/*** 发送信息到MQTT服务器** @param topic 主题* @param payload 消息主体*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);/*** 发送信息到MQTT服务器** @param topic 主题* @param qos 对消息处理的几种机制。* 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。* 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。* 2 多了一次去重的动作,确保订阅者收到的消息有一次。* @param payload 消息主体*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);/*** 发送信息到MQTT服务器** @param topic 主题* @param payload 消息主体*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, Object payload);/*** 发送信息到MQTT服务器** @param topic 主题* @param payload 消息主体*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);
}

mqtt事件监听类

package com.xinghe.tech.user.infrastructure.mqtt;import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class MqttListener {/*** 连接失败的事件通知* @param mqttConnectionFailedEvent*/@EventListener(classes = MqttConnectionFailedEvent.class)public void listenerAction(MqttConnectionFailedEvent mqttConnectionFailedEvent) {log.info("连接失败的事件通知");}/*** 已发送的事件通知* @param mqttMessageSentEvent*/@EventListener(classes = MqttMessageSentEvent.class)public void listenerAction(MqttMessageSentEvent mqttMessageSentEvent) {log.info("已发送的事件通知");}/*** 已传输完成的事件通知* 1.QOS == 0,发送消息后会即可进行此事件回调,因为不需要等待回执* 2.QOS == 1,发送消息后会等待ACK回执,ACK回执后会进行此事件通知* 3.QOS == 2,发送消息后会等待PubRECV回执,知道收到PubCOMP后会进行此事件通知* @param mqttMessageDeliveredEvent*/@EventListener(classes = MqttMessageDeliveredEvent.class)public void listenerAction(MqttMessageDeliveredEvent mqttMessageDeliveredEvent) {log.info("已传输完成的事件通知");}/*** 消息订阅的事件通知* @param mqttSubscribedEvent*/@EventListener(classes = MqttSubscribedEvent.class)public void listenerAction(MqttSubscribedEvent mqttSubscribedEvent) {log.info("消息订阅的事件通知");}
}

接口调用

MqttGateway mqttGateway;String sendData = "{\"mac\":\"02-00-00-01"\",\"method\":\"switch\",\"data\":{\"channel\":0,\"switch\" :1},\"version\":\"V1.0\"}";
String topic = "iot/gateway/01-00-00-02/ctrl";
mqttGateway.sendToMqtt(topic, sendData);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/20336.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

自动驾驶与智能网联场地测试一体化装备应用

自动化驾驶层级与结构 L1:能够辅助驾驶员玩车某些驾驶任务制动防抱死系统 (ABS),车身电子稳定系统 (ESP)等,这些配置就是L1级别的运用。 L2:部分自动化,在L2的级别里,必须要具备的是自适应巡航系统,主动车道保持系统自动刹车辅助系统以及自动泊车系统等系统。 L3:有条件…

【Android】从零搭建组件化项目

组件化系列文章介绍的内容稍微多了点&#xff0c;本着研究透这玩意的精神&#xff0c;从组件化的简介开始说起。 目录 简介组件化、模块化与插件化开始创建配置共享文件打包模式配置APT与JavaPoet 简介 什么是组件化&#xff1f; 将多个功能模板拆分、重组的过程。 为什么要使…

基于单片机智能衣柜 智能衣橱 换气除湿制系统紫外线消毒的设计与实现

功能介绍 以51单片机作为主控系统&#xff1b;液晶显示当前衣柜温湿度和柜门开启关闭状态&#xff1b;按键设置当前衣柜湿度上限值、衣柜门打开和关闭&#xff0c;杀菌消毒&#xff1b;当湿度超过设置上限&#xff0c;继电器闭合开启风扇进行除湿&#xff1b;进行杀菌消毒时&am…

量子近似优化算法(QAOA)入门(1):从量子绝热算法(QAA)角度的直观理解

文章目录 前言&#xff1a;量子计算的本质是测量一、基于量子逻辑电路的常用算法1.NISQ&#xff1a;Noisy Intermediate-Scale Quantum&#xff08;含噪声中等规模量子&#xff09; 二、量子绝热算法&#xff08;QAA&#xff1a;Quantum Adiabatic Algorithm&#xff09;1.QAA的…

爬虫入门07——requests中携带cookie信息

爬虫入门07——requests中携带cookie信息 对于需要登陆的网站如果不携带cookie是无法获取我们所需内容的就以查看我在CSDN中的订单为例&#xff0c;在登陆后可以查看到订单信息 而当我们使用Python代码发出请求时&#xff0c;是不携带cookie&#xff0c;因此无法拿到订单相关信…

查看某个三方依赖jar包是在哪个pom引入的(springboot+idea)

项目springboot1升级2版本&#xff0c;日志框架使用的是log4j&#xff0c;升级到springboot2版本某些依赖引入了logback依赖包&#xff0c;然后项目启动报错&#xff1a; 查看这个SLF4JLoggerContextFactory这个类是在哪个jar包下 使用idea的maven依赖图查看功能寻找是哪个p…

page_dewarp实现弯曲文本矫正

朋友们&#xff0c;如果你使用ocr&#xff0c;再识别的时候会遇到文本扭曲的问题&#xff0c;为了解决这个问题&#xff0c;需要进行弯曲文本矫正&#xff0c;这里推荐一个开源项目&#xff0c;可以使用上面的功能进行矫正&#xff0c;不过里面可能需要改动一些代码&#xff0c…

Go []uint8和string的爱恨情仇

先上代码&#xff1a; package mainimport "fmt"func main() {byteSlice : []uint8{52, 44, 51} // 示例字节切片str : string(byteSlice)fmt.Printf("byteSlice:%v\r\n", str) }// 执行-输出 byteSlice:4,3 干货&#xff1a; 在Go语言中&#xff0c;[]u…

【逻辑回归实例】

逻辑回归&#xff1a;从理论到实践 在本文中&#xff0c;我们将介绍一种被广泛用于二分类问题的机器学习模型——逻辑回归。我们将通过一个实例&#xff0c;深入解析如何在 Python 环境中实现逻辑回归。 源数据下载链接 1. 什么是逻辑回归&#xff1f; 逻辑回归是一种用于解…

怎么给pdf文件加密?pdf文档如何加密

在数字化时代&#xff0c;保护个人和机密信息的重要性越来越受到关注。PDF&#xff08;Portable Document Format&#xff09;是一种广泛使用的文件格式&#xff0c;用于共享和存储各种类型的文档。然而&#xff0c;由于其易于编辑和复制的特性&#xff0c;保护PDF文件中的敏感…

Django_自定义文件存储类并将图片上传到FastDFS

目录 将图片到FastDFS和浏览的流程图 使用自定义文件存储类 1、定义存储类 2、创建FastDFS的配置文件 3、修改settings.py配置 4、上传图片进行验证 4.1 定义一个包含ImageField字段的模型类 4.2 登录django的admin后台 4.3 上传图片 4.4 查看图片 源码等资料获取方…

如何在WordPress网站中嵌入TikTok视频(3个简单方法)

您想轻松地将 TikTok 视频嵌入您的 WordPress 网站吗&#xff1f; 如果您已经创建了成功的、病毒式传播的 TikTok&#xff0c;那么将这些视频发布到您的网站也是有意义的。通过这种方式&#xff0c;您只需很少的额外努力就能获得更多的观看次数、参与度和社交媒体关注者。 在…