Python pymodbus类库使用学习总结

news/2025/3/17 17:17:06/文章来源:https://www.cnblogs.com/shouke/p/18342169

实践环境

Python 3.9.13

https://www.python.org/ftp/python/3.9.13/python-3.9.13-amd64.exe

pymodbus-3.6.8-py3-none-any.whl

https://files.pythonhosted.org/packages/35/19/a9d16f74548d6750acf6604fa74c2cd165b5bc955fe021bf5e1fa04acf14/pymodbus-3.6.8-py3-none-any.whl

pyserial-3.5

备注:如果不安装该模块,采用串口通信时(下述代码中 comm = 'serial'时的通信),会报错NameError: name 'serial' is not defined

pip install pyserial

Virtual Serial Port Driver 6.9

链接:https://pan.baidu.com/s/15pCeYyOZWSsbEr_rU1e5hg?pwd=vspd
提取码:vspd

代码实践

修改pymodbus\logging.py

路径: PYTHON_HOME\Lib\site-packages\pymodbus\logging.py

修改源码是为了更方便使用类库自带日志打印器

# ...略
class Log:"""Class to hide logging complexity.:meta private:"""# _logger = logging.getLogger(__name__) # commented by shouke_logger = logging.getLogger('logger') # added by shouke # 方便在不同模块获取日志打印器@classmethoddef apply_logging_config(cls, level, log_file_name):"""Apply basic logging configuration."""if level == logging.NOTSET:level = cls._logger.getEffectiveLevel()if isinstance(level, str):level = level.upper()log_stream_handler = logging.StreamHandler()log_formatter = logging.Formatter("%(asctime)s %(levelname)-5s %(module)s:%(lineno)s %(message)s")log_stream_handler.setFormatter(log_formatter)##################### commented by shouke ################## cls._logger.addHandler(log_stream_handler)# if log_file_name:#     log_file_handler = logging.FileHandler(log_file_name)#     log_file_handler.setFormatter(log_formatter)#     cls._logger.addHandler(log_file_handler)############################################################################### added by shouke #################if not cls._logger.handlers: # 如果不加这个判断,在不同模块都执行pymodbus_apply_logging_config函数时,会导致同一条日志被重复打印cls._logger.addHandler(log_stream_handler)if log_file_name:log_file_handler = logging.FileHandler(log_file_name)log_file_handler.setFormatter(log_formatter)cls._logger.addHandler(log_file_handler)##########################################################cls.setLevel(level)# ...略pymodbus_apply_logging_config(level=logging.INFO) # 统一日志打印器配置

异步服务器和异步客户端实现

异步服务器代码实现

server_async.py

