golang开源的可嵌入应用程序高性能的MQTT服务

golang开源的可嵌入应用程序高性能的MQTT服务

什么是MQTT?

MQTT(Message Queuing Telemetry Transport)是一种轻量级的、开放的消息传输协议,设计用于在低带宽、高延迟或不可靠的网络环境中进行通信。MQTT最初由IBM开发,现已成为OASIS标准。
MQTT的设计目标是提供一种简单、轻量、可扩展的协议,适用于各种设备和网络条件。它通常用于物联网(IoT)和传感器网络,其中设备需要以有效的方式进行通信,并且资源(如带宽和电池寿命)可能受到限制。
MQTT的简单设计和适用性使其成为物联网中常用的通信协议之一。它被广泛用于传感器网络、嵌入式设备、移动应用程序和其他场景中,提供了一种可靠、高效的消息传输机制。

什么是Mochi-MQTT

源代码地址:https://github.com/mochi-mqtt/server

Mochi MQTT 是一个完全兼容 MQTT v5的可嵌入的中间件/服务器,完全使用 Go 语言编写,旨在用于遥测和物联网项目的开发。它可以作为独立的二进制文件使用,也可以嵌入到你自己的应用程序中库来使用,经过提出的设计以实现问题的轻量化和快速部署,同时也非常重视代码的质量和可维护性。

用途

物联网项目开发时,常常需要使用MQTT协议对设备接入,在很多场景中,私有化部署物联网系统时资源比较少,性能要求高,一些大型的MQTT服务不满足要求,而且代码不可控。
还有在边缘场景下,需要在边缘网关,边缘控制器设备上部署物联网系统,但是边缘网关的资源很少,内存大约只有4G,所以使用java开发的物联网系统就很难部署上去;使用C/C++开发效率又很低,所以Go语言是最合适的,
Mochi-MQTT刚好又完全是Go编写的开源的,可以嵌入到自己的程序启动。

Mochi MQTT独立部署

Golang的环境配置这里不做说明,请看我前面的博文说明

Mochi MQTT 可以作为独立的中间件使用。只需拉取此仓库代码,然后在 cmd 文件夹中运行 cmd/main.go ,默认将开启下面几个服务端口, tcp (:1883)、websocket (:1882) 和服务状态监控 (:8080) 。

cd cmd
go build -o mqtt && ./mqtt

docker部署

可以从 Docker Hub 仓库中拉取并运行Mochi MQTT官方镜像:

docker pull mochimqtt/server
或者
docker run mochimqtt/server

也提供了一个简单的 Dockerfile,用于运行 cmd/main.go 中的 Websocket(:1882)、TCP(:1883) 和服务端状态信息(:8080)这三个服务监听:

docker build -t mochi:latest .
docker run -p 1883:1883 -p 1882:1882 -p 8080:8080 mochi:latest

嵌入自己项目运行和开发

下载Mochi MQTT包

go get github.com/mochi-mqtt/server/v2

将Mochi MQTT作为包导入使用, 示例代码如下

