SpringBoot 实现EMQ设备的上下线告警

前言

上下线通知

我遇到了一个难题,即在使用EMQ X 4.4.10的开源版本时,我需要实现设备的上下线状态监控,但该4.4.10开源版本并未内置设备上下线提醒模块,只有企业版才内置了该模块。这为我带来了一些技术上的难题,迫使我必须另辟蹊径,通过其他方法来监听设备的上下线状态。为了解决这个问题,我采取了以下步骤:

首先,我修改了EMQ Xacl.config文件,添加了以下规则:

{allow, all, subscribe, ["$SYS/brokers/+/clients/#"]}.

图示:

这个规则允许订阅$SYS/brokers/+/clients/#主题的所有客户端。

接下来,我使用Spring Boot创建了一个应用程序,其中我设置了与EMQ X代理的连接。在这个应用程序中,我创建了一个监听器,用于订阅$SYS/brokers/+/clients/#主题,以感知设备的上下线信息。

通过这个解决方案,我能够实时监控设备的连接和断开事件,而不受EMQ X开源版本的功能限制。这使我能够根据设备状态采取适当的措施,比如发送通知或执行其他自定义操作。这个方法允许我灵活地定制解决方案,以满足我的特定需求,而无需依赖EMQ X的功能。

EMQ简介


EMQErlang MQTT Broker)是一种基于Erlang编程语言开发的开源消息传递代理(MQTT broker),专门用于支持MQTTMessage Queuing Telemetry Transport)协议。MQTT是一种轻量级、高效的消息传递协议,最初设计用于连接受限的设备,如嵌入式系统和物联网设备。EMQ作为MQTT broker,提供了可靠的消息传递机制,使设备能够相互通信,同时也可与后端应用程序集成,使其成为物联网生态系统中的重要组成部分。

环境

  • EMQX安装方式:Docker
  • EMQX版本:4.4.10开源版本
  • 操作系统:CentOS 7

EMQX4.4版本官方文档

下载

下载 EMQX

准备工作

安装EMQX

修改EMQX的ACL规则内容

EMQX的Docker容器配置文件所在目录

# 进入EMQX容器
docker exec -it emqx /bin/sh
# 进入配置文件目录
cd /opt/emqx/etc
# 查看acl配置文件
cat acl.conf
# 编辑acl配置文件
vi acl.conf

非Docker容器配置文件所在目录

# 进入配置文件目录
cd /etc/emqx
# 查看acl配置文件
cat acl.conf
# 编辑acl配置文件
vi acl.conf

acl的默认文件内容

%%--------------------------------------------------------------------
%% [ACL](https://docs.emqx.io/broker/v3/en/config.html)
%%
%% -type(who() :: all | binary() |
%%                {ipaddr, esockd_access:cidr()} |
%%                {ipaddrs, [esockd_access:cidr()]} |
%%                {client, binary()} |
%%                {user, binary()}).
%%
%% -type(access() :: subscribe | publish | pubsub).
%%
%% -type(topic() :: binary()).
%%
%% -type(rule() :: {allow, all} |
%%                 {allow, who(), access(), list(topic())} |
%%                 {deny, all} |
%%                 {deny, who(), access(), list(topic())}).
%%--------------------------------------------------------------------{allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.{allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.{allow, all, subscribe, ["$SYS/brokers/+/clients/#"]}.{allow, all}.

新增一条ACL规则

allow, all, subscribe, ["$SYS/brokers/+/clients/#"]}.
  • allow: 表示这是一个允许(allow)访问的规则。这意味着与此规则匹配的操作将被允许。
  • all: 表示这个规则适用于所有的客户端。
  • subscribe: 表示这个规则定义了对主题的订阅权限。
  • $SYS/brokers/+/clients/#: 这是一个主题过滤器,它指定了匹配的主题模式。在这里,$SYS/brokers/+/clients/# 表示以 $SYS/brokers/ 开头,然后是一个通配符 +,它可以匹配单个层级的任何名称,接着是 clients/,最后又有一个 # 通配符,它可以匹配零个或多个层级的名称。因此,这个主题过滤器匹配了所有以 $SYS/brokers/ 开头,然后紧跟着 clients/ 的主题。

综合起来,这个规则允许所有的客户端订阅以 $SYS/brokers/ 开头,然后跟着 clients/的所有主题。通常,这种规则用于允许所有客户端订阅系统级别的信息或监控数据,如经纪人(Broker)的客户端连接状态等。这可以用于监视和诊断MQTT Broker 的性能和状态。

注意:修改完毕之后使用emqx_ctl reload_acl重新加载acl规则或者直接重启emqx服务

搭建一个能够监听EMQX主题的Spring Boot应用程序

MQTT相关依赖

<!-- MQTT相关依赖 -->
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId>
</dependency>

MQTT接受订阅的主题

$SYS/brokers/+/clients/#

处理设备上下线事件

获取EMQX消息主题

// 从消息请求头中获取消息主题topic
String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));

获取topic最后的节点字符串

