若依前后分离版框架下Springboot java引入Mqtt接受发送消息

**这只是其中一种而且是粗浅的接、发消息。
同步机制还要跟搞物联网的同事沟通确认去看看能不能实现 或者是设备比较多的情况下 不会去使用同步机制
首先pom文件 引入依赖
**

        <dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>

其次配置文件mqtt配置,我这是yml,其他配置文件写法需要改动下

mqtt:username: ****** # 用户名password: ****** # 密码hostUrl: tcp://******:1883 # tcp://ip:端口clientId: clientId # 客户端iddefaultTopic: electric/#,test # 订阅主题  electric/#表示以electric/开头的主题都可以接受到timeout: 100 # 超时时间 (单位:秒)keepalive: 60 # 心跳 (单位:秒)enabled: true # 是否使用mqtt功能

**接下来到了代码层面了
先创建一个yml文件的实体类 MqttConfig
prefix = 这里地址看你自己的配置
@ConfigurationProperties(prefix = “mqtt”)
**

import com.ruoyi.common.utils.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
@ConfigurationProperties(prefix = "mqtt")
public class MqttConfig {@Autowiredprivate MqttPushClient mqttPushClient;/*** 用户名*/private String username;/*** 密码*/private String password;/*** 连接地址*/private String hostUrl;/*** 客户Id*/private String clientId;/*** 默认连接话题*/private String defaultTopic;/*** 超时时间*/private int timeout;/*** 保持连接数*/private int keepalive;/*** mqtt功能使能*/private boolean enabled;private boolean retained;/*** qos*/private int qos;public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}public String getHostUrl() {return hostUrl;}public void setHostUrl(String hostUrl) {this.hostUrl = hostUrl;}public String getClientId() {return clientId;}public void setClientId(String clientId) {this.clientId = clientId;}public String getDefaultTopic() {return defaultTopic;}public void setDefaultTopic(String defaultTopic) {this.defaultTopic = defaultTopic;}public int getTimeout() {return timeout;}public void setTimeout(int timeout) {this.timeout = timeout;}public int getKeepalive() {return keepalive;}public void setKeepalive(int keepalive) {this.keepalive = keepalive;}public boolean isEnabled() {return enabled;}public void setEnabled(boolean enabled) {this.enabled = enabled;}public int getQos() {return qos;}public void setQos(int qos) {this.qos = qos;}@Beanpublic MqttPushClient getMqttPushClient() {if(enabled == true){String mqtt_topic[] = StringUtils.split(defaultTopic, ",");mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);//连接for(int i=0; i<mqtt_topic.length; i++){mqttPushClient.subscribe(mqtt_topic[i], 0);//订阅主题}}return mqttPushClient;}
}

**这里在创建 MqttPushClient 文件
去链接客户端、发消息、订阅主题 功能都在这里
**

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class MqttPushClient {private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);@Autowiredprivate PushCallback pushCallback;private static MqttClient client;private static MqttClient getClient() {return client;}private static void setClient(MqttClient client) {MqttPushClient.client = client;}/*** 客户端连接** @param host      ip+端口* @param clientID  客户端Id* @param username  用户名* @param password  密码* @param timeout   超时时间* @param keepalive 保留数*/public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {MqttClient client;try {client = new MqttClient(host, clientID, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);options.setUserName(username);options.setPassword(password.toCharArray());options.setConnectionTimeout(timeout);options.setKeepAliveInterval(keepalive);MqttPushClient.setClient(client);try {client.setCallback(pushCallback);client.connect(options);} catch (Exception e) {e.printStackTrace();}} catch (Exception e) {e.printStackTrace();}}/*** 发布消息** @param pubTopic 主题* @param message 内容* @param qos   连接方式*/public  static void publishMessage(String pubTopic, String message, int qos) {System.out.println("发布消息   "+client.isConnected());System.out.println("id:"+client.getClientId());MqttMessage mqttMessage = new MqttMessage();mqttMessage.setQos(qos);mqttMessage.setPayload(message.getBytes());MqttTopic topic = client.getTopic(pubTopic);if(null != topic) {try {MqttDeliveryToken publish = topic.publish(mqttMessage);if(!publish.isComplete()) {logger.info("发布消息成功");}} catch (MqttException e) {e.printStackTrace();}}}/*** 订阅某个主题** @param topic 主题* @param qos   连接方式*/public static void subscribe(String topic, int qos) {logger.info("开始订阅主题" + topic);try {MqttPushClient.getClient().subscribe(topic, qos);} catch (MqttException e) {e.printStackTrace();}}}