import (mqttServer "github.com/mochi-mqtt/server/v2""github.com/mochi-mqtt/server/v2/listeners""github.com/mochi-mqtt/server/v2/packets"
)var Server *mqttServer.Serverfunc ServerMqttInit() {// 创建新的 MQTT 服务器。Server = mqttServer.New(&mqttServer.Options{InlineClient: true, // 启动内联客户端})// 初始化数据库实例edge := &edgeHook{deviceDao: deviceDao.NewDeviceRepository(),productDao:     productDao.NewProductRepository(),}// 添加自定义权限方法err := Server.AddHook(edge, nil)if err != nil {log.Fatal(err)}// 在1883端口上创建一个 TCP 服务端。tcp := listeners.NewTCP("t1", ":1883", nil)err = Server.AddListener(tcp)if err != nil {log.Fatal(err)}// 在1882端口上创建一个 Websocket 服务端。ws := listeners.NewWebsocket("ws1", ":1882", nil)err = server.AddListener(ws)if err != nil {log.Fatal(err)}go func() {err := Server.Serve()if err != nil {log.Fatal(err)}}()
}type edgeHook struct {mqttServer.HookBasedeviceDao      deviceDao.DeviceRepositoryproductDao     productDao.ProductRepository
}func (h *edgeHook) ID() string {return "mqtt-auth"
}func (h *edgeHook) Provides(b byte) bool {// 实现钩子函数return bytes.Contains([]byte{//MQTT连接时认证. 当用户尝试与服务器进行身份验证时调用。mqttServer.OnConnectAuthenticate,//MQTT topic权限控制. 当用户尝试发布或订阅主题时调用,用来检测ACL规则。mqttServer.OnACLCheck,//在新客户端连接并进行身份验证后,会立即调用此方法,并在会话建立和发送CONNACK之前立即调用。mqttServer.OnSessionEstablish,//当客户端因任何原因断开连接时调用。mqttServer.OnDisconnect,//当客户端向订阅者发布消息后调用。mqttServer.OnPublished,}, []byte{b})
}// OnConnectAuthenticate MQTT连接时认证. 当用户尝试与服务器进行身份验证时调用。
func (h *edgeHook) OnConnectAuthenticate(cl *mqttServer.Client, pk packets.Packet) bool {username := string(pk.Connect.Username)password := string(pk.Connect.Password)if username == "" || len(username) == 0 {return false}if password == "" || len(password) == 0 {return false}return true
}// OnACLCheck MQTT topic权限控制. 当用户尝试发布或订阅主题时调用,用来检测ACL规则。
func (h *edgeHook) OnACLCheck(cl *mqttServer.Client, topic string, write bool) bool {username := string(cl.Properties.Username)if username == "" || len(username) == 0 {return false}if topic == "" || len(topic) == 0 {return false}return true
}// OnSessionEstablish 在新客户端连接并进行身份验证后,会立即调用此方法,并在会话建立和发送CONNACK之前立即调用。
func (h *edgeHook) OnSessionEstablish(cl *mqttServer.Client, pk packets.Packet) {username := string(cl.Properties.Username)if username == "" || len(username) == 0 {return}//设备连接MQTT成功后保存设备在线状态
}// OnDisconnect 当客户端因任何原因断开连接时调用。
func (h *edgeHook) OnDisconnect(cl *mqttServer.Client, err error, expire bool) {username := string(cl.Properties.Username)if username == "" || len(username) == 0 {return}//设备断开MQTT成功后保存设备离线状态
}// OnPublished 当客户端向订阅者发布消息后调用。
func (h *edgeHook) OnPublished(cl *mqttServer.Client, pk packets.Packet) {Log.Infof("mqtt server OnPublished info topic=%s, msg=%s", pk.TopicName, string(pk.Payload))//收到客户端消息后做业务逻辑处理
}// 使用内联客户端方式,向MQTT发送消息
func PublishMsg(topic string, msg []byte) bool {err := Server.Publish(topic, msg, false, 0)if err != nil {Log.Errorf("mqtt EdgePublish error=%v, topic=%s, msg=%s", err, topic, msg)return false}return true
}// 使用内联客户端方式,订阅边缘MQTT消息topic
func SubscribeTopic(topic string, subscriptionId int, callback func(topic string, msg []byte)) {callbackFn := func(cl *mqttServer.Client, sub packets.Subscription, pk packets.Packet) {Log.Info("mqtt EdgeSubscribe received message", "client", cl.ID, "subscriptionId", sub.Identifier,"topic", pk.TopicName, "payload", string(pk.Payload))callback(pk.TopicName, pk.Payload)}_ = Server.Subscribe(topic, subscriptionId, callbackFn)
}// 使用内联客户端方式,取消订阅边缘MQTT消息topic
func UnsubscribeTopic(topic string, subscriptionId int) {_ = Server.Unsubscribe(topic, subscriptionId)
}func main() {// 创建信号用于等待服务端关闭信号sigs := make(chan os.Signal, 1)done := make(chan bool, 1)signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)go func() {<-sigsdone <- true}()<-doneLog.Error("caught signal, stopping...")Server.Close()Log.Error("main.go finished")
}

监控MQTT指标信息

mqttRouters := r.Group("/mqtt", func(context *gin.Context) {}){mqttRouters.GET("stats", func(c *gin.Context) {util.R(c, nil, mqtt.Server.Info)})}

在这里插入图片描述

详情使用指南请看:https://github.com/mochi-mqtt/server

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/452514.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

fastjson 导致的OOM

