java项目应用MQTT传输数据

一、概述

近期做的一个项目需要传输数据给第三方。根据协定,采用MQTT进行数据的发送和订阅。一般来说,不通系统进行数据对接,一般采用RESTFul接口,走http。mqtt的话,顾名思义,就是一个消息队列。相比RESTFul接口,MQTT方式也许有个好处就是,数据传输给对方后,对方可以收到一个提醒。这个提醒来自于消息队列,不用自己搞。利用这个提醒,也许可以做点啥。除此之外,我不知道还有什么更多的好处。

MQTT的要素:
1)broker,经纪人,即代理地址,如:tcp://10.0.2.18:1883
2)clientID,客户端ID,如"Client001"; 客户端标识,可以自定义,但不能跟receiver同名
3)topic,// 要发布的主题,接收端据此接收,如”monkey/huaguo-moutain“。主题一经定义,可以多次使用。
4)qos,质量服务等级 0,1,2。2最高。

依赖:

<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version>
</dependency>

二、发送

如果单纯发送,客户端无须安装mqtt。Java中发送消息代码如下:

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class MqttPublisher {String broker       = "tcp://127.0.0.1:1883"; // 替换为你的 MQTT 服务器地址String clientId     = "Client001"; // 客户端标识String topic        = "mqttdemo/mytopic001"; // 要发布的主题int qos             = 2; // 质量服务等级MqttClient client = null;public MqttPublisher(String broker, String clientId, String topic, int qos) throws MqttException {this.broker = broker;this.clientId = clientId;this.topic = topic;this.qos = qos;this.client = new MqttClient(broker, clientId, new MemoryPersistence());MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setCleanSession(true);System.out.println("发布者正在连接到broker代理 : " + broker);this.client.connect(connOpts);System.out.println("发布者连接成功!");}public void SendMessage(String content){try {//System.out.println("发布者发送的消息: " + content);MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);this.client.publish(topic, message);System.out.println("发布者已经发送消息!");} catch (Exception e) {e.printStackTrace();}}public void DisconnectMqtt() throws MqttException {this.client.disconnect();System.out.println("Disconnected");}
}

三、订阅

如果想接收mqtt消息,本机则要安装mqtt服务。windows可安装一个名为mosquitto的软件。但是,它天然好像不对外,如果想被外部访问,比如局域网的其他机器访问,要做一些设置。具体如何设置,我还不清楚。

java中接收消息代码如下:

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class MqttReceiver {String broker       = "tcp://127.0.0.1:1883"; // 替换为你的 MQTT 服务器地址String clientId     = "Client002"; // 客户端标识String topic        = "mqttdemo/mytopic001"; // 订阅的主题int qos             = 2; // 质量服务等级MqttReceiver(String broker, String clientId, String topic, int qos){this.broker = broker;this.clientId = clientId;this.topic = topic;this.qos = qos;}public void StartMqttReceiver(){try {MqttClient sampleClient = new MqttClient(this.broker, this.clientId, new MemoryPersistence());MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setCleanSession(true);System.out.println("正在连接到 broker 代理: " + this.broker);sampleClient.connect(connOpts);System.out.println("接受者连接成功!");// 设置回调sampleClient.setCallback(new MqttCallback() {public void connectionLost(Throwable cause) {System.out.println("链接丢失!");}public void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println("接收者接收到了消息: " + topic + " : " + new String(message.getPayload()));}public void deliveryComplete(IMqttDeliveryToken token) {// Called when a message has been delivered}});// 订阅sampleClient.subscribe(this.topic, this.qos);System.out.println("订阅的topic是: " + this.topic);} catch (Exception e) {e.printStackTrace();}}
}

四、测试

订阅和发送,可以是同一个IP,也就是自己发给自己。但注意订阅和发送的clientID不能相同。比如以下代码,自己发给自己,特别利于测试:

