Mqtt学习使用

news/2025/3/4 21:26:29/文章来源:https://www.cnblogs.com/sansWL/p/18751503

1. 依赖导入

<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.0</version>
</dependency>

2. 客户端连接

 String broker = "tcp://broker.emqx.io:1883";String clientId = "demo_client";MqttClient client;public MqttServerClient()  {try {MqttClient client = new MqttClient(broker, clientId);//MqttAsyncClient aClient = new MqttAsyncClient(broker, clientId); //异步通信客户端MqttConnectOptions options = new MqttConnectOptions();// 连接 MQTT Broker 的用户名密码options.setUserName("username");options.setPassword("password".toCharArray());// 是否清除会话options.setCleanSession(true);// 心跳间隔,单位为秒options.setKeepAliveInterval(300);// 连接超时时间,单位为秒options.setConnectionTimeout(30);// 是否自动重连options.setAutomaticReconnect(true);client.connect(options);//this.client = client;//init();}catch (MqttException e) {e.printStackTrace();}}

3. 客户端回调V3(V5增加了数个额外实现)

//方法在 CommsCallback.class  执行回调
client.setCallback(new MqttCallback() {//消息发送被接收到public void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println("topic: " + topic);System.out.println("qos: " + message.getQos());System.out.println("message content: " + new String(message.getPayload()));}public void connectionLost(Throwable cause) {System.out.println("connectionLost: " + cause.getMessage());}public void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete: " + token.isComplete());}});

注意
1.订阅的clientId 与 发布的clientId 需要保持不同,否则会发生客户端断连问题;
2. 使用的ClientId 尽可能复杂些,避免连接失败(使用官方提供的broker = "tcp://broker.emqx.io:1883";)

4. 订阅Topic

 public void subscribe(String topic) throws MqttException {// qos的数量需要与topic一致,存在方法签名为 public void subscribe(String[] topicFilters, int[] qos) throws MqttException【批量】int qos = 2;client.subscribe(topic, qos);
//允许插入回调 IMqttMessageListener}
//走的是异步客户端处理MqttToken token = new MqttToken(getClientId());token.setActionCallback(callback);token.setUserContext(userContext);token.internalTok.setTopics(topicFilters);MqttSubscribe register = new MqttSubscribe(topicFilters, qos);comms.sendNoWait(register, token);

+:单层通配,必须占据一个层级,例如 test/+/aa <===> {test/1/aa,test/abc/aa}、+、test/+
#:多层通配,必须占用一个层级,且是最后一个字符,例如 test/# <===> {test/aa,test/aa/bb... } 、test/demo/#、#
$share/{Share Name}/{Topic Filter}: 共享订阅,{shareName}定义的共享组名,{topicFilter}主题名与publish的一致,例如 $share/test1/demo(sub)、 demo(pub)
$queue/{Topic File}: MQTT3.1.1
负载均衡 EMQX

# etc/emqx.conf# 均衡策略
broker.shared_subscription_strategy = random# 当设备离线,或者消息等级为 QoS1、QoS2,因各种各样原因设备没有回复 ACK 确认,消息会被重新派发至群组内其他的设备。
broker.shared_dispatch_ack_enabled = false

5. 发布msg

 public void publish(String topic, String msg) throws MqttException {int qos = 1;MqttMessage message = new MqttMessage(msg.getBytes());message.setQos(qos);client.publish(topic, message);
//允许插入回调 IMqttActionListener}
//同样走的是异步客户端处理MqttDeliveryToken token = new MqttDeliveryToken(getClientId());token.setActionCallback(callback);token.setUserContext(userContext);token.setMessage(message);token.internalTok.setTopics(new String[] { topic });MqttPublish pubMsg = new MqttPublish(topic, message);comms.sendNoWait(pubMsg, token);

Retain(保留消息) 生产者publish的消息会保存一份最新消息,当订阅此topic后会拿取到这份消息,且消息的retain属性为true,注意必须是后订阅读取到的消息才是保留消息
Will(遗嘱消息) 在连接到指定Broker前指定,options.setWill(),会创建一个独立的Topic

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/893638.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AI训练进行web前后端开发协助

周五我们进行了针对web开发辅助的ai训练,以下是训练过程: 首先我搭建好了环境 然后我根据编程习惯将web前后端分成了这几点:之后我就将其丢给了deepseek,它直接帮我生成了上到mysql表格建立,下到前端jsp页面的所有文件, 哎,这之前怎么没觉得ai这么好用啊,要是用上这种开…

JavaWeb学习(六)

JavaWeb学习(六):Web前端开发 —— 其余内容 目录JavaWeb学习(六):Web前端开发 —— 其余内容Ajax前端工程化Vue 组件库 ElementVue 路由打包部署 本文为个人学习记录,内容学习自 黑马程序员Ajax概念:Asynchronous JavaScript And XML,异步的 JavaScript 和 XML作用:…

leetcode hot 19

解题思路:这题思路主要是用某个容器(数组等)来存储链表,然后再用双指针或者同时遍历的方法就可以判断。我使用栈进行存储,出栈的元素相当于链表倒序,如果和链表正序的结果相同就证明是回文。如果想用O(1)的空间,就把前一半或后一半的链表倒转,然后遍历就可以了。 /*** D…

code.c WriteUp

题目链接: https://pan.baidu.com/s/1u8bGbKcUF6_gLaw63L3jyA?pwd=h8r5 提取码: h8r5 WriteUp 题目名称:code.c 分类:Reverse 描述:得到源码的输出结果 解题思路 首先用Vscode查看code.c文件,发现该文件对argv[]进行判断处理,其中很容易联想到这和输出结果紧密联系。依次…

6. Calcite添加自定义函数

1. 简介 在上篇博文中介绍了如何使用calcite进行sql验证, 但是真正在实际生产环境中我们可能需要使用到用户自定义函数(UDF): 通过代码实现对应的函数逻辑并注册给calcitesql验证: 将UDF信息注册给calcite, SqlValidator.validator验证阶段即可通过验证 sql执行: calcite通过调…

P10945 Place the Robots 紫 题解

Part 1. 题意 在 \(N \times M\) 的矩阵中的空地放人机,任一人机上下左右走到边界或墙之前遇不到另一人机。 我已经尽力写得简短了。。Part 2. 思路 我们先思考无墙的情况。 若无墙,则同車的放置,把草方块当作禁止放車的方块即可,。 贴一下车的放置的代码: #include <b…

如何实现和调试REST API中的摘要认证(Digest Authentication)

如何实现和调试REST API中的摘要认证(Digest Authentication) 在保护REST API时,开发者通常会在多种认证机制之间进行选择,其中摘要认证(Digest Authentication)是一种常见的选择。本文探讨了使用摘要认证的原因,解释了其原理,提供了Java和Go语言的实现示例,并提供了测…

CF2068H. Statues

CF2068H. Statues 构造题. 思路 我们设 \(d_0 = a + b\) 是第 1 座雕像到第 \(n\) 座雕像的距离. 那么首先可以注意到两个必要条件:\(\displaystyle \sum_{i = 0}^{n - 1} d_i\) 为偶数. 对于 \(\forall i \in [0, n - 1]\), 都有 \(d_i \le d_0 + \dots + d_{i - 1} + d_{i + …

WEB攻防-机制验证篇重定向发送响应状态码跳过步骤验证码回传枚举

笔记: 验证码突破:回传的时候泄露了发送的验证码导致不需要知道目标的验证码是多少直接使用数据包里面的队列 规律爆破:就是常见的数字四位或者六位 10000 种可能在规定时间内爆破或者多次验证后网站不会出现新的验证码没有次数限制可以尝试爆破或者是汉字进行 重定向用户:通过…

Docker 安装 Redis 容器

1、下载Redis镜像下载指定版本的Redis镜像 (xxx指具体版本号) docker pull redis:xxx docker pull redis 下载最新版Redis镜像 (其实此命令就等同于 : docker pull redis:latest ),我用5.0.5版本。docker pull redis:5.0.52、 检查当前所有Docker下载的镜像docker images

Script-Server:用Web UI轻松管理你的脚本执行

# 监控 # 运维人员 在现代软件开发和运维中,脚本的使用频繁而广泛。然而,如何让非技术人员轻松、安全地运行这些脚本成为了一个挑战。 幸运的是,Script-Server应运而生,它是一个为脚本提供的Web用户界面,可以让用户通过一个直观的界面执行各种脚本,而无需编写代码。本文…

nuxtjs + scss + unocss + pinia 新建项目

1、通过命令行报错的,直接下载压缩包 pnpm dlx nuxi init <project-name>压缩包地址:https://codeload.github.com/nuxt/starter/tar.gz/refs/heads/v3 2、安装插件 1、安装unocss pnpm install --save-dev @unocss/nuxt unocss# nuxt.config.jsmodules: [@unocss/nuxt…