393 lines
14 KiB
Python
393 lines
14 KiB
Python
|
|
#!/usr/bin/env python
|
|||
|
|
# -*- coding: utf-8 -*-
|
|||
|
|
"""
|
|||
|
|
# @Time : 2026/2/10 09:42
|
|||
|
|
# @Author : reenrr
|
|||
|
|
# @File : conveyor_motor.py
|
|||
|
|
# @Desc : 传送带电机相关接口 需要测试
|
|||
|
|
"""
|
|||
|
|
import time
|
|||
|
|
|
|||
|
|
from modbus import (write_single_register, open_serial_port, RTU_HANDLE_MAP, RTU_HANDLE_LOCK, read_holding_register)
|
|||
|
|
from error_code import ModbusError
|
|||
|
|
|
|||
|
|
PORT = 'COM4'
|
|||
|
|
BAUDRATE = 115200
|
|||
|
|
DATABITS = 8
|
|||
|
|
STOPBITS = 1
|
|||
|
|
PARITY = 0
|
|||
|
|
|
|||
|
|
# --------寄存器地址---------
|
|||
|
|
MODE_REG_ADDR = 0x6200
|
|||
|
|
SPEED_REG_ADDR = 0x6203
|
|||
|
|
ACCELERATION_REG_ADDR = 0x6204
|
|||
|
|
DECELERATION_REG_ADDR = 0x6205
|
|||
|
|
START_OR_STOP_REG_ADDR = 0x6002
|
|||
|
|
# ---------相关写入值------------
|
|||
|
|
START_CMD = 0x0010
|
|||
|
|
STOP_CMD = 0x0040
|
|||
|
|
SPEED_CMD = 0x0002
|
|||
|
|
ABSOLUTE_POSITION_CMD = 0x0001
|
|||
|
|
|
|||
|
|
handle1 = open_serial_port(
|
|||
|
|
port=PORT,
|
|||
|
|
baudrate=BAUDRATE,
|
|||
|
|
databits=DATABITS,
|
|||
|
|
stopbits=STOPBITS,
|
|||
|
|
parity=PARITY
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
def _check_handle_valid():
|
|||
|
|
"""通用句柄校验函数"""
|
|||
|
|
if handle1 is None:
|
|||
|
|
err_msg = ModbusError.get_error_desc(ModbusError.MODBUS_ERR_SERIAL_HANDLE)
|
|||
|
|
print(f"错误:{err_msg}")
|
|||
|
|
raise RuntimeError(err_msg)
|
|||
|
|
|
|||
|
|
with RTU_HANDLE_LOCK:
|
|||
|
|
handle_obj = RTU_HANDLE_MAP.get(handle1)
|
|||
|
|
if not handle_obj or not handle_obj.is_open:
|
|||
|
|
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_SERIAL_HANDLE)}(句柄{handle1})"
|
|||
|
|
print(f"错误:{err_msg}")
|
|||
|
|
raise RuntimeError(err_msg)
|
|||
|
|
|
|||
|
|
def set_motor_speed(station_addr: int, speed_value: int) -> bool:
|
|||
|
|
"""
|
|||
|
|
设置伺服电机速度
|
|||
|
|
:param station_addr: 从机地址
|
|||
|
|
:param speed_value: 速度值
|
|||
|
|
:return: True--设置成功,False--设置失败
|
|||
|
|
"""
|
|||
|
|
_check_handle_valid()
|
|||
|
|
|
|||
|
|
handle_obj = RTU_HANDLE_MAP.get(handle1)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
value = handle_obj.decimal_to_16bit(speed_value)
|
|||
|
|
print(f"速度值转换:{speed_value} → {value}(0x{value:04X})")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_DATA_CONVERT)} - {str(e)}"
|
|||
|
|
print(f"错误:{err_msg}")
|
|||
|
|
raise ValueError(err_msg) from e # 转换失败抛ValueError,保留原始异常栈
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
ret_code = write_single_register(handle1, station_addr, SPEED_REG_ADDR, value, 0, 1)
|
|||
|
|
|
|||
|
|
if ret_code == ModbusError.MODBUS_SUCCESS:
|
|||
|
|
print(f"成功:电机速度设置完成(从站{station_addr},地址0x{SPEED_REG_ADDR:04X},值{value})")
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
else:
|
|||
|
|
# 写寄存器返回错误码:抛OSError
|
|||
|
|
err_desc = ModbusError.get_error_desc(
|
|||
|
|
ret_code) if ret_code in ModbusError.__members__.values() else f"未知错误码{ret_code}"
|
|||
|
|
err_msg = f"速度设置失败,{err_desc}"
|
|||
|
|
print(f"失败:{err_msg}")
|
|||
|
|
raise OSError(err_msg, ret_code) # 携带错误码,便于上层解析
|
|||
|
|
|
|||
|
|
except OSError:
|
|||
|
|
raise
|
|||
|
|
except Exception as e:
|
|||
|
|
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_REG_WRITE)} - {str(e)}"
|
|||
|
|
print(f"异常:{err_msg}")
|
|||
|
|
raise Exception(err_msg) from e
|
|||
|
|
|
|||
|
|
def set_motor_acceleration(station_addr: int, value: int) -> bool:
|
|||
|
|
"""
|
|||
|
|
设置伺服电机加速度
|
|||
|
|
:param station_addr: 从机地址
|
|||
|
|
:param value: 加速度值
|
|||
|
|
:return: True--设置成功,False--设置失败
|
|||
|
|
"""
|
|||
|
|
_check_handle_valid()
|
|||
|
|
|
|||
|
|
handle_obj = RTU_HANDLE_MAP.get(handle1)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
conv_value = handle_obj.decimal_to_16bit(value)
|
|||
|
|
print(f"加速度值转换:{value} → {conv_value}(0x{conv_value:04X})")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_DATA_CONVERT)} - {str(e)}"
|
|||
|
|
print(f"错误:{err_msg}")
|
|||
|
|
raise ValueError(err_msg) from e
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
ret_code = write_single_register(handle1, station_addr, ACCELERATION_REG_ADDR, conv_value, 0, 1)
|
|||
|
|
|
|||
|
|
if ret_code == ModbusError.MODBUS_SUCCESS:
|
|||
|
|
print(f"成功:电机加速度设置完成(从站{station_addr},地址0x{ACCELERATION_REG_ADDR:04X},值{conv_value})")
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
else:
|
|||
|
|
# 写寄存器返回错误码:抛OSError(携带错误码)
|
|||
|
|
err_desc = ModbusError.get_error_desc(
|
|||
|
|
ret_code) if ret_code in ModbusError.__members__.values() else f"未知错误码{ret_code}"
|
|||
|
|
err_msg = f"加速度设置失败,{err_desc}"
|
|||
|
|
print(f"失败:{err_msg}")
|
|||
|
|
raise OSError(err_msg, ret_code) # 错误码存入args,上层可解析
|
|||
|
|
|
|||
|
|
except OSError:
|
|||
|
|
# 主动抛出的OSError直接上抛,不做额外处理
|
|||
|
|
raise
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
# 其他未知异常:抛通用Exception,保留原始栈
|
|||
|
|
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_REG_WRITE)} - {str(e)}"
|
|||
|
|
print(f"异常:{err_msg}")
|
|||
|
|
raise Exception(err_msg) from e
|
|||
|
|
|
|||
|
|
def set_motor_deceleration(station_addr: int, value: int) -> bool:
|
|||
|
|
"""
|
|||
|
|
设置伺服电机减速度
|
|||
|
|
:param station_addr: 从机地址
|
|||
|
|
:param value: 减速度值
|
|||
|
|
:return: True--设置成功,False--设置失败
|
|||
|
|
"""
|
|||
|
|
_check_handle_valid()
|
|||
|
|
|
|||
|
|
handle_obj = RTU_HANDLE_MAP.get(handle1)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
conv_value = handle_obj.decimal_to_16bit(value)
|
|||
|
|
print(f"减速度值转换:{value} → {conv_value}(0x{conv_value:04X})")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_DATA_CONVERT)} - {str(e)}"
|
|||
|
|
print(f"错误:{err_msg}")
|
|||
|
|
raise ValueError(err_msg) from e
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
ret_code = write_single_register(handle1, station_addr, DECELERATION_REG_ADDR, conv_value, 0, 1)
|
|||
|
|
|
|||
|
|
if ret_code == ModbusError.MODBUS_SUCCESS:
|
|||
|
|
print(f"成功:电机减速度设置完成(从站{station_addr},地址0x{DECELERATION_REG_ADDR:04X},值{conv_value})")
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
else:
|
|||
|
|
err_desc = ModbusError.get_error_desc(
|
|||
|
|
ret_code) if ret_code in ModbusError.__members__.values() else f"未知错误码{ret_code}"
|
|||
|
|
err_msg = f"减速度设置失败,{err_desc}"
|
|||
|
|
print(f"失败:{err_msg}")
|
|||
|
|
raise OSError(err_msg, ret_code)
|
|||
|
|
except OSError:
|
|||
|
|
raise
|
|||
|
|
except Exception as e:
|
|||
|
|
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_REG_WRITE)} - {str(e)}"
|
|||
|
|
print(f"异常:{err_msg}")
|
|||
|
|
raise Exception(err_msg) from e
|
|||
|
|
|
|||
|
|
def set_motor_mode(station_addr: int, value: int) -> bool:
|
|||
|
|
"""
|
|||
|
|
设置电机模式
|
|||
|
|
:param station_addr: 从机地址
|
|||
|
|
:param value: 0--速度模式 1--绝对位置模式
|
|||
|
|
:return: bool 设置是否成功
|
|||
|
|
"""
|
|||
|
|
_check_handle_valid()
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
if value == 0:
|
|||
|
|
write_cmd = SPEED_CMD
|
|||
|
|
mode_desc = "速度模式"
|
|||
|
|
elif value == 1:
|
|||
|
|
write_cmd = ABSOLUTE_POSITION_CMD
|
|||
|
|
mode_desc = "绝对位置模式"
|
|||
|
|
|
|||
|
|
ret_code = write_single_register(handle1, station_addr, MODE_REG_ADDR, write_cmd, 0, 1)
|
|||
|
|
|
|||
|
|
if ret_code == ModbusError.MODBUS_SUCCESS:
|
|||
|
|
print(f"成功:电机模式设置完成(从站{station_addr},地址0x{MODE_REG_ADDR:04X},模式:{mode_desc})")
|
|||
|
|
return True
|
|||
|
|
else:
|
|||
|
|
err_desc = ModbusError.get_error_desc(
|
|||
|
|
ret_code) if ret_code in ModbusError.__members__.values() else f"未知错误码{ret_code}"
|
|||
|
|
err_msg = f"电机{mode_desc}设置失败,{err_desc}"
|
|||
|
|
print(f"失败:{err_msg}")
|
|||
|
|
raise OSError(err_msg, ret_code)
|
|||
|
|
|
|||
|
|
except OSError:
|
|||
|
|
# 主动抛出的OSError直接上抛,不做额外处理
|
|||
|
|
raise
|
|||
|
|
except Exception as e:
|
|||
|
|
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_MOTOR_MODE)} - {str(e)}"
|
|||
|
|
print(f"异常:{err_msg}")
|
|||
|
|
raise Exception(err_msg) from e
|
|||
|
|
|
|||
|
|
def get_motor_speed(station_addr: int) -> int:
|
|||
|
|
"""
|
|||
|
|
获取电机速度
|
|||
|
|
:param station_addr: 从站地址
|
|||
|
|
:return: 返回电机速度值
|
|||
|
|
"""
|
|||
|
|
_check_handle_valid()
|
|||
|
|
|
|||
|
|
handle_obj = RTU_HANDLE_MAP.get(handle1)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
reg_values = read_holding_register(handle1, station_addr, SPEED_REG_ADDR, 1, 0, [], 1)
|
|||
|
|
|
|||
|
|
if not reg_values or len(reg_values) != 1:
|
|||
|
|
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_DATA_FORMAT)},数据:{reg_values}"
|
|||
|
|
print(f"错误:{err_msg}")
|
|||
|
|
raise OSError(err_msg)
|
|||
|
|
|
|||
|
|
unsigned_speed = reg_values[0]
|
|||
|
|
signed_speed = handle_obj.uint16_to_int16(unsigned_speed)
|
|||
|
|
|
|||
|
|
print(f"成功:获取电机速度(从站{station_addr})-->无符号值:{unsigned_speed}-->有符号值:{signed_speed}")
|
|||
|
|
return signed_speed
|
|||
|
|
|
|||
|
|
except OSError:
|
|||
|
|
raise
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_REG_READ)} - {str(e)}"
|
|||
|
|
print(f"错误:{err_msg}")
|
|||
|
|
raise Exception(err_msg) from e
|
|||
|
|
|
|||
|
|
def get_motor_acceleration(station_addr: int) -> int:
|
|||
|
|
"""
|
|||
|
|
获取电机加速度
|
|||
|
|
:param station_addr: 从站地址
|
|||
|
|
:return: 电机加速度值
|
|||
|
|
"""
|
|||
|
|
_check_handle_valid()
|
|||
|
|
|
|||
|
|
handle_obj = RTU_HANDLE_MAP.get(handle1)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
reg_values = read_holding_register(handle1, station_addr, ACCELERATION_REG_ADDR, 1, 0, [], 1)
|
|||
|
|
|
|||
|
|
if not reg_values or len(reg_values) != 1:
|
|||
|
|
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_DATA_FORMAT)},数据:{reg_values}"
|
|||
|
|
print(f"错误:{err_msg}")
|
|||
|
|
raise OSError(err_msg)
|
|||
|
|
|
|||
|
|
unsigned_speed = reg_values[0]
|
|||
|
|
signed_speed = handle_obj.uint16_to_int16(unsigned_speed)
|
|||
|
|
|
|||
|
|
print(f"成功:获取电机加速度(从站{station_addr})-->无符号值:{unsigned_speed}-->有符号值:{signed_speed}")
|
|||
|
|
return signed_speed
|
|||
|
|
|
|||
|
|
except OSError:
|
|||
|
|
raise
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_REG_READ)} - {str(e)}"
|
|||
|
|
print(f"错误:{err_msg}")
|
|||
|
|
raise Exception(err_msg) from e
|
|||
|
|
|
|||
|
|
def get_motor_deceleration(station_addr: int) -> int:
|
|||
|
|
"""
|
|||
|
|
获取电机减速度
|
|||
|
|
:param station_addr: 从站地址
|
|||
|
|
:return: i电机减速度值
|
|||
|
|
"""
|
|||
|
|
_check_handle_valid()
|
|||
|
|
|
|||
|
|
handle_obj = RTU_HANDLE_MAP.get(handle1)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
reg_values = read_holding_register(handle1, station_addr, DECELERATION_REG_ADDR, 1, 0, [], 1)
|
|||
|
|
|
|||
|
|
if not reg_values or len(reg_values) != 1:
|
|||
|
|
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_DATA_FORMAT)},数据:{reg_values}"
|
|||
|
|
print(f"错误:{err_msg}")
|
|||
|
|
raise OSError(err_msg)
|
|||
|
|
|
|||
|
|
unsigned_speed = reg_values[0]
|
|||
|
|
signed_speed = handle_obj.uint16_to_int16(unsigned_speed)
|
|||
|
|
|
|||
|
|
print(f"成功:获取电机减速度(从站{station_addr})-->无符号值:{unsigned_speed}-->有符号值:{signed_speed}")
|
|||
|
|
return signed_speed
|
|||
|
|
|
|||
|
|
except OSError:
|
|||
|
|
raise
|
|||
|
|
except Exception as e:
|
|||
|
|
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_REG_READ)} - {str(e)}"
|
|||
|
|
print(f"错误:{err_msg}")
|
|||
|
|
raise Exception(err_msg) from e
|
|||
|
|
|
|||
|
|
def move_motor(station_addr: int, value: bool) -> bool:
|
|||
|
|
"""
|
|||
|
|
启停电机
|
|||
|
|
:param station_addr: 从站地址
|
|||
|
|
:param value: True--启动电机,False--停止电机
|
|||
|
|
:return: 是否设置成功
|
|||
|
|
"""
|
|||
|
|
_check_handle_valid()
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
if value:
|
|||
|
|
write_cmd = START_CMD
|
|||
|
|
op = "启动"
|
|||
|
|
else:
|
|||
|
|
write_cmd = STOP_CMD
|
|||
|
|
op = "停止"
|
|||
|
|
|
|||
|
|
ret_code = write_single_register(handle1, station_addr, START_OR_STOP_REG_ADDR, write_cmd, 0,1)
|
|||
|
|
|
|||
|
|
if ret_code == ModbusError.MODBUS_SUCCESS:
|
|||
|
|
print(f"成功:电机{op}指令已发送(从站{station_addr})")
|
|||
|
|
return True
|
|||
|
|
else:
|
|||
|
|
err_desc = ModbusError.get_error_desc(
|
|||
|
|
ret_code) if ret_code in ModbusError.__members__.values() else f"未知错误码{ret_code}"
|
|||
|
|
err_msg = f"电机{op}指令发送失败,{err_desc}"
|
|||
|
|
print(f"失败:{err_msg}")
|
|||
|
|
raise OSError(err_msg, ret_code)
|
|||
|
|
|
|||
|
|
except OSError:
|
|||
|
|
raise
|
|||
|
|
except Exception as e:
|
|||
|
|
op = "启动" if value else "停止"
|
|||
|
|
err_msg = f"异常:电机{op}指令执行失败 - {str(e)}"
|
|||
|
|
print(f"异常:{err_msg}")
|
|||
|
|
raise Exception(err_msg)
|
|||
|
|
|
|||
|
|
def sync_motor_move(value: bool):
|
|||
|
|
"""
|
|||
|
|
同步传送带电机
|
|||
|
|
:param value: 启停传送点电机 True--同步启动传送带电机 False--同步停止传送带电机
|
|||
|
|
"""
|
|||
|
|
op_desc = "启动" if value else "停止"
|
|||
|
|
|
|||
|
|
if not isinstance(value, bool):
|
|||
|
|
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_PARAM_TYPE)}(同步{op_desc}):参数必须是布尔值,当前值:{value}(类型:{type(value)})"
|
|||
|
|
print(f"错误:{err_msg}")
|
|||
|
|
raise TypeError(err_msg)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
print(f"开始同步{op_desc}传送带电机(从站1、2)...")
|
|||
|
|
move_motor(1, value)
|
|||
|
|
move_motor(2, value)
|
|||
|
|
print(f"成功:所有传送带电机已同步{op_desc}完成")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_MOTOR_SYNC)}({op_desc}):{str(e)}"
|
|||
|
|
print(f"错误:{err_msg}")
|
|||
|
|
# 抛RuntimeError,保留原始异常链,便于上层定位具体失败电机
|
|||
|
|
raise RuntimeError(err_msg) from e
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ------------调试接口----------
|
|||
|
|
if __name__ == '__main__':
|
|||
|
|
# 配置传送带电机参数 只需配置一次
|
|||
|
|
set_motor_mode(1, 0) # 配置成速度模式
|
|||
|
|
set_motor_speed(1, -30)
|
|||
|
|
set_motor_acceleration(1, 50)
|
|||
|
|
set_motor_deceleration(1, 50)
|
|||
|
|
|
|||
|
|
set_motor_mode(2, 0) # 配置成速度模式
|
|||
|
|
set_motor_speed(2, -30)
|
|||
|
|
set_motor_acceleration(2, 50)
|
|||
|
|
set_motor_deceleration(2, 50)
|
|||
|
|
|
|||
|
|
sync_motor_move(True)
|
|||
|
|
|
|||
|
|
time.sleep(1)
|
|||
|
|
|
|||
|
|
sync_motor_move(False)
|