SpringBoot项目接入MQTT协议

mqtt是一种类似于mq的通讯技术

1、mqtt服务端搭建

创建docker网络

docker network create --driver bridge --subnet 172.18.0.0/24 --gateway 172.18.0.1 emqx-net

创建容器

docker run -d \--name emqx1 \-e "EMQX_NODE_NAME=emqx@172.18.0.2" \--network emqx-net \--ip 172.18.0.2 \--network-alias 172.18.0.2 \-p 1883:1883 \-p 8083:8083 \-p 8084:8084 \-p 8883:8883 \-p 18083:18083 \emqx/emqx:5.4.1docker run -d \--name emqx2 \--ip 172.18.0.3 \-e "EMQX_NODE_NAME=emqx@172.18.0.3" \--network emqx-net \--network-alias 172.18.0.3 \emqx/emqx:5.4.1docker run -d \--name emqx3 \--ip 172.18.0.4 \-e "EMQX_NODE_NAME=emqx@172.18.0.4" \--network emqx-net \--network-alias 172.18.0.4 \emqx/emqx:5.4.1

服务节点注册

docker exec -it emqx2 \
emqx ctl cluster join emqx@172.18.0.2docker exec -it emqx3 \
emqx ctl cluster join emqx@172.18.0.2

2、创建springboot项目,并增加mqtt依赖

依赖引入

<dependency><groupId>com.hivemq</groupId><artifactId>hivemq-mqtt-client</artifactId><version>1.3.0</version>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version>
</dependency>

创建连接器

@Configuration
public class VideoConfig implements MqttClientDisconnectedListener, MqttClientConnectedListener {static Logger logger = LoggerFactory.getLogger(VideoConfig.class);@Value("${mqtt.server.url:192.168.31.47}")private String serverUrl;//mqtt地址@Value("${mqtt.server.port:1883}")private Integer serverPort;//mqtt地址@Value("${mqtt.server.username}")private String serverUserName;//mqtt账号(测试默认没有)@Value("${mqtt.server.password}")private String serverPassWord;//mqtt密码(测试默认没有)@Autowiredprivate DefautMqttConsumerListener defautMqttConsumerListener;@Beanpublic Mqtt3AsyncClient mqtt3AsyncClient(){String clientId = String.format("%d%s", TimeUtil.getCurrentInMillis(), RandomUtil.getRandomStr(10));Mqtt3ClientBuilder mqttClientBuilder = Mqtt3Client.builder();Mqtt3AsyncClient mqttClient = mqttClientBuilder.identifier(clientId).serverHost(serverUrl).serverPort(serverPort).addConnectedListener(this).addDisconnectedListener(this).build().toAsync();mqttClient.connect();mqttClient.connectWith().keepAlive(60).willPublish().topic("/").applyWillPublish().simpleAuth().username(serverUserName).password(serverPassWord.getBytes()).applySimpleAuth().send().whenCompleteAsync((connAck, throwable) -> {Mqtt3ConnAckReturnCode returnCode = connAck.getReturnCode();logger.info("mqtt connect result: {}", returnCode);if (throwable != null) {logger.error("connectWith error , throwable :"+throwable);}});return mqttClient;}/***连接成功回调后 监听mqtt消息**/@Overridepublic void onConnected(MqttClientConnectedContext mqttClientConnectedContext) {String subscribedTopic ="+/reply";//+表示匹配一个信息mqtt3AsyncClient().subscribeWith().topicFilter(subscribedTopic).callback(defautMqttConsumerListener).send().whenComplete((subAck, throwable) -> {if (throwable != null) {logger.error("Handle failure to subscribe", throwable);} else {logger.info("successful subscription: " + subscribedTopic);}});}/***连接关闭回调重新创建连接**/@Overridepublic void onDisconnected(MqttClientDisconnectedContext mqttClientDisconnectedContext) {final Mqtt3ClientDisconnectedContext context = (Mqtt3ClientDisconnectedContext) mqttClientDisconnectedContext;try {context.getReconnector().connectWith().simpleAuth().username(serverUserName).password(serverPassWord.getBytes()).applySimpleAuth().applyConnect().reconnect(true).delay(new Random().nextInt(100), TimeUnit.MILLISECONDS);} catch (Exception e) {logger.error("reconnect:" + e.getMessage(), e);}}
}

监听器

@Component
public class DefautMqttConsumerListener implements Consumer<Mqtt3Publish> {private static final Logger logger = LoggerFactory.getLogger(DefautMqttConsumerListener.class);@Overridepublic void accept(Mqtt3Publish mqttPublish) {String topic = mqttPublish.getTopic().toString();byte[] msg = mqttPublish.getPayloadAsBytes();String msgJson = new String(msg);logger.info("mqtt listener topic :{} ,msg:{}" ,topic, msgJson);}}

消息发送

public void sendMqttMsg(String topic, MqttQos qos, String msg){mqtt3AsyncClient.publishWith().topic(topic).payload(msg.getBytes()).qos(Optional.ofNullable(qos).orElse(MqttQos.AT_LEAST_ONCE)).retain(false).send().whenComplete((result, throwable) -> {logger.info("sendMqttMsg to video, topic : {} , body : {}",topic,body);if (throwable != null) {logger.error("transfer failed , throwable :{}",throwable);}});}