import org.eclipse.paho.client.mqttv3.MqttException;public class MainClass {public static void main(String[] args) throws MqttException, InterruptedException {String recever_broker       = "tcp://127.0.0.1:1883"; // 替换为你的 MQTT 服务器地址String recever_clientId     = "ClientReceiver001"; // 客户端标识String recever_topic        = "ocean/south/message/status"; // 订阅的主题int recever_qos             = 2; // 质量服务等级MqttReceiver mqttReceiver = new MqttReceiver(recever_broker, recever_clientId, recever_topic, recever_qos);mqttReceiver.StartMqttReceiver();String publisher_broker       = "tcp://127.0.0.1:1883"; // 替换为你的 MQTT 服务器地址String publisher_clientId     = "ClientPublisher002"; // 客户端标识String publisher_topic        = "ocean/south/message/status"; // 发布的主题int publisher_qos             = 2; // 质量服务等级MqttPublisher mqttPublisher = new MqttPublisher(publisher_broker, publisher_clientId, publisher_topic,publisher_qos);int cnt = 0;while(true){cnt ++;mqttPublisher.SendMessage("{\"msg\":\"send some message to you!\",\"data\":"+Integer.toString(cnt)+"}");Thread.sleep(1000);}}
}

在这里插入图片描述

五、重连

以上例子还比较简单,需要考虑连接失败或突然断掉的时候重连。

1、发送端重连

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import java.nio.charset.StandardCharsets;public class MqttPublisher extends Thread {String broker; //"tcp://10.0.2.18:1883";String clientId; //"Client001"; 客户端标识,可以自定义,但不能跟receiver同名String topic; // 要发布的主题,接收端据此接收int qos; // 质量服务等级 0,1,2。2最高。MqttClient client = null;public MqttPublisher(String broker, String clientId, String topic, int qos) throws MqttException {this.broker = broker;this.clientId = clientId;this.topic = topic;this.qos = qos;this.client = new MqttClient(broker, clientId, new MemoryPersistence());_connect();}public boolean sendMessage(String content) {boolean ok = false;MqttMessage message = new MqttMessage(content.getBytes(StandardCharsets.UTF_8));message.setQos(qos);int count = 2;while (!ok && count > 0) {count--;try {this.client.publish(topic, message);ok = true;} catch (MqttException e) {System.err.println(e.getMessage());reconnect(1);} catch (Exception e) {System.err.println(e.getMessage());}}return ok;}public void disconnectMqtt() throws MqttException {this.client.disconnect();System.out.println("Disconnected");}public boolean reconnect(long retryTimes) {boolean ok = false;long count = retryTimes;while (!ok && count > 0) {count--;ok = _reconnect();}return ok;}private boolean _reconnect() {boolean ok = false;try {// 关闭现有连接if (this.client != null && client.isConnected()) {this.client.disconnect();}// 重新连接ok = _connect();} catch (MqttException e) {// 处理重新连接失败的情况System.err.println(e.getCause());}return ok;}private boolean _connect() {boolean ok = false;MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setCleanSession(true);System.out.print(String.format("正在连接到远程mqtt服务器: %s ......", broker));try {this.client.connect(connOpts);ok = true;System.out.println("连接成功!");} catch (MqttException e) {System.out.println("连接失败!");}return ok;}}

2、订阅端重连

订阅端重连当时是采用这样的思路:连接失败时延迟10秒后执行第一次重连尝试,之后每隔30秒执行一次,直到成功。

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class MqttReceiver {String broker; //"tcp://10.0.2.18:1883";String clientId; //"Client001"; 客户端标识,可以自定义,但不能跟publish同名String topic; // 要发布的主题,接收端据此接收int qos; // 质量服务等级 0,1,2。2最高。private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);public MqttReceiver(String broker, String clientId, String topic, int qos) {this.broker = broker;this.clientId = clientId;this.topic = topic;this.qos = qos;}public void StartMqttReceiver() {MqttClient sampleClient = null;try {sampleClient = new MqttClient(this.broker, this.clientId, new MemoryPersistence());MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setCleanSession(true);// 设置回调MqttClient finalSampleClient = sampleClient;sampleClient.setCallback(new MqttCallback() {public void connectionLost(Throwable cause) {System.out.println("监听mqtt服务器连接丢失!");scheduleReconnect(finalSampleClient);}public void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println(String.format("接收到消息(%s): %s", topic, new String(message.getPayload())));}public void deliveryComplete(IMqttDeliveryToken token) {//消息传递完成//System.out.println("a message has been delivered");}});// 订阅sampleClient.subscribe(this.topic, this.qos);sampleClient.connect(connOpts);System.out.println(String.format("正在连接到监听mqtt服务器: %s 成功", this.broker));} catch(MqttException e){System.out.println(String.format("正在连接到监听mqtt服务器: %s 失败:%s", this.broker, e.getMessage()));scheduleReconnect(sampleClient);} catch (Exception e) {System.out.println(String.format("正在连接到监听mqtt服务器: %s 失败:%s", this.broker, e.getMessage()));}}private void scheduleReconnect(MqttClient sampleClient) {if(sampleClient == null) return;final Runnable reconnectTask = () -> {try {if (!sampleClient.isConnected()) {// 尝试重新连接sampleClient.connect();// 重新订阅sampleClient.subscribe(topic, qos);System.out.println("重新连接监听mqtt服务器成功!");// 取消定时任务,因为连接成功了//scheduler.shutdownNow();}} catch (MqttException e) {System.out.println("重新连接监听mqtt服务器出现异常: " + e.getMessage());}};// 延迟10秒后执行第一次重连尝试,之后每隔30秒执行一次scheduler.scheduleWithFixedDelay(() -> {reconnectTask.run();}, 10, 30, TimeUnit.SECONDS);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/305282.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL:多表操作

介绍 实际开发中&#xff0c;一个项目通常需要很多张表才能完成。例如&#xff1a;一个商城项目就需要分类表(category)、商品表(products)、订单表(orders)等多张表&#xff0c;且这些表的数据之间存在一定的关系。 多表关系 MySQL多表之间的关系可以概括为&#xff1a;一对一…

【我与Java的成长记】之this引用和构造方法的使用详解

系列文章目录 能看懂文字就能明白系列 C语言笔记传送门 &#x1f31f; 个人主页&#xff1a;古德猫宁- &#x1f308; 信念如阳光&#xff0c;照亮前行的每一步 文章目录 系列文章目录&#x1f308; *信念如阳光&#xff0c;照亮前行的每一步* 前言一、this的使用this引用的特…

【C语言】初识C语言

本章节主要目的是基本了解C语言的基础知识&#xff0c;对C语言有一个大概的认识。 什么是C语言 在日常生活中&#xff0c;语言就是一种人与人之间沟通的工具&#xff0c;像汉语&#xff0c;英语&#xff0c;法语……等。而人与计算机之间交流沟通的工具则被称为计算机语言&am…

机器学习——损失函数

【说明】文章内容来自《机器学习——基于sklearn》&#xff0c;用于学习记录。若有争议联系删除。 1、简介 损失函数(loss function)又称为误差函数(error function)&#xff0c;是衡量模型好坏的标准&#xff0c;用于估量模型的预测值与真实值的不一致程度&#xff0c;是一个…

Translation翻译插件

Translation插件是为IntelliJ IDEA开发的&#xff0c;因此只能在IntelliJ IDEA中使用。但是&#xff0c;如果你需要在其他软件中进行翻译&#xff0c;可以考虑使用其他的翻译工具或服务。例如&#xff0c;一些在线翻译网站&#xff08;如Google翻译、百度翻译等&#xff09;提供…

Android Security PIN 相关代码

开发项目遇到一个问题&#xff0c;具体描述及复制步骤如下&#xff1a; 就是开启"Enhanced PIN privacy"(增强的PIN隐私)的时候输入秘密的时候还是会显示数字 如下图&#xff0c;应该是直接是“.” 不应该出现PIN 密码 想要的效果如下图&#xff1a; 设置的步骤如下图…

编写第一个APP自动化脚本 appium_helloworld ,将脚本跑起来

一、前置说明 我们把学习 Appium 的第一个脚本称为 appium_helloworld&#xff0c;它用于展示 Appium 的基本用法&#xff0c;验证配置和环境是否正确。 Appium 自动化操作 APP 的基本流程&#xff08;Android平台&#xff09;&#xff1a; 启动 Appium Serveradb 连接设备&…

【JavaScript】垃圾回收与内存泄漏

✨ 专栏介绍 在现代Web开发中&#xff0c;JavaScript已经成为了不可或缺的一部分。它不仅可以为网页增加交互性和动态性&#xff0c;还可以在后端开发中使用Node.js构建高效的服务器端应用程序。作为一种灵活且易学的脚本语言&#xff0c;JavaScript具有广泛的应用场景&#x…

Python 进阶(十八):配置文件(configparser 模块)

大家好&#xff0c;我是水滴~~ configparser模块是Python标准库中的一个模块&#xff0c;用于解析配置文件。它提供了一种简单而灵活的方式来读取、修改和写入INI格式的配置文件。本文将介绍该模块是如何操作配置文件的。 文章中包含大量的示例代码&#xff0c;希望能够帮助新…

Docker七 | 搭建Swarm集群

目录 创建Swarm集群 创建管理节点 增加工作节点 查看集群 部署服务 新建服务 查看服务 服务伸缩 增加服务 减少服务 删除服务 创建Swarm集群 创建管理节点 在192.168.117.131下执行docker swarm init命令的节点自动成为管理节点 [rootlocalhost ~]# docker swar…

同化的题解

时间限制: 1000ms 空间限制: 524288kB 题目描述 古人云&#xff1a;“近朱者赤近墨者黑”。这句话是很有道理的。这不鱼大大和一群苦命打工仔被安排进厂拧螺丝了。 进厂第一天&#xff0c;每个人拧螺丝的动力k都是不同且十分高涨的。但是当大家坐在一起后会聊天偷懒&#xf…

axios配置请求头content-type 和 get/post请求方式

axios配置请求头content-type https://blog.csdn.net/wojiushiwo945you/article/details/107653962 axios 是Ajax的一个插件&#xff0c;axios虽然是一个插件&#xff0c;但是我们不需要通过Vue.use(axios)来使用&#xff0c;下载完成后&#xff0c;只需在项目中引入即可。(一…