通过物联网管理多台MQTT设备-基于米尔T527开发板

本篇测评由电子工程世界的优秀测评者“JerryZhen”提供。


本文将介绍基于米尔电子MYD-LT527开发板的网关方案测试。

一、系统概述

基于米尔-全志 T527设计一个简易的物联网网关,该网关能够管理多台MQTT设备,通过MQTT协议对设备进行读写操作,同时提供HTTP接口,允许用户通过HTTP协议与网关进行交互,并对设备进行读写操作。

二、系统架构

  1. 网关服务:基于FastAPI框架构建的Web服务,提供HTTP接口。

  2. MQTT客户端:负责与MQTT设备通信,管理设备连接、消息发布和订阅。

  3. 设备管理:维护一个设备列表,记录设备的基本信息和状态。

  4. 数据存储:使用内存或数据库存储设备数据,确保数据持久化。

三、组件设计

  1. MQTT组件

  • 负责与MQTT broker建立连接。
  • 订阅设备主题,接收设备发送的消息。
  • 发布消息到设备,实现远程控制。
  1. 设备管理组件:

  • 维护一个设备列表,记录设备的唯一标识符(如设备ID)、MQTT主题、连接状态等信息。
  • 提供设备增删改查的方法。
  1. HTTP组件:

  • 基于FastAPI定义HTTP接口。
  • 接收用户请求,调用MQTT组件和设备管理组件进行相应操作。
  • 返回操作结果给用户。

四、接口设计

  1. 设备列表:

  • GET /devices:返回所有设备的列表。
  • POST /devices:添加新设备到网关。
  • DELETE /devices/{device_id}:从网关中删除指定设备。
  1. 设备详情:

  • GET /devices/{device_id}:返回指定设备的详细信息。
  1. 设备数据:

  • GET /devices/{device_id}/data:获取指定设备的最新数据。
  • POST /devices/{device_id}/data:发送数据到指定设备。
  1. 设备控制:

  • POST /devices/{device_id}/control:发送控制命令到指定设备。

五、数据结构设计

  1. 设备信息:

  • 设备ID (device_id):唯一标识设备的字符串。
  • MQTT主题 (mqtt_topic):设备在MQTT broker上的主题。
  • 连接状态 (connection_status):表示设备是否在线的布尔值。
  • 其他设备属性(如名称、描述等)。
  1. 设备数据:

  • 设备ID (device_id):关联设备信息的设备ID。
  • 时间戳 (timestamp):数据发送或接收的时间。
  • 数据内容 (data):设备发送或接收的具体数据,可以是JSON格式或其他格式。

六、安全性考虑

  • 使用HTTPS协议提供安全的HTTP通信。

  • 实现用户认证和授权机制,确保只有授权用户可以访问和操作设备。

  • 对于敏感操作(如删除设备),要求用户进行二次确认或提供额外的安全措施。

七、部署与扩展

  • 使用Docker容器化部署网关服务,便于管理和扩展。

  • 根据需要,可以水平扩展网关实例以处理更多的设备连接和请求。

八、实现步骤

  1. 安装所需的Python库:fastapi, uvicorn, paho-mqtt等。

  2. 创建FastAPI应用并定义路由。

  3. 实现MQTT组件,包括与MQTT broker的连接、订阅、发布等功能。

  4. 实现设备管理组件,维护设备列表并提供增删改查的方法。

  5. 实现HTTP组件,调用MQTT组件和设备管理组件处理用户请求。

  6. 编写测试代码,验证网关的各项功能是否正常工作。

  7. 部署网关服务并监控其运行状态。

该设计方案仅仅是概述,具体实现细节可能需要根据实际需求和项目环境进行调整和优化。在实际开发中,还需要考虑异常处理、日志记录、性能优化等方面的问题。基于上述设计方案,以下是一个简化版的参考代码,展示了如何使用FastAPI和paho-mqtt库来创建一个物联网网关。需要注意,示例中不包含完整的错误处理、用户认证和授权机制,这些在实际生产环境中都是必不可少的。依赖的主要库版本:

fastapi==0.108.0

paho-mqtt==1.6.1

网关模拟代码gateway.py:

