Files
wire_controlsystem/RK1106/test.py

292 lines
12 KiB
Python
Raw Normal View History

2026-01-05 18:11:56 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
# @Time : 2026/1/5 16:18
# @Author : reenrr
# @File : test.py
# @Desc : 步进电机添加记录脉冲数量(防止断电情况)
'''
import time
import json
import os
from periphery import GPIO
# ------------参数配置-------------
# 1. 脉冲PUL引脚配置 → GPIO32
PUL_Pin = 32
# 2. 方向DIR引脚配置 → GPIO33
DIR_Pin = 33
# 3. 驱动器参数(根据拨码调整,默认不变)
PULSES_PER_ROUND = 400 # 每圈脉冲数SW5~SW8拨码默认400
PULSE_FREQUENCY = 2500 # 脉冲频率Hz新手建议500~2000最大200KHz
# 4. 计数持久化配置
COUNT_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "motor_pulse_count.json")
SAVE_INTERVAL = 10 # 每发送10个脉冲保存一次计数减少IO开销可根据需求调整
class StepperMotor:
"""新力川MA860H驱动器步进电机控制类"""
# 方向常量定义
CLOCKWISE = "clockwise" # 顺时针
COUNTER_CLOCKWISE = "counterclockwise" # 逆时针
def __init__(self,
pul_pin: int = PUL_Pin,
dir_pin: int = DIR_Pin,
pulses_per_round: int = PULSES_PER_ROUND,
pulse_frequency: int = PULSE_FREQUENCY,
clockwise_level: bool = True,
counter_clockwise_level: bool = False):
"""
初始化步进电机控制器
:param pul_pin: 脉冲引脚
:param dir_pin: 方向引脚
:param pulses_per_round: 每圈脉冲数SW5~SW8拨码默认400
:param pulse_frequency: 脉冲频率Hz新手建议500~2000最大200KHz
:param clockwise_level: 顺时针对应的DIR电平
:param counter_clockwise_level: 逆时针对应的DIR电平
"""
# 硬件配置参数
self.pul_pin = pul_pin
self.dir_pin = dir_pin
# 驱动器参数
self.pulses_per_round = pulses_per_round
self.pulse_frequency = pulse_frequency
self.clockwise_level = clockwise_level
self.counter_clockwise_level = counter_clockwise_level
# GPIO对象初始化
self.pul_gpio = None
self.dir_gpio = None
# 脉冲计数相关(核心:记录已发送的脉冲数)
self.current_pulse_count = 0 # 本次旋转已发送的脉冲数
self.total_pulse_history = self._load_pulse_count() # 历史总脉冲数(从文件加载)
self.current_rotate_target = 0 # 本次旋转的目标脉冲数
# 初始化GPIO
self._init_gpio()
def _load_pulse_count(self):
"""加载历史脉冲计数(程序启动时执行)"""
try:
if os.path.exists(COUNT_FILE):
with open(COUNT_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
# 返回历史总脉冲数默认0
return int(data.get("total_pulses", 0))
except Exception as e:
print(f"[WARN] 加载历史计数失败:{e}将从0开始计数")
return 0
def _save_pulse_count(self):
"""保存当前总脉冲数到文件(持久化)"""
try:
with open(COUNT_FILE, "w", encoding="utf-8") as f:
json.dump({
"total_pulses": self.total_pulse_history,
"update_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
}, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"[ERROR] 保存计数失败:{e}")
def _init_gpio(self):
"""初始化PUL和DIR引脚内部方法"""
try:
# 初始化脉冲引脚(输出模式)
self.pul_gpio = GPIO(self.pul_pin, "out")
# 初始化方向引脚(输出模式)
self.dir_gpio = GPIO(self.dir_pin, "out")
# 初始电平置低(避免电机误动作)
self.pul_gpio.write(False)
self.dir_gpio.write(False)
print(f"[OK] PUL引脚初始化完成{self.pul_pin} 引脚")
print(f"[OK] DIR引脚初始化完成{self.dir_pin} 引脚")
print(
f"[INFO] 历史累计脉冲数:{self.total_pulse_history} → 对应圈数:{self.total_pulse_history / self.pulses_per_round:.2f}")
except PermissionError:
raise RuntimeError("权限不足请用sudo运行程序sudo python xxx.py")
except Exception as e:
raise RuntimeError(f"GPIO初始化失败{str(e)}") from e
def _validate_rounds(self, rounds: float) -> bool:
"""验证圈数是否合法(内部方法)"""
if rounds <= 0:
print("[ERROR] 圈数必须为正数")
return False
return True
def _validate_direction(self, direction: str) -> bool:
"""验证方向参数是否合法(内部方法)"""
if direction not in [self.CLOCKWISE, self.COUNTER_CLOCKWISE]:
print(f"[ERROR] 方向参数错误:仅支持 {self.CLOCKWISE}/{self.COUNTER_CLOCKWISE}")
return False
return True
def rotate(self, rounds: float, direction: str = CLOCKWISE):
"""
控制电机旋转支持正反转实时记录脉冲数
:param rounds: 旋转圈数可小数如0.5=半圈
:param direction: 方向clockwise=顺时针counterclockwise=逆时针
"""
# 参数验证
if not self._validate_rounds(rounds) or not self._validate_direction(direction):
return
# 重置本次旋转的计数
self.current_pulse_count = 0
# 计算本次旋转的目标脉冲数
self.current_rotate_target = int(rounds * self.pulses_per_round)
# 设置旋转方向DIR电平
if direction == self.CLOCKWISE: # 顺时针
self.dir_gpio.write(self.clockwise_level)
print(f"\n=== 顺时针旋转 {rounds} 圈 ===")
else: # 逆时针
self.dir_gpio.write(self.counter_clockwise_level)
print(f"\n=== 逆时针旋转 {rounds} 圈 ===")
# 计算总脉冲数和时序(精准控制,避免丢步)
total_pulses = self.current_rotate_target
pulse_period = 1.0 / self.pulse_frequency # 脉冲周期(秒)
half_period = pulse_period / 2 # 占空比50%MA860H最优
print(f"目标脉冲数:{total_pulses} | 频率:{self.pulse_frequency}Hz | 周期:{pulse_period * 1000:.2f}ms")
start_time = time.perf_counter() # 高精度计时(避免丢步)
try:
# 发送脉冲序列核心占空比50%的方波,实时计数)
for _ in range(total_pulses):
# 高电平
self.pul_gpio.write(True)
# 精准延时比time.sleep稳定适配高频脉冲
while time.perf_counter() - start_time < half_period:
pass
# 低电平
self.pul_gpio.write(False)
# 更新下一个脉冲的起始时间
start_time += pulse_period
# 🌟 核心:累加本次脉冲计数
self.current_pulse_count += 1
self.total_pulse_history += 1
# 每发送SAVE_INTERVAL个脉冲保存一次计数减少IO开销
if self.current_pulse_count % SAVE_INTERVAL == 0:
self._save_pulse_count()
print(
f"[OK] 旋转完成 → 本次发送脉冲:{self.current_pulse_count} | 累计脉冲:{self.total_pulse_history} | 累计圈数:{self.total_pulse_history / self.pulses_per_round:.2f}")
except Exception as e:
# 🌟 关键:异常发生时(如断电前的程序崩溃),立即保存当前计数
self._save_pulse_count()
# 计算已完成的圈数
completed_rounds = self.current_pulse_count / self.pulses_per_round
remaining_pulses = self.current_rotate_target - self.current_pulse_count
remaining_rounds = remaining_pulses / self.pulses_per_round
print(f"\n[ERROR] 旋转过程中异常:{e}")
print(f"[INFO] 异常时已发送脉冲:{self.current_pulse_count} → 已完成圈数:{completed_rounds:.2f}")
print(f"[INFO] 剩余未发送脉冲:{remaining_pulses} → 剩余圈数:{remaining_rounds:.2f}")
print(
f"[INFO] 累计总脉冲:{self.total_pulse_history} → 累计总圈数:{self.total_pulse_history / self.pulses_per_round:.2f}")
raise # 抛出异常,让上层处理
def get_current_status(self):
"""获取当前电机状态(脉冲数、圈数)"""
return {
"total_pulses": self.total_pulse_history,
"total_rounds": self.total_pulse_history / self.pulses_per_round,
"last_rotate_completed_pulses": self.current_pulse_count,
"last_rotate_completed_rounds": self.current_pulse_count / self.pulses_per_round,
"last_rotate_target_pulses": self.current_rotate_target,
"last_rotate_target_rounds": self.current_rotate_target / self.pulses_per_round
}
def reset_count(self):
"""重置累计计数(按需使用,如电机归位后)"""
self.total_pulse_history = 0
self._save_pulse_count()
print("[INFO] 累计脉冲计数已重置为0")
def stop(self):
"""紧急停止(置低脉冲引脚)"""
if self.pul_gpio:
self.pul_gpio.write(False)
# 停止时保存当前计数
self._save_pulse_count()
print("[STOP] 电机已停止,当前计数已保存")
def close(self):
"""释放GPIO资源"""
# 安全释放GPIO资源关键避免引脚电平残留
if self.pul_gpio:
self.pul_gpio.write(False) # 脉冲引脚置低
self.pul_gpio.close()
print("\n[OK] PUL引脚已关闭电平置低")
if self.dir_gpio:
self.dir_gpio.write(False) # 方向引脚置低
self.dir_gpio.close()
print("[OK] DIR引脚已关闭电平置低")
# 关闭时保存最终计数
self._save_pulse_count()
print(
f"[INFO] 最终累计脉冲:{self.total_pulse_history} → 累计圈数:{self.total_pulse_history / self.pulses_per_round:.2f}")
# 重置GPIO对象
self.pul_gpio = None
self.dir_gpio = None
def __del__(self):
"""析构函数:确保资源释放"""
self.close()
# 使用示例
def motor_demo():
"""电机控制示例"""
motor = None
try:
# 创建电机实例(使用默认配置)
motor = StepperMotor()
print("\n=== 步进电机控制程序启动 ===")
# 打印初始状态
init_status = motor.get_current_status()
print(f"[INIT] 初始状态 → 累计圈数:{init_status['total_rounds']:.2f}")
while True:
# 靠近电机方向 逆时针
motor.rotate(rounds=10.0, direction=motor.COUNTER_CLOCKWISE)
time.sleep(5) # 暂停5秒
# 远离电机方向 顺时针
motor.rotate(rounds=10.0, direction=motor.CLOCKWISE)
time.sleep(5) # 暂停5秒
except PermissionError:
print("\n[ERROR] 权限不足:请用 sudo 运行!")
print("命令sudo python3 double_direction_motor.py")
except ImportError:
print("\n[ERROR] 缺少依赖请安装python-periphery")
print("命令pip install python-periphery")
except KeyboardInterrupt:
print("\n[INFO] 用户手动停止程序")
except Exception as e:
print(f"\n[ERROR] 程序异常:{str(e)}")
finally:
if motor:
# 打印最终状态
final_status = motor.get_current_status()
print(f"\n[FINAL] 程序退出状态 → 累计脉冲:{final_status['total_pulses']} | 累计圈数:{final_status['total_rounds']:.2f}")
motor.close()
print("[OK] 程序退出完成")
if __name__ == '__main__':
motor_demo()