From 8d2e91b8b3cad306fc60316827d2fa74a2f5e297 Mon Sep 17 00:00:00 2001 From: pengqi Date: Thu, 26 Feb 2026 15:42:22 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BC=A0=E9=80=81=E5=B8=A6?= =?UTF-8?q?=E7=94=B5=E6=9C=BA=E3=80=81=E8=88=B5=E6=9C=BA=E3=80=81=E8=BE=BE?= =?UTF-8?q?=E5=A6=99=E7=94=B5=E6=9C=BA=E7=AD=89=E7=9B=B8=E5=85=B3=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DM/DM_Motor.py | 65 ++- DM/DM_Motor_controller.py | 247 +++++++++ DM/DM_Motor_test.py | 257 ---------- EMV/EMV_test.py | 50 +- conveyor_controller/conveyor_motor.py | 392 +++++++++++++++ conveyor_controller/error_code.py | 68 +++ conveyor_controller/modbus.py | 698 ++++++++++++++++++++++++++ main_control.py | 2 +- servo/servo_control.py | 145 ++++++ servo/servo_test.py | 49 +- 10 files changed, 1642 insertions(+), 331 deletions(-) create mode 100644 DM/DM_Motor_controller.py delete mode 100644 DM/DM_Motor_test.py create mode 100644 conveyor_controller/conveyor_motor.py create mode 100644 conveyor_controller/error_code.py create mode 100644 conveyor_controller/modbus.py create mode 100644 servo/servo_control.py diff --git a/DM/DM_Motor.py b/DM/DM_Motor.py index b5ca27b..e172675 100644 --- a/DM/DM_Motor.py +++ b/DM/DM_Motor.py @@ -22,7 +22,7 @@ class DMMotorController: """ 初始化电机控制器 :param slave_id: 从机ID,默认0x01 - :param master_id: 主机ID,默认0x11(不要设为0x00) + :param master_id: 主机ID,默认0x11 :param port: 串口端口,默认COM6 :param baudrate: 波特率,默认921600 """ @@ -40,7 +40,6 @@ class DMMotorController: # 初始化电机和串口 self.init_motor() - def init_motor(self): """初始化电机和串口""" try: @@ -112,8 +111,8 @@ class DMMotorController: :param v_desired: 目标速度(rad/s, 范围[-30, 30]) """ try: - # 归零 + 发送运动指令 - self.motor_control.set_zero_position(self.motor) + # 发送运动指令 + # self.motor_control.set_zero_position(self.motor) self.motor_control.control_Pos_Vel(self.motor, p_desired, v_desired) time.sleep(0.1) print(f"✅ 位置-速度控制:位置={p_desired}rad | 速度={v_desired}rad/s") @@ -135,7 +134,10 @@ class DMMotorController: return False def _get_mode_name(self, control_type): - """获取模式名称""" + """ + 获取模式名称 + :param control_type: 控制模式(Control_Type.POS_VEl/VEL/MIT) + """ # 1.定义[枚举值--中文名称]的映射字典 mode_map = { Control_Type.POS_VEL: "位置-速度模式", @@ -145,15 +147,55 @@ class DMMotorController: # 2.根据控制模式值获取中文名称 字典方法 return mode_map.get(control_type, "未知模式") # “未知模式”默认值 - def get_position(self): - self.refresh() - return self.motor.getPosition() + def save_param(self): + """保存所有电机参数""" + try: + if self.motor is None: + raise ValueError("电机实例为None,无法保存参数") - def refresh(self): - self.motor_control.refresh_motor_status(self.motor) + self.motor_control.save_motor_param(self.motor) + print("电机参数保存成功") + except Exception as e: + print(f"❌ 电机参数保存失败:{str(e)}") + + def refresh_motor_status(self): + """获得电机状态""" + try: + if self.motor is None: + raise ValueError("电机实例为None,无法保存参数") + + self.motor_control.refresh_motor_status(self.motor) + print("电机状态刷新成功") + except Exception as e: + print(f"❌ 电机状态刷新失败:{str(e)}") + + def get_position(self): + """获取电机位置""" + try: + if self.motor is None: + raise ValueError("电机实例为None,无法保存参数") + + position = self.motor.getPosition() + print(f"获取电机位置成功,当前位置: {position}") + return position + except Exception as e: + print(f"获取电机位置失败: {str(e)}") def change_limit_param(self, motor_type, pmax, vmax, tmax): - self.motor_control.change_limit_param(motor_type, pmax, vmax, tmax) + """ + 改变电机的PMAX VMAX TMAX + :param motor_type: 电机的类型 + :param pmax: 电机的PMAX + :param vmax: 电机的VMAX + :param tmax: 电机的TAMX + """ + try: + self.motor_control.change_limit_param(motor_type, pmax, vmax, tmax) + print( + f"电机限位参数修改成功 | 类型: {motor_type} | PMAX: {pmax} | VMAX: {vmax} | TMAX: {tmax}" + ) + except Exception as e: + print(f"修改电机限位参数失败: {str(e)}") def __del__(self): """析构函数:确保程序退出时失能电机、关闭串口""" @@ -168,7 +210,6 @@ class DMMotorController: except Exception as e: print(f"ℹ️ 析构函数执行警告:{str(e)}") - if __name__ == '__main__': # 1.创建电机控制器实例 motor_controller = DMMotorController( diff --git a/DM/DM_Motor_controller.py b/DM/DM_Motor_controller.py new file mode 100644 index 0000000..ac55f1b --- /dev/null +++ b/DM/DM_Motor_controller.py @@ -0,0 +1,247 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2026/1/6 13:55 +# @Author : reenrr +# @File : DM_Motor_test.py +# @Desc : 达妙电机测试 +''' +from DM_CAN import * +from DM_Motor import DMMotorController + +import time + +# -------------------------- 电机参数配置 -------------------------- +SLAVE_ID = 0x01 +MASTER_ID = 0x11 +PORT = 'COM10' +BAUDRATE = 921600 + +ORI_P_DESIRED = 0.0 # 吸附位置 +DROP_P_DESIRED = 0.0 # 放置位置 +V_DESIRED = 30 +P_DESIRED_MIN = -300 +P_DESIRED_MAX = 300 +V_DESIRED_MIN = -30 +V_DESIRED_MAX = 30 + +ORIGIN_POSITION = 0.0 # 电机原点位置 +POSITION_TOLERANCE = 0.1 # 位置误差阈值 + + +class DMMotorInterface: + def __init__(self): + """初始化电机控制器,切换模式并使能电机""" + self.motor_controller = DMMotorController( + slave_id=SLAVE_ID, + master_id=MASTER_ID, + port=PORT, + baudrate=BAUDRATE + ) + + try: + # 切换到位置-速度模式 + self.motor_controller.switch_control_mode(Control_Type.POS_VEL) + # 使能电机 + self.motor_controller.enable_motor() + except Exception as e: + raise RuntimeError(f"电机初始化失败:{str(e)}") from e + +# 全局变量 +_DMMotor = DMMotorInterface() + +def set_dm_motor_speed(value: int) -> bool: + """ + 设置达妙电机速度 + :param value: 速度值 + :return: 是否设置成功 + """ + global V_DESIRED_MIN, V_DESIRED_MAX, V_DESIRED + + try: + _min = V_DESIRED_MIN + _max = V_DESIRED_MAX + except NameError as e: + err_msg = f"速度限位常量未定义:{str(e)},请检查V_DESIRED_MIN/V_DESIRED_MAX的定义" + print(f"错误:{err_msg}") + raise Exception(err_msg) from e + + if not (_min <= value <= _max): + err_msg = f"速度值超出合法范围:{value},合法范围[{_min}, {_max}]" + print(f"错误:{err_msg}") + # 可选:若需要上层感知超限,改为抛ValueError(替代return False) + # raise ValueError(err_msg) + return False + + try: + old_value = V_DESIRED + V_DESIRED = value + print(f"成功:电机速度已更新,旧值={old_value},新值={value}(范围[{_min}, {_max}])") + return True + + except Exception as e: + # 5. 异常处理:保留原始异常链,加详细日志 + err_msg = f"设置电机速度失败:{str(e)}(尝试设置的值:{value})" + print(f"异常:{err_msg}") + # 保留原始异常链,便于上层定位根因 + raise Exception(err_msg) from e + +def set_ori_position(value: float) ->bool: + """ + 设置吸附位置 + :param value: 设置吸附位置值 + """ + global ORI_P_DESIRED, P_DESIRED_MIN, P_DESIRED_MAX + + try: + _min = P_DESIRED_MIN + _max = P_DESIRED_MAX + except NameError as e: + err_msg = f"位置限位常量未定义:{str(e)},请检查P_DESIRED_MIN/P_DESIRED_MAX的定义" + print(f"错误:{err_msg}") + raise Exception(err_msg) from e + + if not (_min <= value <= _max): + err_msg = f"位置值超出合法范围:{value},合法范围[{_min}, {_max}]" + print(f"错误:{err_msg}") + # 可选:若需要上层感知超限,改为抛ValueError(替代return False) + # raise ValueError(err_msg) + return False + + try: + old_value = ORI_P_DESIRED + ORI_P_DESIRED = value + print(f"成功:电机速度已更新,旧值={old_value},新值={value}(范围[{_min}, {_max}])") + return True + + except Exception as e: + err_msg = f"设置电机速度失败:{str(e)}(尝试设置的值:{value})" + print(f"异常:{err_msg}") + # 保留原始异常链,便于上层定位根因 + raise Exception(err_msg) from e + +def set_drop_position(value: float) -> bool: + """ + 设置放置位置 + :param value: 设置放置位置 + """ + global DROP_P_DESIRED, P_DESIRED_MIN, P_DESIRED_MAX + + try: + _min = P_DESIRED_MIN + _max = P_DESIRED_MAX + except NameError as e: + err_msg = f"位置限位常量未定义:{str(e)},请检查P_DESIRED_MIN/P_DESIRED_MAX的定义" + print(f"错误:{err_msg}") + raise Exception(err_msg) from e + + if not (_min <= value <= _max): + err_msg = f"位置值超出合法范围:{value},合法范围[{_min}, {_max}]" + print(f"错误:{err_msg}") + # 可选:若需要上层感知超限,改为抛ValueError(替代return False) + # raise ValueError(err_msg) + return False + + try: + old_value = DROP_P_DESIRED + DROP_P_DESIRED = value + print(f"成功:电机速度已更新,旧值={old_value},新值={value}(范围[{_min}, {_max}])") + return True + + except Exception as e: + # 5. 异常处理:保留原始异常链,加详细日志 + err_msg = f"设置电机速度失败:{str(e)}(尝试设置的值:{value})" + print(f"异常:{err_msg}") + # 保留原始异常链,便于上层定位根因 + raise Exception(err_msg) from e + +def move_to_ori_position(): + """ + 移动到吸附位置 + """ + try: + print(f"开始执行--移动到吸附位置:目标位置={ORI_P_DESIRED},速度={V_DESIRED}") + + _DMMotor.motor_controller.control_pos_vel(ORI_P_DESIRED, V_DESIRED) + time.sleep(5) # 等待电机移动完成 + print(f"吸附位置移动指令已发送") + + except Exception as e: + # 封装异常信息,保留原始异常链 + err_msg = f"移动到吸附位置失败:{str(e)}(目标位置={ORI_P_DESIRED},速度={V_DESIRED})" + print(f"❌ 异常:{err_msg}") + raise RuntimeError(err_msg) from e + +def move_to_drop_position(): + """ + 移动到放置位置 + """ + try: + print(f"开始执行--移动到放置位置:目标位置={DROP_P_DESIRED},速度={V_DESIRED}") + + _DMMotor.motor_controller.control_pos_vel(DROP_P_DESIRED, V_DESIRED) + time.sleep(5) # 等待电机移动完成 + print(f"放置位置移动指令已发送") + + except Exception as e: + # 封装异常信息,保留原始异常链 + err_msg = f"移动到放置位置失败:{str(e)}(目标位置={DROP_P_DESIRED},速度={V_DESIRED})" + print(f"❌ 异常:{err_msg}") + raise RuntimeError(err_msg) from e + +def set_zero_position() -> bool: + """ + 电机零点标定:将当前物理位置设置为软件0位 + 注意:调用前需确保电机处于稳定的物理零点位置 + """ + try: + print("🔍 开始执行电机零点标定...") + result = _DMMotor.motor_controller.motor_control.set_zero_position(_DMMotor.motor_controller.motor) + + if result: + # 保存标定参数(关键:否则断电后零点丢失) + _DMMotor.motor_controller.save_param() + print("✅ 零点标定成功!当前位置已设为0位,参数已保存") + return True + else: + print("❌ 零点标定失败!") + return False + + except Exception as e: + err_msg = f"零点标定异常:{str(e)}" + print(f"❌ {err_msg}") + raise RuntimeError(err_msg) from e + +def reset_motor(): + """复位达妙电机,防止断电""" + try: + # 读取断电后的当前位置 + print("开始电机复位流程:读取当前位置...") + current_pos = _DMMotor.motor_controller.get_position() + print(f"✅ 读取当前位置成功:{current_pos}") + + # 计算需要移动的偏移量(核心逻辑) + if abs(current_pos) <= POSITION_TOLERANCE: + print(f"ℹ️ 当前位置已在原点附近(误差={abs(current_pos)} ≤ {POSITION_TOLERANCE}),无需复位") + return True + + reset_target_p = -current_pos # 重点修改:p_desired = -当前位置 + print(f"📌 复位路径计算:当前位置={current_pos} → 需设置p_desired={reset_target_p} → 回到0位") + + print(f"🚀 开始复位移动:p_desired={reset_target_p},速度={V_DESIRED} rpm") + _DMMotor.motor_controller.control_pos_vel(p_desired=reset_target_p, v_desired=V_DESIRED) + time.sleep(5) # 等待电机复位 + + except Exception as e: + err_msg = f"电机复位失败:{str(e)}" + print(f"❌ 错误:{err_msg}") + raise RuntimeError(err_msg) from e + +# ---------调试接口---------- +if __name__ == '__main__': + # 首先在吸附位置处进行 设置零点位置 + set_zero_position() + + set_dm_motor_speed(30) + set_ori_position(282.6) + move_to_ori_position() diff --git a/DM/DM_Motor_test.py b/DM/DM_Motor_test.py deleted file mode 100644 index af6acb6..0000000 --- a/DM/DM_Motor_test.py +++ /dev/null @@ -1,257 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -''' -# @Time : 2026/1/6 13:55 -# @Author : reenrr -# @File : DM_Motor_test.py -# @Desc : 达妙电机测试 -''' -from DM_CAN import * -import serial -import time - -# -------------------------- 电机参数配置 -------------------------- -SLAVE_ID = 0x01 -MASTER_ID = 0x11 -PORT = 'COM10' -BAUDRATE = 921600 - - -class DMMotorController: - """达妙电机控制器类""" - def __init__(self, slave_id=None, master_id=None, port=None, baudrate=None): - """ - 初始化电机控制器 - :param slave_id: 从机ID,默认0x01 - :param master_id: 主机ID,默认0x11 - :param port: 串口端口,默认COM6 - :param baudrate: 波特率,默认921600 - """ - # 初始化参数 - self.slave_id = slave_id if slave_id is not None else SLAVE_ID - self.master_id = master_id if master_id is not None else MASTER_ID - self.port = port if port is not None else PORT - self.baudrate = baudrate if baudrate is not None else BAUDRATE - - # 核心属性初始化 - self.serial_device = None # 串口设备 - self.motor = None # 电机实例 - self.motor_control = None # 电机控制器实例 - - # 初始化电机和串口 - self.init_motor() - - def init_motor(self): - """初始化电机和串口""" - try: - # 1.初始化串口 - self.serial_device = serial.Serial( - port=self.port, - baudrate=self.baudrate, - timeout=0.5 - ) - - # 2.创建电机实例 - self.motor = Motor(DM_Motor_Type.DM4310, self.slave_id, self.master_id) - # 3.创建电机控制器实例 - self.motor_control = MotorControl(self.serial_device) - # 4.添加电机 - self.motor_control.addMotor(self.motor) - - print(f"✅ 电机初始化成功") - print(f" - 串口:{self.port} | 波特率:{self.baudrate}") - print(f" - 从机ID:{hex(self.slave_id)} | 主机ID:{hex(self.master_id)}") - - except Exception as e: - raise RuntimeError(f"❌ 电机初始化失败:{e}") - - def switch_control_mode(self, control_type): - """ - 切换电机控制模式 - :param control_type: 控制模式(Control_Type.POS_VEl/VEL/MIT) - :return: 切换成功返回True,否则返回False - """ - try: - result = self.motor_control.switchControlMode(self.motor, control_type) - mode_name = self._get_mode_name(control_type) - if result: - print(f"✅ 切换到{mode_name}模式成功") - # 切换模式后保存参数 - self.motor_control.save_motor_param(self.motor) - else: - print(f"❌ 切换到{mode_name}模式失败") - return result - except Exception as e: - print(f"❌ 切换模式出错:{str(e)}") - return False - - def enable_motor(self): - """使能电机""" - try: - self.motor_control.enable(self.motor) - print("✅ 电机使能成功") - return True - except Exception as e: - print(f"❌ 电机使能失败:{str(e)}") - return False - - def disable_motor(self): - """失能电机""" - try: - self.motor_control.disable(self.motor) - print("✅ 电机失能成功") - return True - except Exception as e: - print(f"❌ 电机失能失败:{str(e)}") - return False - - def control_pos_vel(self, p_desired, v_desired): - """ - 位置-速度模式控制 - :param p_desired: 目标位置(rad, 范围[-300, 300]) - :param v_desired: 目标速度(rad/s, 范围[-30, 30]) - """ - try: - # 发送运动指令 - # self.motor_control.set_zero_position(self.motor) - self.motor_control.control_Pos_Vel(self.motor, p_desired, v_desired) - time.sleep(0.1) - print(f"✅ 位置-速度控制:位置={p_desired}rad | 速度={v_desired}rad/s") - return True - - except Exception as e: - print(f"❌ 位置-速度控制出错:{str(e)}") - return False - - def close_serial(self): - """关闭串口""" - try: - if self.serial_device and self.serial_device.is_open: - self.serial_device.close() - print("✅ 串口关闭成功") - return True - except Exception as e: - print(f"❌ 串口关闭失败:{str(e)}") - return False - - def _get_mode_name(self, control_type): - """ - 获取模式名称 - :param control_type: 控制模式(Control_Type.POS_VEl/VEL/MIT) - """ - # 1.定义[枚举值--中文名称]的映射字典 - mode_map = { - Control_Type.POS_VEL: "位置-速度模式", - Control_Type.VEL: "速度模式", - Control_Type.MIT: "MIT模式" - } - # 2.根据控制模式值获取中文名称 字典方法 - return mode_map.get(control_type, "未知模式") # “未知模式”默认值 - - def save_param(self): - """保存所有电机参数""" - try: - if self.motor is None: - raise ValueError("电机实例为None,无法保存参数") - - self.motor_control.save_motor_param(self.motor) - print("电机参数保存成功") - except Exception as e: - print(f"❌ 电机参数保存失败:{str(e)}") - - def refresh_motor_status(self): - """获得电机状态""" - try: - if self.motor is None: - raise ValueError("电机实例为None,无法保存参数") - - self.motor_control.refresh_motor_status(self.motor) - print("电机状态刷新成功") - except Exception as e: - print(f"❌ 电机状态刷新失败:{str(e)}") - - def get_position(self): - """获取电机位置""" - try: - if self.motor is None: - raise ValueError("电机实例为None,无法保存参数") - - position = self.motor.getPosition() - print(f"获取电机位置成功,当前位置: {position}") - return position - except Exception as e: - print(f"获取电机位置失败: {str(e)}") - - def change_limit_param(self, motor_type, pmax, vmax, tmax): - """ - 改变电机的PMAX VMAX TMAX - :param motor_type: 电机的类型 - :param pmax: 电机的PMAX - :param vmax: 电机的VMAX - :param tmax: 电机的TAMX - """ - try: - self.motor_control.change_limit_param(motor_type, pmax, vmax, tmax) - print( - f"电机限位参数修改成功 | 类型: {motor_type} | PMAX: {pmax} | VMAX: {vmax} | TMAX: {tmax}" - ) - except Exception as e: - print(f"修改电机限位参数失败: {str(e)}") - - - def __del__(self): - """析构函数:确保程序退出时失能电机、关闭串口""" - try: - # 先检查串口是否打开,避免重复操作 - if self.serial_device and self.serial_device.is_open: - self.disable_motor() - self.close_serial() - else: - # 串口已关闭,无需重复操作,仅打印日志 - print("ℹ️ 串口已关闭,析构函数无需重复释放资源") - except Exception as e: - print(f"ℹ️ 析构函数执行警告:{str(e)}") - -def dm_motor_control(): - # 1.创建电机控制器实例 - motor_controller = DMMotorController( - slave_id=SLAVE_ID, - master_id=MASTER_ID, - port=PORT, - baudrate=BAUDRATE - ) - - try: - # 切换到位置-速度模式 - motor_controller.switch_control_mode(Control_Type.POS_VEL) - - # 使能电机 - motor_controller.enable_motor() - - # 循环控制电机 - while True: - print("运动前的位置", motor_controller.get_position()) # 需要测试断电后是否能读取得到 - motor_controller.control_pos_vel(p_desired=282.6, v_desired=30) # 450mm 665-215 - time.sleep(20) - - motor_controller.refresh_motor_status() - print("运动1的位置", motor_controller.get_position()) # 刷新的比较慢,可以等位置不变一段时间之后,再获取位置 - motor_controller.refresh_motor_status() - - motor_controller.control_pos_vel(p_desired=-282.6, v_desired=30) - time.sleep(20) - - - except KeyboardInterrupt: - print("\n⚠️ 用户手动停止程序") - except Exception as e: - print(f"\n❌ 程序运行出错:{str(e)}") - finally: - # 5. 无论是否出错,最终都要失能电机、关闭串口 - motor_controller.disable_motor() - motor_controller.close_serial() - print("✅ 程序正常退出") - -# ---------调试接口---------- -if __name__ == '__main__': - dm_motor_control() \ No newline at end of file diff --git a/EMV/EMV_test.py b/EMV/EMV_test.py index c72a830..0689d90 100644 --- a/EMV/EMV_test.py +++ b/EMV/EMV_test.py @@ -116,6 +116,15 @@ class RelayController: self.sock = None self.is_connected = False # 长连接状态标记 + # 初始化时自动尝试建立长连接 + print(f"正在初始化网络继电器并建立长连接") + connect_success = self.connect() + + if connect_success: + print(f"网络继电器长连接建立成功:{HOST}:{PORT}") + else: + print(f"网络继电器长连接建立失败") + def connect(self) -> bool: """ 建立长连接 @@ -241,27 +250,8 @@ class RelayController: # 全局变量 -GLOBAL_RELAY = None - -# --------对外接口-------- -def init_relay(): - """初始化网络继电器实例+建立长连接""" - global GLOBAL_RELAY - - try: - GLOBAL_RELAY = RelayController() - GLOBAL_RELAY.connect() - - except Exception as e: - raise ValueError(f"初始化失败{e}") - -def deinit_relay(): - """断开长连接""" - global GLOBAL_RELAY - - if GLOBAL_RELAY is not None: - GLOBAL_RELAY.disconnect() - GLOBAL_RELAY = None +# private +_GLOBAL_RELAY = RelayController() def ng_push(): """NG推料流程""" @@ -288,8 +278,7 @@ def write_do(device_name: str, state: bool): :param device_name: 设备名称 :param state: True:打开 False:关闭 """ - global GLOBAL_RELAY - if GLOBAL_RELAY is None: + if _GLOBAL_RELAY is None: raise ValueError("未初始化实例") # 验证设备是否存在 @@ -298,12 +287,12 @@ def write_do(device_name: str, state: bool): raise ValueError(f"无效的设备名 '{device_name}'。有效设备: {valid_devices}") # 确保已连接 - if not GLOBAL_RELAY.is_connected: - if not GLOBAL_RELAY.connect(): + if not _GLOBAL_RELAY.is_connected: + if not _GLOBAL_RELAY.connect(): raise RuntimeError("无法连接到网络继电器") try: - GLOBAL_RELAY.set_device(device_name, state) + _GLOBAL_RELAY.set_device(device_name, state) except Exception as e: raise RuntimeError(f"控制设备 '{device_name}' 失败: {e}") @@ -313,13 +302,12 @@ def read_all_io() -> dict[str, dict[str, bool]]: 读取所有DI(传感器)和DO(设备)状态 :return: {'devices': {...}, 'sensors': {...}} """ - global GLOBAL_RELAY - if GLOBAL_RELAY is None: + if _GLOBAL_RELAY is None: raise ValueError("未初始化") try: - devices = GLOBAL_RELAY.get_all_device_status('devices') - sensors = GLOBAL_RELAY.get_all_device_status('sensors') + devices = _GLOBAL_RELAY.get_all_device_status('devices') + sensors = _GLOBAL_RELAY.get_all_device_status('sensors') return {'devices': devices, 'sensors': sensors} except Exception as e: @@ -328,7 +316,6 @@ def read_all_io() -> dict[str, dict[str, bool]]: # ------------测试接口------------- if __name__ == '__main__': - init_relay() write_do(SOLENOID_VALVE1, True) @@ -339,7 +326,6 @@ if __name__ == '__main__': status_str = "开启" if status else "关闭" print(f"{device_name_map.get(name, name)}: {status_str}") - deinit_relay() diff --git a/conveyor_controller/conveyor_motor.py b/conveyor_controller/conveyor_motor.py new file mode 100644 index 0000000..9e522bb --- /dev/null +++ b/conveyor_controller/conveyor_motor.py @@ -0,0 +1,392 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +# @Time : 2026/2/10 09:42 +# @Author : reenrr +# @File : conveyor_motor.py +# @Desc : 传送带电机相关接口 需要测试 +""" +import time + +from modbus import (write_single_register, open_serial_port, RTU_HANDLE_MAP, RTU_HANDLE_LOCK, read_holding_register) +from error_code import ModbusError + +PORT = 'COM4' +BAUDRATE = 115200 +DATABITS = 8 +STOPBITS = 1 +PARITY = 0 + +# --------寄存器地址--------- +MODE_REG_ADDR = 0x6200 +SPEED_REG_ADDR = 0x6203 +ACCELERATION_REG_ADDR = 0x6204 +DECELERATION_REG_ADDR = 0x6205 +START_OR_STOP_REG_ADDR = 0x6002 +# ---------相关写入值------------ +START_CMD = 0x0010 +STOP_CMD = 0x0040 +SPEED_CMD = 0x0002 +ABSOLUTE_POSITION_CMD = 0x0001 + +handle1 = open_serial_port( + port=PORT, + baudrate=BAUDRATE, + databits=DATABITS, + stopbits=STOPBITS, + parity=PARITY +) + +def _check_handle_valid(): + """通用句柄校验函数""" + if handle1 is None: + err_msg = ModbusError.get_error_desc(ModbusError.MODBUS_ERR_SERIAL_HANDLE) + print(f"错误:{err_msg}") + raise RuntimeError(err_msg) + + with RTU_HANDLE_LOCK: + handle_obj = RTU_HANDLE_MAP.get(handle1) + if not handle_obj or not handle_obj.is_open: + err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_SERIAL_HANDLE)}(句柄{handle1})" + print(f"错误:{err_msg}") + raise RuntimeError(err_msg) + +def set_motor_speed(station_addr: int, speed_value: int) -> bool: + """ + 设置伺服电机速度 + :param station_addr: 从机地址 + :param speed_value: 速度值 + :return: True--设置成功,False--设置失败 + """ + _check_handle_valid() + + handle_obj = RTU_HANDLE_MAP.get(handle1) + + try: + value = handle_obj.decimal_to_16bit(speed_value) + print(f"速度值转换:{speed_value} → {value}(0x{value:04X})") + + except Exception as e: + err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_DATA_CONVERT)} - {str(e)}" + print(f"错误:{err_msg}") + raise ValueError(err_msg) from e # 转换失败抛ValueError,保留原始异常栈 + + try: + ret_code = write_single_register(handle1, station_addr, SPEED_REG_ADDR, value, 0, 1) + + if ret_code == ModbusError.MODBUS_SUCCESS: + print(f"成功:电机速度设置完成(从站{station_addr},地址0x{SPEED_REG_ADDR:04X},值{value})") + return True + + else: + # 写寄存器返回错误码:抛OSError + err_desc = ModbusError.get_error_desc( + ret_code) if ret_code in ModbusError.__members__.values() else f"未知错误码{ret_code}" + err_msg = f"速度设置失败,{err_desc}" + print(f"失败:{err_msg}") + raise OSError(err_msg, ret_code) # 携带错误码,便于上层解析 + + except OSError: + raise + except Exception as e: + err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_REG_WRITE)} - {str(e)}" + print(f"异常:{err_msg}") + raise Exception(err_msg) from e + +def set_motor_acceleration(station_addr: int, value: int) -> bool: + """ + 设置伺服电机加速度 + :param station_addr: 从机地址 + :param value: 加速度值 + :return: True--设置成功,False--设置失败 + """ + _check_handle_valid() + + handle_obj = RTU_HANDLE_MAP.get(handle1) + + try: + conv_value = handle_obj.decimal_to_16bit(value) + print(f"加速度值转换:{value} → {conv_value}(0x{conv_value:04X})") + + except Exception as e: + err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_DATA_CONVERT)} - {str(e)}" + print(f"错误:{err_msg}") + raise ValueError(err_msg) from e + + try: + ret_code = write_single_register(handle1, station_addr, ACCELERATION_REG_ADDR, conv_value, 0, 1) + + if ret_code == ModbusError.MODBUS_SUCCESS: + print(f"成功:电机加速度设置完成(从站{station_addr},地址0x{ACCELERATION_REG_ADDR:04X},值{conv_value})") + return True + + else: + # 写寄存器返回错误码:抛OSError(携带错误码) + err_desc = ModbusError.get_error_desc( + ret_code) if ret_code in ModbusError.__members__.values() else f"未知错误码{ret_code}" + err_msg = f"加速度设置失败,{err_desc}" + print(f"失败:{err_msg}") + raise OSError(err_msg, ret_code) # 错误码存入args,上层可解析 + + except OSError: + # 主动抛出的OSError直接上抛,不做额外处理 + raise + + except Exception as e: + # 其他未知异常:抛通用Exception,保留原始栈 + err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_REG_WRITE)} - {str(e)}" + print(f"异常:{err_msg}") + raise Exception(err_msg) from e + +def set_motor_deceleration(station_addr: int, value: int) -> bool: + """ + 设置伺服电机减速度 + :param station_addr: 从机地址 + :param value: 减速度值 + :return: True--设置成功,False--设置失败 + """ + _check_handle_valid() + + handle_obj = RTU_HANDLE_MAP.get(handle1) + + try: + conv_value = handle_obj.decimal_to_16bit(value) + print(f"减速度值转换:{value} → {conv_value}(0x{conv_value:04X})") + + except Exception as e: + err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_DATA_CONVERT)} - {str(e)}" + print(f"错误:{err_msg}") + raise ValueError(err_msg) from e + + try: + ret_code = write_single_register(handle1, station_addr, DECELERATION_REG_ADDR, conv_value, 0, 1) + + if ret_code == ModbusError.MODBUS_SUCCESS: + print(f"成功:电机减速度设置完成(从站{station_addr},地址0x{DECELERATION_REG_ADDR:04X},值{conv_value})") + return True + + else: + err_desc = ModbusError.get_error_desc( + ret_code) if ret_code in ModbusError.__members__.values() else f"未知错误码{ret_code}" + err_msg = f"减速度设置失败,{err_desc}" + print(f"失败:{err_msg}") + raise OSError(err_msg, ret_code) + except OSError: + raise + except Exception as e: + err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_REG_WRITE)} - {str(e)}" + print(f"异常:{err_msg}") + raise Exception(err_msg) from e + +def set_motor_mode(station_addr: int, value: int) -> bool: + """ + 设置电机模式 + :param station_addr: 从机地址 + :param value: 0--速度模式 1--绝对位置模式 + :return: bool 设置是否成功 + """ + _check_handle_valid() + + try: + if value == 0: + write_cmd = SPEED_CMD + mode_desc = "速度模式" + elif value == 1: + write_cmd = ABSOLUTE_POSITION_CMD + mode_desc = "绝对位置模式" + + ret_code = write_single_register(handle1, station_addr, MODE_REG_ADDR, write_cmd, 0, 1) + + if ret_code == ModbusError.MODBUS_SUCCESS: + print(f"成功:电机模式设置完成(从站{station_addr},地址0x{MODE_REG_ADDR:04X},模式:{mode_desc})") + return True + else: + err_desc = ModbusError.get_error_desc( + ret_code) if ret_code in ModbusError.__members__.values() else f"未知错误码{ret_code}" + err_msg = f"电机{mode_desc}设置失败,{err_desc}" + print(f"失败:{err_msg}") + raise OSError(err_msg, ret_code) + + except OSError: + # 主动抛出的OSError直接上抛,不做额外处理 + raise + except Exception as e: + err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_MOTOR_MODE)} - {str(e)}" + print(f"异常:{err_msg}") + raise Exception(err_msg) from e + +def get_motor_speed(station_addr: int) -> int: + """ + 获取电机速度 + :param station_addr: 从站地址 + :return: 返回电机速度值 + """ + _check_handle_valid() + + handle_obj = RTU_HANDLE_MAP.get(handle1) + + try: + reg_values = read_holding_register(handle1, station_addr, SPEED_REG_ADDR, 1, 0, [], 1) + + if not reg_values or len(reg_values) != 1: + err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_DATA_FORMAT)},数据:{reg_values}" + print(f"错误:{err_msg}") + raise OSError(err_msg) + + unsigned_speed = reg_values[0] + signed_speed = handle_obj.uint16_to_int16(unsigned_speed) + + print(f"成功:获取电机速度(从站{station_addr})-->无符号值:{unsigned_speed}-->有符号值:{signed_speed}") + return signed_speed + + except OSError: + raise + + except Exception as e: + err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_REG_READ)} - {str(e)}" + print(f"错误:{err_msg}") + raise Exception(err_msg) from e + +def get_motor_acceleration(station_addr: int) -> int: + """ + 获取电机加速度 + :param station_addr: 从站地址 + :return: 电机加速度值 + """ + _check_handle_valid() + + handle_obj = RTU_HANDLE_MAP.get(handle1) + + try: + reg_values = read_holding_register(handle1, station_addr, ACCELERATION_REG_ADDR, 1, 0, [], 1) + + if not reg_values or len(reg_values) != 1: + err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_DATA_FORMAT)},数据:{reg_values}" + print(f"错误:{err_msg}") + raise OSError(err_msg) + + unsigned_speed = reg_values[0] + signed_speed = handle_obj.uint16_to_int16(unsigned_speed) + + print(f"成功:获取电机加速度(从站{station_addr})-->无符号值:{unsigned_speed}-->有符号值:{signed_speed}") + return signed_speed + + except OSError: + raise + + except Exception as e: + err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_REG_READ)} - {str(e)}" + print(f"错误:{err_msg}") + raise Exception(err_msg) from e + +def get_motor_deceleration(station_addr: int) -> int: + """ + 获取电机减速度 + :param station_addr: 从站地址 + :return: i电机减速度值 + """ + _check_handle_valid() + + handle_obj = RTU_HANDLE_MAP.get(handle1) + + try: + reg_values = read_holding_register(handle1, station_addr, DECELERATION_REG_ADDR, 1, 0, [], 1) + + if not reg_values or len(reg_values) != 1: + err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_DATA_FORMAT)},数据:{reg_values}" + print(f"错误:{err_msg}") + raise OSError(err_msg) + + unsigned_speed = reg_values[0] + signed_speed = handle_obj.uint16_to_int16(unsigned_speed) + + print(f"成功:获取电机减速度(从站{station_addr})-->无符号值:{unsigned_speed}-->有符号值:{signed_speed}") + return signed_speed + + except OSError: + raise + except Exception as e: + err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_REG_READ)} - {str(e)}" + print(f"错误:{err_msg}") + raise Exception(err_msg) from e + +def move_motor(station_addr: int, value: bool) -> bool: + """ + 启停电机 + :param station_addr: 从站地址 + :param value: True--启动电机,False--停止电机 + :return: 是否设置成功 + """ + _check_handle_valid() + + try: + if value: + write_cmd = START_CMD + op = "启动" + else: + write_cmd = STOP_CMD + op = "停止" + + ret_code = write_single_register(handle1, station_addr, START_OR_STOP_REG_ADDR, write_cmd, 0,1) + + if ret_code == ModbusError.MODBUS_SUCCESS: + print(f"成功:电机{op}指令已发送(从站{station_addr})") + return True + else: + err_desc = ModbusError.get_error_desc( + ret_code) if ret_code in ModbusError.__members__.values() else f"未知错误码{ret_code}" + err_msg = f"电机{op}指令发送失败,{err_desc}" + print(f"失败:{err_msg}") + raise OSError(err_msg, ret_code) + + except OSError: + raise + except Exception as e: + op = "启动" if value else "停止" + err_msg = f"异常:电机{op}指令执行失败 - {str(e)}" + print(f"异常:{err_msg}") + raise Exception(err_msg) + +def sync_motor_move(value: bool): + """ + 同步传送带电机 + :param value: 启停传送点电机 True--同步启动传送带电机 False--同步停止传送带电机 + """ + op_desc = "启动" if value else "停止" + + if not isinstance(value, bool): + err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_PARAM_TYPE)}(同步{op_desc}):参数必须是布尔值,当前值:{value}(类型:{type(value)})" + print(f"错误:{err_msg}") + raise TypeError(err_msg) + + try: + print(f"开始同步{op_desc}传送带电机(从站1、2)...") + move_motor(1, value) + move_motor(2, value) + print(f"成功:所有传送带电机已同步{op_desc}完成") + + except Exception as e: + err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_MOTOR_SYNC)}({op_desc}):{str(e)}" + print(f"错误:{err_msg}") + # 抛RuntimeError,保留原始异常链,便于上层定位具体失败电机 + raise RuntimeError(err_msg) from e + + +# ------------调试接口---------- +if __name__ == '__main__': + # 配置传送带电机参数 只需配置一次 + set_motor_mode(1, 0) # 配置成速度模式 + set_motor_speed(1, -30) + set_motor_acceleration(1, 50) + set_motor_deceleration(1, 50) + + set_motor_mode(2, 0) # 配置成速度模式 + set_motor_speed(2, -30) + set_motor_acceleration(2, 50) + set_motor_deceleration(2, 50) + + sync_motor_move(True) + + time.sleep(1) + + sync_motor_move(False) diff --git a/conveyor_controller/error_code.py b/conveyor_controller/error_code.py new file mode 100644 index 0000000..0e06cef --- /dev/null +++ b/conveyor_controller/error_code.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +# @Time : 2026/2/6 17:15 +# @Author : reenrr +# @File : error_code.py +# @Desc : Modbus RTU错误码定义 +""" +from enum import IntEnum + +class ModbusError(IntEnum): + """Modbus RTU操作错误码枚举(覆盖传送带电机所有场景)""" + # 基础成功/通用错误 + MODBUS_SUCCESS = 0 # 成功返回码(对齐C接口) + MODBUS_ERR_UNKNOWN = 99 # 未知错误 + + # 串口相关错误 + MODBUS_ERR_SERIAL = 1 # 串口错误(打开/关闭/配置失败) + MODBUS_ERR_SERIAL_HANDLE = 10 # 串口句柄无效/未初始化/未打开 + MODBUS_ERR_SERIAL_TIMEOUT = 5 # 串口通信超时 + + # 数据校验/转换错误 + MODBUS_ERR_CRC = 2 # CRC校验错误 + MODBUS_ERR_DATA_CONVERT = 11 # 十进制<->16位数据转换失败 + MODBUS_ERR_DATA_FORMAT = 12 # 寄存器返回数据格式异常(长度/值非法) + + # 参数错误 + MODBUS_ERR_PARAM = 4 # 参数错误(通用) + MODBUS_ERR_PARAM_STATION = 13 # 从站地址非法(非1-247) + MODBUS_ERR_PARAM_VALUE = 14 # 数值超出范围(如速度/加速度超出int16) + MODBUS_ERR_PARAM_TYPE = 15 # 参数类型错误(如启停指令非布尔值) + + # 响应/寄存器操作错误 + MODBUS_ERR_RESPONSE = 3 # 响应异常(从站无响应/响应非法) + MODBUS_ERR_REG_WRITE = 20 # 写寄存器失败(返回非0码) + MODBUS_ERR_REG_READ = 21 # 读寄存器失败(返回数据为空/长度不匹配) + MODBUS_ERR_REG_ADDR = 22 # 寄存器地址非法 + + # 电机模式/控制错误 + MODBUS_ERR_MOTOR_MODE = 30 # 电机模式设置失败 + MODBUS_ERR_MOTOR_START_STOP = 31 # 电机启停指令执行失败 + MODBUS_ERR_MOTOR_SYNC = 32 # 电机同步操作失败 + + @classmethod + def get_error_desc(cls, err_code: int) -> str: + """获取错误码对应的描述信息""" + desc_map = { + cls.MODBUS_SUCCESS: "操作成功", + cls.MODBUS_ERR_UNKNOWN: "未知错误", + cls.MODBUS_ERR_SERIAL: "串口错误(打开/关闭/配置失败)", + cls.MODBUS_ERR_SERIAL_HANDLE: "串口句柄无效/未初始化/未打开", + cls.MODBUS_ERR_SERIAL_TIMEOUT: "串口通信超时", + cls.MODBUS_ERR_CRC: "CRC校验错误", + cls.MODBUS_ERR_DATA_CONVERT: "十进制<->16位数据转换失败", + cls.MODBUS_ERR_DATA_FORMAT: "寄存器返回数据格式异常", + cls.MODBUS_ERR_PARAM: "参数错误(通用)", + cls.MODBUS_ERR_PARAM_STATION: "从站地址非法(必须是1-247的整数)", + cls.MODBUS_ERR_PARAM_VALUE: "数值超出范围(如int16:-32768~32767)", + cls.MODBUS_ERR_PARAM_TYPE: "参数类型错误", + cls.MODBUS_ERR_RESPONSE: "从站响应异常", + cls.MODBUS_ERR_REG_WRITE: "写寄存器失败", + cls.MODBUS_ERR_REG_READ: "读寄存器失败", + cls.MODBUS_ERR_REG_ADDR: "寄存器地址非法", + cls.MODBUS_ERR_MOTOR_MODE: "电机模式设置失败", + cls.MODBUS_ERR_MOTOR_START_STOP: "电机启停指令执行失败", + cls.MODBUS_ERR_MOTOR_SYNC: "电机同步操作失败" + } + return desc_map.get(err_code, f"未定义的错误码:{err_code}") \ No newline at end of file diff --git a/conveyor_controller/modbus.py b/conveyor_controller/modbus.py new file mode 100644 index 0000000..54d547a --- /dev/null +++ b/conveyor_controller/modbus.py @@ -0,0 +1,698 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +# @Time : 2026/2/6 16:45 +# @Author : reenrr +# @File : modbus.py +# @Desc : Modbus RTU通用接口 +""" +import time +import serial +import threading + +from typing import Optional + +from error_code import ModbusError + +# -------全局485半双工锁-------- +RTU_GLOBAL_LOCKS: dict[str, threading.Lock] = {} # key=串口号,value=独占锁 +RTU_LOCK_INIT_LOCK = threading.Lock() # 锁初始化的线程安全锁 + +# -------全局句柄管理------- +RTU_HANDLE_MAP: dict[int, "RTUSerialHandle"] = {} # 句柄ID +RTU_NEXT_HANDLE_ID = 1 +RTU_HANDLE_LOCK = threading.Lock() + +# -------Modbus RTU 核心常量----------- +MODBUS_FUNC_READ_HOLDING_REG = 0x03 # 读保持寄存器功能码 +MODBUS_FUNC_WRITE_SINGLE_REG = 0x06 # 写单个寄存器功能码 +MODBUS_FUNC_WRITE_MULTI_REG = 0x10 # 写多个寄存器功能码 +MODBUS_CRC_LENGTH = 2 # CRC校验码长度 +MODBUS_MAX_READ_REG = 125 # 读寄存器最大数量(协议规定) +MODBUS_MAX_WRITE_MULTI_REG = 123 # 写多寄存器最大数量(协议规定) +MODBUS_REG_VAL_MAX = 0xFFFF # 单寄存器最大取值(16位) +MODBUS_STATION_ADDR_MIN = 1 # 从站地址最小值 +MODBUS_STATION_ADDR_MAX = 247 # 从站地址最大值 + +class RTUSerialHandle: + """ + Modbus RTU串口句柄类,封装串口实例、配置、状态 + """ + def __init__(self, port: str, baudrate: int, + databits: int, stopbits: int, parity: str): + self.port = port + self.baudrate = baudrate + self.databits = databits + self.stopbits = stopbits + self.parity = parity # 校验位 + self.ser: Optional[serial.Serial] = None # 串口实例 + self.is_open = False + + def open(self) -> int: + """ + 打开串口,转换pyserial兼容参数,初始化485锁 + :return: ModbusError错误码 + """ + try: + # 串口参数转换:将类内存储的参数转为pyserial可识别的格式 + # 1. 停止位转换(int -> pyserial常量) + stopbits_map = { + 1: serial.STOPBITS_ONE, + 2: serial.STOPBITS_TWO + } + pyserial_stopbits = stopbits_map.get(self.stopbits, serial.STOPBITS_ONE) + + # 2. 数据位转换(int -> pyserial常量) + databits_map = { + 7: serial.SEVENBITS, + 8: serial.EIGHTBITS + } + pyserial_databits = databits_map.get(self.databits, serial.EIGHTBITS) + + self.ser = serial.Serial( + port=self.port, + baudrate=self.baudrate, + bytesize=pyserial_databits, + stopbits=pyserial_stopbits, + parity=self.parity, + timeout=0.1, + write_timeout=0.1, + xonxoff=False, + rtscts=False, + dsrdtr=False + ) + + if self.ser.is_open: + self.is_open = True + # 初始化该串口的485半双工锁 + with RTU_LOCK_INIT_LOCK: + if self.port not in RTU_GLOBAL_LOCKS: + RTU_GLOBAL_LOCKS[self.port] = threading.Lock() + return ModbusError.MODBUS_SUCCESS + return ModbusError.MODBUS_ERR_SERIAL + + except Exception as e: + print(f"串口打开失败{self.port}:{e}") + return ModbusError.MODBUS_ERR_SERIAL + + def close(self): + """关闭串口,清理资源,置空状态""" + if self.ser and self.is_open: + try: + self.ser.reset_input_buffer() + self.ser.reset_output_buffer() + self.ser.close() + except Exception as e: + print(f"【串口警告】关闭{self.port}时异常:{str(e)}") + finally: + self.is_open = False + self.ser = None + + def get_lock(self) ->threading.Lock: + """ + 获取当前串口的485半双工锁 + :return: 独占锁实例 + """ + with RTU_LOCK_INIT_LOCK: + return RTU_GLOBAL_LOCKS.get(self.port, threading.Lock()) + + def decimal_to_16bit(self, value: int) -> int: + """ + 将十进制有符号整数转换成十六进制字符串 + :param value: 十进制有符号整数 + :return: 十六进制字符串 + """ + # 1. 限制数值在16位有符号整数范围内(-32768 ~ 32767) + min_16bit = -32768 # 16位有符号整数最小值 + max_16bit = 32767 # 16位有符号整数最大值 + + # 超出范围自动截断并提示 + if value < min_16bit: + print(f"警告:数值{value}超出16位最小值{min_16bit},已截断为{min_16bit}") + value = min_16bit + elif value > max_16bit: + print(f"警告:数值{value}超出16位最大值{max_16bit},已截断为{max_16bit}") + value = max_16bit + + # 2. 转换为16位无符号补码(核心逻辑) + if value < 0: + unsigned_value = 65536 + value # 负数转补码(如-30 → 65506) + else: + unsigned_value = value # 正数直接保留(如30 → 30) + + # 3. 返回十六进制格式的整数(0x开头,Python中整数本质不变,仅显示格式) + return unsigned_value + + def uint16_to_int16(self, unsigned_value: int) -> int: + """ + 将16位无符号十进制数转换为16位有符号十进制数 + :param unsigned_value: 无符号十进制数 + :return: 有符号十进制数 + """ + # 先校验输入范围(必须是16位无符号数) + if not isinstance(unsigned_value, int): + raise ValueError(f"输入必须是整数,当前是{type(unsigned_value)}") + if unsigned_value < 0 or unsigned_value > 65535: + raise ValueError(f"输入必须是0~65535的整数,当前是{unsigned_value}") + + # 核心转换逻辑 + if unsigned_value > 32767: + return unsigned_value - 65536 + else: + return unsigned_value + + def __del__(self): + """析构函数,程序退出时自动关闭串口,防止资源泄露""" + self.close() + +# -------对外接口-------- +# public +def open_serial_port(port: str, baudrate: int, databits: int, stopbits: int, parity: int) -> Optional[int]: + """ + 打开并初始化Modbus串口 + :param port: 串口号 + :param baudrate: 波特率 + :param databits: 数据位 + :param stopbits: 停止位 + :param parity: 校验位 + :return: 串口句柄 + """ + # 1. 参数合法性校验 + if not port: + print("参数错误:串口号不能为空") + return None + if baudrate <= 0: + print(f"参数错误:波特率{baudrate}非法(必须>0)") + return None + if databits not in [7, 8]: + print(f"参数错误:数据位{databits}非法(仅支持7/8)") + return None + if stopbits not in [1, 2]: + print(f"参数错误:停止位{stopbits}非法(仅支持1/2)") + return None + if parity not in [0, 1, 2]: + print(f"参数错误:校验位{parity}非法(仅支持0=无/1=奇/2=偶)") + return None + + # 2. 校验位转换(int -> 字符串:0->N,1->O,2->E) + parity_map = {0: 'N', 1: 'O', 2: 'E'} + parity_str = parity_map[parity] # 已校验parity在0-2,无需默认值 + + # 3. 创建串口句柄实例 + handle_obj = RTUSerialHandle( + port=port, + baudrate=baudrate, + databits=databits, + stopbits=stopbits, + parity=parity_str + ) + + # 4. 打开串口 + ret = handle_obj.open() + if ret != ModbusError.MODBUS_SUCCESS: + print(f"串口{port}打开失败,错误码:{ret}") + return None + + # 5. 分配全局句柄ID并存储 + global RTU_NEXT_HANDLE_ID + with RTU_HANDLE_LOCK: + handle_id = RTU_NEXT_HANDLE_ID + RTU_HANDLE_MAP[handle_id] = handle_obj + RTU_NEXT_HANDLE_ID += 1 + + print(f"串口[{port}]打开成功,句柄ID:{handle_id}") + return handle_id + +def close_serial_port(handle: int): + """ + 关闭Modbus串口 + :param handle:串口句柄ID + """ + with RTU_HANDLE_LOCK: + handle_obj = RTU_HANDLE_MAP.get(handle) + if not handle_obj: + print(f"句柄{handle}不存在,关闭失败") + return + + # 关闭串口并清理资源 + handle_obj.close() + del RTU_HANDLE_MAP[handle] + + # 清理485半双工锁(可选) + with RTU_LOCK_INIT_LOCK: + if handle_obj.port in RTU_GLOBAL_LOCKS: + del RTU_GLOBAL_LOCKS[handle_obj.port] + + print(f"句柄{handle}(串口[{handle_obj.port}])已关闭") + +# --------Modbus RTU CRC16校验函数----------- +def modbus_crc16(data: bytes) -> bytes: + """ + 计算Modbus RTU CRC16校验码(小端序) + :param data: 待校验的字节流(不含CRC) + :return: 2字节校验码 + """ + crc = 0xFFFF + for byte in data: + crc ^= byte + + # 对当前字节的每1位(共8位)做移位+异或操作 + for _ in range(8): + # 判断CRC寄存器的最低位是否为1 + if crc & 0x0001: + # 如果最低位是1:先右移1位,再和多项式0xA001异或 + crc = (crc >> 1) ^ 0xA001 + else: + # 如果最低位是0:仅右移1位 + crc >>= 1 + + # 把16位CRC值拆成2个字节(小端序:低字节在前,高字节在后) + return bytes([crc & 0xFF, (crc >> 8) & 0xFF]) + +def verify_modbus_crc(data: bytes) -> bool: + """ + 验证Modbus RTU数据的CRC16校验码 + :param data: 包含CRC的完整字节流 + :return: True=校验通过,False=校验失败 + """ + if len(data) < MODBUS_CRC_LENGTH: + return False + + # 拆分出“原始数据部分”(去掉最后2字节的CRC) + data_part = data[:-MODBUS_CRC_LENGTH] + + # 拆分出“附带的CRC校验码部分”(取最后2字节) + crc_part = data[-MODBUS_CRC_LENGTH:] + + # 核心验证:重新计算原始数据的CRC,和附带的CRC对比 + return modbus_crc16(data_part) == crc_part + + +# ------Modbus RTU核心接口------- +def read_holding_register(handle: int, station_addr: int, start_reg_addr: int, + reg_count: int, resp_offset: int, out_buffer: list[int], use_crc: int) -> list[int]: + """ + 读保持寄存器 + :param handle: 串口句柄ID + :param station_addr: 从机地址 + :param start_reg_addr: 起始寄存器地址 + :param reg_count: 读取寄存器数量 + :param resp_offset: 响应数据偏移量 + :param out_buffer: 输出缓冲区 + :param use_crc: 是否启用crc校验 + :return: 读取到的寄存器值列表 + """ + # 1. 句柄校验 + with RTU_HANDLE_LOCK: + handle_obj = RTU_HANDLE_MAP.get(handle) + if not handle_obj or not handle_obj.is_open: + print(f"句柄{handle}无效或串口未打开") + raise ModbusError.MODBUS_ERR_SERIAL + + # 2. 参数合法性校验 + if station_addr < 1 or station_addr > 247: + print(f"从站地址错误:{station_addr}(必须1-247)") + raise ModbusError.MODBUS_ERR_PARAM + if reg_count < 1 or reg_count > 125: # Modbus RTU最大读取125个寄存器 + print(f"寄存器数量错误:{reg_count}(必须1-125)") + raise ModbusError.MODBUS_ERR_PARAM + if resp_offset < 0: + print(f"响应偏移量错误:{resp_offset}(必须≥0)") + raise ModbusError.MODBUS_ERR_PARAM + if use_crc not in [0, 1]: + print(f"CRC参数错误:{use_crc}(必须0/1)") + raise ModbusError.MODBUS_ERR_PARAM + + # 3. 构建Modbus RTU读指令帧 + cmd = bytearray() + cmd.append(station_addr) + cmd.append(MODBUS_FUNC_READ_HOLDING_REG) + cmd.extend(start_reg_addr.to_bytes(2, byteorder='big')) # 起始寄存器地址(大端) + cmd.extend(reg_count.to_bytes(2, byteorder='big')) # 寄存器数量(大端) + + # 添加CRC校验(如果启用) + if use_crc == 1: + crc = modbus_crc16(bytes(cmd)) + cmd.extend(crc) + cmd_bytes = bytes(cmd) + + # 4. 485半双工发送+接收(加全局锁) + lock = handle_obj.get_lock() + with lock: + # 清空缓冲区 + handle_obj.ser.reset_input_buffer() + handle_obj.ser.reset_output_buffer() + + # 发送指令 + try: + handle_obj.ser.write(cmd_bytes) + handle_obj.ser.flush() + except Exception as e: + print(f"发送读指令失败:{e}") + raise ModbusError.MODBUS_ERR_SERIAL + + # 接收响应 + start_time = time.time() + response = b"" + # 最小响应长度:地址(1)+功能码(1)+字节数(1)+数据(2*N)+CRC(2) + min_resp_len = 3 + 2 * reg_count +(MODBUS_CRC_LENGTH if use_crc == 1 else 0) + while (time.time() - start_time) < handle_obj.ser.timeout * 3: + if handle_obj.ser.in_waiting > 0: + response += handle_obj.ser.read(handle_obj.ser.in_waiting) + if len(response) >= min_resp_len: + break + time.sleep(0.001) + if len(response) == 0: + print(f"读寄存器超时(从站{station_addr},起始地址{start_reg_addr})") + raise ModbusError.MODBUS_ERR_TIMEOUT + + # 5. CRC校验(如果启用) + if use_crc == 1: + if not verify_modbus_crc(response): + print(f"CRC校验失败 | 响应数据:{response.hex(' ')}") + raise ModbusError.MODBUS_ERR_CRC + response = response[:-MODBUS_CRC_LENGTH] # 去掉CRC + + # 6. 响应格式校验 + if len(response) < 3: + print(f"响应数据过短:{response.hex(' ')}") + raise ModbusError.MODBUS_ERR_RESPONSE + # 校验从站地址和功能码 + if response[0] != station_addr: + print(f"从站地址不匹配:请求{station_addr},响应{response[0]}") + raise ModbusError.MODBUS_ERR_RESPONSE + if response[1] != MODBUS_FUNC_READ_HOLDING_REG: + # 功能码最高位为1表示错误响应 + if response[1] == MODBUS_FUNC_READ_HOLDING_REG | 0x80: + err_code = response[2] # 提取错误差 + print(f"Modbus错误响应:从站{station_addr},错误码{err_code}") + else: + print(f"功能码不匹配:请求{MODBUS_FUNC_READ_HOLDING_REG},响应{response[1]}") + raise ModbusError.MODBUS_ERR_RESPONSE + # 校验数据长度 + resp_byte_count = response[2] + expected_byte_count = 2 * reg_count + if resp_byte_count != expected_byte_count: + print(f"数据长度不匹配:预期{expected_byte_count}字节,实际{resp_byte_count}字节") + raise ModbusError.MODBUS_ERR_RESPONSE + + # 7. 解析数据并填充输出缓冲区 + data_part = response[3 + resp_offset:] # 跳过偏移量 + if len(data_part) < 2 * reg_count: + print(f"偏移后数据不足:需要{2 * reg_count}字节,实际{len(data_part)}字节") + raise ModbusError.MODBUS_ERR_RESPONSE + + out_buffer.clear() + for i in range(reg_count): + reg_data = data_part[2 * i: 2 * (i + 1)] + reg_value = int.from_bytes(reg_data, byteorder='big') + out_buffer.append(reg_value) + + print(f"读寄存器成功 | 从站{station_addr} | 起始地址{start_reg_addr} | 数量{reg_count} | 数据:{out_buffer}") + return out_buffer + +def write_single_register(handle: int, station_addr: int, reg_addr: int, write_value: int, + resp_offset: int, use_crc: int) -> int: + """ + 写单个寄存器 + :param handle: 串口句柄ID + :param station_addr: 从站地址 + :param reg_addr: 寄存器地址 + :param write_value: 写入值 + :param resp_offset: 响应数据偏移量 + :param use_crc: 是否启用CRC校验 + :return: 状态码 + """ + # 1. 句柄校验 + with RTU_HANDLE_LOCK: + handle_obj = RTU_HANDLE_MAP.get(handle) + if not handle_obj or not handle_obj.is_open: + print(f"句柄{handle}无效或串口未打开") + return ModbusError.MODBUS_ERR_SERIAL + + # 2. 参数合法性校验 + if station_addr < 1 or station_addr > 247: + print(f"从站地址错误:{station_addr}(必须1-247)") + return ModbusError.MODBUS_ERR_PARAM + if write_value < 0 or write_value > 0xFFFF: + print(f"写入值错误:{write_value}(必须0-65535)") + return ModbusError.MODBUS_ERR_PARAM + if use_crc not in [0, 1]: + print(f"CRC参数错误:{use_crc}(必须0/1)") + return ModbusError.MODBUS_ERR_PARAM + + # 3. 构建Modbus RTU写指令帧 + cmd = bytearray() + cmd.append(station_addr) # 从站地址 + cmd.append(MODBUS_FUNC_WRITE_SINGLE_REG) # 功能码 + cmd.extend(reg_addr.to_bytes(2, byteorder='big')) # 寄存器地址(大端) + cmd.extend(write_value.to_bytes(2, byteorder='big')) # 写入值(大端) + + # 添加CRC校验(如果启用) + if use_crc == 1: + crc = modbus_crc16(bytes(cmd)) + cmd.extend(crc) + cmd_bytes = bytes(cmd) + + print(f"命令:{cmd_bytes.hex(' ')}") + + # 4. 485半双工发送+接收 + lock = handle_obj.get_lock() + with lock: + handle_obj.ser.reset_input_buffer() + handle_obj.ser.reset_output_buffer() + + # 发送指令 + try: + handle_obj.ser.write(cmd_bytes) + handle_obj.ser.flush() + except Exception as e: + print(f"发送写指令失败:{e}") + return ModbusError.MODBUS_ERR_SERIAL + + # 接收响应(写单个寄存器的响应和请求帧一致) + start_time = time.time() + response = b"" + expected_resp_len = len(cmd_bytes) - (MODBUS_CRC_LENGTH if use_crc == 1 else 0) + while (time.time() - start_time) < handle_obj.ser.timeout * 3: + if handle_obj.ser.in_waiting > 0: + response += handle_obj.ser.read(handle_obj.ser.in_waiting) + if len(response) >= expected_resp_len: + break + time.sleep(0.001) + + if len(response) == 0: + print(f"写寄存器超时(从站{station_addr},地址{reg_addr})") + return ModbusError.MODBUS_ERR_TIMEOUT + + # 5. CRC校验(如果启用) + if use_crc == 1: + if not verify_modbus_crc(response): + print(f"CRC校验失败 | 响应数据:{response.hex(' ')}") + return ModbusError.MODBUS_ERR_CRC + response = response[:-MODBUS_CRC_LENGTH] + + # 6. 响应校验(跳过偏移量后匹配) + expected_resp = cmd_bytes[:-MODBUS_CRC_LENGTH] if use_crc == 1 else cmd_bytes + if response[resp_offset:] != expected_resp[resp_offset:]: + print(f"写响应不匹配 | 请求:{expected_resp.hex(' ')} | 响应:{response.hex(' ')}") + return ModbusError.MODBUS_ERR_RESPONSE + + print(f"写单个寄存器成功 | 从站{station_addr} | 地址{reg_addr} | 值{write_value}") + return ModbusError.MODBUS_SUCCESS + +def write_multi_register(handle: int, station_addr: int, start_reg_addr: int, reg_count: int, + write_values: list[int], resp_offset: int, use_crc: int) -> int: + """ + 写多个寄存器 + :param handle: 串口句柄ID + :param station_addr: 从站地址 + :param start_reg_addr: 起始寄存器地址 + :param reg_count: 写入寄存器数量 + :param write_values: 写入值列表 + :param resp_offset: 响应数据偏移量 + :param use_crc: 是否启用CRC校验 + :return: 状态码 + """ + # 1. 句柄校验 + with RTU_HANDLE_LOCK: + handle_obj = RTU_HANDLE_MAP.get(handle) + if not handle_obj or not handle_obj.is_open: + print(f"句柄{handle}无效或串口未打开") + return ModbusError.MODBUS_ERR_SERIAL + + # 2. 参数合法性校验 + if station_addr < 1 or station_addr > 247: + print(f"从站地址错误:{station_addr}(必须1-247)") + return ModbusError.MODBUS_ERR_PARAM + if reg_count < 1 or reg_count > 123: # Modbus RTU最大写入123个寄存器 + print(f"寄存器数量错误:{reg_count}(必须1-123)") + return ModbusError.MODBUS_ERR_PARAM + if len(write_values) != reg_count: + print(f"写入值数量不匹配:预期{reg_count}个,实际{len(write_values)}个") + return ModbusError.MODBUS_ERR_PARAM + for val in write_values: + if val < 0 or val > 0xFFFF: + print(f"写入值错误:{val}(必须0-65535)") + return ModbusError.MODBUS_ERR_PARAM + if use_crc not in [0, 1]: + print(f"CRC参数错误:{use_crc}(必须0/1)") + return ModbusError.MODBUS_ERR_PARAM + + # 3. 构建Modbus RTU批量写指令帧 + cmd = bytearray() + cmd.append(station_addr) # 从站地址 + cmd.append(MODBUS_FUNC_WRITE_MULTI_REG) # 功能码 + cmd.extend(start_reg_addr.to_bytes(2, byteorder='big')) # 起始地址(大端) + cmd.extend(reg_count.to_bytes(2, byteorder='big')) # 寄存器数量(大端) + cmd.append(2 * reg_count) # 字节数(每个寄存器2字节) + # 追加写入值 + for val in write_values: + cmd.extend(val.to_bytes(2, byteorder='big')) + # 添加CRC校验(如果启用) + if use_crc == 1: + crc = modbus_crc16(bytes(cmd)) + cmd.extend(crc) + cmd_bytes = bytes(cmd) + + print(f"命令:{cmd_bytes.hex(' ')}") + + # 4. 485半双工发送+接收 + lock = handle_obj.get_lock() + with lock: + handle_obj.ser.reset_input_buffer() + handle_obj.ser.reset_output_buffer() + + # 发送指令 + try: + handle_obj.ser.write(cmd_bytes) + handle_obj.ser.flush() + except Exception as e: + print(f"发送批量写指令失败:{e}") + return ModbusError.MODBUS_ERR_SERIAL + + # 接收响应(批量写响应帧:地址+功能码+起始地址+数量+CRC) + start_time = time.time() + response = b"" + expected_resp_len = 7 + (MODBUS_CRC_LENGTH if use_crc == 1 else 0) # 基础响应长度 + while (time.time() - start_time) < handle_obj.ser.timeout * 3: + if handle_obj.ser.in_waiting > 0: + response += handle_obj.ser.read(handle_obj.ser.in_waiting) + if len(response) >= expected_resp_len: + break + time.sleep(0.001) + + if len(response) == 0: + print(f"批量写寄存器超时(从站{station_addr},起始地址{start_reg_addr})") + return ModbusError.MODBUS_ERR_TIMEOUT + + # 5. CRC校验(如果启用) + if use_crc == 1: + if not verify_modbus_crc(response): + print(f"CRC校验失败 | 响应数据:{response.hex(' ')}") + return ModbusError.MODBUS_ERR_CRC + response = response[:-MODBUS_CRC_LENGTH] + + # 6. 响应格式校验 + if len(response) < 7: + print(f"批量写响应过短:{response.hex(' ')}") + return ModbusError.MODBUS_ERR_RESPONSE + + resp_data = response[resp_offset:] + if len(resp_data) < 6: # 偏移后至少保留:地址(1)+功能码(1)+起始地址(2)+数量(2) + print(f"【响应错误】偏移后数据不足 | 偏移量{resp_offset} | 剩余长度{len(resp_data)}字节(需≥6)") + return ModbusError.MODBUS_ERR_RESPONSE + + # 校验核心字段 + if response[0] != station_addr: + print(f"从站地址不匹配:请求{station_addr},响应{response[0]}") + return ModbusError.MODBUS_ERR_RESPONSE + if response[1] != MODBUS_FUNC_WRITE_MULTI_REG: + print(f"功能码不匹配:请求{MODBUS_FUNC_WRITE_MULTI_REG},响应{response[1]}") + return ModbusError.MODBUS_ERR_RESPONSE + resp_start_addr = int.from_bytes(response[2:4], byteorder='big') + resp_reg_count = int.from_bytes(response[4:6], byteorder='big') + if resp_start_addr != start_reg_addr or resp_reg_count != reg_count: + print( + f"批量写响应参数不匹配 | 预期(start={start_reg_addr}, count={reg_count}) | 实际(start={resp_start_addr}, count={resp_reg_count})") + return ModbusError.MODBUS_ERR_RESPONSE + + print(f"批量写寄存器成功 | 从站{station_addr} | 起始地址{start_reg_addr} | 数量{reg_count} | 值:{write_values}") + return ModbusError.MODBUS_SUCCESS + +# ---------测试接口-------- +if __name__ == '__main__': + # handle1 = open_serial_port( + # port='COM4', + # baudrate=115200, + # databits=8, + # stopbits=1, + # parity=0 + # ) + # + # # + # # if handle1: + # # close_serial_port(handle1) + # + # # 测试校验码是否正确 + # # data1 = b'\x01\x06\x62\x00\x00\x02' + # # crc1 = modbus_crc16(data1) + # # print(f"计算出的CRC16(小端序,十六进制):{crc1.hex(' ')}") # 预期结果:84 0A + # # print(f"完整Modbus指令帧:{(data1 + crc1).hex(' ')}\n") + # 手动构造句柄(无硬件时) + handle_id = None + mock_handle = None + + if not handle_id: + + mock_handle = RTUSerialHandle( + port="COM99", baudrate=9600, databits=8, stopbits=1, parity="N" + ) + mock_handle.is_open = True + mock_handle.ser = None + + with RTU_HANDLE_LOCK: + handle_id = RTU_NEXT_HANDLE_ID + RTU_HANDLE_MAP[handle_id] = mock_handle + RTU_NEXT_HANDLE_ID += 1 + print(f"已创建模拟句柄,ID:{handle_id}") + + result = mock_handle.decimal_to_16bit(-30) + print("result:{result}") + + write_single_register( + handle_id, + 1, + 0x6203, + result, + 0, + 1 + ) + + + + + # ret = write_multi_register( + # handle=handle_id, + # station_addr=1, + # start_reg_addr=0x6000, + # reg_count=3, + # write_values=[1, 2, 3], + # resp_offset=0, + # use_crc=1 + # ) + # + # # 步骤4:验证结果 + # print("\n===== 测试结果 =====") + # print(f"函数返回码:{ret} (0=成功,5=超时,2=CRC错误,3=响应错误)") + # print(f"测试是否成功:{ret == ModbusError.MODBUS_SUCCESS}") + + # 步骤5:清理资源 + close_serial_port(handle_id) + + + + + + + diff --git a/main_control.py b/main_control.py index 0416ce1..885d554 100644 --- a/main_control.py +++ b/main_control.py @@ -13,7 +13,7 @@ import logging import os from conveyor_controller.conveyor_master_controller2_test import conveyor_control from client import IndustrialPCClient -from DM.DM_Motor_test import DMMotorController, Control_Type, dm_motor_control +from DM.DM_Motor_controller import DMMotorController, Control_Type, dm_motor_control # ------------ 日志+参数配置 ------------ script_dir = os.path.dirname(os.path.abspath(__file__)) diff --git a/servo/servo_control.py b/servo/servo_control.py new file mode 100644 index 0000000..daa77e7 --- /dev/null +++ b/servo/servo_control.py @@ -0,0 +1,145 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +# @Time : 2026/2/26 14:07 +# @Author : reenrr +# @File : servo_control.py +# @Desc : 舵机对外控制接口 +""" +import time + +from servo_test import ServoController + +# -------------参数配置-------------- +BAUDRATE = 115200 # 舵机的波特率 +PORT = 'COM4' +SERVO_IDS = [1, 2, 3, 4, 5] # 舵机们的 ID 号 +POS_START = 2047 +POS_END = 0 +SPEED = 1500 +ACC = 0 + +# 舵机参数换算常量(核心:0度=0,360度=4095) +DEGREE_TO_RAW_RATIO = 4095 / 360 # 1度对应的原始值 +RAW_TO_DEGREE_RATIO = 360 / 4095 # 1个原始值对应的角度 + +custom_config = { + 'port': PORT, + 'baudrate': BAUDRATE, + 'servo_ids': SERVO_IDS, + 'pos_start': POS_START, + 'pos_end': POS_END, + 'speed': SPEED, + 'acc': ACC, +} + +class ServoInterface: + def __init__(self, config=None): + self.servo_controller = ServoController(config) + + try: + # 初始化串口 + self.servo_controller.setup_port() + self.servo_controller.enable_all_servos() + print("舵机控制器初始化成功(串口/舵机启用完成)") + + except Exception as e: + raise RuntimeError(f"舵机初始化失败:{str(e)}") from e + +# 全局变量 +_Servo = ServoInterface(custom_config) + +def _degree_to_raw(degree: int) -> int: + """ + 角度转原始值 + :param degree: 角度值(0-360) + :return: 原始值(0-4095) + """ + if not (0 <= degree <= 360): + raise ValueError(f"角度值必须在0-360度之间,当前输入:{degree}") + raw_value = int(degree * DEGREE_TO_RAW_RATIO) + + return max(0, min(raw_value, 4095)) + +def _raw_to_degree(raw: int) -> float: + """ + 原始值转角度 + :param raw: 原始值(0-4095) + :return: 角度值(0-360) + """ + return round(raw * RAW_TO_DEGREE_RATIO, 2) + +def set_servo_speed(value: int): + """ + 设置舵机速度 + :param value: 速度值 + """ + global SPEED + + if 0 <= value <= 3400: + SPEED = value + print(f"舵机速度已设置为:{SPEED}") + else: + raise ValueError("速度值超出限定范围") + +def set_servo_acceleration(value: int): + """ + 设置舵机加速度 + :param value: 加速度值 + """ + global ACC + if 0 <= value <= 254: + ACC = value + print(f"舵机加速度已设置为:{ACC}") + else: + raise ValueError("加速度值超出限定范围") + +def set_servo_ori_position(ori_position: int): + """ + 设置舵机原点位 + :param ori_position: 舵机原点位 + """ + global POS_START + POS_START = _degree_to_raw(ori_position) + print(f"舵机原点位置已设置为:{ori_position}度(对应原始值:{POS_START})") + +def set_servo_rot_position(rot_position: int): + """ + 设置舵机翻转位置 + :param rot_position: 舵机翻转位 + """ + global POS_END + POS_END = _degree_to_raw(rot_position) + print(f"舵机翻转位置已设置为:{rot_position}度(对应原始值:{POS_END})") + +def move_to_rot_position(): + """舵机旋转到翻转位置""" + try: + _Servo.servo_controller.write_position(POS_END, SPEED, ACC) + time.sleep(2) + target_degree = _raw_to_degree(POS_END) + print(f"舵机已移动到翻转位置:{target_degree}度(原始值:{POS_END})") + + except Exception as e: + raise RuntimeError(f"舵机移动到翻转位置失败:{str(e)}") from e + +def move_to_ori_position(): + """舵机旋转到原点""" + try: + _Servo.servo_controller.write_position(POS_START, SPEED, ACC) + time.sleep(2) + target_degree = _raw_to_degree(POS_START) + print(f"舵机已移动到原点位置:{target_degree}度(原始值:{POS_START})") + + except Exception as e: + raise RuntimeError(f"舵机移动到原点位置失败:{str(e)}") from e + +# ----------调试接口---------- +if __name__ == '__main__': + set_servo_speed(1500) + set_servo_acceleration(0) + set_servo_ori_position(0) + set_servo_rot_position(180) + + move_to_rot_position() + time.sleep(1) diff --git a/servo/servo_test.py b/servo/servo_test.py index 6d14203..2386b9e 100644 --- a/servo/servo_test.py +++ b/servo/servo_test.py @@ -1,20 +1,11 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -''' +""" # @Time : 2026/1/5 17:01 # @Author : reenrr # @File : servo_test.py # @Desc : 测试舵机 -''' -#!/usr/bin/env python -# -*- coding: utf-8 -*- -''' -# @Time : 2025/12/2 14:05 -# @Author : reenrr -# @File :servo.py -# @Description : 控制舵机正反转 -''' - +""" from servo_sdk import * import logging @@ -49,13 +40,13 @@ class ServoController: """配置串口参数""" # 打开串口 if self.port_handler.openPort(): - logging.info("成功打开串口") + print("成功打开串口") else: raise RuntimeError("打开串口失败") # 设置波特率 if self.port_handler.setBaudRate(self.config['baudrate']): - logging.info("设置波特率成功") + print("设置波特率成功") else: self.port_handler.closePort() raise RuntimeError("设置波特率失败") @@ -66,16 +57,16 @@ class ServoController: for servo_id in self.config['servo_ids']: enable_result = self.servo_handler.torqueEnable(servo_id) if enable_result[1] != 0: - logging.info(f"[ID:{servo_id:02d}] 舵机使能失败,错误信息:{enable_result[1]}") + print(f"[ID:{servo_id:02d}] 舵机使能失败,错误信息:{enable_result[1]}") enable_success = False else: - logging.info(f"[ID:{servo_id:02d}] 舵机使能成功") + print(f"[ID:{servo_id:02d}] 舵机使能成功") if not enable_success: self.cleanup() raise RuntimeError("使能舵机失败,程序退出") - logging.info("所有舵机使能成功") + print("所有舵机使能成功") def disable_all_servos(self): """失能所有舵机""" @@ -83,16 +74,16 @@ class ServoController: for servo_id in self.config['servo_ids']: disable_result = self.servo_handler.torqueDisable(servo_id) if disable_result[1] != 0: - logging.info(f"[ID:{servo_id:02d}] 舵机失能失败,错误信息:{disable_result[1]}") + print(f"[ID:{servo_id:02d}] 舵机失能失败,错误信息:{disable_result[1]}") disable_success = False else: - logging.info(f"[ID:{servo_id:02d}] 舵机失能成功") + print(f"[ID:{servo_id:02d}] 舵机失能成功") if not disable_success: self.cleanup() raise RuntimeError("失能舵机失败,程序退出") - logging.info("所有舵机失能成功") + print("所有舵机失能成功") def write_position(self, position, speed, acc): """ @@ -106,13 +97,13 @@ class ServoController: servo_id, position, speed, acc ) if not add_param_result: - logging.info(f"[ID:{servo_id:02d}] 添加参数失败") + print(f"[ID:{servo_id:02d}] 添加参数失败") continue result = self.servo_handler.GroupSyncWrite.txPacket() if result != COMM_SUCCESS: - logging.info(f"[ID:{servo_id:02d}] 发送指令失败:{result.getTxRxResult(result)}") - logging.info("复位成功") + print(f"[ID:{servo_id:02d}] 发送指令失败:{result.getTxRxResult(result)}") + print("复位成功") # 清空参数缓存 self.servo_handler.GroupSyncWrite.clearParam() @@ -126,26 +117,26 @@ class ServoController: self.disable_all_servos() if self.port_handler.is_open(): self.port_handler.closePort() - logging.info("串口已关闭") + print("串口已关闭") def run_cycle(self): """运行舵机循环运动""" self.is_running = True - logging.info("舵机开始循环运动(按Ctrl+C终止)...") + print("舵机开始循环运动(按Ctrl+C终止)...") while self.is_running: # 运动到起始位置(180度) self.write_position(self.config['pos_start'], self.config['speed'], self.config['acc']) - logging.info("运动到180度") + print("运动到180度") time.sleep(self.time_interval1) # 运动到结束位置(0度) self.write_position(self.config['pos_end'], self.config['speed'], self.config['acc']) - logging.info("运动到0度") + print("运动到0度") time.sleep(self.time_interval2) def run(self): @@ -162,12 +153,12 @@ class ServoController: # 运行循环 self.run_cycle() except KeyboardInterrupt: - logging.info("\n用户终止程序...") + print("\n用户终止程序...") self.stop() except RuntimeError as e: - logging.info(f"\n运行错误:{str(e)}") + print(f"\n运行错误:{str(e)}") except Exception as e: - logging.info(f"\n程序运行异常:{str(e)}") + print(f"\n程序运行异常:{str(e)}") finally: self.cleanup()