MQTT服务器源码解析

目录

1、关于header问题 

2、MQTT 连接参数的使用

2.1连接地址

2.2 基于 TCP 的 MQTT 连接

2.3 基于 WebSocket 的连接

3、订阅topic

 4、推送消息给订阅者

5、QOS 机制

5.1 QOS是什么

5.2 QOS的实现原理

5.3 发送流程

6、reatain机制

总结:给还没上线的人留言

7、遗嘱消息

总结:给还在等消息的人留言

8、Clean Session

9、cleint id

如果客户端使用一个重复的 Client ID 连接至服务器,将会把已使用该 Client ID 连接成功的客户端踢下线。

10、连接超时(Connect Timeout)

11、总结


 

1、关于header问题 

mqtt的一直说header比较节省流量,这是为什么呐、?看下结构图

可以看到有不同的header结构,字段也很少,确实节省流量

2、MQTT 连接参数的使用

2.1连接地址

MQTT 的连接地址通常包含 :服务器 IP 或者域名、服务器端口、连接协议。

2.2 基于 TCP 的 MQTT 连接

mqtt 是普通的 TCP 连接,端口一般为 1883。

mqtts 是基于 TLS/SSL 的安全连接,端口一般为 8883。

比如 mqtt://broker.emqx.io:1883 是一个基于普通 TCP 的 MQTT 连接地址。

2.3 基于 WebSocket 的连接

ws 是普通的 WebSocket 连接,端口一般为 8083。

wss 是基于 WebSocket 的安全连接,端口一般为 8084。

当使用 WebSocket 连接时,连接地址还需要包含 Path,EMQX 默认配置的 Path 是 /mqtt。比如 ws://broker.emqx.io:8083/mqtt 是一个基于 WebSocket 的 MQTT 连接地址。

3、订阅topic

客户端订阅topic之后,服务器是如何保存,并且如何转发的。

可以看到服务端订阅之后会放入一个set,在做转发的时候动态匹配,匹配成功之后才会进行转发。

这里也是用了线程池