以下方式通过主题topic来获取ClientId

// topic最后的节点字符串
String lastPart = extractLastPart(topic);
// 获取消息内容
String payload = StrUtil.str((byte[]) message.getPayload(), "utf-8");
// 判断设备是上线或下线消息
if ("connected".equals(lastPart)) {// 设备上线消息clientId = extractClientIdFromTopic(topic);log.info("设备上线提醒 -> IMEI:{}", clientId);
} else if ("disconnected".equals(lastPart)) {// 设备下线消息clientId = extractClientIdFromTopic(topic);log.info("设备下线警告 -> IMEI:{}", clientId);
}/*** 获取最后一个节点** @param topic 主题* @return 节点内容*/
public static String extractLastPart(String topic) {// 使用split方法将字符串根据'/'分割成数组String[] parts = topic.split("/");// 获取最后一个元素String lastPart = parts[parts.length - 1];return lastPart;
}/*** 从Topic中提取ClientId** @param topic 主题* @return ClientId*/
public static String extractClientIdFromTopic(String topic) {// 使用正则表达式匹配主题中的ClientIdString regex = "\\$SYS/brokers/[^/]+/clients/([^/]+)/(connected|disconnected)";Pattern pattern = Pattern.compile(regex);Matcher matcher = pattern.matcher(topic);// matcher.groupCount() 是一个方法,用于获取正则表达式中定义的组数。在正则表达式中,使用括号 () 来定义捕获组。在这个情况下,正则表达式 \\$SYS/brokers/[^/]+/clients/([^/]+)/(connected|disconnected) 中有两组,分别是括号内的 ([^/]+) 部分和 (connected|disconnected) 部分。matcher.groupCount() 返回的是正则表达式中捕获组的数量if (matcher.matches() && matcher.groupCount() == 2) {// 如果正则匹配成功,提取ClientId并返回return matcher.group(1);} else {// 如果匹配失败,返回null或者抛出异常,视情况而定return null;
}

当然你也可以通过解析payload来获取更多详细信息,可参照官方文档:客户端上下线事件

主题 (Topic)说明
${clientid}/connected上线事件。当任意客户端上线时,EMQX 就会发布该主题的消息
${clientid}/disconnected下线事件。当任意客户端下线时,EMQX 就会发布该主题的消息

connected 事件消息的 Payload 解析成 JSON 格式如下:

{"username": "foo","ts": 1625572213873,"sockport": 1883,"proto_ver": 4,"proto_name": "MQTT","keepalive": 60,"ipaddress": "127.0.0.1","expiry_interval": 0,"connected_at": 1625572213873,"connack": 0,"clientid": "emqtt-8348fe27a87976ad4db3","clean_start": true
}

disconnected 事件消息的 Payload 解析成 JSON 格式如下:

{"username": "foo","ts": 1625572213873,"sockport": 1883,"reason": "tcp_closed","proto_ver": 4,"proto_name": "MQTT","ipaddress": "127.0.0.1","disconnected_at": 1625572213873,"clientid": "emqtt-8348fe27a87976ad4db3"
}

可以解析JOSN数据拿到clientid,注意此处使用的JSON解析工具是Hutool的。

// 获取消息内容
String payload = StrUtil.str((byte[]) message.getPayload(), "utf-8");
// 解析JSON字符串
JSONObject payloadJsonObject = JSONUtil.parseObj(payload);
// 获取ClientId
String clientid = payloadJsonObject.getStr("clientid");

实现效果

总结

  1. 修改 EMQ X ACL 配置: 你在 EMQ X 中修改了 acl.config 文件,添加了相应的 ACL 规则,允许订阅 $SYS/brokers/+/clients/# 主题的所有客户端。这个步骤允许你在开源版本中访问关键的设备连接信息。

  2. 创建 Spring Boot 应用程序: 通过创建一个 Spring Boot 应用程序,你建立了一个连接到 EMQ X 代理的桥梁。这个应用程序充当了监听器,用于订阅 $SYS/brokers/+/clients/# 主题,以实时感知设备的连接和断开事件。

  3. 实时设备监控: 你的解决方案允许你实时监控设备的连接状态,而无需依赖 EMQ X 企业版的内置功能。这使你能够快速响应设备状态的变化,并采取相应的行动,如发送通知或执行自定义操作。

  4. 定制性: 通过这个方法,你能够灵活地定制解决方案,以满足你的特定需求。你可以根据设备状态采取不同的操作,因此具有更大的灵活性。

  5. 避免功能限制: 尽管 EMQ X 4.4.10 开源版本没有内置设备上下线提醒模块,但你成功地绕过了这个限制,通过自定义配置和应用程序开发来实现了所需的功能。

  6. 无需升级或许可证: 你的解决方案不需要升级到 EMQ X 企业版或购买额外的许可证。这使得你可以在开源版本的基础上构建所需的功能。

  7. 总而言之,你的解决方案是一种巧妙的方式,通过修改配置和创建一个自定义的 Spring Boot 应用程序,实现了设备上下线状态的监控和管理。这个方法不仅解决了技术挑战,还提供了灵活性和定制性,以满足你的特定需求。这是一个创造性的方法,适用于需要在 EMQ X 开源版本中实现设备监控的情况。

