python基于ModBusTCP服务端的业务实现特定的client

python实现ModBusTCP协议的client是一件简单的事情,只要通过pymodbus、pyModbusTCP等模块都可以实现,本文采用pymodbus。但要基于ModBusTCP服务端的业务实现特定的client,那得看看服务端是否复杂。前面系列文章,我们学习了对服务端的简单交互,便是得力于服务端简单的业务流程,本文将实现有点复杂的业务流程。

一、业务描述

我们将使用python脚本来实现一个ModBusTCP client,他能够触发设备进行拍照,然后读取拍照情况。

1、ModBusTCP服务端交互流程

大概类似如下交互时序,其中PLC将用我们的脚本代替。

2、控制

根据上述业务,服务器需要有寄存器存储客户端写入的控制位,存储顺序如下。

3、状态

根据上述业务,服务器需要有寄存器存储状态位,让客户端来读取,存储顺序如下。

4、结果

根据上述业务,服务器需要有寄存器存储结果,让客户端来读取,存储顺序如下。

5、 地址空间

(1)控制

类型:HoldingRegisters、Coils

起始地址与寄存器数量:自定义

(2)状态

类型:HoldingRegisters、DiscreteInputs、InputRegisters

起始地址与寄存器数量:自定义

(3)结果

类型:HoldingRegisters、InputRegisters

起始地址与寄存器数量:自定义

二、程序整体设计

class myModBusTCPclient(object):def __init__(self, obj):self.obj = obj# 状态读取后转成二进制,分别对应TriggerReady等状态def custom_binary(self, num, length=16, ByteSwap=0):# 将整数转换为二进制字符串binary_str = bin(num)[2:]# 计算需要补充的零的个数zeros_to_add = length - len(binary_str)# 构造符合规则的二进制字符串result_str = '0' * zeros_to_add + binary_str# 翻转二进制,如01转为10,方便后续取值result_str = result_str[::-1]if ByteSwap==0:return result_strelif ByteSwap==1:  # 需要字节交换时return result_str[8:] + result_str[:8]else:raise ValueError("ByteSwap 的值错误!")# 控制写之前先将TriggerEnable等二进制控制位转成数值def custom_num(self, binary, length=16, ByteSwap=0):assert len(binary) == length, "输入的二进制长度不正确!"binary = binary[::-1]  # 翻转二进制,如01转为10,方便后续取值if ByteSwap==0:return int(binary, 2)elif ByteSwap==1:  # 需要字节交换时return int(binary[8:] + binary[:8], 2)else:raise ValueError("ByteSwap 的值错误!")def ctrl(self, addrtype="HoldingRegisters", ByteSwap=0, binary="0000000000000000", address=0, slave=1):if addrtype=="HoldingRegisters":value = self.custom_num(binary[0:16], ByteSwap=ByteSwap)self.obj.write_registers(address, value, slave=slave)elif addrtype=="Coils":...else:raise ValueError("ctrl_addrtype的值错误!")def status(self, addrtype="HoldingRegisters", ByteSwap=0, reg_addr=2, reg_nb=2, slave=1):if addrtype=="HoldingRegisters":value = self.obj.read_holding_registers(reg_addr, reg_nb, slave=slave)value_list = [value.registers[i] for i in range(reg_nb)]  # 整型列表print("status HoldingRegisters:", value_list)print([self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list])return [self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list]elif addrtype=="InputRegisters":...elif addrtype=="DiscreteInputs":...else:raise ValueError("status_addrtype的值错误!")def plc_out(self, addrtype="HoldingRegisters", ByteSwap=0):if addrtype=="HoldingRegisters":...elif addrtype=="InputRegisters":...else:raise ValueError("plc_out_addrtype的值错误!")if __name__ == "__main__":# Modbus TCP服务器的IP地址和端口号server_ip = "192.168.1.196"port = 502station = 1# 创建Modbus TCP客户端MDclient = ModbusTcpClient(server_ip, port)if MDclient.connect():myclient = myModBusTCPclient(MDclient)myclient.ctrl(ByteSwap=1, binary="1100000000000000")time.sleep(1)myclient.status(ByteSwap=1)

1、程序结构

上述代码定义了一个名为 myModBusTCPclient 的类,用于与 Modbus TCP 服务器进行通信。下面是对程序结构的分析:

构造函数 __init__

接收一个参数 obj,表示 Modbus TCP 客户端对象。将这个对象存储在实例变量 self.obj 中。

custom_binary 方法:

将给定的整数 num 转换为指定长度 length 的二进制字符串。可选参数 ByteSwap 用于指定是否进行字节交换。如果 ByteSwap 为 1,则进行字节交换,否则不进行。返回构造好的二进制字符串。

