Files

724 lines
27 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
# @Time : 2026/2/6 16:45
# @Author : reenrr
# @File : modbus.py
# @Desc : Modbus RTU通用接口
"""
import time
import serial
import threading
from typing import Optional
from error_code import ModbusError
# -------全局485半双工锁--------
RTU_GLOBAL_LOCKS: dict[str, threading.Lock] = {} # key=串口号value=独占锁
RTU_LOCK_INIT_LOCK = threading.Lock() # 锁初始化的线程安全锁
# -------全局句柄管理-------
RTU_HANDLE_MAP: dict[int, "RTUSerialHandle"] = {} # 句柄ID
RTU_NEXT_HANDLE_ID = 1
RTU_HANDLE_LOCK = threading.Lock()
# -------Modbus RTU 核心常量-----------
MODBUS_FUNC_READ_HOLDING_REG = 0x03 # 读保持寄存器功能码
MODBUS_FUNC_WRITE_SINGLE_REG = 0x06 # 写单个寄存器功能码
MODBUS_FUNC_WRITE_MULTI_REG = 0x10 # 写多个寄存器功能码
MODBUS_CRC_LENGTH = 2 # CRC校验码长度
MODBUS_MAX_READ_REG = 125 # 读寄存器最大数量(协议规定)
MODBUS_MAX_WRITE_MULTI_REG = 123 # 写多寄存器最大数量(协议规定)
MODBUS_REG_VAL_MAX = 0xFFFF # 单寄存器最大取值16位
MODBUS_STATION_ADDR_MIN = 1 # 从站地址最小值
MODBUS_STATION_ADDR_MAX = 247 # 从站地址最大值
class RTUSerialHandle:
"""
Modbus RTU串口句柄类封装串口实例、配置、状态
"""
def __init__(self, port: str, baudrate: int,
databits: int, stopbits: int, parity: str):
self.port = port
self.baudrate = baudrate
self.databits = databits
self.stopbits = stopbits
self.parity = parity # 校验位
self.ser: Optional[serial.Serial] = None # 串口实例
self.is_open = False
def open(self) -> int:
"""
打开串口转换pyserial兼容参数初始化485锁
:return: ModbusError错误码
"""
try:
# 串口参数转换将类内存储的参数转为pyserial可识别的格式
# 1. 停止位转换int -> pyserial常量
stopbits_map = {
1: serial.STOPBITS_ONE,
2: serial.STOPBITS_TWO
}
pyserial_stopbits = stopbits_map.get(self.stopbits, serial.STOPBITS_ONE)
# 2. 数据位转换int -> pyserial常量
databits_map = {
7: serial.SEVENBITS,
8: serial.EIGHTBITS
}
pyserial_databits = databits_map.get(self.databits, serial.EIGHTBITS)
self.ser = serial.Serial(
port=self.port,
baudrate=self.baudrate,
bytesize=pyserial_databits,
stopbits=pyserial_stopbits,
parity=self.parity,
timeout=0.1,
write_timeout=0.1,
xonxoff=False,
rtscts=False,
dsrdtr=False
)
if self.ser.is_open:
self.is_open = True
# 初始化该串口的485半双工锁
with RTU_LOCK_INIT_LOCK:
if self.port not in RTU_GLOBAL_LOCKS:
RTU_GLOBAL_LOCKS[self.port] = threading.Lock()
return ModbusError.MODBUS_SUCCESS
return ModbusError.MODBUS_ERR_SERIAL
except Exception as e:
print(f"串口打开失败{self.port}:{e}")
return ModbusError.MODBUS_ERR_SERIAL
def close(self):
"""关闭串口,清理资源,置空状态"""
if self.ser and self.is_open:
try:
self.ser.reset_input_buffer()
self.ser.reset_output_buffer()
self.ser.close()
except Exception as e:
print(f"【串口警告】关闭{self.port}时异常:{str(e)}")
finally:
self.is_open = False
self.ser = None
def get_lock(self) ->threading.Lock:
"""
获取当前串口的485半双工锁
:return: 独占锁实例
"""
with RTU_LOCK_INIT_LOCK:
return RTU_GLOBAL_LOCKS.get(self.port, threading.Lock())
def decimal_to_16bit(self, value: int) -> int:
"""
将十进制有符号整数转换成十六进制字符串
:param value: 十进制有符号整数
:return: 十六进制字符串
"""
# 1. 限制数值在16位有符号整数范围内-32768 ~ 32767
min_16bit = -32768 # 16位有符号整数最小值
max_16bit = 32767 # 16位有符号整数最大值
# 超出范围自动截断并提示
if value < min_16bit:
print(f"警告:数值{value}超出16位最小值{min_16bit},已截断为{min_16bit}")
value = min_16bit
elif value > max_16bit:
print(f"警告:数值{value}超出16位最大值{max_16bit},已截断为{max_16bit}")
value = max_16bit
# 2. 转换为16位无符号补码核心逻辑
if value < 0:
unsigned_value = 65536 + value # 负数转补码(如-30 → 65506
else:
unsigned_value = value # 正数直接保留如30 → 30
# 3. 返回十六进制格式的整数0x开头Python中整数本质不变仅显示格式
return unsigned_value
def uint16_to_int16(self, unsigned_value: int) -> int:
"""
将16位无符号十进制数转换为16位有符号十进制数
:param unsigned_value: 无符号十进制数
:return: 有符号十进制数
"""
# 先校验输入范围必须是16位无符号数
if not isinstance(unsigned_value, int):
raise ValueError(f"输入必须是整数,当前是{type(unsigned_value)}")
if unsigned_value < 0 or unsigned_value > 65535:
raise ValueError(f"输入必须是0~65535的整数当前是{unsigned_value}")
# 核心转换逻辑
if unsigned_value > 32767:
return unsigned_value - 65536
else:
return unsigned_value
def uint32_to_int32(self, unsigned_value: int) -> int:
"""
将32位无符号十进制数转换为32位有符号十进制数
:param unsigned_value: 无符号十进制数
:return: 有符号十进制数
"""
# 先校验输入范围必须是32位无符号数
if not isinstance(unsigned_value, int):
raise ValueError(f"输入必须是整数,当前是{type(unsigned_value)}")
if unsigned_value < 0 or unsigned_value > 4294967295:
raise ValueError(f"输入必须是0~4294967295的整数当前是{unsigned_value}")
# 核心转换逻辑
if unsigned_value > 2147483647: # 0x7FFFFFFF
return unsigned_value - 4294967296 # 0x100000000
else:
return unsigned_value
def __del__(self):
"""析构函数,程序退出时自动关闭串口,防止资源泄露"""
self.close()
# -------对外接口--------
# public
def open_serial_port(port: str, baudrate: int, databits: int, stopbits: int, parity: int) -> Optional[int]:
"""
打开并初始化Modbus串口
:param port: 串口号
:param baudrate: 波特率
:param databits: 数据位
:param stopbits: 停止位
:param parity: 校验位
:return: 串口句柄
"""
# 1. 参数合法性校验
if not port:
print("参数错误:串口号不能为空")
return None
if baudrate <= 0:
print(f"参数错误:波特率{baudrate}非法(必须>0")
return None
if databits not in [7, 8]:
print(f"参数错误:数据位{databits}非法仅支持7/8")
return None
if stopbits not in [1, 2]:
print(f"参数错误:停止位{stopbits}非法仅支持1/2")
return None
if parity not in [0, 1, 2]:
print(f"参数错误:校验位{parity}非法仅支持0=无/1=奇/2=偶)")
return None
# 2. 校验位转换int -> 字符串0->N1->O2->E
parity_map = {0: 'N', 1: 'O', 2: 'E'}
parity_str = parity_map[parity] # 已校验parity在0-2无需默认值
# 3. 创建串口句柄实例
handle_obj = RTUSerialHandle(
port=port,
baudrate=baudrate,
databits=databits,
stopbits=stopbits,
parity=parity_str
)
# 4. 打开串口
ret = handle_obj.open()
if ret != ModbusError.MODBUS_SUCCESS:
print(f"串口{port}打开失败,错误码:{ret}")
return None
# 5. 分配全局句柄ID并存储
global RTU_NEXT_HANDLE_ID
with RTU_HANDLE_LOCK:
handle_id = RTU_NEXT_HANDLE_ID
RTU_HANDLE_MAP[handle_id] = handle_obj
RTU_NEXT_HANDLE_ID += 1
print(f"串口[{port}]打开成功句柄ID{handle_id}")
return handle_id
def close_serial_port(handle: int):
"""
关闭Modbus串口
:param handle:串口句柄ID
"""
with RTU_HANDLE_LOCK:
handle_obj = RTU_HANDLE_MAP.get(handle)
if not handle_obj:
print(f"句柄{handle}不存在,关闭失败")
return
# 关闭串口并清理资源
handle_obj.close()
del RTU_HANDLE_MAP[handle]
# 清理485半双工锁可选
with RTU_LOCK_INIT_LOCK:
if handle_obj.port in RTU_GLOBAL_LOCKS:
del RTU_GLOBAL_LOCKS[handle_obj.port]
print(f"句柄{handle}(串口[{handle_obj.port}])已关闭")
# --------Modbus RTU CRC16校验函数-----------
def modbus_crc16(data: bytes) -> bytes:
"""
计算Modbus RTU CRC16校验码小端序
:param data: 待校验的字节流不含CRC
:return: 2字节校验码
"""
crc = 0xFFFF
for byte in data:
crc ^= byte
# 对当前字节的每1位共8位做移位+异或操作
for _ in range(8):
# 判断CRC寄存器的最低位是否为1
if crc & 0x0001:
# 如果最低位是1先右移1位再和多项式0xA001异或
crc = (crc >> 1) ^ 0xA001
else:
# 如果最低位是0仅右移1位
crc >>= 1
# 把16位CRC值拆成2个字节小端序低字节在前高字节在后
return bytes([crc & 0xFF, (crc >> 8) & 0xFF])
def verify_modbus_crc(data: bytes) -> bool:
"""
验证Modbus RTU数据的CRC16校验码
:param data: 包含CRC的完整字节流
:return: True=校验通过False=校验失败
"""
if len(data) < MODBUS_CRC_LENGTH:
return False
# 拆分出“原始数据部分”去掉最后2字节的CRC
data_part = data[:-MODBUS_CRC_LENGTH]
# 拆分出“附带的CRC校验码部分”取最后2字节
crc_part = data[-MODBUS_CRC_LENGTH:]
# 核心验证重新计算原始数据的CRC和附带的CRC对比
return modbus_crc16(data_part) == crc_part
# ------Modbus RTU核心接口-------
def read_holding_register(handle: int, station_addr: int, start_reg_addr: int,
reg_count: int, resp_offset: int, out_buffer: list[int], use_crc: int) -> list[int]:
"""
读保持寄存器
:param handle: 串口句柄ID
:param station_addr: 从机地址
:param start_reg_addr: 起始寄存器地址
:param reg_count: 读取寄存器数量
:param resp_offset: 响应数据偏移量
:param out_buffer: 输出缓冲区
:param use_crc: 是否启用crc校验
:return: 读取到的寄存器值列表
"""
# 1. 句柄校验
with RTU_HANDLE_LOCK:
handle_obj = RTU_HANDLE_MAP.get(handle)
if not handle_obj or not handle_obj.is_open:
print(f"句柄{handle}无效或串口未打开")
raise ModbusError.MODBUS_ERR_SERIAL
# 2. 参数合法性校验
if station_addr < 1 or station_addr > 247:
print(f"从站地址错误:{station_addr}必须1-247")
raise ModbusError.MODBUS_ERR_PARAM
if reg_count < 1 or reg_count > 125: # Modbus RTU最大读取125个寄存器
print(f"寄存器数量错误:{reg_count}必须1-125")
raise ModbusError.MODBUS_ERR_PARAM
if resp_offset < 0:
print(f"响应偏移量错误:{resp_offset}必须≥0")
raise ModbusError.MODBUS_ERR_PARAM
if use_crc not in [0, 1]:
print(f"CRC参数错误{use_crc}必须0/1")
raise ModbusError.MODBUS_ERR_PARAM
# 3. 构建Modbus RTU读指令帧
cmd = bytearray()
cmd.append(station_addr)
cmd.append(MODBUS_FUNC_READ_HOLDING_REG)
cmd.extend(start_reg_addr.to_bytes(2, byteorder='big')) # 起始寄存器地址(大端)
cmd.extend(reg_count.to_bytes(2, byteorder='big')) # 寄存器数量(大端)
# 添加CRC校验如果启用
if use_crc == 1:
crc = modbus_crc16(bytes(cmd))
cmd.extend(crc)
cmd_bytes = bytes(cmd)
# 4. 485半双工发送+接收(加全局锁)
lock = handle_obj.get_lock()
with lock:
# 清空缓冲区
handle_obj.ser.reset_input_buffer()
handle_obj.ser.reset_output_buffer()
# 发送指令
try:
handle_obj.ser.write(cmd_bytes)
handle_obj.ser.flush()
except Exception as e:
print(f"发送读指令失败:{e}")
raise ModbusError.MODBUS_ERR_SERIAL
# 接收响应
start_time = time.time()
response = b""
# 最小响应长度地址1+功能码1+字节数1+数据2*N+CRC2
min_resp_len = 3 + 2 * reg_count +(MODBUS_CRC_LENGTH if use_crc == 1 else 0)
while (time.time() - start_time) < handle_obj.ser.timeout * 3:
if handle_obj.ser.in_waiting > 0:
response += handle_obj.ser.read(handle_obj.ser.in_waiting)
if len(response) >= min_resp_len:
break
time.sleep(0.001)
if len(response) == 0:
print(f"读寄存器超时(从站{station_addr},起始地址{start_reg_addr}")
raise ModbusError.MODBUS_ERR_TIMEOUT
# 5. CRC校验如果启用
if use_crc == 1:
if not verify_modbus_crc(response):
print(f"CRC校验失败 | 响应数据:{response.hex(' ')}")
raise ModbusError.MODBUS_ERR_CRC
response = response[:-MODBUS_CRC_LENGTH] # 去掉CRC
# 6. 响应格式校验
if len(response) < 3:
print(f"响应数据过短:{response.hex(' ')}")
raise ModbusError.MODBUS_ERR_RESPONSE
# 校验从站地址和功能码
if response[0] != station_addr:
print(f"从站地址不匹配:请求{station_addr},响应{response[0]}")
raise ModbusError.MODBUS_ERR_RESPONSE
if response[1] != MODBUS_FUNC_READ_HOLDING_REG:
# 功能码最高位为1表示错误响应
if response[1] == MODBUS_FUNC_READ_HOLDING_REG | 0x80:
err_code = response[2] # 提取错误差
print(f"Modbus错误响应从站{station_addr},错误码{err_code}")
else:
print(f"功能码不匹配:请求{MODBUS_FUNC_READ_HOLDING_REG},响应{response[1]}")
raise ModbusError.MODBUS_ERR_RESPONSE
# 校验数据长度
resp_byte_count = response[2]
expected_byte_count = 2 * reg_count
if resp_byte_count != expected_byte_count:
print(f"数据长度不匹配:预期{expected_byte_count}字节,实际{resp_byte_count}字节")
raise ModbusError.MODBUS_ERR_RESPONSE
# 7. 解析数据并填充输出缓冲区
data_part = response[3 + resp_offset:] # 跳过偏移量
if len(data_part) < 2 * reg_count:
print(f"偏移后数据不足:需要{2 * reg_count}字节,实际{len(data_part)}字节")
raise ModbusError.MODBUS_ERR_RESPONSE
out_buffer.clear()
for i in range(reg_count):
reg_data = data_part[2 * i: 2 * (i + 1)]
reg_value = int.from_bytes(reg_data, byteorder='big')
out_buffer.append(reg_value)
print(f"读寄存器成功 | 从站{station_addr} | 起始地址{start_reg_addr} | 数量{reg_count} | 数据:{out_buffer}")
return out_buffer
def write_single_register(handle: int, station_addr: int, reg_addr: int, write_value: int,
resp_offset: int, use_crc: int) -> int:
"""
写单个寄存器
:param handle: 串口句柄ID
:param station_addr: 从站地址
:param reg_addr: 寄存器地址
:param write_value: 写入值
:param resp_offset: 响应数据偏移量
:param use_crc: 是否启用CRC校验
:return: 状态码
"""
# 1. 句柄校验
with RTU_HANDLE_LOCK:
handle_obj = RTU_HANDLE_MAP.get(handle)
if not handle_obj or not handle_obj.is_open:
print(f"句柄{handle}无效或串口未打开")
return ModbusError.MODBUS_ERR_SERIAL
# 2. 参数合法性校验
if station_addr < 1 or station_addr > 247:
print(f"从站地址错误:{station_addr}必须1-247")
return ModbusError.MODBUS_ERR_PARAM
if write_value < 0 or write_value > 0xFFFF:
print(f"写入值错误:{write_value}必须0-65535")
return ModbusError.MODBUS_ERR_PARAM
if use_crc not in [0, 1]:
print(f"CRC参数错误{use_crc}必须0/1")
return ModbusError.MODBUS_ERR_PARAM
# 3. 构建Modbus RTU写指令帧
cmd = bytearray()
cmd.append(station_addr) # 从站地址
cmd.append(MODBUS_FUNC_WRITE_SINGLE_REG) # 功能码
cmd.extend(reg_addr.to_bytes(2, byteorder='big')) # 寄存器地址(大端)
cmd.extend(write_value.to_bytes(2, byteorder='big')) # 写入值(大端)
# 添加CRC校验如果启用
if use_crc == 1:
crc = modbus_crc16(bytes(cmd))
cmd.extend(crc)
cmd_bytes = bytes(cmd)
print(f"命令:{cmd_bytes.hex(' ')}")
# 4. 485半双工发送+接收
lock = handle_obj.get_lock()
with lock:
handle_obj.ser.reset_input_buffer()
handle_obj.ser.reset_output_buffer()
# 发送指令
try:
handle_obj.ser.write(cmd_bytes)
handle_obj.ser.flush()
except Exception as e:
print(f"发送写指令失败:{e}")
return ModbusError.MODBUS_ERR_SERIAL
# 接收响应(写单个寄存器的响应和请求帧一致)
start_time = time.time()
response = b""
expected_resp_len = len(cmd_bytes) - (MODBUS_CRC_LENGTH if use_crc == 1 else 0)
while (time.time() - start_time) < handle_obj.ser.timeout * 3:
if handle_obj.ser.in_waiting > 0:
response += handle_obj.ser.read(handle_obj.ser.in_waiting)
if len(response) >= expected_resp_len:
break
time.sleep(0.001)
if len(response) == 0:
print(f"写寄存器超时(从站{station_addr},地址{reg_addr}")
return ModbusError.MODBUS_ERR_TIMEOUT
# 5. CRC校验如果启用
if use_crc == 1:
if not verify_modbus_crc(response):
print(f"CRC校验失败 | 响应数据:{response.hex(' ')}")
return ModbusError.MODBUS_ERR_CRC
response = response[:-MODBUS_CRC_LENGTH]
# 6. 响应校验(跳过偏移量后匹配)
expected_resp = cmd_bytes[:-MODBUS_CRC_LENGTH] if use_crc == 1 else cmd_bytes
if response[resp_offset:] != expected_resp[resp_offset:]:
print(f"写响应不匹配 | 请求:{expected_resp.hex(' ')} | 响应:{response.hex(' ')}")
return ModbusError.MODBUS_ERR_RESPONSE
# print(f"写单个寄存器成功 | 从站{station_addr} | 地址{reg_addr} | 值{write_value}")
return ModbusError.MODBUS_SUCCESS
def write_multi_register(handle: int, station_addr: int, start_reg_addr: int, reg_count: int,
write_values: list[int], resp_offset: int, use_crc: int) -> int:
"""
写多个寄存器
:param handle: 串口句柄ID
:param station_addr: 从站地址
:param start_reg_addr: 起始寄存器地址
:param reg_count: 写入寄存器数量
:param write_values: 写入值列表
:param resp_offset: 响应数据偏移量
:param use_crc: 是否启用CRC校验
:return: 状态码
"""
# 1. 句柄校验
with RTU_HANDLE_LOCK:
handle_obj = RTU_HANDLE_MAP.get(handle)
if not handle_obj or not handle_obj.is_open:
print(f"句柄{handle}无效或串口未打开")
return ModbusError.MODBUS_ERR_SERIAL
# 2. 参数合法性校验
if station_addr < 1 or station_addr > 247:
print(f"从站地址错误:{station_addr}必须1-247")
return ModbusError.MODBUS_ERR_PARAM
if reg_count < 1 or reg_count > 123: # Modbus RTU最大写入123个寄存器
print(f"寄存器数量错误:{reg_count}必须1-123")
return ModbusError.MODBUS_ERR_PARAM
if len(write_values) != reg_count:
print(f"写入值数量不匹配:预期{reg_count}个,实际{len(write_values)}")
return ModbusError.MODBUS_ERR_PARAM
for val in write_values:
if val < 0 or val > 0xFFFF:
print(f"写入值错误:{val}必须0-65535")
return ModbusError.MODBUS_ERR_PARAM
if use_crc not in [0, 1]:
print(f"CRC参数错误{use_crc}必须0/1")
return ModbusError.MODBUS_ERR_PARAM
# 3. 构建Modbus RTU批量写指令帧
cmd = bytearray()
cmd.append(station_addr) # 从站地址
cmd.append(MODBUS_FUNC_WRITE_MULTI_REG) # 功能码
cmd.extend(start_reg_addr.to_bytes(2, byteorder='big')) # 起始地址(大端)
cmd.extend(reg_count.to_bytes(2, byteorder='big')) # 寄存器数量(大端)
cmd.append(2 * reg_count) # 字节数每个寄存器2字节
# 追加写入值
for val in write_values:
cmd.extend(val.to_bytes(2, byteorder='big'))
# 添加CRC校验如果启用
if use_crc == 1:
crc = modbus_crc16(bytes(cmd))
cmd.extend(crc)
cmd_bytes = bytes(cmd)
print(f"命令:{cmd_bytes.hex(' ')}")
# 4. 485半双工发送+接收
lock = handle_obj.get_lock()
with lock:
handle_obj.ser.reset_input_buffer()
handle_obj.ser.reset_output_buffer()
# 发送指令
try:
handle_obj.ser.write(cmd_bytes)
handle_obj.ser.flush()
except Exception as e:
print(f"发送批量写指令失败:{e}")
return ModbusError.MODBUS_ERR_SERIAL
# 接收响应(批量写响应帧:地址+功能码+起始地址+数量+CRC
start_time = time.time()
response = b""
expected_resp_len = 7 + (MODBUS_CRC_LENGTH if use_crc == 1 else 0) # 基础响应长度
while (time.time() - start_time) < handle_obj.ser.timeout * 3:
if handle_obj.ser.in_waiting > 0:
response += handle_obj.ser.read(handle_obj.ser.in_waiting)
if len(response) >= expected_resp_len:
break
time.sleep(0.001)
if len(response) == 0:
print(f"批量写寄存器超时(从站{station_addr},起始地址{start_reg_addr}")
return ModbusError.MODBUS_ERR_TIMEOUT
# 5. CRC校验如果启用
if use_crc == 1:
if not verify_modbus_crc(response):
print(f"CRC校验失败 | 响应数据:{response.hex(' ')}")
return ModbusError.MODBUS_ERR_CRC
response = response[:-MODBUS_CRC_LENGTH]
# 6. 响应格式校验
if len(response) < 7:
print(f"批量写响应过短:{response.hex(' ')}")
return ModbusError.MODBUS_ERR_RESPONSE
resp_data = response[resp_offset:]
if len(resp_data) < 6: # 偏移后至少保留:地址(1)+功能码(1)+起始地址(2)+数量(2)
print(f"【响应错误】偏移后数据不足 | 偏移量{resp_offset} | 剩余长度{len(resp_data)}字节需≥6")
return ModbusError.MODBUS_ERR_RESPONSE
# 校验核心字段
if response[0] != station_addr:
print(f"从站地址不匹配:请求{station_addr},响应{response[0]}")
return ModbusError.MODBUS_ERR_RESPONSE
if response[1] != MODBUS_FUNC_WRITE_MULTI_REG:
print(f"功能码不匹配:请求{MODBUS_FUNC_WRITE_MULTI_REG},响应{response[1]}")
return ModbusError.MODBUS_ERR_RESPONSE
resp_start_addr = int.from_bytes(response[2:4], byteorder='big')
resp_reg_count = int.from_bytes(response[4:6], byteorder='big')
if resp_start_addr != start_reg_addr or resp_reg_count != reg_count:
print(
f"批量写响应参数不匹配 | 预期(start={start_reg_addr}, count={reg_count}) | 实际(start={resp_start_addr}, count={resp_reg_count})")
return ModbusError.MODBUS_ERR_RESPONSE
print(f"批量写寄存器成功 | 从站{station_addr} | 起始地址{start_reg_addr} | 数量{reg_count} | 值:{write_values}")
return ModbusError.MODBUS_SUCCESS
# ---------测试接口--------
if __name__ == '__main__':
# handle1 = open_serial_port(
# port='COM4',
# baudrate=115200,
# databits=8,
# stopbits=1,
# parity=0
# )
#
# #
# # if handle1:
# # close_serial_port(handle1)
#
# # 测试校验码是否正确
# # data1 = b'\x01\x06\x62\x00\x00\x02'
# # crc1 = modbus_crc16(data1)
# # print(f"计算出的CRC16小端序十六进制{crc1.hex(' ')}") # 预期结果84 0A
# # print(f"完整Modbus指令帧{(data1 + crc1).hex(' ')}\n")
# 手动构造句柄(无硬件时)
handle_id = None
mock_handle = None
if not handle_id:
mock_handle = RTUSerialHandle(
port="COM99", baudrate=9600, databits=8, stopbits=1, parity="N"
)
mock_handle.is_open = True
mock_handle.ser = None
with RTU_HANDLE_LOCK:
handle_id = RTU_NEXT_HANDLE_ID
RTU_HANDLE_MAP[handle_id] = mock_handle
RTU_NEXT_HANDLE_ID += 1
print(f"已创建模拟句柄ID{handle_id}")
result = mock_handle.decimal_to_16bit(-30)
print("result:{result}")
write_single_register(
handle_id,
1,
0x6203,
result,
0,
1
)
# ret = write_multi_register(
# handle=handle_id,
# station_addr=1,
# start_reg_addr=0x6000,
# reg_count=3,
# write_values=[1, 2, 3],
# resp_offset=0,
# use_crc=1
# )
#
# # 步骤4验证结果
# print("\n===== 测试结果 =====")
# print(f"函数返回码:{ret} (0=成功5=超时2=CRC错误3=响应错误)")
# print(f"测试是否成功:{ret == ModbusError.MODBUS_SUCCESS}")
# 步骤5清理资源
close_serial_port(handle_id)