基于MQTT协议实现微服务架构事件总线

一、场景描述

昨天在博客《客户端订阅服务端事件的实现方法》中提出了利用websocket、服务端EventEmitter和客户端mitt实现客户端订阅服务端事件,大大简化了客户端对服务端数据实时响应的逻辑。上述方案适用于单服务节点的情形。

对于由服务集群支撑的微服务架构,websocket提供的点对点通信已无法满足前端订阅后端集群事件的需求,升级方案是使用基于消息总线的通信方式。

在这里插入图片描述

二、几种消息总线适用性比较

常用的消息总线包括Kafka、Redis和基于MQTT协议实现的EMQX。

Kafka和MQTT都是从发布/订阅系统演化而来,但发展侧重点不同。Kafka通过分布式架构提供了海量数据流的存储,并保证数据流顺序,它的设计目标是支持数据发布、订阅和存储。而MQTT用于网络中传输小型数据包,其设计目的是实现简单、可靠的设备间通信。

而Redis是从内存数据库系统演化而来,发布/订阅功能是把消息保存在内存中。与Kafka相比,其只能提供半持久化;与MQTT相比,其通信效率较低。

由于应用场景没有对消息持久化的需求,且考虑到产业大脑平台未来会接入工业互联网,使用MQTT协议来搭建事件总线更利于平台在工业互联网环境下的扩展。

三、MQTT简介

几年前,本人曾写过MQTT简介。本文摘抄其中重要概念。

(一)MQTT协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是基于“订阅/发布”模式的轻量级通信协议,该协议基于TCP/IP,能以极低的带宽为海量(百万级)跨域设备提供可靠的消息服务,因此在物联网、小型移动终端、边缘计算方面有广泛应用。
所谓可靠的消息传输,体现为可配置消息的服务质量(QoS),有三种服务质量可选:

  • 至多一次:
    消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。应用场景如环境传感器的数据采集,丢失一次记录无所谓,因为不久后还会有第二次发送。
  • 至少一次:
    确保消息送达订阅者,但消息可能重复,适用于幂等性操作。
  • 只有一次:
    最严格的消息服务质量,确保消息到达且仅到达一次订阅者。应用场景如计费系统等。

MQTT协议中存在三种身份:消息总线(Broker)、发布者(Publish)和订阅者(Subscribe),其中消息总线属于服务器,后两者都属于客户端。发布者和订阅者可以是各种物联网设备和小型终端,消息发布者可以同时也是消息订阅者,如下图所示。

MQTT.png

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

  • Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
  • payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

订阅消息时,可以在订阅表达式中使用通配符筛选器对主题进行筛选,可同时订阅所匹配的多个主题。
MQTT协议中主要有以下5个方法:

  • connect:客户端建立与服务器的连接
  • disconnect:等待客户端完成工作后,端口与总线的会话
  • subscribe:客户端向消息总线注册订阅主题
  • unsubscribe:客户端等待消息总线取消所注册的订阅
  • publish:客户端向消息总线发送某主题的消息

(二)开源消息总线EMQX

EMQX(Erlang/Enterprise/Elastic MQTT Broker),是基于Erlang语言开发的开源物联网MQTT消息总线。其是一款由前华为员工开发的开源软件,软件主页为https://www.emqx.io/。可根据操作系统类别选择不同版本下载安装,或通过docker部署。

软件安装后,通过 emqx start以后台方式启动。启动后将会开放两个端口:

  • 18083端口为控制台端口,可通过浏览器访问该端口,首次登录的用户名和密码为admin和public。控制台提供了总线监控、用户权限管理、在线客户端订阅/发布等功能。
  • 8083端口为通信端口,MQTT客户端可通过该端口与EMQX消息总线通信。

(三)MQTT.js客户端

MQTT.js是MQTT客户端Nodejs SDK,可在浏览器(ES模块)和Node.js环境(CommonJS模块)下使用,前者可通过MQTT over WebSocket使用,后者既可以通过MQTT over WebSocket使用,也可以直接使用MQTT。区别仅仅是连接参数的协议头不同。

1. 安装和帮助文件

$ pnpm i mqtt -S #安装
$ npx mqtt help  #帮助
MQTT.js command line interface, available commands are:* publish     publish a message to the broker* subscribe   subscribe for updates from the broker* version     the current MQTT.js version* help        help about commandsLaunch 'mqtt help [command]' to know more about the commands.

2. 使用方法

// const mqtt = require('mqtt') //ES模块
import mqtt from 'mqtt'  //CommonJS模块// 连接选项
const options = {clean: true, // true: 清除会话, false: 保留会话connectTimeout: 4000, // 超时时间// 认证信息clientId: 'user_id', // 要保证唯一性// 若在控制台配置了用户名和密码:// username: 'xxx',// password: 'xxx',
}// 连接字符串, 通过协议指定使用的连接方式
// ws 未加密 WebSocket 连接
// wss 加密 WebSocket 连接
// mqtt 未加密 TCP 连接
// mqtts 加密 TCP 连接
// wxs 微信小程序连接
// alis 支付宝小程序连接
const connectUrl = 'ws://localhost:8084/mqtt'
const client = mqtt.connect(connectUrl, options)client.on('reconnect', error => {console.error('正在重连:', error)
})client.on('error', error => {console.error('连接失败:', error)
})//收到消息
client.on('message', (topic, message) => {console.log('收到消息:', topic, message.toString()) //message是二进制流,需要转换成字符串
})//订阅主题
const topic='/user_id/#'
const qos=0  //0:最多交付1次;1:至少交付1次;2:只交付1次
client.subscribe(topic, qos, error=>{  //订阅user_id主题下所有消息if(error){console.error('订阅主题失败:', error)return}console.log('订阅成功')
})//发布消息
client.publish('user_id/a',JSON.stringify({a:123}),qos,error=>{if(error){console.error('发布消息失败:', error)}
})//取消订阅
client.unsubscrib(topic, qos, error=>{if(error){console.error('取消订阅失败', error)return}
})//断开连接
if(client.connected){try{client.end(false,()=>{console.log('成功断开连接')})catch(error){console.error('断开连接失败:', error)}}
}

