Files
wire_controlsystem/conveyor_controller/conveyor_motor.py

919 lines
33 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
# @Time : 2026/2/10 09:42
# @Author : reenrr
# @File : conveyor_motor.py
# @Desc : 传送带电机相关接口 需要测试
"""
import time
from modbus import (write_single_register, open_serial_port, RTU_HANDLE_MAP, RTU_HANDLE_LOCK, read_holding_register)
from error_code import ModbusError
PORT = '/dev/ttyUSB0'
BAUDRATE = 115200
DATABITS = 8
STOPBITS = 1
PARITY = 0
# --------寄存器地址---------
MODE_REG_ADDR = 0x6200
POSITION_H_REG_ADDR = 0x6201
POSITION_L_REG_ADDR = 0x6202
SPEED_REG_ADDR = 0x6203
ACCELERATION_REG_ADDR = 0x6204
DECELERATION_REG_ADDR = 0x6205
START_OR_STOP_REG_ADDR = 0x6002
ENCODER_H_REG_ADDR = 0x0B1C # 读取电机位置--编码器单位
COMMAND_H_REG_ADDR = 0x602C # 读电机位置--指令单位
POSITION_ERROR_H_REG_ADDR = 0x0B1E # 位置误差
# ---------相关写入值------------
START_CMD = 0x0010 # 启动指令
STOP_CMD = 0x0040 # 停止指令
SPEED_CMD = 0x0002 # 速度指令
ABSOLUTE_POSITION_CMD = 0x0001 # 绝对位置指令
RELATIVE_POSITION_CMD = 0x0041 # 相对位置指令
handle1 = open_serial_port(
port=PORT,
baudrate=BAUDRATE,
databits=DATABITS,
stopbits=STOPBITS,
parity=PARITY
)
def _check_handle_valid():
"""通用句柄校验函数"""
if handle1 is None:
err_msg = ModbusError.get_error_desc(ModbusError.MODBUS_ERR_SERIAL_HANDLE)
print(f"错误:{err_msg}")
raise RuntimeError(err_msg)
with RTU_HANDLE_LOCK:
handle_obj = RTU_HANDLE_MAP.get(handle1)
if not handle_obj or not handle_obj.is_open:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_SERIAL_HANDLE)}(句柄{handle1}"
print(f"错误:{err_msg}")
raise RuntimeError(err_msg)
def set_motor_position(station_addr: int, position_value: int) -> bool:
"""
设置伺服电机位置
:param station_addr: 从机地址
:param position_value: 位置值32位整数 正数:远离电机方向;负数:靠近电机方向
:return: True--设置成功False--设置失败
"""
_check_handle_valid()
try:
# 有符号32位位置值拆分为高16位和低16位
position_value = position_value & 0xFFFFFFFF # 确保为32位无符号整数
position_h = (position_value >> 16) & 0xFFFF # 高16位
position_l = position_value & 0xFFFF # 低16位
print(f"位置值拆分:{position_value} → 高16位:(0x{position_h:04x}) 低16位:(0x{position_l:04x})")
except Exception as e:
err_msg = f"位置值拆分失败 - {str(e)}"
print(f"错误:{err_msg}")
raise ValueError(err_msg) from e
try:
# 写入高16位到0x6201
ret_code_h = write_single_register(handle1, station_addr, POSITION_H_REG_ADDR,
position_h, resp_offset=0, use_crc=1)
if ret_code_h != ModbusError.MODBUS_SUCCESS:
err_desc = ModbusError.get_error_desc(ret_code_h)
raise OSError(f"位置高位写入失败:{err_desc}", ret_code_h)
# 写入低16位到 0x6202
ret_code_l = write_single_register(handle1, station_addr, POSITION_L_REG_ADDR,
position_l, resp_offset=0, use_crc=1)
if ret_code_l != ModbusError.MODBUS_SUCCESS:
err_desc = ModbusError.get_error_desc(ret_code_l)
raise OSError(f"位置低位写入失败:{err_desc}", ret_code_l)
print(f"成功:电机位置设置完成(从站{station_addr},位置值:{position_value}")
return True
except OSError:
raise
except Exception as e:
err_msg = f"位置设置异常 - {str(e)}"
print(f"异常:{err_msg}")
raise Exception(err_msg) from e
def set_motor_speed(station_addr: int, speed_value: int) -> bool:
"""
设置伺服电机速度
:param station_addr: 从机地址
:param speed_value: 速度值 速度模式时,正负值表示方向,正值--远离电机方向,负值--靠近电机方向
其他模式时,速度没有方向概念,只表示速度大小
:return: True--设置成功False--设置失败
"""
_check_handle_valid()
handle_obj = RTU_HANDLE_MAP.get(handle1)
try:
value = handle_obj.decimal_to_16bit(speed_value)
# print(f"速度值转换:{speed_value} → {value}0x{value:04X}")
except Exception as e:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_DATA_CONVERT)} - {str(e)}"
print(f"错误:{err_msg}")
raise ValueError(err_msg) from e # 转换失败抛ValueError保留原始异常栈
try:
ret_code = write_single_register(handle1, station_addr, SPEED_REG_ADDR, value, 0, 1)
if ret_code == ModbusError.MODBUS_SUCCESS:
print(f"成功:电机速度设置完成(从站{station_addr}地址0x{SPEED_REG_ADDR:04X},值{value}")
return True
else:
# 写寄存器返回错误码抛OSError
err_desc = ModbusError.get_error_desc(
ret_code) if ret_code in ModbusError.__members__.values() else f"未知错误码{ret_code}"
err_msg = f"速度设置失败,{err_desc}"
print(f"失败:{err_msg}")
raise OSError(err_msg, ret_code) # 携带错误码,便于上层解析
except OSError:
raise
except Exception as e:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_REG_WRITE)} - {str(e)}"
print(f"异常:{err_msg}")
raise Exception(err_msg) from e
def set_motor_acceleration(station_addr: int, value: int) -> bool:
"""
设置伺服电机加速度
:param station_addr: 从机地址
:param value: 加速度值
:return: True--设置成功False--设置失败
"""
_check_handle_valid()
handle_obj = RTU_HANDLE_MAP.get(handle1)
try:
conv_value = handle_obj.decimal_to_16bit(value)
# print(f"加速度值转换:{value} → {conv_value}0x{conv_value:04X}")
except Exception as e:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_DATA_CONVERT)} - {str(e)}"
print(f"错误:{err_msg}")
raise ValueError(err_msg) from e
try:
ret_code = write_single_register(handle1, station_addr, ACCELERATION_REG_ADDR, conv_value, 0, 1)
if ret_code == ModbusError.MODBUS_SUCCESS:
print(f"成功:电机加速度设置完成(从站{station_addr}地址0x{ACCELERATION_REG_ADDR:04X},值{conv_value}")
return True
else:
# 写寄存器返回错误码抛OSError携带错误码
err_desc = ModbusError.get_error_desc(
ret_code) if ret_code in ModbusError.__members__.values() else f"未知错误码{ret_code}"
err_msg = f"加速度设置失败,{err_desc}"
print(f"失败:{err_msg}")
raise OSError(err_msg, ret_code) # 错误码存入args上层可解析
except OSError:
# 主动抛出的OSError直接上抛不做额外处理
raise
except Exception as e:
# 其他未知异常抛通用Exception保留原始栈
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_REG_WRITE)} - {str(e)}"
print(f"异常:{err_msg}")
raise Exception(err_msg) from e
def set_motor_deceleration(station_addr: int, value: int) -> bool:
"""
设置伺服电机减速度
:param station_addr: 从机地址
:param value: 减速度值
:return: True--设置成功False--设置失败
"""
_check_handle_valid()
handle_obj = RTU_HANDLE_MAP.get(handle1)
try:
conv_value = handle_obj.decimal_to_16bit(value)
# print(f"减速度值转换:{value} → {conv_value}0x{conv_value:04X}")
except Exception as e:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_DATA_CONVERT)} - {str(e)}"
print(f"错误:{err_msg}")
raise ValueError(err_msg) from e
try:
ret_code = write_single_register(handle1, station_addr, DECELERATION_REG_ADDR, conv_value, 0, 1)
if ret_code == ModbusError.MODBUS_SUCCESS:
print(f"成功:电机减速度设置完成(从站{station_addr}地址0x{DECELERATION_REG_ADDR:04X},值{conv_value}")
return True
else:
err_desc = ModbusError.get_error_desc(
ret_code) if ret_code in ModbusError.__members__.values() else f"未知错误码{ret_code}"
err_msg = f"减速度设置失败,{err_desc}"
print(f"失败:{err_msg}")
raise OSError(err_msg, ret_code)
except OSError:
raise
except Exception as e:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_REG_WRITE)} - {str(e)}"
print(f"异常:{err_msg}")
raise Exception(err_msg) from e
def set_motor_mode(station_addr: int, mode: int) -> bool:
"""
设置电机模式
:param station_addr: 从机地址
:param mode: 0--速度模式 1--绝对位置模式 2--相对位置模式
:return: bool 设置是否成功
"""
_check_handle_valid()
try:
if mode == 0:
write_cmd = SPEED_CMD
mode_desc = "速度模式"
elif mode == 1:
write_cmd = ABSOLUTE_POSITION_CMD
mode_desc = "绝对位置模式"
elif mode == 2:
write_cmd = RELATIVE_POSITION_CMD
mode_desc = "相对位置模式"
ret_code = write_single_register(handle1, station_addr, MODE_REG_ADDR, write_cmd, resp_offset=0, use_crc=1)
if ret_code == ModbusError.MODBUS_SUCCESS:
print(f"成功:电机模式设置完成(从站{station_addr}地址0x{MODE_REG_ADDR:04X},模式:{mode_desc}")
return True
else:
err_desc = ModbusError.get_error_desc(
ret_code) if ret_code in ModbusError.__members__.values() else f"未知错误码{ret_code}"
err_msg = f"电机{mode_desc}设置失败,{err_desc}"
print(f"失败:{err_msg}")
raise OSError(err_msg, ret_code)
except OSError:
# 主动抛出的OSError直接上抛不做额外处理
raise
except Exception as e:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_MOTOR_MODE)} - {str(e)}"
print(f"异常:{err_msg}")
raise Exception(err_msg) from e
def get_motor_speed(station_addr: int) -> int:
"""
获取电机速度
:param station_addr: 从站地址
:return: 返回电机速度值
"""
_check_handle_valid()
handle_obj = RTU_HANDLE_MAP.get(handle1)
try:
reg_values = read_holding_register(handle1, station_addr, SPEED_REG_ADDR, 1, 0, [], 1)
if not reg_values or len(reg_values) != 1:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_DATA_FORMAT)},数据:{reg_values}"
print(f"错误:{err_msg}")
raise OSError(err_msg)
unsigned_speed = reg_values[0]
signed_speed = handle_obj.uint16_to_int16(unsigned_speed)
print(f"成功:获取电机速度(从站{station_addr})-->无符号值:{unsigned_speed}-->有符号值:{signed_speed}")
return signed_speed
except OSError:
raise
except Exception as e:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_REG_READ)} - {str(e)}"
print(f"错误:{err_msg}")
raise Exception(err_msg) from e
def get_motor_acceleration(station_addr: int) -> int:
"""
获取电机加速度
:param station_addr: 从站地址
:return: 电机加速度值
"""
_check_handle_valid()
handle_obj = RTU_HANDLE_MAP.get(handle1)
try:
reg_values = read_holding_register(handle1, station_addr, ACCELERATION_REG_ADDR, 1, 0, [], 1)
if not reg_values or len(reg_values) != 1:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_DATA_FORMAT)},数据:{reg_values}"
print(f"错误:{err_msg}")
raise OSError(err_msg)
unsigned_speed = reg_values[0]
signed_speed = handle_obj.uint16_to_int16(unsigned_speed)
print(f"成功:获取电机加速度(从站{station_addr})-->无符号值:{unsigned_speed}-->有符号值:{signed_speed}")
return signed_speed
except OSError:
raise
except Exception as e:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_REG_READ)} - {str(e)}"
print(f"错误:{err_msg}")
raise Exception(err_msg) from e
def get_motor_deceleration(station_addr: int) -> int:
"""
获取电机减速度
:param station_addr: 从站地址
:return: i电机减速度值
"""
_check_handle_valid()
handle_obj = RTU_HANDLE_MAP.get(handle1)
try:
reg_values = read_holding_register(handle1, station_addr, DECELERATION_REG_ADDR, 1, 0, [], 1)
if not reg_values or len(reg_values) != 1:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_DATA_FORMAT)},数据:{reg_values}"
print(f"错误:{err_msg}")
raise OSError(err_msg)
unsigned_speed = reg_values[0]
signed_speed = handle_obj.uint16_to_int16(unsigned_speed)
print(f"成功:获取电机减速度(从站{station_addr})-->无符号值:{unsigned_speed}-->有符号值:{signed_speed}")
return signed_speed
except OSError:
raise
except Exception as e:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_REG_READ)} - {str(e)}"
print(f"错误:{err_msg}")
raise Exception(err_msg) from e
def move_motor(station_addr: int, value: bool) -> bool:
"""
启停电机
:param station_addr: 从站地址
:param value: True--启动电机False--停止电机
:return: 是否设置成功
"""
_check_handle_valid()
try:
if value:
write_cmd = START_CMD
op = "启动"
else:
write_cmd = STOP_CMD
op = "停止"
ret_code = write_single_register(handle1, station_addr, START_OR_STOP_REG_ADDR, write_cmd, 0,1)
if ret_code == ModbusError.MODBUS_SUCCESS:
print(f"成功:电机{op}指令已发送(从站{station_addr}")
return True
else:
err_desc = ModbusError.get_error_desc(
ret_code) if ret_code in ModbusError.__members__.values() else f"未知错误码{ret_code}"
err_msg = f"电机{op}指令发送失败,{err_desc}"
print(f"失败:{err_msg}")
raise OSError(err_msg, ret_code)
except OSError:
raise
except Exception as e:
op = "启动" if value else "停止"
err_msg = f"异常:电机{op}指令执行失败 - {str(e)}"
print(f"异常:{err_msg}")
raise Exception(err_msg)
def sync_motor_move(value: bool):
"""
同步传送带电机
:param value: 启停传送点电机 True--同步启动传送带电机 False--同步停止传送带电机
"""
op_desc = "启动" if value else "停止"
if not isinstance(value, bool):
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_PARAM_TYPE)}(同步{op_desc}):参数必须是布尔值,当前值:{value}(类型:{type(value)}"
print(f"错误:{err_msg}")
raise TypeError(err_msg)
try:
print(f"开始同步{op_desc}传送带电机从站1、2...")
move_motor(1, value)
move_motor(2, value)
print(f"成功:所有传送带电机已同步{op_desc}完成")
except Exception as e:
err_msg = f"{ModbusError.get_error_desc(ModbusError.MODBUS_ERR_MOTOR_SYNC)}{op_desc}{str(e)}"
print(f"错误:{err_msg}")
# 抛RuntimeError保留原始异常链便于上层定位具体失败电机
raise RuntimeError(err_msg) from e
def read_motor_value(station_addr: int) -> int:
"""
读取电机位置值
:param station_addr: 从站地址
:return: 电机位置值
"""
_check_handle_valid()
handle_obj = RTU_HANDLE_MAP.get(handle1)
try:
reg_values = read_holding_register(handle1, station_addr, ENCODER_H_REG_ADDR,
reg_count=2, resp_offset=0, out_buffer=[], use_crc=1)
if not reg_values or len(reg_values) != 2:
err_msg = f"电机位置值读取失败,数据长度错误:{reg_values}"
print(f"错误:{err_msg}")
raise OSError(err_msg)
# 组合高16位和低16位
encoder_h = reg_values[0] # 高16位
encoder_l = reg_values[1] # 低16位
encoder_value = (encoder_h << 16) | encoder_l
# 转换为有符号数
signed_encoder_value = handle_obj.uint32_to_int32(encoder_value)
print(f"成功:读取电机位置值(从站{station_addr})-->高位:{encoder_h} 低位:{encoder_l} 总值:{signed_encoder_value}")
return signed_encoder_value
except OSError:
raise
except Exception as e:
err_msg = f"电机位置值读取异常 - {str(e)}"
print(f"异常:{err_msg}")
raise Exception(err_msg) from e
def read_motor_value_command(station_addr: int) -> int:
"""
读取电机位置值--指令单位
:param station_addr: 从站地址
:return: 电机位置值
"""
_check_handle_valid()
handle_obj = RTU_HANDLE_MAP.get(handle1)
try:
reg_values = read_holding_register(handle1, station_addr, COMMAND_H_REG_ADDR,
reg_count=2, resp_offset=0, out_buffer=[], use_crc=1)
if not reg_values or len(reg_values) != 2:
err_msg = f"电机位置值读取失败,数据长度错误:{reg_values}"
print(f"错误:{err_msg}")
raise OSError(err_msg)
# 组合高16位和低16位
command_value_h = reg_values[0] # 高16位
command_value_l = reg_values[1] # 低16位
command_value = (command_value_h << 16) | command_value_l
# 转换为有符号数
signed_command_value = handle_obj.uint32_to_int32(command_value)
print(f"成功:读取电机位置值(从站{station_addr})-->高位:{command_value_h} 低位:{command_value_l} 总值:{signed_command_value}")
return signed_command_value
except OSError:
raise
except Exception as e:
err_msg = f"电机位置值读取异常 - {str(e)}"
print(f"异常:{err_msg}")
raise Exception(err_msg) from e
def read_position_error(station_addr: int) -> int:
"""
读取电机位置误差--编码器单位
:param station_addr: 从站地址
:return: 电机位置误差
"""
_check_handle_valid()
handle_obj = RTU_HANDLE_MAP.get(handle1)
try:
reg_values = read_holding_register(handle1, station_addr, POSITION_ERROR_H_REG_ADDR,
reg_count=2, resp_offset=0, out_buffer=[], use_crc=1)
if not reg_values or len(reg_values) != 2:
err_msg = f"电机位置误差读取失败,数据长度错误:{reg_values}"
print(f"错误:{err_msg}")
raise OSError(err_msg)
# 组合高16位和低16位
position_error_h = reg_values[0] # 高16位
position_error_l = reg_values[1] # 低16位
position_error = (position_error_h << 16) | position_error_l
# 转换为有符号数
signed_position_error = handle_obj.uint32_to_int32(position_error)
print(f"成功:读取电机位置误差(从站{station_addr})-->高位:{position_error_h} 低位:{position_error_l} 总值:{signed_position_error}")
return signed_position_error
except OSError:
raise
except Exception as e:
err_msg = f"电机位置误差读取异常 - {str(e)}"
print(f"异常:{err_msg}")
raise Exception(err_msg) from e
# ------------对外测试接口------------
def test_speed_mode():
# 配置传送带电机参数 只需配置一次
# 速度模式
set_motor_mode(station_addr=1, mode=0) # 配置成速度模式
set_motor_speed(station_addr=1, speed_value=-30)
set_motor_acceleration(station_addr=1, acceleration=50)
set_motor_deceleration(station_addr=1, deceleration=50)
set_motor_mode(station_addr=2, mode=0) # 配置成速度模式
set_motor_speed(station_addr=2, speed_value=-30)
set_motor_acceleration(station_addr=2, acceleration=50)
set_motor_deceleration(station_addr=2, deceleration=50)
sync_motor_move(True)
time.sleep(2)
sync_motor_move(False)
time.sleep(0.5)
while True:
time.sleep(1)
def test_absolute_position_mode():
# 绝对位置模式
set_motor_mode(station_addr=1, mode=1) # 配置成绝对位置模式
set_motor_position(station_addr=1, position_value=0) # 每圈10000脉冲跑两圈
set_motor_speed(station_addr=1, speed_value=30)
set_motor_acceleration(station_addr=1, value=50)
set_motor_deceleration(station_addr=1, value=50)
set_motor_mode(station_addr=2, mode=1) # 配置成绝对位置模式
set_motor_position(station_addr=2, position_value=0)
set_motor_speed(station_addr=2, speed_value=30)
set_motor_acceleration(station_addr=2, value=50)
set_motor_deceleration(station_addr=2, value=50)
# 运行前查看电机位置初始值
print("\n=== 运行前电机位置初始值 ===")
motor1_initial = read_motor_value(station_addr=1)
motor2_initial = read_motor_value(station_addr=2)
print(f"电机1初始电机位置值: {motor1_initial}")
print(f"电机2初始电机位置值: {motor2_initial}")
sync_motor_move(True)
# 等待电机完成运动(根据实际情况调整等待时间)
time.sleep(5)
# 读取编码器值检查是否丢步
print("\n=== 检查电机是否丢步 ===")
target_position = 0 # 目标位置值
# 检查电机1
motor1 = read_motor_value(station_addr=1)
error1 = abs(motor1 - target_position)
print(f"电机1 - 目标位置: {target_position}, 实际位置: {motor1}, 误差: {error1}")
# 检查电机2
encoder2 = read_motor_value(station_addr=2)
error2 = abs(encoder2 - target_position)
print(f"电机2 - 目标位置: {target_position}, 实际位置: {encoder2}, 误差: {error2}")
encoder_unit3 = read_motor_value_command(station_addr=1)
encoder_unit4 = read_motor_value_command(station_addr=2)
print(f"电机1运动后电机的指令值: {encoder_unit3}")
print(f"电机2运动后电机的指令值: {encoder_unit4}")
# 读取电机位置误差
position_error1 = read_position_error(station_addr=1)
position_error2 = read_position_error(station_addr=2)
print(f"电机1位置误差: {position_error1}")
print(f"电机2位置误差: {position_error2}")
while True:
time.sleep(1)
def test_relative_position_mode():
"""
相对位置模式测试已修复CRC错误、数据符号、通信延时问题
功能每3秒相对移动6000脉冲实时监控编码器误差误差>100自动补偿
"""
try:
# ===================== 1. 基础配置 =====================
station_list = [1, 2]
base_pulse = 6000
interval = 3
error_threshold = 100
move_wait_time = 1.5
# 补偿值
compensate_map = {1: 0, 2: 0}
print("===== 相对位置模式 + 误差监控 + 自动补偿(稳定版)=====")
print(f"基础移动:{base_pulse} 脉冲 | 误差阈值:>{error_threshold}")
# 初始化电机
for station in station_list:
set_motor_mode(station, 2)
set_motor_speed(station, 30)
set_motor_acceleration(station, 50)
set_motor_deceleration(station, 50)
time.sleep(0.1)
print(f"✅ 电机{station} 配置完成")
print("\n===== 开始循环运动 =====")
count = 0
while True:
count += 1
print(f"\n==================== 第{count}次运动 ====================")
# 写入位置(带重试 + 间隔解决CRC
for station in station_list:
compensate = compensate_map[station]
final_pulse = base_pulse - compensate
# 关键修复:相对位置必须用 有符号32位
final_pulse = int(final_pulse)
retry = 3
while retry > 0:
try:
set_motor_position(station, final_pulse)
time.sleep(0.05)
break
except:
retry -= 1
time.sleep(0.1)
if retry == 0:
print(f"❌ 电机{station} 写入位置失败")
print(f"电机{station}:移动 {final_pulse} 脉冲")
# 启动
sync_motor_move(True)
time.sleep(move_wait_time)
# 停止
sync_motor_move(False)
time.sleep(0.2)
# 读取误差
print("\n---------------- 误差监控 ----------------")
for station in station_list:
try:
real_pos = read_motor_value(station)
pos_error = read_position_error(station)
if abs(pos_error) > error_threshold:
new_comp = pos_error - error_threshold
compensate_map[station] = new_comp
status = f"⚠️ 补偿:{new_comp}"
else:
compensate_map[station] = 0
status = "✅ 正常"
print(f"电机{station} | 实际:{real_pos} | 误差:{pos_error} | {status}")
except:
print(f"⚠️ 电机{station} 读取失败")
print(f"\n{interval}秒后继续...")
time.sleep(interval)
except KeyboardInterrupt:
print("\n🛑 手动停止")
sync_motor_move(False)
except Exception as e:
print(f"\n❌ 异常:{e}")
sync_motor_move(False)
def test_cycle_relative_position_mode():
"""
相对位置模式 —— 前20次每次+5之后固定使用6007脉冲
稳定版防CRC报错 + 自动重试
"""
try:
# ===================== 配置 =====================
station_list = [1, 2]
base_pulse = 6007 # 初始脉冲
move_wait_time = 3 # 运动时间
send_interval = 2 # 间隔2秒
speed = 30
acc = 50
dec = 50
max_increase_times = 50 # 前20次递增
increase_step = 2 # 每次加5
current_count = 0 # 运行次数计数
print("===== 前20次每次+520次后恢复6007脉冲 =====")
# 初始化电机
for station in station_list:
set_motor_mode(station, 2)
time.sleep(0.1)
set_motor_speed(station, speed)
time.sleep(0.1)
set_motor_acceleration(station, acc)
time.sleep(0.1)
set_motor_deceleration(station, dec)
time.sleep(0.1)
print(f"✅ 电机{station} 初始化完成")
# ===================== 主循环 =====================
while True:
current_count += 1
print(f"\n==================== 第 {current_count} 次运动 ====================")
# ============== 核心逻辑 ==============
if current_count <= max_increase_times:
# 前20次每次 +5
send_pulse = base_pulse + current_count * increase_step
print(f"📈 前20次递增 → 本次发送:{send_pulse}")
else:
# 20次以后固定 6007
send_pulse = 6007
print(f"✅ 已超过20次 → 固定发送:{send_pulse}")
# ========== 写入位置(带重试)==========
for station in station_list:
retry = 5
while retry > 0:
try:
set_motor_position(station, send_pulse)
time.sleep(0.1)
break
except Exception as e:
retry -= 1
print(f"⚠️ 电机{station} 写入失败,重试 {retry}")
time.sleep(0.2)
# ========== 启动电机 ==========
sync_motor_move(True)
# ========== 运动中实时监控误差 ==========
print("\n------------ 运动中实时误差 ------------")
start_time = time.time()
while time.time() - start_time < move_wait_time:
for station in station_list:
try:
err = read_position_error(station)
print(f"电机{station} 实时误差 → {err}")
except:
pass
time.sleep(0.1)
# ========== 停止 + 等待 ==========
sync_motor_move(False)
time.sleep(0.3)
print(f"\n✅ 运动完成,等待 {send_interval} 秒后继续...")
time.sleep(send_interval)
except KeyboardInterrupt:
print("\n🛑 手动停止")
try:
sync_motor_move(False)
except:
pass
except Exception as e:
print(f"\n❌ 异常:{e}")
try:
sync_motor_move(False)
except:
pass
def test_cycle_relative_position_mode_test():
"""
相对位置模式 —— 电机1每次+2电机2每次+4
稳定版防CRC报错 + 自动重试
"""
try:
# ===================== 配置 =====================
station_list = [1, 2]
base_pulse = 6007 # 初始基准脉冲
move_wait_time = 3 # 运动时间
send_interval = 2 # 间隔2秒
speed = 30
acc = 50
dec = 50
current_count = 0 # 运行次数计数
print("===== 电机1每次+2电机2每次+4 循环运行 =====")
# 初始化电机
for station in station_list:
set_motor_mode(station, 2)
time.sleep(0.1)
set_motor_speed(station, speed)
time.sleep(0.1)
set_motor_acceleration(station, acc)
time.sleep(0.1)
set_motor_deceleration(station, dec)
time.sleep(0.1)
print(f"✅ 电机{station} 初始化完成")
# ===================== 主循环 =====================
while True:
current_count += 1
print(f"\n==================== 第 {current_count} 次运动 ====================")
# ============== 核心:两台电机不同增量 ==============
pulse_1 = base_pulse + current_count * 10 # 电机1每次 +2 靠近出料口的电机
pulse_2 = base_pulse + current_count * 5 # 电机2每次 +4 远离出料口的电机
print(f"📌 电机1本次脉冲{pulse_1}")
print(f"📌 电机2本次脉冲{pulse_2}")
# ========== 分别写入两台电机(带重试)==========
# 写入电机1
retry = 5
while retry > 0:
try:
set_motor_position(1, pulse_1)
time.sleep(0.1)
break
except Exception as e:
retry -= 1
print(f"⚠️ 电机1写入失败重试 {retry}")
time.sleep(0.2)
# 写入电机2
retry = 5
while retry > 0:
try:
set_motor_position(2, pulse_2)
time.sleep(0.1)
break
except Exception as e:
retry -= 1
print(f"⚠️ 电机2写入失败重试 {retry}")
time.sleep(0.2)
# ========== 启动电机 ==========
sync_motor_move(True)
# ========== 运动中实时监控误差 ==========
print("\n------------ 运动中实时误差 ------------")
start_time = time.time()
while time.time() - start_time < move_wait_time:
for station in station_list:
try:
err = read_position_error(station)
print(f"电机{station} 实时误差 → {err}")
except:
pass
time.sleep(0.1)
time.sleep(1)
print(f"\n✅ 运动完成,等待 {send_interval} 秒后继续...")
time.sleep(send_interval)
except KeyboardInterrupt:
print("\n🛑 手动停止")
try:
sync_motor_move(False)
except:
pass
except Exception as e:
print(f"\n❌ 异常:{e}")
try:
sync_motor_move(False)
except:
pass
# ------------调试接口----------
if __name__ == '__main__':
test_cycle_relative_position_mode()