参考文献

EMQX4.4官方文档
SpringBoot整合MQTT(EMQ)设备上下线告警
【EMQX 5.0】 Spring Cloud 集成MQTT并异步入库 + 客户端上报数据 + 上下线主题订阅

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/129941.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

记录宝塔面板申请ssl证书报错 Invalid version. The only valid version for X509Req is 0

问题 宝塔面板申请ssl证书报错 Invalid version. The only valid version for X509Req is 0。 原因是由于服务器端使用了不兼容的 OpenSSL 版本导致的&#xff0c;服务器端的X509Req 版本只支持 0&#xff0c;而宝塔这边默认的版本为2。 我的是之前可以申请ssl&#xff0c;过…

玄子Share- IDEA 2023 SpringBoot 热部署

玄子Share- IDEA 2023 SpringBoot 热部署 修改 IDEA 部署设置 IDEA 勾选如下选项 新建 SpringBoot 项目 项目构建慢的将 Spring Initializr 服务器 URL 改为阿里云&#xff1a;https://start.aliyun.com/ 在这里直接勾选Spring Boot Devtools插件即可 测试 切出 IDEA 项目文…

HTML5+CSSday4综合案例二——banner效果

bannerCSS展示图&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"wi…

【C++】List -- 详解

一、list的介绍及使用 https://cplusplus.com/reference/list/list/?kwlist list 是可以在常数范围内在任意位置进行插入和删除的序列式容器&#xff0c;并且该容器可以前后双向迭代。 list 的底层是双向链表结构&#xff0c;双向链表中每个元素存储在互不相关的独立节点中&…

Redis AOF重写原原理

重写aof之前 appendonly.aof.1.base.aof appendonly.aof.1.incr.aof appendonly.aof.manifest 重写aof 一次 appendonly.aof.2.base.aof 大小变化 appendonly.aof.2.incr.aof 大小o appendonly.aof.manifest 大小不变 AOF文件重写并不是对原文件进行重新整理&#xff0c;而是直…

Elasticsearch:什么是检索增强生成 - RAG?

在人工智能的动态格局中&#xff0c;检索增强生成&#xff08;Retrieval Augmented Generation - RAG&#xff09;已经成为游戏规则的改变者&#xff0c;彻底改变了我们生成文本和与文本交互的方式。 RAG 使用大型语言模型 (LLMs) 等工具将信息检索的能力与自然语言生成无缝结合…

hive add columns 后查询不到新字段数据的问题

分区表add columns 查询不到新增字段数据的问题&#xff1b; 5.1元数据管理 &#xff08;1&#xff09;基本架构 Hive的2个重要组件&#xff1a;hiveService2 和metastore,一个负责转成MR进行执行&#xff0c;一个负责元数据服务管理 beeline-->hiveService2/spar…

软件企业找第三方软件测评机构做确认测试有什么优势?

软件确认测试是一个在软件开发过程中十分重要的环节。它确保了软件的功能符合预期&#xff0c;达到了用户的需求和期望。确认测试主要验证软件的功能、性能、易用性、稳定性等方面&#xff0c;旨在发现和修复潜在的问题和缺陷。通过进行全面的确认测试&#xff0c;软件企业可以…

视频怎么压缩?这样做视频变小还清晰

在我们的日常生活和工作中&#xff0c;视频已经成为了不可或缺的一部分。然而&#xff0c;随着视频文件的增大&#xff0c;如何有效地压缩视频以方便存储和传输成了一个重要的问题&#xff0c;如果你还不知道怎么压缩视频大小&#xff0c;不妨试试下面的方法吧~ 方法一&#xf…

每日leetcode_2441_对应负数同时存在的最大整数

Leetcode每日一题_2441_对应负数同时存在的最大整数 记录自己的成长&#xff0c;加油。 题目 解题 class Solution {public int findMaxK(int[] nums) {int k -1;Set<Integer> set new HashSet<Integer>();for (int x : nums) {set.add(x);}for (int x : nums) …

Flink-SQL join 优化 -- MiniBatch + local-global

背景 问题1. 近期在开发flink-sql期间&#xff0c;发现数据在启动后&#xff0c;任务总是进行重试&#xff0c;运行一段时间后&#xff0c;container心跳超时&#xff0c;内存溢出&#xff0c;作业无法进行正常工作 023-10-07 14:53:30,408 | INFO | [flink-akka.actor.defa…

家政服务行业做开发微信小程序可以实现什么功能

家政服务行业开发微信小程序可以实现多种功能&#xff0c;从而提升服务品质和效率&#xff0c;下面我们来详细介绍一些可能实现的功能。 一、展示服务信息 家政服务微信小程序可以展示各种服务信息&#xff0c;包括各类家政服务项目、价格、服务流程、服务人员信息等。用户可以…