custom_num 方法:

接收一个二进制字符串 binary,根据给定的长度 length 和是否进行字节交换 ByteSwap 将其转换为整数。返回转换得到的整数。

ctrl 方法:

根据给定的地址类型 addrtype(默认是 "HoldingRegisters")、是否进行字节交换 ByteSwap、二进制字符串 binary、Modbus 地址 address 和从站号 slave,向 Modbus 服务器写入数据。如果地址类型是 "HoldingRegisters",则使用 write_registers 方法写入寄存器。

status 方法:

根据给定的地址类型 addrtype、是否进行字节交换 ByteSwap、寄存器地址 reg_addr、寄存器数量 reg_nb 和从站号 slave,从 Modbus 服务器读取数据。如果地址类型是 "HoldingRegisters",则使用 read_holding_registers 方法读取寄存器。

plc_out 方法:

根据给定的地址类型 addrtype 和是否进行字节交换 ByteSwap,执行一些 Modbus 操作。具体操作需要根据地址类型的不同进行扩展。

if __name__ == "__main__": 部分:

在脚本独立运行时进行的操作。创建了一个 Modbus TCP 客户端对象 MDclient。通过 myModBusTCPclient 类创建了一个自定义的客户端对象 myclient。调用了 ctrl 方法,向 Modbus 服务器写入数据。等待了一秒钟。调用了 status 方法,从 Modbus 服务器读取数据。

2、不同地址空间的请求

在ModbusTCP中,对于不同的寄存器,请求方式是不一样的,因此需要根据服务端的设置相应更改。

为了满足业务,需要对原有的pymodbus进行封装改造。

为了满足大端序和小端序,需要加入字节交换的操作。

三、程序实现