fastjson 导致的OOM 示例代码 public static void main(String[] args) throws Exception {try {List<Integer> list JSONObject.parseArray("[2023,2024", Integer.class);}catch (Exception e){System.err.println("error");}System.out.println…

【Java集合框架ArrayList、LinkedList、HashSet之间的区别】

Java集合框架ArrayList、LinkedList、HashSet之间的区别 1. 实现方式&#xff1a;2. 插入和删除操作的效率&#xff1a;3. 随机访问的效率&#xff1a;4. 内存占用&#xff1a;综上所述&#xff0c;选择ArrayList还是LinkedList或HashSet取决于具体的使用场景。如果需要频繁的插…

Javascript入门:第三个知识点:javascript里的数据类型、运算符

数字类型 123 //整数 123.1 //浮点数 1.123e3 //科学计数法 -10 //负数 NaN //not a number Infinity //无限大 以上的类型在javascript里都是数字类型 字符串类型 在开始之前&#xff0c;我需要先说明白两个知识点&#xff1a; console.log()是啥&#xff1f; let 与 v…

onlyfans无法订阅?2024年订阅onlyfans最新教程一键直达

讲在前面-关于OnlyFans 欧美除了脸书和推特之外&#xff0c;又新起了一个社交软件&#xff0c;它就是onlyfans&#xff0c;简称o站。 在极短的时间内&#xff0c;它就拥有了1.2亿的用户量&#xff0c;而全站订阅金额更是达到了17亿英镑&#xff0c;换成人民币&#xff0c;数额…

Palworld幻兽帕鲁自建服务器32人联机开黑!

玩转幻兽帕鲁服务器&#xff0c;阿里云推出新手0基础一键部署幻兽帕鲁服务器教程&#xff0c;傻瓜式一键部署&#xff0c;3分钟即可成功创建一台Palworld专属服务器&#xff0c;成本仅需26元&#xff0c;阿里云服务器网aliyunfuwuqi.com分享2024年新版基于阿里云搭建幻兽帕鲁服…

ASM-HEMT参数提取和模型验证测试

参数提取程序 直流I-V参数提取 DC模型参数提取流程对于ASM-GaN-HEMT模型可以总结在下图中。 以下步骤描述了该流程&#xff1a; 在模型中设置物理参数&#xff0c;如L&#xff08;沟道长度&#xff09;、W&#xff08;沟道宽度&#xff09;、NF&#xff08;栅指数&#xf…

【软件设计师笔记】一篇文章让你读懂什么是软件工程与系统开发

【考证须知】IT行业高含金量的证书(传送门)&#x1f496; 【软件设计师笔记】计算机系统基础知识考点(传送门) &#x1f496; 【软件设计师笔记】程序语言设计考点(传送门) &#x1f496; 【软件设计师笔记】操作系统考点(传送门)&#x1f496; &#x1f413; 软件工程 软件…

PyTorch 2.2 中文官方教程(十四)

参数化教程 原文&#xff1a; 译者&#xff1a;飞龙 协议&#xff1a;CC BY-NC-SA 4.0 作者&#xff1a;Mario Lezcano 注意 点击这里下载完整示例代码 在本教程中&#xff0c;您将学习如何实现并使用此模式来对模型进行约束。这样做就像编写自己的nn.Module一样容易。 对深…

企业计算机服务器中了mallox勒索病毒怎么办?Mallox勒索病毒解密

随着社会的不断发展&#xff0c;数字化经济已经成为常态化&#xff0c;企业的数据慢慢地成为生存发展的根本&#xff0c;保护企业的核心数据是众多企业关心的主要话题。近期&#xff0c;云天数据恢复中心接到很多企业的求助&#xff0c;企业的计算机服务器遭到了mallox勒索病毒…

Mysql+MybatisPlus+Vue实现基础增删改查CRUD

数据库 设计数据库 设计几个字段&#xff0c;主键id自动增长且不可为空 create table if not exists user (id bigint(20) primary key auto_increment comment 主键id,username varchar(255) not null comment 用户名,sex char(1) not null comment 性…

力扣热门100题刷题笔记 - 3.无重复字符的最长子串

力扣热门100题 - 3.无重复字符的最长子串 题目链接&#xff1a;3. 无重复字符的最长子串 题目描述&#xff1a; 给定一个字符串 s &#xff0c;请你找出其中不含有重复字符的 最长子串 的长度。示例&#xff1a; 输入: s "abcabcbb" 输出: 3 解释: 因为无重复字…

基于ChatGLM.cpp实现低成本对ChatGLM3-6B的量化加速

文章目录 1. 参考2. ChatGLM3 介绍3. 本地运行3.1 硬件配置3.2 下载ChatGLM3代码3.3 量化模型3.4 编译和运行3.4.1 编译3.4.12 运行 4. python绑定4.1 安装4.2 使用预先转换的 GGML 模型 总结 前面两章分别有讲到基于MacBook Pro M1芯片运行chatglm2-6b大模型和如何在本地部署c…