Files
wire_controlsystem/conveyor_controller/conveyor_motor.py

397 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
# @Time : 2026/2/10 09:42
# @Author : reenrr
# @File : conveyor_motor.py
# @Desc : 传送带电机相关接口 需要测试
"""
import time
from modbus import (write_single_register, open_serial_port, RTU_HANDLE_MAP, RTU_HANDLE_LOCK, read_holding_register)
from error_code import ModbusError
PORT = 'COM4'
BAUDRATE = 115200
DATABITS = 8
STOPBITS = 1
PARITY = 0
# --------寄存器地址---------
MODE_REG_ADDR = 0x6200
SPEED_REG_ADDR = 0x6203
ACCELERATION_REG_ADDR = 0x6204
DECELERATION_REG_ADDR = 0x6205
START_OR_STOP_REG_ADDR = 0x6002
# ---------相关写入值------------
START_CMD = 0x0010
STOP_CMD = 0x0040
SPEED_CMD = 0x0002
ABSOLUTE_POSITION_CMD = 0x0001
handle1 = open_serial_port(
port=PORT,
baudrate=BAUDRATE,
databits=DATABITS,
stopbits=STOPBITS,
parity=PARITY
)
def _check_handle_valid():
"""通用句柄校验函数"""
if handle1 is None:
err_msg = ModbusError.get_error_desc(ModbusError.MODBUS_ERR_SERIAL_HANDLE)
print(f"错误:{err_msg}")
raise RuntimeError(err_msg)
with RTU_HANDLE_LOCK:
handle_obj = RTU_HANDLE_MAP.get(handle1)
if not handle_obj or not handle_obj.is_open:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_SERIAL_HANDLE)}(句柄{handle1}"
print(f"错误:{err_msg}")
raise RuntimeError(err_msg)
def set_motor_speed(station_addr: int, speed_value: int) -> bool:
"""
设置伺服电机速度
:param station_addr: 从机地址
:param speed_value: 速度值
:return: True--设置成功False--设置失败
"""
_check_handle_valid()
handle_obj = RTU_HANDLE_MAP.get(handle1)
try:
value = handle_obj.decimal_to_16bit(speed_value)
print(f"速度值转换:{speed_value}{value}0x{value:04X}")
except Exception as e:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_DATA_CONVERT)} - {str(e)}"
print(f"错误:{err_msg}")
raise ValueError(err_msg) from e # 转换失败抛ValueError保留原始异常栈
try:
ret_code = write_single_register(handle1, station_addr, SPEED_REG_ADDR, value, 0, 1)
if ret_code == ModbusError.MODBUS_SUCCESS:
print(f"成功:电机速度设置完成(从站{station_addr}地址0x{SPEED_REG_ADDR:04X},值{value}")
return True
else:
# 写寄存器返回错误码抛OSError
err_desc = ModbusError.get_error_desc(
ret_code) if ret_code in ModbusError.__members__.values() else f"未知错误码{ret_code}"
err_msg = f"速度设置失败,{err_desc}"
print(f"失败:{err_msg}")
raise OSError(err_msg, ret_code) # 携带错误码,便于上层解析
except OSError:
raise
except Exception as e:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_REG_WRITE)} - {str(e)}"
print(f"异常:{err_msg}")
raise Exception(err_msg) from e
def set_motor_acceleration(station_addr: int, value: int) -> bool:
"""
设置伺服电机加速度
:param station_addr: 从机地址
:param value: 加速度值
:return: True--设置成功False--设置失败
"""
_check_handle_valid()
handle_obj = RTU_HANDLE_MAP.get(handle1)
try:
conv_value = handle_obj.decimal_to_16bit(value)
print(f"加速度值转换:{value}{conv_value}0x{conv_value:04X}")
except Exception as e:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_DATA_CONVERT)} - {str(e)}"
print(f"错误:{err_msg}")
raise ValueError(err_msg) from e
try:
ret_code = write_single_register(handle1, station_addr, ACCELERATION_REG_ADDR, conv_value, 0, 1)
if ret_code == ModbusError.MODBUS_SUCCESS:
print(f"成功:电机加速度设置完成(从站{station_addr}地址0x{ACCELERATION_REG_ADDR:04X},值{conv_value}")
return True
else:
# 写寄存器返回错误码抛OSError携带错误码
err_desc = ModbusError.get_error_desc(
ret_code) if ret_code in ModbusError.__members__.values() else f"未知错误码{ret_code}"
err_msg = f"加速度设置失败,{err_desc}"
print(f"失败:{err_msg}")
raise OSError(err_msg, ret_code) # 错误码存入args上层可解析
except OSError:
# 主动抛出的OSError直接上抛不做额外处理
raise
except Exception as e:
# 其他未知异常抛通用Exception保留原始栈
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_REG_WRITE)} - {str(e)}"
print(f"异常:{err_msg}")
raise Exception(err_msg) from e
def set_motor_deceleration(station_addr: int, value: int) -> bool:
"""
设置伺服电机减速度
:param station_addr: 从机地址
:param value: 减速度值
:return: True--设置成功False--设置失败
"""
_check_handle_valid()
handle_obj = RTU_HANDLE_MAP.get(handle1)
try:
conv_value = handle_obj.decimal_to_16bit(value)
print(f"减速度值转换:{value}{conv_value}0x{conv_value:04X}")
except Exception as e:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_DATA_CONVERT)} - {str(e)}"
print(f"错误:{err_msg}")
raise ValueError(err_msg) from e
try:
ret_code = write_single_register(handle1, station_addr, DECELERATION_REG_ADDR, conv_value, 0, 1)
if ret_code == ModbusError.MODBUS_SUCCESS:
print(f"成功:电机减速度设置完成(从站{station_addr}地址0x{DECELERATION_REG_ADDR:04X},值{conv_value}")
return True
else:
err_desc = ModbusError.get_error_desc(
ret_code) if ret_code in ModbusError.__members__.values() else f"未知错误码{ret_code}"
err_msg = f"减速度设置失败,{err_desc}"
print(f"失败:{err_msg}")
raise OSError(err_msg, ret_code)
except OSError:
raise
except Exception as e:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_REG_WRITE)} - {str(e)}"
print(f"异常:{err_msg}")
raise Exception(err_msg) from e
def set_motor_mode(station_addr: int, value: int) -> bool:
"""
设置电机模式
:param station_addr: 从机地址
:param value: 0--速度模式 1--绝对位置模式
:return: bool 设置是否成功
"""
_check_handle_valid()
try:
if value == 0:
write_cmd = SPEED_CMD
mode_desc = "速度模式"
elif value == 1:
write_cmd = ABSOLUTE_POSITION_CMD
mode_desc = "绝对位置模式"
ret_code = write_single_register(handle1, station_addr, MODE_REG_ADDR, write_cmd, 0, 1)
if ret_code == ModbusError.MODBUS_SUCCESS:
print(f"成功:电机模式设置完成(从站{station_addr}地址0x{MODE_REG_ADDR:04X},模式:{mode_desc}")
return True
else:
err_desc = ModbusError.get_error_desc(
ret_code) if ret_code in ModbusError.__members__.values() else f"未知错误码{ret_code}"
err_msg = f"电机{mode_desc}设置失败,{err_desc}"
print(f"失败:{err_msg}")
raise OSError(err_msg, ret_code)
except OSError:
# 主动抛出的OSError直接上抛不做额外处理
raise
except Exception as e:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_MOTOR_MODE)} - {str(e)}"
print(f"异常:{err_msg}")
raise Exception(err_msg) from e
def get_motor_speed(station_addr: int) -> int:
"""
获取电机速度
:param station_addr: 从站地址
:return: 返回电机速度值
"""
_check_handle_valid()
handle_obj = RTU_HANDLE_MAP.get(handle1)
try:
reg_values = read_holding_register(handle1, station_addr, SPEED_REG_ADDR, 1, 0, [], 1)
if not reg_values or len(reg_values) != 1:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_DATA_FORMAT)},数据:{reg_values}"
print(f"错误:{err_msg}")
raise OSError(err_msg)
unsigned_speed = reg_values[0]
signed_speed = handle_obj.uint16_to_int16(unsigned_speed)
print(f"成功:获取电机速度(从站{station_addr})-->无符号值:{unsigned_speed}-->有符号值:{signed_speed}")
return signed_speed
except OSError:
raise
except Exception as e:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_REG_READ)} - {str(e)}"
print(f"错误:{err_msg}")
raise Exception(err_msg) from e
def get_motor_acceleration(station_addr: int) -> int:
"""
获取电机加速度
:param station_addr: 从站地址
:return: 电机加速度值
"""
_check_handle_valid()
handle_obj = RTU_HANDLE_MAP.get(handle1)
try:
reg_values = read_holding_register(handle1, station_addr, ACCELERATION_REG_ADDR, 1, 0, [], 1)
if not reg_values or len(reg_values) != 1:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_DATA_FORMAT)},数据:{reg_values}"
print(f"错误:{err_msg}")
raise OSError(err_msg)
unsigned_speed = reg_values[0]
signed_speed = handle_obj.uint16_to_int16(unsigned_speed)
print(f"成功:获取电机加速度(从站{station_addr})-->无符号值:{unsigned_speed}-->有符号值:{signed_speed}")
return signed_speed
except OSError:
raise
except Exception as e:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_REG_READ)} - {str(e)}"
print(f"错误:{err_msg}")
raise Exception(err_msg) from e
def get_motor_deceleration(station_addr: int) -> int:
"""
获取电机减速度
:param station_addr: 从站地址
:return: i电机减速度值
"""
_check_handle_valid()
handle_obj = RTU_HANDLE_MAP.get(handle1)
try:
reg_values = read_holding_register(handle1, station_addr, DECELERATION_REG_ADDR, 1, 0, [], 1)
if not reg_values or len(reg_values) != 1:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_DATA_FORMAT)},数据:{reg_values}"
print(f"错误:{err_msg}")
raise OSError(err_msg)
unsigned_speed = reg_values[0]
signed_speed = handle_obj.uint16_to_int16(unsigned_speed)
print(f"成功:获取电机减速度(从站{station_addr})-->无符号值:{unsigned_speed}-->有符号值:{signed_speed}")
return signed_speed
except OSError:
raise
except Exception as e:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_REG_READ)} - {str(e)}"
print(f"错误:{err_msg}")
raise Exception(err_msg) from e
def move_motor(station_addr: int, value: bool) -> bool:
"""
启停电机
:param station_addr: 从站地址
:param value: True--启动电机False--停止电机
:return: 是否设置成功
"""
_check_handle_valid()
try:
if value:
write_cmd = START_CMD
op = "启动"
else:
write_cmd = STOP_CMD
op = "停止"
ret_code = write_single_register(handle1, station_addr, START_OR_STOP_REG_ADDR, write_cmd, 0,1)
if ret_code == ModbusError.MODBUS_SUCCESS:
print(f"成功:电机{op}指令已发送(从站{station_addr}")
return True
else:
err_desc = ModbusError.get_error_desc(
ret_code) if ret_code in ModbusError.__members__.values() else f"未知错误码{ret_code}"
err_msg = f"电机{op}指令发送失败,{err_desc}"
print(f"失败:{err_msg}")
raise OSError(err_msg, ret_code)
except OSError:
raise
except Exception as e:
op = "启动" if value else "停止"
err_msg = f"异常:电机{op}指令执行失败 - {str(e)}"
print(f"异常:{err_msg}")
raise Exception(err_msg)
def sync_motor_move(value: bool):
"""
同步传送带电机
:param value: 启停传送点电机 True--同步启动传送带电机 False--同步停止传送带电机
"""
op_desc = "启动" if value else "停止"
if not isinstance(value, bool):
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_PARAM_TYPE)}(同步{op_desc}):参数必须是布尔值,当前值:{value}(类型:{type(value)}"
print(f"错误:{err_msg}")
raise TypeError(err_msg)
try:
print(f"开始同步{op_desc}传送带电机从站1、2...")
move_motor(1, value)
move_motor(2, value)
print(f"成功:所有传送带电机已同步{op_desc}完成")
except Exception as e:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_MOTOR_SYNC)}{op_desc}{str(e)}"
print(f"错误:{err_msg}")
# 抛RuntimeError保留原始异常链便于上层定位具体失败电机
raise RuntimeError(err_msg) from e
# ------------调试接口----------
if __name__ == '__main__':
# 配置传送带电机参数 只需配置一次
set_motor_mode(1, 0) # 配置成速度模式
set_motor_speed(1, -30)
set_motor_acceleration(1, 50)
set_motor_deceleration(1, 50)
set_motor_mode(2, 0) # 配置成速度模式
set_motor_speed(2, -30)
set_motor_acceleration(2, 50)
set_motor_deceleration(2, 50)
sync_motor_move(True)
time.sleep(1)
sync_motor_move(False)
time.sleep(0.5)
while(True):
time.sleep(1)