(四)安全性

1. 排它订阅

排它订阅是 EMQX 支持的 MQTT 扩展功能。排它订阅允许对主题进行互斥订阅,一个主题同一时刻仅被允许存在一个订阅者,在当前订阅者未取消订阅前,其他订阅者都将无法订阅对应主题。

2. JWT认证

系统整体采用JWT认证方式,通过一台认证服务器颁发JWT Token。MQTT客户端访问EMQX总线时,携带由认证服务器颁发的JWT Token。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/497112.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【数据结构与算法】回溯法解题20240229

【数据结构与算法】回溯法解题20240229 一、46. 全排列1、以[1,2,3]为例,抽象成树形结构2、回溯三部曲 二、LCR 084. 全排列 II1、以[1,1,2]为例,抽象成树形结构 三、面试题 08.07. 无重复字符串的排列组合四、面试题 08.08. 有重复字符串的排列组合 一、…

Oracle 直接路径插入(Direct-Path Insert)

直接路径插入(Direct Path Insert)是Oracle一种数据加载提速技术,可以在使用insert语句或SQL*Loader工具大批量加载数据时使用。直接路径插入处理策略与普通insert语句完全不同,Oracle会通过牺牲空间,安全性&#xff0…

LeetCode19. 删除链表的倒数第 N 个结点(C++)

LeetCode19. 删除链表的倒数第 N 个结点 题目链接代码 题目链接 https://leetcode.cn/problems/remove-nth-node-from-end-of-list/description/ 代码 /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode() : …

DAY10-内容安全过滤技术概述

文件过滤技术流程: 应用行为控制技术具体:

npm run dev和npm run serve两个命令的区别

npm run dev和npm run serve两个命令的区别 前端开发过程中运行Vue项目的时候,有时候使用npm run serve命令可以启动项目,有时候却会报错;有时候使用npm run dev命令可以启动项目,有时候却也会报错。是什么原因造成这种情况呢&am…

如何正确处理 Go 项目中关于文件路径的问题

嗨,大家好!我是波罗学。本文是系列文章 Go 技巧第十九篇,系列文章查看:Go 语言技巧。 在使用 Go 开发项目时,估计有不少人遇到过无法正确处理文件路径的问题,特别是刚从如 PHP、python 这类动态语言转向 Go…

复盘昨天的内容

vue调节css 后端做业务处理 1.分类管理 GetMapping("/queryCtc")public ApiResult queryCtc(){return ctcService.queryCtc();}/*** 修改类目信息* return*/PutMapping("/updateCtc")public ApiResult updateCtc(RequestBody ShopCtc shopCtc){return c…

Python Web开发记录 Day4:JavaScript

名人说:莫道桑榆晚,为霞尚满天。——刘禹锡(刘梦得,诗豪) 创作者:Code_流苏(CSDN)(一个喜欢古诗词和编程的Coder😊) 目录 四、JavaScript1、JavaScript-基础①JavaScript…

【AI绘画·24年1月最新】Stable Diffusion整合包安装!解压即用--秋葉aaaki 大佬的作品,试用

前言 Stable Diffusion 之前费老大的劲部署安装,解决报错。搞完之后,突然发现有个现成集成包可以用,真是效率高到不行,今天搞下来试试 我电脑配置: CPU: 12th Gen Intel Core™ i7-12700F 2.10 GHz 内存32G&#xff0…

【MySQL】内置函数 -- 详解

一、日期函数 日期:年月日时间:时分秒 1、获得年月日 2、获得时分秒 3、获得时间戳 4、在日期的基础上加日期 5、在日期的基础上减去时间 6、计算两个日期之间相差多少天 7、获得当前时间 ⚪练习 (1)记录生日 (2&…

蓝桥杯备战刷题three(自用)

1.合法日期 #include <iostream> #include <map> #include <string> using namespace std; int main() {map<string,int>mp;int days[13]{0,31,28,31,30,31,30,31,31,30,31,30,31};for(int i1;i<12;i){for(int j1;j<days[i];j){string sto_strin…

硬件工程师入门基础知识(三)钽电容应用(二)

钽电容应用&#xff08;二&#xff09; 1.钽电容使用容量选择2.非固体电解质钽电容器使用时应注意的问题2.1 容量和损耗2.2 直流漏电流2.3 使用电压2.4 反向电压2.5 纹波电流2.6 失效率的影响因素2.7 补充说明&#xff1a; 1.钽电容使用容量选择 许多情况下&#xff0c;高能混…