SpringBoot集成Mqtt发送消息

1. MQTT简介

MQTT是一种物联网消息协议,为Message Queuing Telemetry Transport的缩写,即消息队列传输探测,协议基于发布订阅模式进行通信,有开销低、带宽小、轻量的特点,通常应用在物联网数据采集、移动应用、智能硬件、电力、能源等领域。

相关概念

三种身份:

在这里插入图片描述

  • 客户端(Client):MQTT 客户端是发送接收消息的应用程序。
  • 服务器(Broker):也叫“代理”,服务器是处理消息的应用程序,位于发布者和订阅者中间,负责接收消息,并按照某种规则发送给订阅者。
  • 主题(Topic): 主题是消息的标识符,用于区分不同类型的消息。

MQTT 消息

MQTT传输的消息可以分为:主题(topic)和负载(payload)两部分

  • 主题,可以理解为消息的类型
  • 负载,可以理解为消息的内容

消息服务质量QoS(Quality of Service)

Qos用于保证在不同的网络环境下消息传递的可靠性,分为3个等级

  • 0 消息最多传递一次,消息发布完全依赖底层TCP/IP网络,可能会发生消息丢失, 也就是发出去就不管了,也被叫做“即发即弃”
  • 1 消息传递至少 1 次,确保消息到达,但消息重复可能会发生,发送者将会存储发送的信息直到发送者收到一次来自接收者的PUBACK格式的应答。
  • 2 消息仅传送一次,确保消息到达一次

2. SpringBoot集成Mqtt

Spring集成Mqtt常用的有两种方式,一种是直接使用Mqtt的客户端库,如Eclipse Paho,另外一种是spring integration mqtt
第一种:使用Mqtt客户端库
依赖引入:org.eclipse.paho.client.mqttv3

<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.0</version>
</dependency>

服务端配置