from fastapi import FastAPI, HTTPException, Body, status
from paho.mqtt.client import Client as MQTTClient
from typing import List, Dict, Any
import asyncio
import jsonapp = FastAPI()
mqtt_client = None
device_data = {}  subtopic="gateway/device/#"# MQTT回调函数
def on_message(client, userdata, msg): payload = msg.payload.decode()topic = msg.topicdevice_id = topic.split('/')[-1]device_data[device_id] = payloadprint(f"Received message from {device_id}: {payload}")  # MQTT连接和订阅
def mqtt_connect_and_subscribe(broker_url, broker_port):global mqtt_clientmqtt_client = MQTTClient()mqtt_client.on_message = on_messagemqtt_client.connect(broker_url, broker_port, 60)mqtt_client.subscribe(subtopic)mqtt_client.loop_start()# MQTT发布消息
async def mqtt_publish(topic: str, message: str):if mqtt_client is not None and mqtt_client.is_connected():mqtt_client.publish(topic, message)else: print("MQTT client is not connected!")# 设备管理:添加设备
@app.post("/devices/", status_code=status.HTTP_201_CREATED)
async def add_device(device_id: str):device_data[device_id] = Nonereturn {"message": f"Device {device_id} added"}# 设备管理:获取设备列表
@app.get("/devices/")
async def get_devices(): return list(device_data.keys())# 设备管理:获取设备数据
@app.get("/devices/{device_id}/data")
async def get_device_data(device_id: str):if device_id not in device_data:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Device {device_id} not found")return device_data.get(device_id)# 设备管理:发送数据到设备
@app.post("/devices/{device_id}/data")
async def send_data_to_device(device_id: str, data: Dict[str, Any] = Body(...)):topic = f"devices/{device_id}"message = json.dumps(data)await mqtt_publish(topic, message)return {"message": f"Data sent to {device_id}"}# 设备控制:发送控制命令到设备
@app.post("/devices/{device_id}/control")
async def control_device(device_id: str, command: str):topic = f"devices/device/{device_id}"await mqtt_publish(topic, command)return {"message": f"Control command sent to {device_id}"}# FastAPI启动事件
@app.on_event("startup")
async def startup_event():mqtt_connect_and_subscribe("127.0.0.1", 1883)# FastAPI关闭事件
@app.on_event("shutdown")
async def shutdown_event():if mqtt_client is not None:mqtt_client.loop_stop()mqtt_client.disconnect()# 运行FastAPI应用
if __name__ == "__main__":import uvicornuvicorn.run(app, host="127.0.0.1", port=8000)

设备1模拟代码 dev1.py:

import paho.mqtt.client as mqtt# 连接成功回调
def on_connect(client, userdata, flags, rc):print('Connected with result code '+str(rc))client.subscribe('devices/1')# 消息接收回调
def on_message(client, userdata, msg):print(msg.topic+" "+str(msg.payload))client.publish('gateway/device/1',payload=f'echo {msg.payload}',qos=0)client = mqtt.Client()# 指定回调函数
client.on_connect = on_connect
client.on_message = on_message# 建立连接
client.connect('127.0.0.1', 1883)
# 发布消息
client.publish('gateway/device/1',payload='Hello, I am device',qos=0)client.loop_forever()

设备2模拟代码 dev2.py

import paho.mqtt.client as mqtt# 连接成功回调
def on_connect(client, userdata, flags, rc):print('Connected with result code '+str(rc))client.subscribe('devices/2')# 消息接收回调
def on_message(client, userdata, msg):print(msg.topic+" "+str(msg.payload))client.publish('gateway/device/2',payload=f'echo {msg.payload}',qos=0)client = mqtt.Client()# 指定回调函数
client.on_connect = on_connect
client.on_message = on_message# 建立连接
client.connect('127.0.0.1', 1883)
# 发布消息
client.publish('gateway/device/2',payload='Hello, I am device',qos=0)client.loop_forever()

运行网关代码,打开网页得到api接口:

 通过api分别添加设备1和设备2,

在另外两个控制台中分别运行模拟设备1和模拟设备2的代码

通过网页API向设备1发送数据

通过网页API获得设备回复的数据,设备代码中只是简单的把网关发过来的数据进行回传

我们在网关的后台可以看到完整的数据流

至此一个简易的网关已经实现了,接下来将会尝试实现楼宇里的最常见的bacnet设备进行通讯管理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/691232.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

找不到或无法加载主类 com.ruoyi.RuoYiApplication