#!/usr/bin/env python3
'''Pymodbus 异步服务器示例多线程异步服务器的一个示例。
'''import asyncioimport logging
### 不修改 PYTHON_HOME\Lib\site-packages\pymodbus\logging.py源码的基础上,获取日志打印器相关方法及说明:
# 方法1
# _logger = logging.getLogger(__name__) # 采用该_logger打印的日志看不到
# logging.basicConfig(level=logging.DEBUG) # 不注释上行代码的基础上添加这行代码,日志能打印出来,但是日志所在模块不为当前模块# 方法2
# _logger = logging.getLogger(__file__)
# logging.basicConfig(level=logging.INFO) #  添加这行代码,确保日志能打印出来,但是打印出来的日志所在模块不为当前模块# 方法3:
# from pymodbus.logging import Log,pymodbus_apply_logging_config
# pymodbus_apply_logging_config(level=logging.INFO) # 如果缺少这行代码,下面async_helper中的日志打印将无法在控制台输出
# _logger = Log._logger# 修改 PYTHON_HOME\Lib\site-packages\pymodbus\logging.py源码的基础上,获取日志打印器相关方法:
_logger = logging.getLogger('logger')
# _logger.setLevel(logging.INFO)from pymodbus import __version__ as pymodbus_version
from pymodbus.datastore import (ModbusSequentialDataBlock,ModbusServerContext,ModbusSlaveContext,ModbusSparseDataBlock,
)
from pymodbus.device import ModbusDeviceIdentification
from pymodbus.server import (StartAsyncSerialServer,StartAsyncTcpServer,StartAsyncTlsServer,StartAsyncUdpServer,
)# hook函数
def server_request_tracer(request, *_addr):"""跟踪请求所有服务器请求在被处理之前都经过此过滤器"""_logger.info(f'---> REQUEST: {request}') # 输出,类似这样: ---> REQUEST: ReadBitRequest(0,1)def server_response_manipulator( response):"""操纵响应所有服务器响应在发送之前都通过此过滤器过滤器返回:- 响应,原始或修改后的- 跳过编码,发出是否对响应进行编码的信号"""_logger.info(f'---> RESPONSE: {response}') # 输出,类似这样:---> RESPONSE: ReadCoilsResponse(1)# response.should_respond = False # 如果让该行代码生效,则客户端收不到服务器响应return response, Falseclass Args:comm = 'tcp' # 通讯模式,可选值 tcp、udp serial、tlscomm_defaults = {'tcp': ['socket', 5020],# 'tcp': ['rtu', 5020], # 如果采用ModbusRTU协议 则使用这个'udp': ['socket', 5020],# 'serial': ['rtu', '/dev/ptyp0'], # Linux'serial': ['rtu', 'COM1']  # Windows(本例中COM1和COM2是成对的)'tls': ['tls', 5020]}framer = comm_defaults[comm][0] # 帧类型port = comm_defaults[comm][1] # 站点端口host = ''baudrate = 9600 # 串行设备波特率 即每秒传输的位数# 连续的、无间隙顺序存储数据块(寄存器块)datablock = lambda : ModbusSequentialDataBlock(0x01, [17] * 100)  # pylint: disable=unnecessary-lambda-assignment# 连续的,或者可能有间隙的稀疏、不规则存储数据块# datablock = lambda : ModbusSparseDataBlock({0x00: 0, 0x05: 1})  # pylint: disable=unnecessary-lambda-assignment# 工厂模式# datablock = lambda : ModbusSequentialDataBlock.create()  # pylint: disable=unnecessary-lambda-assignment,unnecessary-lambda# 如果从节点数量不为0#服务器使用服务器上下文,该上下文允许服务器,以针对不同的从设备ID使用不同的从站上下文(slave context)进行响应。# 默认情况下,它将为提供的每个slave ID返回相同的上下文(广播模式)。# 但是,可以通过将single标识设置为False并且提供slave ID到上下文映射的字典来覆盖这种行为:# 从机上下文也可以按zero_mode初始化,这意味着到地址(0-7)的请求将映射到地址(0-7)。# 默认值为False,因此地址(0-7)将映射到(1-8):# context = {#     0x01: ModbusSlaveContext( # 0x01为从设备、从机地址#         di=datablock(), # 输入离散量#         co=datablock(), # 输出线圈#         hr=datablock(), # 保持寄存器#         ir=datablock(), # 输入寄存器#     ),#     0x02: ModbusSlaveContext(#         di=datablock(),#         co=datablock(),#         hr=datablock(),#         ir=datablock(),#     ),#     0x03: ModbusSlaveContext(#         di=datablock(),#         co=datablock(),#         hr=datablock(),#         ir=datablock(),#         zero_mode=True,#     ),#     }# single = False#如果从节点数量为0context = ModbusSlaveContext(di=datablock(), co=datablock(), hr=datablock(), ir=datablock())single = True# 构建数据存储context = ModbusServerContext(slaves=context,single=single)# ----------------------------------------------------------------------- ## 初始化服务器信息# 不对属性字段做任何设置,则字段值默认为空字符串# ----------------------------------------------------------------------- #identity = ModbusDeviceIdentification(info_name={"VendorName": "Pymodbus","ProductCode": "PM","VendorUrl": "https://github.com/pymodbus-dev/pymodbus/","ProductName": "Pymodbus Server","ModelName": "Pymodbus Server","MajorMinorRevision": pymodbus_version,})# sslctx=sslctx,  # 用于TLS的SSLContext (默认为None, 自动创建)cert_file_path = './certificates/pymodbus.crt' # 用于TLS 证书文件路径  (如果sslctx为None时使用该文件)key_file_path = './certificates/pymodbus.key'  # 用于TLS 私钥文件路径 (如果sslctx为None时使用该文件)async def run_async_server():"""Run server."""txt = f'### start ASYNC server, listening on {Args.port} - {Args.comm}'_logger.info(txt)if Args.comm == 'tcp':address = (Args.host if Args.host else '', Args.port if Args.port else None)server = await StartAsyncTcpServer(context=Args.context,  # 数据存储identity=Args.identity,  # 服务器标识# TBD host=# TBD port=address=address,  # 监听地址# custom_functions=[],  # 允许自定义处理函数framer=Args.framer,  # 使用的帧策略# ignore_missing_slaves=True,  # 忽略对缺失的slave的请求# broadcast_enable=False,  # 是否允许广播 将 slave id 0视为广播地址# timeout=1  # 等待请求完成的时间 # waiting time for request to complete# 可选参数,实现hook功能request_tracer=server_request_tracer,response_manipulator=server_response_manipulator)elif Args.comm == 'udp':address = (Args.host if Args.host else "127.0.0.1",Args.port if Args.port else None,)server = await StartAsyncUdpServer(context=Args.context,identity=Args.identity,address=address,# custom_functions=[],framer=Args.framer,# ignore_missing_slaves=True,# broadcast_enable=False,# timeout=1)elif Args.comm == 'serial':# socat -d -d PTY,link=/tmp/ptyp0,raw,echo=0,ispeed=9600#             PTY,link=/tmp/ttyp0,raw,echo=0,ospeed=9600server = await StartAsyncSerialServer(context=Args.context,identity=Args.identity,# timeout=1,  # waiting time for request to completeport=Args.port,  # 串口# custom_functions=[],framer=Args.framer,# stopbits=1,  # 要使用的停止位数(The number of stop bits to use)# bytesize=8,  # 序列化消息字节大小(The bytesize of the serial messages)# parity="N",  # 使用哪种奇偶校验baudrate=Args.baudrate,  # 用于串行设备的波特率# handle_local_echo=False,  # 处理USB-to-RS485适配器的本地echo(Handle local echo of the USB-to-RS485 adaptor)# ignore_missing_slaves=True,  # ignore request to a missing slave# broadcast_enable=False,  ## strict=True,  # 使用严格的计时,针对Modbus RTU 为t1.5(use strict timing, t1.5 for Modbus RTU))elif Args.comm == 'tls':address = (Args.host if Args.host else '', Args.port if Args.port else None)server = await StartAsyncTlsServer(context=Args.context,  # Data storagehost='localhost',  # 定义用于连接的tcp地址# port=port,  # tcp监听端口identity=Args.identity,  # server identify# custom_functions=[],  # allow custom handlingaddress=address,framer=Args.framer,certfile=Args.cert_file_path,  # The cert file path for TLS (used if sslctx is None)# sslctx=sslctx,  # The SSLContext to use for TLS (default None and auto create)keyfile=Args.key_file_path,  # The key file path for TLS (used if sslctx is None)# password="none",  # 用于解密私钥文件的密码# ignore_missing_slaves=True,# broadcast_enable=False,# timeout=1)return serverasync def async_helper():_logger.info("Starting server...")await run_async_server()if __name__ == "__main__":asyncio.run(async_helper(), debug=True)