import json
import time
import socket
from pymodbus.client import ModbusTcpClientclass myModBusTCPclient(object):def __init__(self, obj):self.obj = obj# 状态读取后转成二进制,分别对应TriggerReady等状态def custom_binary(self, num, length=16, ByteSwap=0):# 将整数转换为二进制字符串binary_str = bin(num)[2:]# 计算需要补充的零的个数zeros_to_add = length - len(binary_str)# 构造符合规则的二进制字符串result_str = '0' * zeros_to_add + binary_str# 根据业务逻辑需要,翻转二进制,如01转为10,方便后续取值result_str = result_str[::-1]if ByteSwap==0:return result_strelif ByteSwap==1:  # 需要字节交换时return result_str[8:] + result_str[:8]else:raise ValueError("ByteSwap 的值错误!")# 控制写之前先将TriggerEnable等二进制控制位转成数值def custom_num(self, binary, length=16, ByteSwap=0):assert len(binary) == length, "输入的二进制长度不正确!"binary = binary[::-1]  # 根据业务逻辑需要,翻转二进制,如01转为10,方便后续取值if ByteSwap==0:return int(binary, 2)elif ByteSwap==1:  # 需要字节交换时return int(binary[8:] + binary[:8], 2)else:raise ValueError("ByteSwap 的值错误!")def result_ByteSwap(self, num, length=16, ByteSwap=0):# 将整数转换为二进制字符串binary_str = bin(num)[2:]# 计算需要补充的零的个数zeros_to_add = length - len(binary_str)# 构造符合规则的二进制字符串result_str = '0' * zeros_to_add + binary_strif ByteSwap == 1:return result_strelif ByteSwap == 0:  # 需要字节交换时return result_str[8:] + result_str[:8]else:raise ValueError("ByteSwap 的值错误!")def ctrl(self, addrtype="HoldingRegisters", ByteSwap=0, binary="0000000000000000", address=0, slave=1):if addrtype=="HoldingRegisters":value = self.custom_num(binary[0:16], ByteSwap=ByteSwap)self.obj.write_registers(address, value, slave=slave)elif addrtype=="Coils":values = []for i in binary:if i == "0":values.append(False)elif i == "1":values.append(True)print(values)# 线圈不存在字节交换# value = self.custom_num(binary[0:16], ByteSwap=0)# print("------------------------------values :", [i for i in bin(value)[2:]])# self.obj.write_coils(address=address, values=[i for i in bin(value)[2:]], slave=slave)self.obj.write_coils(address=address, values=values, slave=slave)else:raise ValueError("ctrl_addrtype的值错误!")def status(self, addrtype="HoldingRegisters", ByteSwap=0, reg_addr=2, reg_nb=2, slave=1):if addrtype=="HoldingRegisters":value = self.obj.read_holding_registers(reg_addr, reg_nb, slave=slave)value_list = [value.registers[i] for i in range(reg_nb)]  # 整型列表print("status HoldingRegisters:", value_list)print([self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list])return [self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list]elif addrtype=="InputRegisters":value = self.obj.read_input_registers(address=reg_addr, count=reg_nb, slave=slave)value_list = [value.registers[i] for i in range(reg_nb)]  # 整型列表print("result InputRegisters:", value_list)print([self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list])return [self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list]elif addrtype=="DiscreteInputs":# 离散寄存器不存在字节交换value = self.obj.read_discrete_inputs(address=reg_addr, count=reg_nb, slave=slave)print([value.bits[i] for i in range(reg_nb)])result = ""for i in [value.bits[i] for i in range(reg_nb)]:if i == False:result += "0"elif i == True:result += "1"# [value.bits[i] for i in range(reg_nb)]return [result[i:i+16] for i in range(0, len(result), 16)]else:raise ValueError("status_addrtype的值错误!")def plc_out(self, addrtype="HoldingRegisters", ByteSwap=0, reg_addr=100, reg_nb=100, slave=1):if addrtype=="HoldingRegisters":value = self.obj.read_holding_registers(reg_addr, reg_nb, slave=slave)value_list = [value.registers[i] for i in range(reg_nb)]  # 整型列表print("result HoldingRegisters:", value_list)print([int(self.result_ByteSwap(i, ByteSwap=ByteSwap), 2) for i in value_list])return [int(self.result_ByteSwap(i, ByteSwap=ByteSwap), 2) for i in value_list]elif addrtype=="InputRegisters":value = self.obj.read_input_registers(reg_addr, reg_nb, slave=slave)value_list = [value.registers[i] for i in range(reg_nb)]  # 整型列表print("result InputRegisters:", value_list)print([int(self.result_ByteSwap(i, ByteSwap=ByteSwap), 2) for i in value_list])return [int(self.result_ByteSwap(i, ByteSwap=ByteSwap), 2) for i in value_list]else:raise ValueError("plc_out_addrtype的值错误!")def plc_out_only_result(self, addrtype="HoldingRegisters", ByteSwap=0, reg_addr=100, reg_nb=100, slave=1):result_list = self.plc_out(addrtype=addrtype, ByteSwap=ByteSwap, reg_addr=reg_addr, reg_nb=reg_nb, slave=slave)[5:]result_list_have = [i for i in result_list if i != 0]result_binary = ""result = ""for num in result_list_have:# 将整数转换为二进制字符串binary_str = bin(num)[2:]# 计算需要补充的零的个数zeros_to_add = 16 - len(binary_str)# 构造符合规则的二进制字符串result_str = '0' * zeros_to_add + binary_strresult_binary += result_strfor binary in [result_binary[i:i+8] for i in range(0, len(result_binary), 8)]:  # 以长度8进行分割if binary != "00000000":result += chr(int(binary, 2))return resultif __name__ == "__main__":# Modbus TCP服务器的IP地址和端口号server_ip = "192.168.1.196"port = 502station = 1# 创建Modbus TCP客户端MDclient = ModbusTcpClient(server_ip, port)if MDclient.connect():ByteSwap = 0ctrl_addrtype = "Coils"  # HoldingRegisters、Coilsstatus_addrtype = "HoldingRegisters"  # HoldingRegisters、DiscreteInputs、InputRegistersresult_addrtype = "InputRegisters"  # HoldingRegisters、InputRegistersmyclient = myModBusTCPclient(MDclient)# 初始化控制位状态ctrl = "0000000000000000"myclient.ctrl(addrtype=ctrl_addrtype, ByteSwap=ByteSwap, binary="0000000000000000")# 使能TriggerEnable并常置为1TriggerEnable = "1"ctrl = TriggerEnable + ctrl[1:]while True:print("ctrl: ", ctrl)# 写入控制myclient.ctrl(addrtype=ctrl_addrtype, ByteSwap=ByteSwap, binary=ctrl, address=0)ResultACK = "0"ctrl = ctrl[:3] + ResultACK + ctrl[4:]# 查询状态status = myclient.status(addrtype=status_addrtype, ByteSwap=ByteSwap, reg_addr=2, reg_nb=2)[0]TriggerReady = status[0]print("------TriggerReady", TriggerReady)TriggerACK = status[1]ResultOKorNG = status[11]# print("---------------------------------ResultOKorNG:", ResultOKorNG)if TriggerReady == "1":Trigger = "1"ctrl = ctrl[:1] + TriggerEnable + ctrl[2:]print("-----------------ctrl")if TriggerACK == "1":Trigger = "0"ctrl = ctrl[:1] + Trigger + ctrl[2:]if ResultOKorNG == "1":# print("---------------------------------------")ResultACK = "1"ctrl = ctrl[:3] + ResultACK + ctrl[4:]# myclient.plc_out(ByteSwap=1)result = myclient.plc_out_only_result(addrtype=result_addrtype, ByteSwap=ByteSwap)print(result)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/265652.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