若依项目,很久不启动,莫名其妙报错 找不到或无法加载主类 com.ruoyi.RuoYiApplication 解决方式 参考文章 找不到或无法加载主类_错误: 找不到或无法加载主类 com.ruoyi.ruoyiapplication-CSDN博客

pytest + yaml 框架 - 录制接口转 yaml 用例实现

pytest yaml 框架基本不用写 python 代码,只需写yaml 文件用例就能实现接口自动化。 现在引入接口录制功能,连 yaml 文件也不用写了,点点点就能生成 yaml 用例文件了。 录制功能在v1.3.4版本上实现 pip instal pytest-yaml-yoyo 环境准备 …

yo!这里是socket网络编程相关介绍

目录 前言 基本概念 源ip&&目的ip 源端口号&&目的端口号 udp&&tcp初识 socket编程 网络字节序 socket常见接口 socket bind listen accept connect 地址转换函数 字符串转in_addr in_addr转字符串 套接字读写函数 recvfrom&&a…

如何同时或者按顺序间隔启动多个程序

首先,需要用到的这个工具: 度娘网盘 提取码:qwu2 蓝奏云 提取码:2r1z 1、打开工具,切换到定时器模块,快捷键:Ctrl3 2、新建一个定时器,我这里演示同时打开多个程序(比…

棱镜七彩参编《网络安全技术 软件供应链安全要求》国家标准发布

据全国标准信息公共服务平台消息显示,《网络安全技术 软件供应链安全要求》(GB/T 43698-2024)国家标准已于2024年4月25日正式发布,并将于2024年11月1日正式实施。棱镜七彩作为主要编制单位之一参与该国家标准的编制,为…

标准引领 | 竹云参编《面向云计算的零信任体系》行业标准正式发布!

近日,中华人民共和国工业和信息化部公告2024年第4号文件正式发布行业标准:YD/T 4598.1-2024《面向云计算的零信任体系 第1部分:总体架构》(后简称“总体架构”),并于2024年7月1日起正式实施。 该标准汇集大…

5.神经网络-激活函数

目录 1. 激活函数不是阶跃函数 1.1 激活函数和阶跃函数都是非线性函数 1.2 激活函数不是阶跃函数 2. sigmoid 函数 2.1 sigmoid 函数表达式 2.2 sigmoid 函数 Python 实现 2.4 sigmoid 函数图 3. ReLU 函数 3.1 ReLU 函数表达式 3.2 ReLU 函数 Python 实现 3.4 ReLU…

重写muduo之TcpServer

目录 1、Callbacks.h 2、TcpServer.h 3、TcpServer.cc 1、Callbacks.h 回调操作 #pragma once#include <memory> #include <functional>class Buffer; class TcpConnection;using TcpConnectionPtrstd::shared_ptr<TcpConnection>; using ConnectionCall…

【class5】建立人工智能系统(2)

【昨日内容复习】 进行监督学习时&#xff0c;第一个步骤是提取数据集的文本特征和对应的标签。 提取文本特征的具体步骤如下&#xff1a; STEP1. 构造词袋模型&#xff0c;提取数据集中的文本特征 STEP2. 使用toarray()函数&#xff0c;将X转换为一个NumPy数组&#xff0c;方…

74从零开始学Java之排序算法中的冒泡和选择排序

作者:孙玉昌,昵称【一一哥】,另外【壹壹哥】也是我哦 CSDN博客专家、万粉博主、阿里云专家博主、掘金优质作者 前言 我们要想成为一个优秀的程序员,其实非常关键的一点就是要锻炼培养自己的编程思维,就好比一个狙击手,要通过大量的射击训练要用大量的子弹喂出来。同样的…

Linux(centos7)系统配置 ntpd服务设置时间同步

一 、应用场景 两台服务器,要求使他们时间同步,有人问为什么要时间同步?如果一个集群中,时间相差很大,那么会出现很多诡异的问题,你也不想在一个无法解决的问题上浪费几天时间吧!总之,设置服务器之间时间同步,为了避免很多问题的发生! ntpd(Network Time Protocol …

超详细的胎教级Stable Diffusion使用教程(五)

这套课程分为五节课&#xff0c;会系统性的介绍sd的全部功能和实操案例&#xff0c;让你打下坚实牢靠的基础 一、为什么要学Stable Diffusion&#xff0c;它究竟有多强大&#xff1f; 二、三分钟教你装好Stable Diffusion 三、小白快速上手Stable Diffusion 四、Stable dif…