public class MqttSendMsgService {private static String clientId = "test";private static String username = "admin";private static String password = "xxxxxx";private static String broker = "tcp://xxxxx:1883";public ReturnT<String> mqttSend(String param) {MqttClient client;try {client = new MqttClient(broker, clientId, new MemoryPersistence());client.setCallback(new MqttCallback() {public void connectionLost(Throwable cause) {System.out.println("Connection lost: " + cause.getMessage());}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {System.out.println("Message arrived: " + mqttMessage.getPayload());}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {System.out.println("Delivery complete");}});MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setUserName(username);connOpts.setPassword(password.toCharArray());client.connect(connOpts);log.info("Connected to MQTT Broker!");//主题String topic="test/simple";//消息String content="发送测试";MqttMessage message = new MqttMessage();message.setQos(1);message.setRetained(false);message.setPayload(content.getBytes());//消息发送client.publish(topic,message);} catch (MqttException e) {e.printStackTrace();}return ReturnT.SUCCESS;}
}

上面这种使用起来比较简单,生产环境使用最多的还是下面这种

第二种:使用 Spring integration进行集成,这里以发送消息为例
依赖引入

<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.5.14</version>
</dependency>

添加yaml配置

mqtt.url = tcp://xxxxx:1883
mqtt.username = admin
mqtt.password = 123456
mqtt.clientId = test
mqtt.defaultTopic = /test/send
mqtt.keepAliveInterval = 60
mqtt.automaticReconnect = true
mqtt.cleanSession = false
mqtt.connectionTimeout = 30
mqtt.maxInflight = 1024

添加对应的属性配置类

@Component
public class MqttConfigProperties {@Value("${mqtt.url}")private String url;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Value("${mqtt.clientId}")private String clientId;@Value("${mqtt.defaultTopic}")private String defaultTopic;@Value("${mqtt.keepAliveInterval}")private Integer keepAliveInterval;@Value("${mqtt.automaticReconnect}")private Boolean automaticReconnect;@Value("${mqtt.cleanSession}")private Boolean cleanSession;@Value("${mqtt.connectionTimeout}")private Integer connectionTimeout;@Value("${mqtt.maxInflight}")private Integer maxInflight;
}

创建客户端配置类

@Configuration
@IntegrationComponentScan
public class MqttConfig {@Autowiredprivate MqttConfigProperties mqttConfigProperties;@Beanpublic MqttConnectOptions mqttConnectOptions() {log.info("初始化mqtt信息{}", JSON.toJSON(mqttConfigProperties));MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttConfigProperties.getUsername());options.setPassword(mqttConfigProperties.getPassword().toCharArray());options.setServerURIs(new String[]{mqttConfigProperties.getUrl()});options.setKeepAliveInterval(mqttConfigProperties.getKeepAliveInterval());options.setAutomaticReconnect(mqttConfigProperties.getAutomaticReconnect());options.setCleanSession(mqttConfigProperties.getCleanSession());options.setConnectionTimeout(mqttConfigProperties.getConnectionTimeout());options.setMaxInflight(mqttConfigProperties.getMaxInflight());return options;}@Beanpublic MqttPahoClientFactory mqttPahoClientFactory(MqttConnectOptions mqttConnectOptions) {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(mqttConnectOptions);return factory;}// 推送通道@Beanpublic MessageChannel mqttOutputChannel() {return new DirectChannel();}@Bean@ServiceActivator(inputChannel = "mqttOutputChannel")public MessageHandler sendHandler(MqttPahoClientFactory mqttPahoClientFactory) {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfigProperties.getClientId() + "-publish", mqttPahoClientFactory);messageHandler.setAsync(true);messageHandler.setDefaultQos(1);messageHandler.setDefaultTopic(mqttConfigProperties.getDefaultTopic());log.info("初始化mqttOutputChannel...");return messageHandler;}}

发送网关接口

@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
public interface MqttGateway {/*** 发送消息** @param topic* @param data*/void send(@Header(MqttHeaders.TOPIC) String topic, String data);
}

这样,在发送消息时,直接将消息网关注入,调用发送方法就可以发送了

mqttGateway.send(topic, JSONObject.toJSONString(msg));

参考:
https://mqtt.org/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/484416.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

pandas/geopandas 笔记:判断地点在不在路网上 不在路网的点和路网的距离

0 导入库 import osimport pandas as pd pd.set_option(display.max_rows,5)import osmnx as oximport geopandas as gpd from shapely.geometry import Point 1 读取数据 假设我们有 如下的数据&#xff1a; 1.1 新加坡室外基站位置数据 cell_stationpd.read_csv(outdoor…

面试答疑03

1、登录鉴权怎么做的&#xff1f;为什么采用jwt的方式&#xff1f;有什么好处&#xff1f; Java登录鉴权常见的实现方式包括**CookieSession、HTTP Basic Authentication、ServletJDBC**等。 在Java的Web应用中&#xff0c;登录鉴权是确认用户身份的关键环节。一个常用的传统…

《探索网校 App 的魅力世界:知识与科技的完美结合》

在数字化时代&#xff0c;教育也在经历着深刻的变革。网校 App 的出现&#xff0c;正为学习者们开启了一扇通往知识的新大门&#xff0c;它融合了科技的力量&#xff0c;让学习变得更加便捷、高效和有趣。 一、功能与优势 网校 App 提供了丰富多样的学习功能&#xff0c;满足了…

Vue 中 onclick和@click区别

文章目录 一、直接上结论二、验证代码&#xff0c;可直接运行三、点击结果 一、直接上结论 onclick 只能触发 js的原生方法&#xff0c;不能触发vue的封装方法click 只能触发vue的封装方法&#xff0c;不能触发js的原生方法 二、验证代码&#xff0c;可直接运行 <!DOCTYP…

11-树-二叉树的前序遍历

这是树的第11篇算法&#xff0c;力扣链接。 给你二叉树的根节点 root &#xff0c;返回它节点值的 前序 遍历。 示例 1&#xff1a; 输入&#xff1a;root [1,null,2,3] 输出&#xff1a;[1,2,3] 做了这么久的树问题&#xff0c;现在开始回忆三种遍历方法&#xff0c;这篇文章…

【嵌入式学习】IO进程线程day02.22

一、思维导图 二、习题 1> 将互斥机制的代码实现 #include <myhead.h>//定义一个全局变量 char str[128]"我是一个全局字符串数组"; //1、创建一个互斥锁变量 pthread_mutex_t mutex;//线程1 void *pth1(void *arg) {//上锁pthread_mutex_lock(&mutex…

为新固态硬盘安装操作系统

目录 背景方案具体步骤1 为新硬盘进行分区2 下载Dism3 下载win10的iso文件4 通过Dism 重装系统5 从biso调整启动顺序5 遗留问题 参考资料 背景 情况是这样的&#xff0c;我的电脑本来就有一块sata的固态硬盘&#xff0c;作为c盘&#xff0c;装载的是win10系统。 一方面只有500…

matlab代码--基于matlabLDPC-和积译码系统

LDPC编码 一个码长为n、信息位个数为k的线性分组码&#xff08;n,k&#xff09;可以由一个生成矩阵 来定义&#xff0c;信息序列 通过G被映射到码字XS.G。线性分组码也可以由一个校验矩阵 来描述。所以码字均满足 。校验矩阵的每一行表示一个校验约束 &#xff0c;其中所有的非…

springboot大学生体质测试管理系统源码和论文

大学生体质测试管理系统提供给用户一个简单方便体质测试管理信息&#xff0c;通过留言区互动更方便。本系统采用了B/S体系的结构&#xff0c;使用了java技术以及MYSQL作为后台数据库进行开发。系统主要分为系统管理员、教师和用户三个部分&#xff0c;系统管理员主要功能包括首…

(全注解开发)学习Spring-MVC的第三天

全注解开发 第一部分 : 1.1 消除spring-mvc.xml 这些是原来spring-mvc.xml配置文件的内容 <!--1、组件扫描, 使Controller可以被扫描到--><context:component-scan base-package"com.itheima.controller"/><!--2、非自定义的Bean, 文件上传解析器--&…

尝试一下最新的联合办公利器ONLYOffice

下载下来一起试试吧 桌面安装版下载地址&#xff1a;https://www.onlyoffice.com/zh/download-desktop.aspx) 官网地址&#xff1a;https://www.onlyoffice.com 普通Office对联合办公的局限性 普通Office软件&#xff08;如Microsoft Office、Google Docs等&#xff09;在面对…

程序员可以做哪些副业?

如果你经常玩知乎、看公众号&#xff08;软件、工具、互联网这几类的&#xff09;你就会发现&#xff0c;好多资源连接都变成了夸克网盘、迅雷网盘的资源链接。 例如&#xff1a;天涯神贴&#xff0c;基本上全是夸克、UC、迅雷网盘的资源链接。 有资源的前提下&#xff0c;迅雷…