com.lxr.iot.bootstrap.channel.MqttHandlerService#subscribe

 /*** 订阅*/@Overridepublic void subscribe(Channel channel, MqttSubscribeMessage mqttSubscribeMessage) {Set<String> topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription ->mqttTopicSubscription.topicName()).collect(Collectors.toSet());mqttChannelService.suscribeSuccess(mqttChannelService.getDeviceId(channel), topics);subBack(channel, mqttSubscribeMessage, topics.size());}/*** 订阅成功后 (发送保留消息)*/public void suscribeSuccess(String deviceId, Set<String> topics){doIfElse(topics,topics1->!CollectionUtils.isEmpty(topics1),strings -> {MqttChannel mqttChannel = mqttChannels.get(deviceId);mqttChannel.setSubStatus(SubStatus.YES); // 设置订阅主题标识mqttChannel.addTopic(strings);executorService.execute(() -> {Optional.ofNullable(mqttChannel).ifPresent(mqttChannel1 -> {if(mqttChannel1.isLogin()){strings.parallelStream().forEach(topic -> {addChannel(topic,mqttChannel);sendRetain(topic,mqttChannel); // 发送保留消息});}});});});}

 4、推送消息给订阅者

遍历所有的channel,根据不同的QOS进行转发。

这里的channel也做了使用一个map进行保存

protected  static  Cache<String, Collection<MqttChannel>> mqttChannelCache = CacheBuilder.newBuilder().maximumSize(100).build();com.lxr.iot.bootstrap.channel.MqttChannelService#push/*** 推送消息给订阅者*/private  void push(String topic, MqttQoS qos, byte[] bytes, boolean isRetain){Collection<MqttChannel> subChannels = getChannels(topic, topic1 -> cacheMap.getData(getTopic(topic1)));if(!CollectionUtils.isEmpty(subChannels)){subChannels.parallelStream().forEach(subChannel -> {switch (subChannel.getSessionStatus()){case OPEN: // 在线if(subChannel.isActive()){ // 防止channel失效  但是离线状态没更改switch (qos){case AT_LEAST_ONCE:sendQosConfirmMsg(MqttQoS.AT_LEAST_ONCE,subChannel,topic,bytes);break;case AT_MOST_ONCE:sendQos0Msg(subChannel.getChannel(),topic,bytes);break;case EXACTLY_ONCE:sendQosConfirmMsg(MqttQoS.EXACTLY_ONCE,subChannel,topic,bytes);break;}}else{if(!subChannel.isCleanSession() & !isRetain){clientSessionService.saveSessionMsg(subChannel.getDeviceId(),SessionMessage.builder().byteBuf(bytes).qoS(qos).topic(topic).build() );break;}}break;case CLOSE: // 连接 设置了 clean session =falseclientSessionService.saveSessionMsg(subChannel.getDeviceId(),SessionMessage.builder().byteBuf(bytes).qoS(qos).topic(topic).build() );break;}});}}

5、QOS 机制

5.1 QOS是什么

可靠的消息传递

MQTT 提供了三种服务质量(QoS),在不同网络环境下保证消息的可靠性。

  • QoS 0:消息最多传送一次。如果当前客户端不可用,它将丢失这条消息。
  • QoS 1:消息至少传送一次。
  • QoS 2:消息只传送一次。

5.2 QOS的实现原理

PublishApiSevice

1.2.1、QOS中1和2 需要确认,这里做了一个缓存

channel代表会话

@Getter
@Setter
public class MqttChannel {private transient  volatile  Channel channel;private String deviceId;private boolean isWill;private volatile SubStatus subStatus; // 是否订阅过主题private  Set<String> topic  ;private volatile SessionStatus sessionStatus;  // 在线 - 离线private volatile boolean cleanSession; // 当为 true 时 channel close 时 从缓存中删除  此channel// messageId - message(qos1)  // 待确认消息private ConcurrentHashMap<Integer,SendMqttMessage>  message ;private  AtomicInteger index ;

看下消息的定义

/*** mqtt 消息**/
@Builder
@Data
public class SendMqttMessage {private int messageId;private Channel channel;private volatile ConfirmStatus confirmStatus;private long time;private byte[]  byteBuf;private boolean isRetain;private MqttQoS qos;private String topic;}
/*** 确认状态***/
public enum ConfirmStatus {PUB,PUBREC,PUBREL,COMPLETE,
}
public enum MqttQoS {AT_MOST_ONCE(0),AT_LEAST_ONCE(1),EXACTLY_ONCE(2),FAILURE(0x80);private final int value;MqttQoS(int value) {this.value = value;}public int value() {return value;}public static MqttQoS valueOf(int value) {for (MqttQoS q: values()) {if (q.value == value) {return q;}}throw new IllegalArgumentException("invalid QoS: " + value);}
}

这里面有几个针对QOS的字段

messageId 是消息的唯一Id

ConfirmStatus 是消息的状态

MqttQoS 是消息确认状态的枚举

5.3 发送流程

protected void sendQosConfirmMsg(MqttQoS qos, MqttChannel mqttChannel, String topic, byte[] bytes) {if(mqttChannel.isLogin()){int messageId = mqttChannel.messageId();switch (qos){case AT_LEAST_ONCE:mqttChannel.addSendMqttMessage(messageId,sendQos1Msg(mqttChannel.getChannel(),topic,false,bytes,messageId));break;case EXACTLY_ONCE:mqttChannel.addSendMqttMessage(messageId,sendQos2Msg(mqttChannel.getChannel(),topic,false,bytes,messageId));break;}}}

待客户端响应之后,修改message的ConfirmStatus

/*** 消息回复确认(qos1 级别 保证收到消息  但是可能会重复)*/@Overridepublic void puback(Channel channel, MqttMessage mqttMessage) {MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();int messageId = messageIdVariableHeader.messageId();Optional.ofNullable(mqttChannelService.getMqttChannel(mqttChannelService.getDeviceId(channel)).getSendMqttMessage(messageId)).ifPresent(msg->msg.setConfirmStatus(ConfirmStatus.COMPLETE)); // 复制为空messageTransfer.removeQueue(channel,messageId);}

待状态都确认完成之后,移除消息

MQTT QoS 0, 1, 2 介绍 | EMQ

6、reatain机制

发布者发布消息时,如果 Retained 标记被设置为 true,则该消息即是 MQTT 中的保留消息(Retained Message)。MQTT 服务器会为每个主题存储最新一条保留消息,以方便消息发布后才上线的客户端在订阅主题时仍可以接收到该消息。

AbstractChannelService

// topic - 保留消息
protected  static  ConcurrentHashMap<String,ConcurrentLinkedQueue<RetainMessage>> retain = new ConcurrentHashMap<>(); 

下面的代码将retainMessage加入到缓存中

/*** 保存保留消息* @param topic 主题* @param retainMessage 信息*/private void saveRetain(String topic, RetainMessage retainMessage, boolean isClean){ConcurrentLinkedQueue<RetainMessage> retainMessages = retain.getOrDefault(topic, new ConcurrentLinkedQueue<>());if(!retainMessages.isEmpty() && isClean){retainMessages.clear();}boolean flag;do{flag = retainMessages.add(retainMessage);}while (!flag);retain.put(topic, retainMessages);}

订阅成功后发送retain消息

/*** 订阅成功后 (发送保留消息)*/public void suscribeSuccess(String deviceId, Set<String> topics){doIfElse(topics,topics1->!CollectionUtils.isEmpty(topics1),strings -> {MqttChannel mqttChannel = mqttChannels.get(deviceId);mqttChannel.setSubStatus(SubStatus.YES); // 设置订阅主题标识mqttChannel.addTopic(strings);executorService.execute(() -> {Optional.ofNullable(mqttChannel).ifPresent(mqttChannel1 -> {if(mqttChannel1.isLogin()){strings.parallelStream().forEach(topic -> {addChannel(topic,mqttChannel);sendRetain(topic,mqttChannel); // 发送保留消息});}});});});}

总结:给还没上线的人留言

7、遗嘱消息

遗嘱消息是 MQTT 为那些可能出现意外断线的设备提供的将遗嘱优雅地发送给其他客户端的能力。设置了遗嘱消息消息的 MQTT 客户端异常下线时,MQTT 服务器会发布该客户端设置的遗嘱消息。

  • 当设备意外断线时,遗嘱消息将被发送至遗嘱 Topic;
   public void doSend( String deviceId) {  // 客户端断开连接后 开启遗嘱消息发送if(StringUtils.isNotBlank(deviceId)&&(willMeaasges.get(deviceId))!=null){WillMeaasge willMeaasge = willMeaasges.get(deviceId);channelService.sendWillMsg(willMeaasge); // 发送遗嘱消息if(!willMeaasge.isRetain()){ // 移除willMeaasges.remove(deviceId);log.info("deviceId will message["+willMeaasge.getWillMessage()+"] is removed");}}}

总结:给还在等消息的人留言

8、Clean Session

为 false 时表示创建一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,直到会话超时注销。为 true 时表示创建一个新的临时会话,在客户端断开时,会话自动销毁。

持久会话避免了客户端掉线重连后消息的丢失,并且免去了客户端连接后重复的订阅开销。这一功能在带宽小,网络不稳定的物联网场景中非常实用。

MqttChannel

private volatile boolean cleanSession; // 当为 true 时 channel close 时 从缓存中删除  此channel

9、cleint id

如果客户端使用一个重复的 Client ID 连接至服务器,将会把已使用该 Client ID 连接成功的客户端踢下线。

10、连接超时(Connect Timeout)

连接超时时长,收到服务器连接确认前的等待时间,等待时间内未收到连接确认则为连接失败。

AbsMqttProducer

  protected   void  connectTo(ConnectOptions connectOptions){checkConnectOptions(connectOptions);if(this.nettyBootstrapClient ==null){this.nettyBootstrapClient = new NettyBootstrapClient(connectOptions);}this.channel =nettyBootstrapClient.start();initPool(connectOptions.getMinPeriod());try {countDownLatch.await(connectOptions.getConnectTime(), TimeUnit.SECONDS);} catch (InterruptedException e) {log.error("InterruptedException",e);nettyBootstrapClient.doubleConnect(); // 重新连接}}

11、总结

在工作中一直使用emqx,但是不知道业务原理,虽然emqx是开源的,但是因为开发语言是erlang,也不好下手去读,在网上随便找了一个开源的实现,代码很老,但是基本的功能属性都有

https://github.com/1ssqq1lxr/iot_push

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/128658.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

智慧工地:数字革命下的建筑业新趋势

在当今建筑领域&#xff0c;智慧工地正迅速崭露头角。这个概念不仅代表了技术进步&#xff0c;还预示着建筑行业的数字化和智能化未来。从多个角度来看&#xff0c;智慧工地都具有深远的意义&#xff0c;它正在改变着我们建筑的方式和未来。 提高工程效率 智慧工地利用物联网&…

软件工程与计算总结(五)软件需求基础

本帖介绍软件需求涉及的诸多基本概念&#xff0c;通过对这些概念的阐述&#xff0c;剖析软件需求的来源、层次、类别、作用等重要知识~ 目录 ​编辑 一.引言 二.需求工程基础 1.简介 2.活动 3.需求获取 4.需求分析 5.需求规格说明 6.需求验证 7.需求管理 三.需求基…

2023年MES系统研究报告-介绍及主要结论 | 百世慧®

随着国内制造业的蓬勃发展&#xff0c;制造企业的数字化转型需求也在不断的增强&#xff0c;使得工业软件的需求也不断被激发。 现在&#xff0c;我国软件产业迎来一个高速发展时期&#xff0c;软件产业高质量发展上升为国家战略&#xff0c;工业软件、基础软件等关键核心技术…

【Python查找算法】二分查找、线性查找、哈希查找

目录 1 二分查找算法 2 线性查找算法 3 哈希查找算法 1 二分查找算法 二分查找&#xff08;Binary Search&#xff09;是一种用于在有序数据集合中查找特定元素的高效算法。它的工作原理基于将数据集合分成两半&#xff0c;然后逐步缩小搜索范围&#xff0c;直到找到目标元素…

线性代数小例子

这样做有什么问题呢&#xff1a; A 2 A > A ( A − E ) 0 > A E A 0 A^2 A > A(A - E) 0> A E \quad A 0 A2A>A(A−E)0>AEA0 上述做法是错误的&#xff0c;这是因为两个矩阵的乘积结果为0&#xff0c;并不能说明这两个矩阵就是0&#xff0c;即上述…

解决方案:AI赋能工业生产3.0,从工业“制造”到“智造”

视频监控技术是一种既成熟又广泛应用于工业制造领域的先进技术。它可以通过安装各种摄像头和传感器来监测整个生产流程&#xff0c;包括原材料的采购、加工、装配和物流等环节&#xff0c;从而实现对生产过程的实时监控和管理&#xff0c;以及对异常事件的及时预警和响应。 在…

ueditor

下载文件 文档 UEditor入门部署 入门部署和体验 1.1 下载编辑器 到官网下载 UEditor 最新版&#xff1a;http://ueditor.baidu.com/website/download.html#ueditor 1.2 创建demo文件 解压下载的包&#xff0c;在解压后的目录创建 demo.html 文件&#xff0c;填入下面的…

MES生产执行解决方案提供商,可定制工厂MES精益制造管理系统-亿发

亿发智能制造MES系统&#xff1a;驱动制造业创新&#xff0c;实现数字化生产和管理 MES管理系统以实时协同思想为核心&#xff0c;着重于精益生产计划的实施和车间实时调度。对生产现场和业务经营的数据进行全面的系统化管理&#xff0c;以数据分析的结果为基础&#xff0c;协助…

docker虚拟网桥和业务网段冲突处理

ifconfig查看docker虚拟网桥ip地址 docker inspect --format{{.Name}} - {{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}} $(docker ps -aq)查询所有容器的ip 修改docker-compose networks networks xxx-network: driver: bridge ipam: c…

微服务技术栈-Nacos配置管理和Feign远程调用

文章目录 前言一、统一配置管理1.添加配置文件2.微服务拉取配置3.配置共享 三、Feign远程调用总结 前言 在上篇文章中介绍了微服务技术栈中Nacos这个组件的概念&#xff0c;Nacos除了可以做注册中心&#xff0c;同样可以做配置管理来使用。同时我们将学习一种新的远程调用方式…

路径总和 III

题目链接 路径总和 III 题目描述 注意点 二叉树的节点个数的范围是 [0,1000]求该二叉树里节点值之和等于 targetSum 的 路径 的数目 解答思路 可根据前缀和的思路解决本题&#xff0c;前缀和表示从根节点开始&#xff0c;往左或往右组成的路径和&#xff0c;统计从根节点开…

前端uniapp生成海报并保存相册

uiapp插件 目录 图片qrcode.vue源码完整版封装源码qrcodeSwiper.vue最后 图片 qrcode.vue源码完整版 <template><view class"qrcode"><div class"qrcode_swiper SourceHanSansSC-Normal"><!-- <cc-scroolCard :dataInfo"dat…