先看效果
一、准备工作
1.官网下载emqx压缩包放到自己的盘符下,不要带中文路径
下载 EMQX
2.在路径的bin中,cmd,启动emqx服务
emqx start
3.访问服务,能打开就证明启动成功,登录的话官网默认的密码账号(后续用不到登录)
http://localhost:18083
二、pom依赖
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>
三、工具类复制就可以使用
mqtt信息
import com.mybatisflex.annotation.Id;
import com.mybatisflex.annotation.KeyType;
import com.mybatisflex.annotation.Table;
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;/*** mqtt信息 实体类** @author Administrator* @since 2024-04-23*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Mqtt {/*** mqttid*/@Id(keyType = KeyType.Auto)private Integer id;/*** 网址*/private String broker;/*** 昵称*/private String username;/*** 密码*/private String password;/*** 发布主题id*/private String clientid1;/*** 订阅主题id*/private String clientid2;/*** 主题*/private String topic;/*** 回复主题*/private String replyTopic;/*** 服务质量,0:消息最多传送一次如果当前客户端不可用,它将丢失这条消息1:消息至少传送一次 2:消息只传送一次*/private Integer qos;}
Publisher(发布主题)
import com.alibaba.fastjson.JSONObject;
import com.test.entity.Mqtt;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import java.util.HashMap;public class Publisher {public static void main(String[] args) throws MqttException {Mqtt mqtt = new Mqtt();mqtt.setBroker("tcp://127.0.0.1:1883");mqtt.setUsername("emqx");mqtt.setPassword("public");mqtt.setClientid1("publish_client");mqtt.setClientid2("subscribe_client");mqtt.setTopic("mqtt/test");mqtt.setReplyTopic("mqtt/reply");mqtt.setQos(1);publisher(mqtt,"你好订阅主题");}/*** 发布主题** @param mqtt* @param content*/public static void publisher(Mqtt mqtt, String content) {MemoryPersistence persistence = new MemoryPersistence();try {MqttClient client = new MqttClient(mqtt.getBroker(), mqtt.getClientid1(), persistence);MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);client.connect(options);// 订阅回复消息的主题client.subscribe(mqtt.getReplyTopic(), 2);MqttMessage message = new MqttMessage(content.getBytes());client.publish(mqtt.getTopic(), message);client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable cause) {}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println("接收到了订阅主题回复: " + new String(message.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {}});// client.disconnect();} catch (Exception e) {e.printStackTrace();}}}
Subscriber(订阅主题)
import com.alibaba.fastjson.JSONObject;
import com.xiaoqiu.gate2.entity.Mqtt;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import java.util.HashMap;public class Subscriber {public static void main(String[] args) throws MqttException {Mqtt mqtt = new Mqtt();mqtt.setBroker("tcp://127.0.0.1:1883");mqtt.setUsername("emqx");mqtt.setPassword("public");mqtt.setClientid1("publish_client");mqtt.setClientid2("subscribe_client");mqtt.setTopic("mqtt/test");mqtt.setReplyTopic("mqtt/reply");mqtt.setQos(1);subscriber(mqtt);}/*** 订阅主题** @param mqtt*/public static void subscriber(Mqtt mqtt) {MemoryPersistence persistence = new MemoryPersistence();try {MqttClient client = new MqttClient(mqtt.getBroker(), mqtt.getClientid2(), persistence);MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);client.connect(options);client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable cause) {}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println("接收到了发布主题请求: " + new String(message.getPayload()));// 回复消息MqttMessage replyMessage = new MqttMessage("你好,发布主题".getBytes());client.publish(mqtt.getReplyTopic(), replyMessage);}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {}});client.subscribe(mqtt.getTopic(), 2);} catch (Exception e) {e.printStackTrace();}}}
五、分别启动订阅题类和发布主题类(最后),运行效果
这个订阅工具类的回复写错了,正确的是:你好发布主题
更正后