异步客户端代码实现

client_async.py

#!/usr/bin/env python3'''Pymodbus异步客户端示例
'''
import asyncio
import loggingimport pymodbus.client as modbusClient
from pymodbus import ModbusException_logger = logging.getLogger('logger')class Args:comm = 'tcp' # 通讯模式,可选值 tcp、udp serial、tlscomm_defaults = {'tcp': ['socket', 5020],# 'tcp': ['rtu', 5020], # 如果采用ModbusRTU协议 则使用这个'udp': ['socket', 5020],# 'serial': ['rtu', '/dev/ptyp0'], # Linux'serial': ['rtu', 'COM2']  # Windows(本例中COM1和COM2是用Virtual Serial Port Driver 6.9软件成对添加的虚拟端口)'tls': ['tls', 5020]}framer = comm_defaults[comm][0] # 帧类型port = comm_defaults[comm][1] # 站点端口host = '127.0.0.1' # 服务端地址baudrate = 9600 # 串行设备波特率timeout = 10 # 客户端访问服务器超时时间(该参数仅用于客户端(slave节点)),float型# sslctx=sslctx,  # 用于TLS的SSLContext (默认为None, 自动创建)cert_file_path = './certificates/pymodbus.crt' # 用于TLS 证书文件路径  (如果sslctx为None时使用该文件)key_file_path = './certificates/pymodbus.key'  # 用于TLS 私钥文件路径 (如果sslctx为None时使用该文件)async def run_async_client(modbus_calls=None):"""Run sync client."""_logger.info("### Create client object")if Args.comm == "tcp":client = modbusClient.AsyncModbusTcpClient(Args.host,port=Args.port,  # on which port# Common optional parameters:framer=Args.framer,timeout=Args.timeout,retries=3,reconnect_delay=1,reconnect_delay_max=10,#    retry_on_empty=False,# TCP setup parameters#    source_address=("localhost", 0),)elif Args.comm == "udp":client = modbusClient.AsyncModbusUdpClient(Args.host,port=Args.port,# Common optional parameters:framer=Args.framer,timeout=Args.timeout,#    retries=3,#    retry_on_empty=False,# UDP setup parameters#    source_address=None,)elif Args.comm == "serial":client = modbusClient.AsyncModbusSerialClient(Args.port,# Common optional parameters:#    framer=ModbusRtuFramer,timeout=Args.timeout,#    retries=3,#    retry_on_empty=False,# Serial setup parametersbaudrate=Args.baudrate,#    bytesize=8,#    parity="N",#    stopbits=1,#    handle_local_echo=False,#    strict=True,)elif Args.comm == "tls":client = modbusClient.AsyncModbusTlsClient(Args.host,port=Args.port,# Common optional parameters:framer=Args.framer,timeout=Args.timeout,#    retries=3,#    retry_on_empty=False,# TLS setup parameterssslctx=modbusClient.AsyncModbusTlsClient.generate_ssl(certfile=Args.cert_file_path,keyfile=Args.key_file_path,#    password="none",),server_hostname="localhost",)_logger.info("### Client starting")await client.connect()assert client.connectedif modbus_calls:await modbus_calls(client)client.close()_logger.info("### End of Program")async def run_a_few_calls(client):try:# 读取线圈状态rr = await client.read_coils(0, 1, slave=0) # 从 0x00 地址开始读取1个线圈print(rr.bits) # 输出:[True, False, False, False, False, False, False, False]# assert len(rr.bits) == 8print(rr) # 输出:ReadCoilsResponse(8)# 读输入离散量rr = await client.read_holding_registers(0, 1, slave=0) # 从 0x00 地址开始读取1个线圈print(rr.registers) # 输出:[17]rr = await client.read_holding_registers(4, 2, slave=0) # 从0x04 地址开始读取2个线圈# assert rr.registers[0] == 17# assert rr.registers[1] == 17print(rr.registers) # 输出:[17, 17]# 读保持寄存器rr = await client.read_holding_registers(5, 4) # 从 0x05 地址开始读取4个线圈print(rr.registers) # 输出:[17, 17, 17, 17]# 读输入寄存器rr = await client.read_input_registers(0x0F, 3, slave=0) # 从 0x0F 地址开始读取4个线圈print(rr.registers) # 输出:[17, 17, 17]rr = await client.read_input_registers(15, 3, slave=0) # 从 0x0F 地址开始读取4个线圈print(rr) # 输出:ReadInputRegistersResponse (3)print(rr.registers) # 输出:[17, 17, 17]# 写单个线圈rr = await client.write_coil(9, False, slave=0) # 将布尔值False写入 0x09 地址print(rr) # 输出:WriteCoilResponse(9) => Falserr = await client.read_coils(9, 1, slave=0)print(rr.bits) # 输出:[False, False, False, False, False, False, False, False]# 写多个线圈await client.write_coils(10, False, slave=0) # 将布尔值False写入 0x0A 地址rr = await client.read_coils(10, 1, slave=0)print(rr.bits) # 输出:[False, False, False, False, False, False, False, False]rr = await client.read_coils(11, 1, slave=0)print(rr.bits) # 输出:[True, False, False, False, False, False, False, False]await client.write_coils(10, [False, False], slave=0) # 将布尔值False写入 0x0A 0x0B 地址rr = await client.read_coils(11, 1, slave=0)print(rr.bits) # 输出:[False, False, False, False, False, False, False, False]# 写单个保持寄存器await client.write_register(12, 0x0F, slave=0) # 将0x0F写入 0x0C 地址rr = await client.read_input_registers(12, 3, slave=0)print(rr.registers) # 输出:[17, 17, 17]rr = await client.read_holding_registers(12, 4)print(rr.registers) # 输出:[15, 17, 17, 17]# 写多个保持寄存器await client.write_registers(13, 0x0F, slave=0) # 将0x0F写入 0x0D 地址rr = await client.read_holding_registers(13, 2)print(rr.registers) # 输出:[15, 17]await client.write_registers(13, [0x0F, 0x0E], slave=0) # 将0x0F写入 0x0D,0x0E 地址rr = await client.read_holding_registers(13, 2)print(rr.registers) # 输出:[15, 14]except ModbusException:passasync def main():await run_async_client(modbus_calls=run_a_few_calls)if __name__ == "__main__":asyncio.run(main(), debug=True)

相关说明

  1. pymodbus.datastore.ModbusSequentialDataBlock

    在 Modbus 协议中,数据通常被组织成多个数据块,而每个数据块包含一定数量的数据寄存器、者线圈或者离散量。该类是一个用于创建顺序排列的Modbus顺序数据存储数据块的类。

    例如:

    ModbusSequentialDataBlock(0x00, [17] * 100) # 创建了一个从地址 0x00 开始,包含 100(即包含100个地址) 个初始值为 17 的数据块
    

    实践时发现,此时通过read_coils读取线圈,读取线圈起始地址不能超过99,否则服务端会报错Exception Response(129, 1, IllegalAddress)

    client.read_coils(98, 1, slave=0) # 可正常读取
    client.read_coils(99, 1, slave=0) # 报错
    

    修改下服务端数据块起始地址

    ModbusSequentialDataBlock(0x01, [17] * 100) # 创建了一个从地址 0x01 开始
    

    实践时发现,此时通过read_coils读取线圈,读取线圈起始地址不能超过100,否则服务端会报错Exception Response(129, 1, IllegalAddress)

    client.read_coils(99, 1, slave=0) # 可正常读取
    client.read_coils(100, 1, slave=0) # 报错
    
  2. pymodbus.datastore.ModbusSparseDataBlock

    用于创建稀疏数据块的类。该类允许创建包含不连续地址的数据块(可随机访问)。具体来说,可以在数据块中指定特定地址的数据,而无需为数据块的每个地址都分配内存。这种方式可以有效地节省内存空间,尤其是在处理大量数据时。

    例如:

    sparse = ModbusSparseDataBlock({10: [3, 5, 6, 8], 30: 1, 40: [0]*20})
    

    创建一个拥有3个地址的数据块。

    一个地址从0x10开始,长度为4(即包含4个地址),初始值分别为3,5,6,8,一个地址从0x30开始,长度为1,初始值为10,一个地址从0x40开始,长度为20,初始为0

    sparse = ModbusSparseDataBlock([10]*100)
    

    创建从地址0x00开始,长度为100,初始值为10的数据块

    sparse = ModbusSparseDataBlock() # 创建空的数据块
    sparse.setValues(0, [10]*10)  # 添加从地址0x00开始,长度为10,值为10的数据块
    sparse.setValues(30, [20]*5) # 添加从地址0x30开始,长度为5, 值为20的数据块
    # 注意,除非执行类__init__初始化函数时,将 mutable 属性设置为True(默认值),否则无法使用 setValues 函数来添加新的数据块
    
  3. pymodbus.datastore.ModbusSlaveContext

    用于创建每种数据访问都存储在一个块中的一个modbus数据模型。该类可用来模拟 Modbus 从设备上下文。可以在这个上下文中添加多个不同类型的数据块,模拟一个完整的 Modbus 从设备。

    例子:

     ModbusSlaveContext(di=datablock(), # 输入离散量(Discrete Inputs)co=datablock(), # 输出线圈 (Coils)hr=datablock(), # 保持寄存器(Holding Register)ir=datablock(), # 输入寄存器(Input Register))
    
  4. pymodbus.datastore.ModbusServerContext

    这表示从上下文的主集合,用于创建一个服务器上下文,并将从站上下文添加到服务器上下文中。

    如果初始化时,属性single被设置为True,它将被视为单个上下文(所有的从设备共享相同的 Modbus 地址空间,没有独立的地址范围),因此每个slave ID都返回相同的上下文。通过分析源码可知,当single被设置为True时,会创建一个从设备上下文,设备地址默认为 0,

    如果single设置为False,它将被解释为从站上下文的集合从属上下文(每个从设备都有独立的 Modbus 地址空间,它们的地址范围是相互独立的)

  5. pymodbus.client.mixin.ModbusClientMixin

    1. def read_coils(self, address: int, count: int = 1, slave: int = 0, **kwargs: Any)

      读线圈(功能码 0x01)

      • address 要读取数据的起始地址
      • count 可选参数,要读取的线圈数量(针对read_coils,发现count设置大于1的数和设置为1是一样的效果)
      • slave 可选参数,Modbus从机ID(实践发现,服务端构建服务器实例时,如果single设置为True时, 这里的slave只要不超出合法值范围,可以随便填,但是如果single设置为False,则必须填写正确的从机ID)
      • kwargs可选参数,实验性参数

      异常抛出ModbusException,下同,不再赘述

    2. def read_discrete_inputs(self, address: int, count: int = 1, slave: int = 0, **kwargs: Any)

      读输入离散量(对应功能码 0x02)

      参数说明参考read_coils

    3. def read_holding_registers(self, address: int, count: int = 1, slave: int = 0, **kwargs: Any)

      读保持寄存器(对应功能码 0x03)

      参数说明参考read_coils

    4. def read_input_registers(self, address: int, count: int = 1, slave: int = 0, **kwargs: Any)

      读输入寄存器(对应功能码 0x04)

      参数说明参考read_coils

    5. def write_coil(self, address: int, value: bool, slave: int = 0, **kwargs: Any)

      写单个线圈(对应功能码 0x05)

      • address 要写入数据的起始地址
      • value 要写入的布尔值
      • slave 可选参数,Modbus从机ID
      • kwargs可选参数,实验性参数
    6. def write_coils( self, address: int, values: list[bool] | bool, slave: int = 0, **kwargs: Any)

      写多个线圈(对应功能码 0x0F)

      • address 要写入数据的起始地址
      • values 要写入的布尔值列表、或者单个布尔值
      • slave 可选参数,Modbus从机ID
      • kwargs可选参数,实验性参数
    7. def write_register(self, address: int, value: int, slave: int = 0, **kwargs: Any)

      写单个寄存器(功能码 0x06)

      • address 要写入数据的起始地址
      • value 要写入的整数
      • slave 可选参数,Modbus从机ID
      • kwargs可选参数,实验性参数
    8. def write_registers( self, address: int, values: list[int] | int, slave: int = 0, **kwargs: Any)

      写多个寄存器(功能码 0x10)

      • address 要写入数据的起始地址
      • values 要写入的整数列表、或者单个整数
      • slave 可选参数,Modbus从机ID
      • kwargs可选参数,实验性参数

为服务器设置初始化 payload实现

server_payload.py

#!/usr/bin/env python3
'''Pymodbus服务器Payload示例。
此示例展示如何使用builder初始化具复杂的内存layout的服务器'''import asyncioimport logging
_logger = logging.getLogger('logger')from pymodbus.constants import Endian
from pymodbus.datastore import (ModbusSequentialDataBlock,ModbusServerContext,ModbusSlaveContext
)
from pymodbus.server import StartAsyncTcpServerfrom pymodbus.payload import BinaryPayloadBuilderasync def run_async_server():"""Run server."""builder = BinaryPayloadBuilder(byteorder=Endian.LITTLE, wordorder=Endian.LITTLE)builder.add_string("abcdefgh")builder.add_bits([0, 1, 0, 1, 1, 0, 1, 0])builder.add_8bit_int(-0x12)builder.add_8bit_uint(0x12)builder.add_16bit_int(-0x5678)builder.add_16bit_uint(0x1234)builder.add_32bit_int(-0x1234)builder.add_32bit_uint(0x12345678)builder.add_16bit_float(12.34)builder.add_16bit_float(-12.34)builder.add_32bit_float(22.34)builder.add_32bit_float(-22.34)builder.add_64bit_int(-0xDEADBEEF)builder.add_64bit_uint(0x12345678DEADBEEF)builder.add_64bit_uint(0xDEADBEEFDEADBEED)builder.add_64bit_float(123.45)builder.add_64bit_float(-123.45)datablock = ModbusSequentialDataBlock(1, builder.to_registers())context = ModbusSlaveContext(di=datablock, co=datablock, hr=datablock, ir=datablock # 注意,datablock不能加括号())single = True# 构建数据存储context = ModbusServerContext(slaves=context, single=single)txt = f'### start ASYNC server, listening on 5020 - tcp'_logger.info(txt)address = ('', 5020)server = await StartAsyncTcpServer(context=context,  # 数据存储# TBD host=# TBD port=address=address,  # 监听地址# custom_functions=[],  # 允许自定义处理函数framer='socket',  # 使用的帧策略# ignore_missing_slaves=True,  # 忽略对缺失的slave的请求# broadcast_enable=False,  # 是否允许广播 将 slave id 0视为广播地址# timeout=1  # 等待请求完成的时间 # waiting time for request to complete)return serverasync def async_helper():_logger.info("Starting server...")await run_async_server()if __name__ == "__main__":asyncio.run(async_helper(), debug=True)

带有更新任务的服务器代码实现

server_updating.py

#!/usr/bin/env python3
'''带有更新任务的Pymodbus异步服务器示例。异步服务器以及随服务器一起连续运行并更新值的任务示例
'''import asyncioimport logging
_logger = logging.getLogger('logger')from pymodbus.datastore import (ModbusSequentialDataBlock,ModbusServerContext,ModbusSlaveContext
)
from pymodbus.server import StartAsyncTcpServerasync def updating_task(context):'''更新服务器中的数据值此任务伴随服务器持续运行,它将每两秒增加一些值需要注意的是,getValues和setValues不是并发安全的'''# fc_as_hex = 3 # 功能码,例如3、0x03 表示读保持寄存器fc_as_hex = 4slave_id = 0x00 # 从节点IDaddress = 0x00  # 数据读取起始地址count = 6 # 需要获取的值的数量values = context[slave_id].getValues(fc_as_hex, address, count=count)# set values to zerovalues = [0 for v in values]context[slave_id].setValues(fc_as_hex, address, values)txt = f'updating_task: started: initialised values: {values!s} at address {address!s}'print(txt)_logger.debug(txt)# 循环递增while True:await asyncio.sleep(2)values = context[slave_id].getValues(fc_as_hex, address, count=count)values = [v + 1 for v in values]context[slave_id].setValues(fc_as_hex, address, values)txt = f'updating_task: incremented values: {values!s} at address {address!s}'print(txt)_logger.debug(txt)async def run_async_server(context):"""Run server."""txt = f'### start ASYNC server, listening on 5020 - tcp'_logger.info(txt)address = ('', 5020)server = await StartAsyncTcpServer(context=context,  # 数据存储# TBD host=# TBD port=address=address,  # 监听地址# custom_functions=[],  # 允许自定义处理函数framer='socket',  # 使用的帧策略# ignore_missing_slaves=True,  # 忽略对缺失的slave的请求# broadcast_enable=False,  # 是否允许广播 将 slave id 0视为广播地址# timeout=1  # 等待请求完成的时间 # waiting time for request to complete)return serverasync def async_helper():datablock = lambda : ModbusSequentialDataBlock(0x01, [17] * 100)  # pylint: disable=unnecessary-lambda-assignmentcontext = ModbusSlaveContext(di=datablock(), co=datablock(), hr=datablock(), ir=datablock() # 注意,datablock不能加括号())single = True# 构建数据存储context = ModbusServerContext(slaves=context, single=single)task = asyncio.create_task(updating_task(context))task.set_name("example updating task")_logger.info("Starting server...")await run_async_server(context)task.cancel()if __name__ == "__main__":asyncio.run(async_helper(), debug=True)

客户端访问验证

#!/usr/bin/env python3import asyncio
import loggingimport pymodbus.client as modbusClient
from pymodbus import ModbusException_logger = logging.getLogger('logger')async def run_async_client(modbus_calls=None):"""Run sync client."""_logger.info("### Create client object")client = modbusClient.AsyncModbusTcpClient('127.0.0.1',port=5020,  # on which port# Common optional parameters:framer='socket', # 客户端访问服务器超时时间(该参数仅用于客户端(slave节点)),float型timeout=10,retries=3,reconnect_delay=1,reconnect_delay_max=10,#    retry_on_empty=False,# TCP setup parameters#    source_address=("localhost", 0),)_logger.info("### Client starting")await client.connect()assert client.connectedif modbus_calls:await modbus_calls(client)client.close()_logger.info("### End of Program")async def run_a_few_calls(client):try:# 读保持寄存器# rr = await client.read_holding_registers(0, 4) # print(rr.registers) # 读输入寄存器rr = await client.read_input_registers(0, 7, slave=0) print(rr.registers)except ModbusException:passasync def main(cmdline=None):"""Combine setup and run."""await run_async_client(modbus_calls=run_a_few_calls)if __name__ == "__main__":asyncio.run(main(), debug=True)
class ModbusSlaveContext(ModbusBaseSlaveContext):def getValues(self, fc_as_hex, address, count=1):"""Get `count` values from datastore.:param fc_as_hex: The function we are working with:param address: The starting address:param count: The number of values to retrieve:returns: The requested values from a:a+c"""if not self.zero_mode:address += 1Log.debug("getValues: fc-[{}] address-{}: count-{}", fc_as_hex, address, count)return self.store[self.decode(fc_as_hex)].getValues(address, count)def setValues(self, fc_as_hex, address, values):"""Set the datastore with the supplied values.:param fc_as_hex: The function we are working with:param address: The starting address:param values: The new values to be set"""if not self.zero_mode:address += 1Log.debug("setValues[{}] address-{}: count-{}", fc_as_hex, address, len(values))self.store[self.decode(fc_as_hex)].setValues(address, values)

同步服务器和异步客户端实现

同步服务器代码实现

server_sync.py

#!/usr/bin/env python3
'''Pymodbus 同步服务器示例
'''import loggingfrom pymodbus.datastore import (ModbusSequentialDataBlock,ModbusServerContext,ModbusSlaveContext
)from pymodbus.server import StartTcpServer_logger = logging.getLogger('logger')def run_sync_server():# 连续的、无间隙顺序存储数据块(寄存器块)datablock = lambda : ModbusSequentialDataBlock(0x01, [17] * 100)context = ModbusSlaveContext(di=datablock(), co=datablock(), hr=datablock(), ir=datablock())single = True# 构建数据存储context = ModbusServerContext(slaves=context,single=single)txt = f'### start SYNC server'_logger.info(txt)address = ('', 5020)server = StartTcpServer(context=context,  # Data storage# identity=identity,  # server identify# TBD host=# TBD port=address=address,  # listen address# custom_functions=[],  # allow custom handlingframer='socket',  # The framer strategy to use# ignore_missing_slaves=True,  # ignore request to a missing slave# broadcast_enable=False,  # treat slave_id 0 as broadcast address,# timeout=1,  # waiting time for request to complete)return serverdef sync_helper():server = run_sync_server()server.shutdown()if __name__ == "__main__":sync_helper()

同步客户端代码实现

client_sync.py

#!/usr/bin/env python3'''Pymodbus同步客户端示例
'''import loggingimport pymodbus.client as modbusClient
from pymodbus import ModbusException_logger = logging.getLogger('logger')def run_sync_client(modbus_calls=None):"""Run sync client."""_logger.info("### Create client object")client = modbusClient.ModbusTcpClient('127.0.0.1',port=5020,  # on which port# Common optional parameters:framer='socket', # 客户端访问服务器超时时间(该参数仅用于客户端(slave节点)),float型timeout=10,retries=3,reconnect_delay=1,reconnect_delay_max=10,#    retry_on_empty=False,# TCP setup parameters#    source_address=("localhost", 0),)_logger.info("### Client starting")client.connect()assert client.connectedif modbus_calls:modbus_calls(client)client.close()_logger.info("### End of Program")def run_a_few_calls(client):try:# 读取线圈状态rr = client.read_coils(0, 1, slave=1) # 从 0x00 地址开始读取1个线圈print(rr.bits) # 输出:[True, False, False, False, False, False, False, False]# assert len(rr.bits) == 8except ModbusException:passdef main():run_sync_client(modbus_calls=run_a_few_calls)if __name__ == "__main__":main()

参考链接

https://pypi.org/project/pymodbus/

https://pymodbus.readthedocs.io/en/dev/index.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/777865.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ThinkAdmin_v6两个简单漏洞(文件读取+信息泄露)

危险函数:file_get_contents()第二次遇到侵权声明 本文章中的所有内容(包括但不限于文字、图像和其他媒体)仅供教育和参考目的。如果在本文章中使用了任何受版权保护的材料,我们满怀敬意地承认该内容的版权归原作者所有。 如果您是版权持有人,并且认为您的作品被侵犯,请通…

ubuntu22.04容器安装ssh服务

除了特别说明外,否则以下命令均为ubuntu 22.04 容器内执行!!!安装 查看ubuntu22.04 有没有安装openssh-server,执行命令:sudo dpkg --list | grep ssh没有找到openssh-server的包,很显然,没有安装,那么就开始安装,执行:sudo apt install openssh-server安装完成后,…

3.Java基础语法

注释单行注释 //单行注释 //输入一个Hello World!多行注释:可以注释一段文字 /* 注释 */ //多行注释:可以注释一段文字 /* 注释 */ /* 多行注释 多行注释 多行注释 */文档注释:JavaDoc /** * @Description HelloWorld * @Author 爱吃麻辣烫的妹纸 */⭐️注意:书写注释是…

分享圣诞树+雪人+全屏动效

分享圣诞树+雪人+全屏动效 创建时间:2024年8月4号 分享之前学习老师发的几个小玩意 一、圣诞树 运行: 点击该exe即可出来一棵圣诞树。退出: 鼠标点击该圣诞树右键二、雪人 运行: 双击点开exe即可退出: 和圣诞树一样、 三、音乐:满满都是爱 运行:双击打开,该程序对鼠标的…

java:一键生成二维码工具类

前言:本工具选择了Zxing,他是一个开源的,使用java实现多种格式的1D/2D条码图像处理库, 1.pom添加依赖<!-- 二维码生成&识别组件 --><dependency><groupId>com.google.zxing</groupId><artifactId>core</artifactId><version>…

Luogu P10842 Piggy and Trees 题解 [ 绿 ] [ 拆边 ] [ 贡献思维 ]

Piggy and Trees:把路径拆成边的思维题。 思路 一看到这题的路径,就想到了 Luogu P3177 树上染色 这题化路径为边的贡献,分别计算的思维。 那么对于此题,先来观察题目里式子的意思:对于树上的每个无序点对,求出树上每个点 到这些点对之间的最短路径的 距离之和。枚举点对…

ZeRO:一种去除冗余的数据并行方案

ZeRO:一种去除冗余的数据并行方案 目前训练超大规模语言模型主要有两条技术路线:TPU + XLA + TensorFlow/JAX GPU + Pytorch + Megatron + DeepSpeed 前者由Google主导,由于TPU和自家云平台GCP深度绑定,对于非Googler来说并不友好 后者背后则有NVIDIA、Meta、MS等大厂加持,…

【攻防技术系列+权限维持】注册表运行键

在红队行动期间在网络中获得初步立足点是一项耗时的任务。因此,持久化是红队行动成功的关键,因为这将使团队能够专注于交战目标,而不会失去与指挥和控制服务器的通信。 创建将在 Windows 登录期间执行任意负载的注册表项是红队剧本中最古老的隐藏技巧之一。这种持久性技术需…

链表part02

今天是8月3日,学习了链表的第二部分。交换链表两个节点,考察对next的操作和tmp的灵活运用。 删除链表的倒数第N个节点,双指针减少遍历次数。 链表相交,移动链表尾对齐,其实就是动长链表的指针。 环形链表,记住方法。4. 24交换链表两个节点 题目:给你一个链表,两两交换其…

Qt-pyqt6与QTDesginers的相互使用技巧

1. 先在QT Designers Tools 设计器中画好框架,再保存为.ui文件导出 2. 再pycharm中创建一个main.py文件用来加载和使用这个.ui文件,通用的代码如下: main.py from PyQt6.QtGui import QIntValidator, QIcon from PyQt6.QtWidgets import QApplication, QLabel, QListWidgetI…

Go中使用Zap日志库与Lumberjack日志切割

Go中使用Zap日志库与Lumberjack日志切割Go中使用Zap日志库与Lumberjack日志切割 原创 何泽丰 ProgrammerHe2024年06月11日 20:15 广东 听全文Go中使用Zap日志库与Lumberjack日志切割 概述 在项目中使用日志记录有助于快速定位和修复问题,能帮助我们监控系统健康状态及时发现问…

[UnrealCircle]腾讯 罗谦 | UnLua-UE4下的Lua脚本插件

传送门:[UnrealCircle]腾讯 罗谦 | UnLua-UE4下的Lua脚本插件_哔哩哔哩_bilibili参考PPT:UnrealCircle921北京PPT_免费高速下载|百度网盘-分享无限制一. UnLua 基础 1.1 概念UnLua 是一个脚本插件 UnLua 不是蓝图的替代,而是一种补充没有 Asset 预览 不支持 nativization 无…