**再创建一个继承回调方法的接口 PushCallback
**

package com.ruoyi.util.mqttUtil;import com.alibaba.fastjson2.JSONObject;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class PushCallback implements MqttCallback {private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);@Autowiredprivate MqttConfig mqttConfig;private static MqttClient client;private static String _topic;private static String _qos;private static String _msg;@Overridepublic void connectionLost(Throwable throwable) {// 连接丢失后,一般在这里面进行重连logger.info("连接断开,可以做重连");if (client == null || !client.isConnected()) {mqttConfig.getMqttPushClient();}}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {// subscribe后得到的消息会执行到这里面logger.info("接收消息主题 : " + topic);logger.info("接收消息Qos : " + mqttMessage.getQos());logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));_topic = topic;_qos = mqttMessage.getQos()+"";_msg = new String(mqttMessage.getPayload());}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {logger.info("发布消息成功");//发布消息成功之后 才会调用这里 大家可以仔细看看token里面 后续同步机制也是利用这个token去完成logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());}//别的Controller层会调用这个方法来  获取  接收到的硬件数据public String receive() {JSONObject jsonObject = new JSONObject();jsonObject.put("topic", _topic);jsonObject.put("qos", _qos);jsonObject.put("msg", _msg);return jsonObject.toString();}}

到这就需要去下载个 MQTTX 跟服务器直接互相接发消息了

下图红框内的随意填写 服务器地址、端口、用户名、密码使用java代码配置文件里面的
在这里插入图片描述

在这里插入图片描述

往下就是连接上 添加一个订阅,记得 这个订阅要在你在配置文件里面哦 什么名字都ok /#相当于模糊查询
在这里插入图片描述
好了 这里可以启动项目了 控制台会打印咱们订阅的主题的,也就是说这些主题给咱们发消息 会直接被咱们接受的
在这里插入图片描述

启动项目 由于咱们的配置文件里订阅了test这个主题 我在mqttx里面直接给 test这个主题发送信息

框住的地方是什么就是给那个主题发消息

控制台自动打印 订阅的test主题信息
在这里插入图片描述
**到这里的话 接受消息就完事了 就要搞下发消息了
随便找个controller弄个请求搞一下 **

    @RequestMapping("/send")@ResponseBodyprivate ResponseEntity<String> send() throws MqttException {System.out.println("我是springboot发送的数据");//三个参数 第一个是什么主题,第二个发送内容,第三个是MqttPushClient.publishMessage("clientId1","-===============",1);return new ResponseEntity<>("OK", HttpStatus.OK);}

在这里插入图片描述

在这里插入图片描述

已分享完毕,只是很基础的应用 另过几天如果项目有需求会在这继续完善同步mqtt请求的后续 如果接受不到消息 一定要看看订阅的主题对应起来没

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/177612.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

全方位移动机器人 Stanley 轨迹跟踪 Gazebo 仿真

全方位移动机器人 Stanley 轨迹跟踪 Gazebo 仿真 本来打算今天出去跑一下 GPS&#xff0c;但是下雨&#xff0c;作罢 添加参考轨迹信息 以下三个功能包不需要修改&#xff1a; mrobot&#xff1a;在 Rviz 和 Gazebo 中仿真机器人cmd_to_mrobot&#xff1a;运动学解算&#…

最佳实践-使用Github Actions来构建跨平台容器镜像

公众号「架构成长指南」&#xff0c;专注于生产实践、云原生、分布式系统、大数据技术分享。 前言 最近在写K8s的相关系列文章&#xff0c;因为有涉及到镜像构建&#xff0c;发现在Mac m1的Arm架构下构建的部分镜像&#xff0c;没法在X86架构下使用&#xff0c;不兼容。 尝试…

Xocde 升级15 或者 iOS17报错:

错误&#xff1a; Assertion failed: (false && "compact unwind compressed function offset doesnt fit in 24 bits"), function operator(), file Layout.cpp, line 5758. 翻译&#xff1a; 断言失败&#xff1a;&#xff08;false&&“压缩展开…

【仿真】ruckig在线轨迹生成器示例

该场景说明了使用 CoppeliaSim 中提供的 Ruckig 在线轨迹生成功能的各种方法&#xff1a; 1. 在线程脚本内使用单个阻塞函数&#xff08;红色&#xff09; 2. 在线程脚本中使用多个非阻塞函数&#xff08;黄色&#xff09; 3. 在非线程脚本中使用多个非阻塞函数&#xff08;…

基于闪电搜索算法优化概率神经网络PNN的分类预测 - 附代码

基于闪电搜索算法优化概率神经网络PNN的分类预测 - 附代码 文章目录 基于闪电搜索算法优化概率神经网络PNN的分类预测 - 附代码1.PNN网络概述2.变压器故障诊街系统相关背景2.1 模型建立 3.基于闪电搜索优化的PNN网络5.测试结果6.参考文献7.Matlab代码 摘要&#xff1a;针对PNN神…

PyG(torch_geometric)的MessagePassing详解

1. 提出MessagePassing的目的 MessagePassing是图神经网络&#xff08;Graph Neural Networks&#xff0c;GNNs&#xff09;的一个基础组件&#xff0c;它被设计用来处理图形数据的问题。在图形数据中&#xff0c;数据点&#xff08;节点&#xff09;之间的关系&#xff08;边…

【工具流】WSL2安装

一些废话 最近看到了PKU出品的cs自学指南&#xff0c;想要跟着里面的自学路径学国外的优质课程&#xff0c;无奈大多数pre教程里面都是直接Linux环境下的操作&#xff0c;并且我在CSwiki看到了那个熟悉的上学期学了一点的missing-semester课。 上学期自学missing-semester的时候…

Windows系统Mysql数据库、文件夹自动备份

一、批处理bat文件编写 批处理命令如下&#xff0c;使用时需要将相关参数修改为实际参数 echo off color 0a chcp 65001::数据库备份文件及模型文件备份的根路径 SET BACKUP_DIRZ:\backup ::**************************************配置MySQL数据库备份相关参数*************…

谷歌提出AGI的6大原则,和5大能力等级

随着ChatGPT等大模型的出现,AGI概念正在从哲学层面快速转向实际应用落地&#xff0c;并且ChatGPT已经展示出了初级AGI的功能&#xff08;如AutoGPT&#xff09;,有不少专家认为&#xff0c;AGI时代可能在10年内到来。 因此&#xff0c;需要一个明确的技术框架来讨论和衡量不同…

深度学习+opencv+python实现车道线检测 - 自动驾驶 计算机竞赛

文章目录 0 前言1 课题背景2 实现效果3 卷积神经网络3.1卷积层3.2 池化层3.3 激活函数&#xff1a;3.4 全连接层3.5 使用tensorflow中keras模块实现卷积神经网络 4 YOLOV56 数据集处理7 模型训练8 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &am…

学好Python-新手小白如何做?

新手小白如何学好Python?有哪些参考方法吗?这是一个老生常谈的话题了。今天为大家带来两位前辈的分享&#xff0c;他们给出了非常实用的方法和思路&#xff0c;希望对你有所帮助。 1、多练&#xff0c;两个字&#xff1a;多练 如果真的要说方法可以参考如下&#xff1a; ①…

Acrobat Pro DC 2023 中文版

Acrobat Pro DC 2023是PDF编辑和管理软件&#xff0c;具有以下优点&#xff1a; 更好的安全性&#xff1a;Acrobat Pro DC 2023采用了新的安全功能&#xff0c;包括加密、数字签名等&#xff0c;可以更好地保护PDF文件的安全性。 更高的速度和性能&#xff1a;Acrobat Pro DC …