- 1 MQTT
- 1.1 MQTT介绍
- 1.1.1 简介
- 1.1.2 特点和应用
- 1.1.3 为什么要用 MQTT协议
- 1.2 MQTT控制报文的结构
- 1.2.1 固定报文头(Fixed Header)
- 1.2.2 可变报文头(Variable Header)
- 1.2.3 有效负荷和消息类型
- 1.2.4 消息质量(QoS)
- 1.4 搭建MQTT服务
- 1.5 SpringBoot搭建提供端
- 1.5.1 pom.xml
- 1.5.2 修改配置文件
- 1.5.3 消息发布者客户端配置
- 1.5.4 消息发布客户端回调
- 1.5.5 创建控制器测试发布消息
- 1.6 SpringBoot搭建消费端
- 1.6.1 pom.xml
- 1.6.2 配置文件
- 1.6.3 消费者客户端配置
- 1.6.4 消息消费者客户端回调
- 1.6.5 控制器提供手动建立连接和断开连接方法
- 1.7 测试提供端和消费端
- 1.1 MQTT介绍
1 MQTT
1.1 MQTT介绍
1.1.1 简介
MQTT
全称(Message Queue Telemetry Transport
):一种基于发布/订阅(publish/subscribe
)模式的轻量级通讯协议,通过订阅相应的主题来获取消息,是物联网(Internet of Thing
)中的一个标准传输协议。
MQTT
是一种基于发布/订阅
模式的轻量级通讯协议,该协议构建在TCP/IP
协议上。 MQTT
最大的有点在于可以以极少的代码和有限的带宽,为远程设备提供实时可靠的消息服务。做为一种低开销、低带宽
占用的即时通讯协议,MQTT
在物联网、小型设备、移动应用等方面有广泛应用。
MQTT
协议将消息的发布者(publisher
)与订阅者(subscriber
)进行分离,因此可以在不可靠的网络环境中,为远程连接的设备提供可靠的消息服务,使用方式与传统的MQ
有点类似。
TCP
协议位于传输层,MQTT
协议位于应用层,MQTT
协议构建于TCP/IP
协议上,也就是说只要支持TCP/IP
协议栈的地方,都可以使用MQTT
协议。
1.1.2 特点和应用
特点:
- 开放消息协议,简单易实现
- 发布订阅模式,一对多消息发布
- 基于
TCP/IP
网络连接,提供有序,无损,双向连接
- 2字节固定报头,2字节心跳报文,最小化传输开销和协议交换,有效减少网络流量
- 消息
QoS
支持,可靠传输保证
应用:
- 物联网M2M通信,物联网大数据采集
- Android消息推送,WEB消息推送
- 智能硬件、智能家具、智能电器
- 车联网通信,电动车站桩采集
- 智慧城市、远程医疗、远程教育
- 电力、石油与能源等行业市场
1.1.3 为什么要用 MQTT协议
MQTT
协议为什么在物联网(IOT
)中如此受偏爱?而不是其它协议,比如我们更为熟悉的 HTTP
协议呢?
首先HTTP
协议它是一种同步协议
,客户端请求后需要等待服务器的响应。而在物联网(IOT
)环境中,设备会很受制于环境的影响,比如带宽低、网络延迟高、网络通信
不稳定等,显然异步消息协议更为适合IOT
应用程序。
HTTP
是单向的,如果要获取消息客户端必须发起连接,而在物联网(IOT
)应用程序中,设备或传感器往往都是客户端,这意味着它们无法被动地接收来自网络的命令。
通常需要将一条命令或者消息,发送到网络上的所有设备上。HTTP
要实现这样的功能不但很困难,而且成本极高。
1.2 MQTT控制报文的结构
MQTT
通过交换一些预定义的MQTT
控制报文来工作,每条MQTT
命令消息的消息头都包含一个固定的报头
,有些消息会携带一个可变报文头
和一个负荷
。消息格式如下:
固定包头,存在于所有MQTT控制包
可变包头,存在于某些MQTT控制包
载荷,存在于某些MQTT控制包
1.2.1 固定报文头(Fixed Header)
MQTT
固定报文头最少有两个字节
,第一个字节包含消息类型(Message Type)
和QoS级别
等标志位。第二个字节开始是剩余长度
字段,该长度是后面的可变报文头加消息负载的总长度,该字段最多允许四个字节。
剩余长度
使用了一种可变长度的结构来编码,这种结构使用单一字节
表示0-127
的值。大于127的值如下处理。每个字节的低7位用来编码数据,最高位用来表示是否还有后续字节。因此每个字节可以编码128个值,再加上一个标识位。剩余长度最多可以用四个字节来表示。
例如十进制的数字64可以被编码成一个单独的字节,十进制为64,八进制为0x40。十进制数字321(=65+2×128)被编码为两个字节,低位在前。第一个字节是65+128 = 193。注意最高位的128表示后面至少还有一个字节。第二个字节是2,表示2*127。(注:321 = 11000001 00000010,第一个字节是“标识符后面还有一个字节”+65,第二个字节是“标识符后面没有字节了”+256)。
1.2.2 可变报文头(Variable Header)
可变报文头主要包含协议名
、协议版本
、连接标志(Connect Flags)
、心跳间隔时间(Keep Alive timer)
、连接返回码(Connect Return Code)
、主题名(Topic Name)
等
1.2.3 有效负荷和消息类型
有效负荷(Payload),可以理解为消息主题(body)
当 MQTT
发送的消息类型是 CONNECT
(连接)、PUBLISH
(发布)、SUBSCRIBE
(订阅)、SUBACK
(订阅确认)、则会带有负荷。
各种类型消息的控制报文参考:https://mcxiaoke.gitbooks.io/mqtt-cn/content/mqtt/03-ControlPackets.html
MQTT
的消息类型(Message Type)(控制报文类型)
名字 | 值 | 报文流动方向 | 描述 |
---|---|---|---|
Reserved | 0 | 禁止 | 保留 |
CONNECT | 1 | 客户端到服务端 | 客户端请求连接服务端 |
CONNACK | 2 | 服务端到客户端 | 连接报文确认 |
PUBLISH | 3 | 两个方向都允许 | 发布消息 |
PUBACK | 4 | 两个方向都允许 | QoS 1消息发布收到确认 |
PUBREC | 5 | 两个方向都允许 | 发布收到(保证交付第一步) |
PUBREL | 6 | 两个方向都允许 | 发布释放(保证交付第二步) |
PUBCOMP | 7 | 两个方向都允许 | QoS 2消息发布完成(保证交互第三步) |
SUBSCRIBE | 8 | 客户端到服务端 | 客户端订阅请求 |
SUBACK | 9 | 服务端到客户端 | 订阅请求报文确认 |
UNSUBSCRIBE | 10 | 客户端到服务端 | 客户端取消订阅请求 |
UNSUBACK | 11 | 服务端到客户端 | 取消订阅报文确认 |
PINGREQ | 12 | 客户端到服务端 | 心跳请求 |
PINGRESP | 13 | 服务端到客户端 | 心跳响应 |
DISCONNECT | 14 | 客户端到服务端 | 客户端断开连接 |
Reserved | 15 | 禁止 | 保留 |
1.2.4 消息质量(QoS)
消息质量(QoS
):
QoS 0
:最多分发一次。消息的传递完全依赖于底层的TCP/IP
协议,协议里没有定义应答和重试,消息要么只会到达服务端一次,要么根本没有到达。
只会发送一次,不管成不成功QoS 1
:至少分发一次。服务器的消息接收由PUBACK
消息进行确认,如果通信链路或发送设备异常,或者指定时间内没有收到确认消息,发送端会重发这条在消息头中设置了DUP位的消息。
未成功会继续发送,直到成功,可能会收到多次QoS 2
:只分发一次。这是最高级别的消息传递,消息丢失和重复都是不可接受的,使用这个服务质量等级会有额外的开销。
未成功会继续发送,但会保证只收到一次
通过下面的例子可以更深刻的理解上面三个传输质量等级。
比如目前流行的共享单车智能锁,智能锁可以定时使用QoS level 0
质量消息请求服务器,发送单车的当前位置,如果服务器没收到也没关系,反正过一段时间又会再发送一次。
之后用户可以通过App查询周围单车位置,找到单车后需要进行解锁,这时候可以使用QoS level 1
质量消息,手机App不断的发送解锁消息给单车锁,确保有一次消息能达到以解锁单车。
最后用户用完单车后,需要提交付款表单,可以使用QoS level 2
质量消息,这样确保只传递一次数据,否则用户就会多付钱了。
1.4 搭建MQTT服务
在Linux上搭建MQTT服务
打开EMQ官网:https://www.emqx.io/cn/products/broker
点击开始试用
选择服务器对应版本
复制下载命令到ssh工具中执行
下载完成
下载完成后执行安装命令
安装成功后执行命令:
sudo emqx start
出现以下信息表示启动成功
浏览器访问ip:18083
进入管理界面,默认账号为admin,密码为public
1.5 SpringBoot搭建提供端
1.5.1 pom.xml
<!--mqtt相关依赖-->
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId>
</dependency>
1.5.2 修改配置文件
spring:application:name: provider#MQTT配置信息mqtt:#MQTT服务端地址,端口默认为1883,如果有多个,用逗号隔开,如tcp://127.0.0.1:1883,tcp://192.168.60.133:1883url: tcp://ip:1883#用户名username: admin#密码password: public#客户端id(不能重复)client:id: provider-id#MQTT默认的消息推送主题,实际可在调用接口时指定default:topic: topic
server:port: 8081
1.5.3 消息发布者客户端配置
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;@Configuration
@Slf4j
public class MqttProviderConfig {@Value("${spring.mqtt.username}")private String username;@Value("${spring.mqtt.password}")private String password;@Value("${spring.mqtt.url}")private String hostUrl;@Value("${spring.mqtt.client.id}")private String clientId;@Value("${spring.mqtt.default.topic}")private String defaultTopic;/*** 客户端对象*/private MqttClient client;/*** 客户端连接服务端*/public void connect(){try {//创建MQTT客户端对象client = new MqttClient(hostUrl,clientId,new MemoryPersistence());//连接设置MqttConnectOptions options = new MqttConnectOptions();//是否清空session,设置为false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息//设置为true表示每次连接到服务端都是以新的身份options.setCleanSession(true);//设置连接用户名options.setUserName(username);//设置连接密码options.setPassword(password.toCharArray());//设置超时时间,单位为秒options.setConnectionTimeout(100);//设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线options.setKeepAliveInterval(20);//设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false);//设置回调client.setCallback(new MqttProviderCallBack());client.connect(options);} catch (MqttException e) {e.printStackTrace();}}/*** 发布消息* @param qos 服务质量等级* 0 只会发送一次,不管成不成功* 1 未成功会继续发送,直到成功,可能会收到多次* 2 未成功会继续发送,但会保证只收到一次* @param retained 保留标志* 如果设置为true,服务端必须存储这个应用消息和它的服务质量等级,当有订阅者订阅这个主题时,会把消息推送给这个订阅者* 但服务端对同一个主题只会保留一条retained消息(最后收到的那条)*/public void publish(int qos,boolean retained,String topic,String message){MqttMessage mqttMessage = new MqttMessage();mqttMessage.setQos(qos);mqttMessage.setRetained(retained);mqttMessage.setPayload(message.getBytes());//主题目的地,用于发布/订阅消息MqttTopic mqttTopic = client.getTopic(topic);//提供一种机制来跟踪消息的传递进度。//用于在以非阻塞方式(在后台运行)执行发布时跟踪消息的传递进度MqttDeliveryToken token;try {//将指定消息发布到主题,但不等待消息传递完成。返回的token可用于跟踪消息的传递状态。//一旦此方法干净地返回,消息就已被客户端接受发布。当连接可用时,将在后台完成消息传递。token = mqttTopic.publish(mqttMessage);token.waitForCompletion();} catch (MqttException e) {e.printStackTrace();}}
}
1.5.4 消息发布客户端回调
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;@Configuration
public class MqttProviderCallBack implements MqttCallback {@Value("${spring.mqtt.client.id}")private String clientId;/*** 与服务器断开连接的回调*/@Overridepublic void connectionLost(Throwable throwable) {System.out.println(clientId + "与服务器断开连接");}/*** 消息到达的回调*/@Overridepublic void messageArrived(String s, MqttMessage mqttMessage) throws Exception {}/*** 消息发布成功的回调*/@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {IMqttAsyncClient client = iMqttDeliveryToken.getClient();System.out.println(client.getClientId() + "发布消息成功!");}
}
1.5.5 创建控制器测试发布消息
import com.xdemo.mqttprovider.mqtt.MqttProviderConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;@Controller
public class SendController {@Autowiredprivate MqttProviderConfig providerClient;@RequestMapping("/sendMessage")@ResponseBodypublic String sendMessage(int qos,boolean retained,String topic,String message){try {providerClient.publish(qos,retained,topic,message);return "发送成功";}catch (Exception e){e.printStackTrace();return "发送失败";}}
}
1.6 SpringBoot搭建消费端
在父工程下创建一个Springboot项目作为消息消费者,导入以下依赖
1.6.1 pom.xml
<!--mqtt相关依赖-->
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId>
</dependency>
1.6.2 配置文件
spring:application:name: consumer#MQTT配置信息mqtt:#MQTT服务端地址,端口默认为1883,如果有多个,用逗号隔开,如tcp://127.0.0.1:1883,tcp://192.168.60.133:1883url: tcp://ip:1883#用户名username: admin#密码password: public#客户端id(不能重复)client:id: consumer-id#MQTT默认的消息推送主题,实际可在调用接口时指定default:topic: topic
server:port: 8082
1.6.3 消费者客户端配置
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;@Configuration
public class MqttConsumerConfig {@Value("${spring.mqtt.username}")private String username;@Value("${spring.mqtt.password}")private String password;@Value("${spring.mqtt.url}")private String hostUrl;@Value("${spring.mqtt.client.id}")private String clientId;@Value("${spring.mqtt.default.topic}")private String defaultTopic;/*** 客户端对象*/private MqttClient client;/*** 在bean初始化后连接到服务器*/@PostConstructpublic void init(){connect();}/*** 客户端连接服务端*/public void connect(){try {//创建MQTT客户端对象client = new MqttClient(hostUrl,clientId,new MemoryPersistence());//连接设置MqttConnectOptions options = new MqttConnectOptions();//是否清空session,设置为false表示服务器会保留客户端的连接记录,客户端重连之后能获取到服务器在客户端断开连接期间推送的消息//设置为true表示每次连接到服务端都是以新的身份options.setCleanSession(true);//设置连接用户名options.setUserName(username);//设置连接密码options.setPassword(password.toCharArray());//设置超时时间,单位为秒options.setConnectionTimeout(100);//设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线options.setKeepAliveInterval(20);//设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false);//设置回调client.setCallback(new MqttConsumerCallBack());client.connect(options);//订阅主题//消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息int[] qos = {1,1};//主题String[] topics = {"topic1","topic2"};//订阅主题client.subscribe(topics,qos);} catch (MqttException e) {e.printStackTrace();}}/*** 断开连接*/public void disConnect(){try {client.disconnect();} catch (MqttException e) {e.printStackTrace();}}/*** 订阅主题*/public void subscribe(String topic,int qos){try {client.subscribe(topic,qos);} catch (MqttException e) {e.printStackTrace();}}
}
1.6.4 消息消费者客户端回调
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;public class MqttConsumerCallBack implements MqttCallback {/*** 客户端断开连接的回调*/@Overridepublic void connectionLost(Throwable throwable) {System.out.println("与服务器断开连接,可重连");}/*** 消息到达的回调*/@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println(String.format("接收消息主题 : %s",topic));System.out.println(String.format("接收消息Qos : %d",message.getQos()));System.out.println(String.format("接收消息内容 : %s",new String(message.getPayload())));System.out.println(String.format("接收消息retained : %b",message.isRetained()));}/*** 消息发布成功的回调*/@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {}
}
1.6.5 控制器提供手动建立连接和断开连接方法
import com.xdemo.mqttconsumer.mqtt.MqttConsumerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;@Controller
public class TestController {@Autowiredprivate MqttConsumerConfig client;@Value("${spring.mqtt.client.id}")private String clientId;@RequestMapping("connect")@ResponseBodypublic String connect(){client.connect();return clientId + "连接到服务器";}@RequestMapping("disConnect")@ResponseBodypublic String disConnect(){client.disConnect();return clientId + "与服务器断开连接";}
}
1.7 测试提供端和消费端
分别启动两个项目,可以在管理界面看到创建的两个客户端
调用发布消息接口发布消息
消费者控制台打印
客户端断线消息恢复
把消费者与服务端断开连接
再调用发布消息接口发送两条消息到topic1,然后再把消费者连接到服务端
控制台没有东西打印
修改消费者客户端配置,把setCleanSession改为false
重启项目,把消费者客户端断开连接,调用发布消息接口发布两条消息,再把消费者和服务端连接上