#!/usr/bin/env python # -*- coding: utf-8 -*- """ # @Time : 2026/1/5 17:01 # @Author : reenrr # @File : servo_test.py # @Desc : 测试舵机 """ from servo_sdk import * import logging # -------------参数配置-------------- BAUDRATE = 115200 # 舵机的波特率 PORT = '/dev/ttyACM0 ' SERVO_IDS = [1, 2, 3, 4, 5] # 舵机们的 ID 号 POS_START = 2047 POS_END = 0 SPEED = 1500 ACC = 0 TIME_INTERVAL1 = ((2047-0) / 1500) + 2 # 翻转回来的时间 TIME_INTERVAL2 = 10 # 不用翻转运行的时间 class ServoController: def __init__(self, config=None): """ 初始化舵机控制器 """ self.config = config # self.time_interval1 = self.config['time_interval1'] # self.time_interval2 = self.config['time_interval2'] # 初始化串口和舵机处理器 self.port_handler = PortHandler(self.config['port']) self.servo_handler = HxServoHandler(self.port_handler) # 初始化状态 self.is_running = False def setup_port(self): """配置串口参数""" # 打开串口 if self.port_handler.openPort(): print("成功打开串口") else: raise RuntimeError("打开串口失败") # 设置波特率 if self.port_handler.setBaudRate(self.config['baudrate']): print("设置波特率成功") else: self.port_handler.closePort() raise RuntimeError("设置波特率失败") def enable_all_servos(self): """使能所有舵机""" enable_success = True for servo_id in self.config['servo_ids']: enable_result = self.servo_handler.torqueEnable(servo_id) if enable_result[1] != 0: print(f"[ID:{servo_id:02d}] 舵机使能失败,错误信息:{enable_result[1]}") enable_success = False else: print(f"[ID:{servo_id:02d}] 舵机使能成功") if not enable_success: self.cleanup() raise RuntimeError("使能舵机失败,程序退出") print("所有舵机使能成功") def disable_all_servos(self): """失能所有舵机""" disable_success = True for servo_id in self.config['servo_ids']: disable_result = self.servo_handler.torqueDisable(servo_id) if disable_result[1] != 0: print(f"[ID:{servo_id:02d}] 舵机失能失败,错误信息:{disable_result[1]}") disable_success = False else: print(f"[ID:{servo_id:02d}] 舵机失能成功") if not disable_success: self.cleanup() raise RuntimeError("失能舵机失败,程序退出") print("所有舵机失能成功") def write_position(self, position, speed, acc): """ 写入目标位置到所有舵机 :param position: 目标位置 :param speed: 速度 :param acc: 加速度 """ for servo_id in self.config['servo_ids']: add_param_result = self.servo_handler.syncWritePosEx( servo_id, position, speed, acc ) if not add_param_result: print(f"[ID:{servo_id:02d}] 添加参数失败") continue result = self.servo_handler.GroupSyncWrite.txPacket() if result != COMM_SUCCESS: print(f"[ID:{servo_id:02d}] 发送指令失败:{result.getTxRxResult(result)}") print("复位成功") # 清空参数缓存 self.servo_handler.GroupSyncWrite.clearParam() def stop(self): """停止舵机运行""" self.is_running = False def cleanup(self): """清理资源""" self.disable_all_servos() if self.port_handler.is_open(): self.port_handler.closePort() print("串口已关闭") def run_cycle(self): """运行舵机循环运动""" self.is_running = True print("舵机开始循环运动(按Ctrl+C终止)...") while self.is_running: # 运动到起始位置(180度) self.write_position(self.config['pos_start'], self.config['speed'], self.config['acc']) print("运动到180度") time.sleep(2) # 运动到结束位置(0度) self.write_position(self.config['pos_end'], self.config['speed'], self.config['acc']) print("运动到0度") time.sleep(2) def run(self): """循环运行:复位位置之后,先转到180度,延时TIME_INTERVAL1一段时间,再转到0度""" try: # 初始化串口 self.setup_port() # 使能所有舵机 self.enable_all_servos() # 复位位置 self.write_position(self.config['pos_end'], self.config['speed'], self.config['acc']) # 运行循环 self.run_cycle() except KeyboardInterrupt: print("\n用户终止程序...") self.stop() except RuntimeError as e: print(f"\n运行错误:{str(e)}") except Exception as e: print(f"\n程序运行异常:{str(e)}") finally: self.cleanup() # ---------舵机对外接口---------- def servo_rotate(): custom_config = { 'port': PORT, 'baudrate': BAUDRATE, 'servo_ids': SERVO_IDS, 'pos_start': POS_START, 'pos_end': POS_END, 'speed': SPEED, 'acc': ACC, 'time_interval1': TIME_INTERVAL1, 'time_interval2': TIME_INTERVAL2 } controller = ServoController(custom_config) controller.run() if __name__ == '__main__': servo_rotate()