Files
wire_controlsystem/servo/servo.py

184 lines
5.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
# @Time : 2025/12/2 14:05
# @Author : reenrr
# @File :servo.py
# @Description : 控制舵机正反转
'''
from servo_sdk import *
import logging
# -------------参数配置--------------
BAUDRATE = 115200 # 舵机的波特率
PORT = 'COM4'
SERVO_IDS = [1, 2, 3, 4, 5] # 舵机们的 ID 号
POS_START = 2047
POS_END = 0
SPEED = 1500
ACC = 0
TIME_INTERVAL1 = ((2047-0) / 1500) + 2 # 翻转回来的时间
TIME_INTERVAL2 = 10 # 不用翻转运行的时间
class ServoController:
def __init__(self, config=None):
"""
初始化舵机控制器
"""
self.config = config
self.time_interval1 = self.config['time_interval1']
self.time_interval2 = self.config['time_interval2']
# 初始化串口和舵机处理器
self.port_handler = PortHandler(self.config['port'])
self.servo_handler = HxServoHandler(self.port_handler)
# 初始化状态
self.is_running = False
def setup_port(self):
"""配置串口参数"""
# 打开串口
if self.port_handler.openPort():
logging.info("成功打开串口")
else:
raise RuntimeError("打开串口失败")
# 设置波特率
if self.port_handler.setBaudRate(self.config['baudrate']):
logging.info("设置波特率成功")
else:
self.port_handler.closePort()
raise RuntimeError("设置波特率失败")
def enable_all_servos(self):
"""使能所有舵机"""
enable_success = True
for servo_id in self.config['servo_ids']:
enable_result = self.servo_handler.torqueEnable(servo_id)
if enable_result[1] != 0:
logging.info(f"[ID:{servo_id:02d}] 舵机使能失败,错误信息:{enable_result[1]}")
enable_success = False
else:
logging.info(f"[ID:{servo_id:02d}] 舵机使能成功")
if not enable_success:
self.cleanup()
raise RuntimeError("使能舵机失败,程序退出")
logging.info("所有舵机使能成功")
def disable_all_servos(self):
"""失能所有舵机"""
disable_success = True
for servo_id in self.config['servo_ids']:
disable_result = self.servo_handler.torqueDisable(servo_id)
if disable_result[1] != 0:
logging.info(f"[ID:{servo_id:02d}] 舵机失能失败,错误信息:{disable_result[1]}")
disable_success = False
else:
logging.info(f"[ID:{servo_id:02d}] 舵机失能成功")
if not disable_success:
self.cleanup()
raise RuntimeError("失能舵机失败,程序退出")
logging.info("所有舵机失能成功")
def write_position(self, position, speed, acc):
"""
写入目标位置到所有舵机
:param position: 目标位置
:param speed: 速度
:param acc: 加速度
"""
for servo_id in self.config['servo_ids']:
add_param_result = self.servo_handler.syncWritePosEx(
servo_id, position, speed, acc
)
if not add_param_result:
logging.info(f"[ID:{servo_id:02d}] 添加参数失败")
continue
result = self.servo_handler.GroupSyncWrite.txPacket()
if result != COMM_SUCCESS:
logging.info(f"[ID:{servo_id:02d}] 发送指令失败:{result.getTxRxResult(result)}")
logging.info("复位成功")
# 清空参数缓存
self.servo_handler.GroupSyncWrite.clearParam()
def stop(self):
"""停止舵机运行"""
self.is_running = False
def cleanup(self):
"""清理资源"""
self.disable_all_servos()
if self.port_handler.is_open:
self.port_handler.closePort()
logging.info("串口已关闭")
# ---------舵机对外接口----------
def servo_rotate():
"""复位位置之后先转到180度延时TIME_INTERVAL1一段时间再转到0度"""
custom_config = {
'port': PORT,
'baudrate': BAUDRATE,
'servo_ids': SERVO_IDS,
'pos_start': POS_START,
'pos_end': POS_END,
'speed': SPEED,
'acc': ACC,
'time_interval1': TIME_INTERVAL1,
'time_interval2': TIME_INTERVAL2
}
controller = ServoController(custom_config)
try:
# 初始化串口
controller.setup_port()
# 使能所有舵机
controller.enable_all_servos()
# 复位位置
controller.write_position(controller.config['pos_end'],
controller.config['speed'],
controller.config['acc'])
"""运行舵机运动"""
logging.info("舵机开始循环运动按Ctrl+C终止...")
# 运动到起始位置180度
controller.write_position(controller.config['pos_start'],
controller.config['speed'],
controller.config['acc'])
logging.info("运动到180度")
time.sleep(controller.time_interval1)
# 运动到结束位置0度
controller.write_position(controller.config['pos_end'],
controller.config['speed'],
controller.config['acc'])
logging.info("运动到0度")
time.sleep(controller.time_interval2)
except KeyboardInterrupt:
logging.info("\n用户终止程序...")
controller.stop()
except RuntimeError as e:
logging.info(f"\n运行错误:{str(e)}")
except Exception as e:
logging.info(f"\n程序运行异常:{str(e)}")
finally:
controller.cleanup()
# -----------测试接口------------
if __name__ == '__main__':
servo_rotate()