3、MQTT工具

下载地址:https://mqttx.app/zh/downloads
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

QOS

分为3级,0表示发一次,意味着消息可能会丢失;1表示至少发一次,意味着消息可能会收到多次;2表示保证一次,但是越高性能越低,可以根据自己业务进行选择

topic

类似于rocketmq的topic,也类似于rabbitmq的routingKey,mqtt的topic同样也是消息收发的引导,监听时 + 号,表示匹配 任意 / 号中间的任何数据,# 号表示多个数据 包含了 / 号后边多个

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/438620.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【LeetCode每日一题】56. 合并区间插入区间

一、判断区间是否重叠 力扣 252. 会议室 给定一个会议时间安排的数组 intervals &#xff0c;每个会议时间都会包括开始和结束的时间 intervals[i] [starti, endi] &#xff0c;请你判断一个人是否能够参加这里面的全部会议。 思路分析 因为一个人在同一时刻只能参加一个会…

Zygote的启动流程

在zygote进程对应的文件是app_main.cpp文件&#xff0c;在app_main.cpp文件的main()方法中先解析了init.rc中配置的参数并根据配置的参数设置zygote的状态。 在状态设置阶段主要做了&#xff1a; 设置进程名称为zygote通过startSystemServer true标示启动的是systemServer调…

SpringSecurity笔记

SpringSecurity 本笔记来自三更草堂&#xff1a;https://www.bilibili.com/video/BV1mm4y1X7Hc/?spm_id_from333.337.search-card.all.click&#xff0c;仅供个人学习使用 简介 Spring Security是Spring家族中的一个安全管理框架。相比与另外一个安全框架Shiro&#xff0c;…

消息中间件RabbitMQ介绍

一、基础知识 1. 什么是RabbitMQ RabbitMQ是2007年发布&#xff0c;是一个在AMQP(高级消息队列协议)基础上完成的&#xff0c;简称MQ全称为Message Queue, 消息队列&#xff08;MQ&#xff09;是一种应用程序对应用程序的通信方法&#xff0c;由Erlang&#xff08;专门针对于大…

What is Rust? Why Rust?

why Rust&#xff1f; 目前&#xff0c;Rust 变得越来越流行。然而&#xff0c;仍然有很多人&#xff08;和公司&#xff01;&#xff09;误解了 Rust 的主张价值是什么&#xff0c;甚至误解了它是什么。在本文中&#xff0c;我们将讨论 Rust 是什么以及为什么它是一种可以增强…

基于springboot+vue的医院管理系统(前后端分离)

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战 主要内容&#xff1a;毕业设计(Javaweb项目|小程序等)、简历模板、学习资料、面试题库、技术咨询 文末联系获取 研究背景…

###C语言程序设计-----C语言学习(6)#

前言&#xff1a;感谢老铁的浏览&#xff0c;希望老铁可以一键三连加个关注&#xff0c;您的支持和鼓励是我前进的动力&#xff0c;后续会分享更多学习编程的内容。 一. 主干知识的学习 1. while语句 除了for语句以外&#xff0c;while语句也用于实现循环&#xff0c;而且它…

Kotlin快速入门系列7

Kotlin的数据类、密封类、枚举类 数据类 与Java等语言不同&#xff0c;kotlin创建只包含数据的类时&#xff0c;需要在类名前声明关键字&#xff1a;data。 data class KotlinBean (val brand : String) 在kotlin中&#xff0c;编译器会自动的从主构造函数中根据所有声明的…

Linux提权:Docker组挂载 Rsync未授权 Sudo-CVE Polkit-CVE

目录 Rsync未授权访问 docker组挂载 Sudo-CVE漏洞 Polkit-CVE漏洞 这里的提权手法是需要有一个普通用户的权限&#xff0c;一般情况下取得的webshell权限可能不够 Rsync未授权访问 Rsync是linux下一款数据备份工具&#xff0c;默认开启873端口 https://vulhub.org/#/envir…

【数据分析】numpy基础第一天

文章目录 前言本文代码&#xff1a;使用jupyter notebook打开本文的代码操作示例步骤1.打开Anaconda Powershell Prompt步骤2.复制代码文件地址步骤3.在Anaconda Powershell Prompt中打开jupyter notebook步骤3.5.解决一个可能的问题步骤4.在浏览器中查看ipynb文件步骤5.运行代…

【STC8A8K64D4开发板】第2-10讲:定时器/计数器

第2-10讲&#xff1a;定时器/计数器 学习目的了解定时器/计数器的概念和区别。掌握STC8A8K64D4定时器/计数器的应用流程及程序设计。 Timer原理 定时器几乎是每个单片机必有的重要外设之一&#xff0c;可用于定时、精确延时、计数等等&#xff0c;在检测、控制领域有广泛应用。…

JAVA多线程并发补充

AQS 是一个抽象父类 全称是 AbstractQueuedSynchronizer&#xff0c;是阻塞式锁和相关的同步器工具的框架。 用 state 属性来表示资源的状态&#xff08;分独占模式和共享模式&#xff09;&#xff0c;子类需要定义如何维护这个状态&#xff0c;控制如何获取锁和释放锁 getSt…