推荐几款常用Web自动化测试神器!

1、介绍 Web自动化测试在保证质量、提升效率、软件开发加速迭代上起到关键作用,它已经成为现代软件测试中不可或缺的一部分,今天给大家介绍推荐几款常用的Web自动化测试工具。 2、常用测试工具 常用的Web自动化测试工具包括: Selenium&am…

什么是双电机打包机?

我们都知道有的机器可能不只有一个电机,像我们公司最新生产的打包机,双电机打包机就是有两个电机,一个是退带电机,一个是进带电机,两个电机的好处就是更能提高包装速度,而且还能减少故障录,双电…

风控之Android设备指纹技术

标识性参数——Android ID、IMEI、OAID非标识性参数 非标识性参数——手机运营商 1 设备指纹 简单来讲,设备指纹是指用于标识出该设备的设备特征。可以是单一设备特征,也可以是多种设备特征的组合,以方便风控系统对设备的唯一性进行识别。…

图解python | 元组

1.Python元组 Python的元组与列表类似,不同之处在于元组的元素不能修改。 元组使用小括号,列表使用方括号。 元组创建很简单,只需要在括号中添加元素,并使用逗号隔开即可。 python 复制代码 tup1 (ByteDance, ShowMeAI, 199…

E4990A 阻抗分析仪,20 Hz 至 10/20/30/50/120 MHz

01 E4990A 阻抗分析仪 20 Hz 至 10/20/30/50/120 MHz 产品综述: E4990A 阻抗分析仪具有 20 Hz 至 120 MHz 的频率范围,可在宽阻抗范围内提供出色的 0.045%(典型值)基本准确度,并内置 40 V 直流偏置源,适…

空中消防员:无人机森林防火应用全面升级

森林是生态系统的重要组成部分,也是人类得以生存的关键。我国森林面积广大,存在火灾频发的困境。提升森林火灾防控能力是维护生态平衡、保护资源和保障人民生命安全的必要步骤。随着无人机技术的发展,其在无人机森林防火中的应用为传统巡查工…

租房小程序源码开发中的5大注意事项

在租房小程序源码开发过程中,有许多关键的注意事项需要开发者和团队特别留意。作为该领域的专家,我将分享5大重要事项,以确保你的租房小程序源码开发顺利进行。 1. 确保代码可靠性和安全性 租房小程序源码的可靠性和安全性是开发过程中最重…

2024年天津农学院专升本专业课网上报名确认时间的通知

天津农学院2024年高职升本科专业课考试报名的通知 一、专业课考试报名时间及办法 1、网上报名 考生请于2023年12月18日9点至12月21日24点(退役大学生士兵考生无须参加此次报名)登陆报名系统http://gzsb.tjau.edu.cn,填写报名信息&#xff0…

centos7上安装mysql5.7

1 下载mysql5.7网址 下载后缀名为“.tar.gz”的压缩包 连接虚拟机后 输入: rz 找到你下载的压缩包 2 解压缩 tar -zxvf mysql-5.7.26-linux-glibc2.12-x86_64.tar.gz将减压后的文件移动到/usr/local文件夹下并重命名为mysql mv mysql-5.7.26-linux-glibc2.12-x8…

少儿编程考级:激发孩子逻辑思维能力的关键

在当今信息化时代,少儿编程已经成为孩子们不可或缺的一项技能。而少儿编程考级,则是检验孩子们在这一技能上所取得的成就的重要途径。少儿编程考级不仅能够激发孩子们的逻辑思维能力,还能够提高他们的动手能力和创造力。6547网将详细介绍少儿…

在Ascend昇腾硬件用npu加速paddleLite版本ocr(nnadapter)

在Ascend昇腾硬件用npu加速paddleLite版本ocr(nnadapter) 参考文档* nnadapter参考文档地址* 华为昇腾 NPU参考文档地址* PaddleLite的CAPI参考文档 一.确保cpu版本运行正常二.编译Ascend上npu加速库三.跑通npu加速版本Demo1.Demo下载地址2.参考手册网址…

【设计模式--结构型--外观模式】

设计模式--结构型--外观模式 外观模式定义结构案例优缺点使用场景 外观模式 定义 又称门面模式,时一种通过多个复杂的子系统提供一个一致的接口,而使这些子系统更加容易被访问的模式。该模式对外有一个统一接口,外部应用程序不用关心内部 子…