Files
wire_controlsystem/RK1106/test.py
2026-01-05 18:11:56 +08:00

292 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
# @Time : 2026/1/5 16:18
# @Author : reenrr
# @File : test.py
# @Desc : 步进电机添加记录脉冲数量(防止断电情况)
'''
import time
import json
import os
from periphery import GPIO
# ------------参数配置-------------
# 1. 脉冲PUL引脚配置 → GPIO32
PUL_Pin = 32
# 2. 方向DIR引脚配置 → GPIO33
DIR_Pin = 33
# 3. 驱动器参数(根据拨码调整,默认不变)
PULSES_PER_ROUND = 400 # 每圈脉冲数SW5~SW8拨码默认400
PULSE_FREQUENCY = 2500 # 脉冲频率Hz新手建议500~2000最大200KHz
# 4. 计数持久化配置
COUNT_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "motor_pulse_count.json")
SAVE_INTERVAL = 10 # 每发送10个脉冲保存一次计数减少IO开销可根据需求调整
class StepperMotor:
"""新力川MA860H驱动器步进电机控制类"""
# 方向常量定义
CLOCKWISE = "clockwise" # 顺时针
COUNTER_CLOCKWISE = "counterclockwise" # 逆时针
def __init__(self,
pul_pin: int = PUL_Pin,
dir_pin: int = DIR_Pin,
pulses_per_round: int = PULSES_PER_ROUND,
pulse_frequency: int = PULSE_FREQUENCY,
clockwise_level: bool = True,
counter_clockwise_level: bool = False):
"""
初始化步进电机控制器
:param pul_pin: 脉冲引脚
:param dir_pin: 方向引脚
:param pulses_per_round: 每圈脉冲数SW5~SW8拨码默认400
:param pulse_frequency: 脉冲频率Hz新手建议500~2000最大200KHz
:param clockwise_level: 顺时针对应的DIR电平
:param counter_clockwise_level: 逆时针对应的DIR电平
"""
# 硬件配置参数
self.pul_pin = pul_pin
self.dir_pin = dir_pin
# 驱动器参数
self.pulses_per_round = pulses_per_round
self.pulse_frequency = pulse_frequency
self.clockwise_level = clockwise_level
self.counter_clockwise_level = counter_clockwise_level
# GPIO对象初始化
self.pul_gpio = None
self.dir_gpio = None
# 脉冲计数相关(核心:记录已发送的脉冲数)
self.current_pulse_count = 0 # 本次旋转已发送的脉冲数
self.total_pulse_history = self._load_pulse_count() # 历史总脉冲数(从文件加载)
self.current_rotate_target = 0 # 本次旋转的目标脉冲数
# 初始化GPIO
self._init_gpio()
def _load_pulse_count(self):
"""加载历史脉冲计数(程序启动时执行)"""
try:
if os.path.exists(COUNT_FILE):
with open(COUNT_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
# 返回历史总脉冲数默认0
return int(data.get("total_pulses", 0))
except Exception as e:
print(f"[WARN] 加载历史计数失败:{e}将从0开始计数")
return 0
def _save_pulse_count(self):
"""保存当前总脉冲数到文件(持久化)"""
try:
with open(COUNT_FILE, "w", encoding="utf-8") as f:
json.dump({
"total_pulses": self.total_pulse_history,
"update_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
}, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"[ERROR] 保存计数失败:{e}")
def _init_gpio(self):
"""初始化PUL和DIR引脚内部方法"""
try:
# 初始化脉冲引脚(输出模式)
self.pul_gpio = GPIO(self.pul_pin, "out")
# 初始化方向引脚(输出模式)
self.dir_gpio = GPIO(self.dir_pin, "out")
# 初始电平置低(避免电机误动作)
self.pul_gpio.write(False)
self.dir_gpio.write(False)
print(f"[OK] PUL引脚初始化完成{self.pul_pin} 引脚")
print(f"[OK] DIR引脚初始化完成{self.dir_pin} 引脚")
print(
f"[INFO] 历史累计脉冲数:{self.total_pulse_history} → 对应圈数:{self.total_pulse_history / self.pulses_per_round:.2f}")
except PermissionError:
raise RuntimeError("权限不足请用sudo运行程序sudo python xxx.py")
except Exception as e:
raise RuntimeError(f"GPIO初始化失败{str(e)}") from e
def _validate_rounds(self, rounds: float) -> bool:
"""验证圈数是否合法(内部方法)"""
if rounds <= 0:
print("[ERROR] 圈数必须为正数")
return False
return True
def _validate_direction(self, direction: str) -> bool:
"""验证方向参数是否合法(内部方法)"""
if direction not in [self.CLOCKWISE, self.COUNTER_CLOCKWISE]:
print(f"[ERROR] 方向参数错误:仅支持 {self.CLOCKWISE}/{self.COUNTER_CLOCKWISE}")
return False
return True
def rotate(self, rounds: float, direction: str = CLOCKWISE):
"""
控制电机旋转(支持正反转,实时记录脉冲数)
:param rounds: 旋转圈数可小数如0.5=半圈)
:param direction: 方向clockwise=顺时针counterclockwise=逆时针)
"""
# 参数验证
if not self._validate_rounds(rounds) or not self._validate_direction(direction):
return
# 重置本次旋转的计数
self.current_pulse_count = 0
# 计算本次旋转的目标脉冲数
self.current_rotate_target = int(rounds * self.pulses_per_round)
# 设置旋转方向DIR电平
if direction == self.CLOCKWISE: # 顺时针
self.dir_gpio.write(self.clockwise_level)
print(f"\n=== 顺时针旋转 {rounds} 圈 ===")
else: # 逆时针
self.dir_gpio.write(self.counter_clockwise_level)
print(f"\n=== 逆时针旋转 {rounds} 圈 ===")
# 计算总脉冲数和时序(精准控制,避免丢步)
total_pulses = self.current_rotate_target
pulse_period = 1.0 / self.pulse_frequency # 脉冲周期(秒)
half_period = pulse_period / 2 # 占空比50%MA860H最优
print(f"目标脉冲数:{total_pulses} | 频率:{self.pulse_frequency}Hz | 周期:{pulse_period * 1000:.2f}ms")
start_time = time.perf_counter() # 高精度计时(避免丢步)
try:
# 发送脉冲序列核心占空比50%的方波,实时计数)
for _ in range(total_pulses):
# 高电平
self.pul_gpio.write(True)
# 精准延时比time.sleep稳定适配高频脉冲
while time.perf_counter() - start_time < half_period:
pass
# 低电平
self.pul_gpio.write(False)
# 更新下一个脉冲的起始时间
start_time += pulse_period
# 🌟 核心:累加本次脉冲计数
self.current_pulse_count += 1
self.total_pulse_history += 1
# 每发送SAVE_INTERVAL个脉冲保存一次计数减少IO开销
if self.current_pulse_count % SAVE_INTERVAL == 0:
self._save_pulse_count()
print(
f"[OK] 旋转完成 → 本次发送脉冲:{self.current_pulse_count} | 累计脉冲:{self.total_pulse_history} | 累计圈数:{self.total_pulse_history / self.pulses_per_round:.2f}")
except Exception as e:
# 🌟 关键:异常发生时(如断电前的程序崩溃),立即保存当前计数
self._save_pulse_count()
# 计算已完成的圈数
completed_rounds = self.current_pulse_count / self.pulses_per_round
remaining_pulses = self.current_rotate_target - self.current_pulse_count
remaining_rounds = remaining_pulses / self.pulses_per_round
print(f"\n[ERROR] 旋转过程中异常:{e}")
print(f"[INFO] 异常时已发送脉冲:{self.current_pulse_count} → 已完成圈数:{completed_rounds:.2f}")
print(f"[INFO] 剩余未发送脉冲:{remaining_pulses} → 剩余圈数:{remaining_rounds:.2f}")
print(
f"[INFO] 累计总脉冲:{self.total_pulse_history} → 累计总圈数:{self.total_pulse_history / self.pulses_per_round:.2f}")
raise # 抛出异常,让上层处理
def get_current_status(self):
"""获取当前电机状态(脉冲数、圈数)"""
return {
"total_pulses": self.total_pulse_history,
"total_rounds": self.total_pulse_history / self.pulses_per_round,
"last_rotate_completed_pulses": self.current_pulse_count,
"last_rotate_completed_rounds": self.current_pulse_count / self.pulses_per_round,
"last_rotate_target_pulses": self.current_rotate_target,
"last_rotate_target_rounds": self.current_rotate_target / self.pulses_per_round
}
def reset_count(self):
"""重置累计计数(按需使用,如电机归位后)"""
self.total_pulse_history = 0
self._save_pulse_count()
print("[INFO] 累计脉冲计数已重置为0")
def stop(self):
"""紧急停止(置低脉冲引脚)"""
if self.pul_gpio:
self.pul_gpio.write(False)
# 停止时保存当前计数
self._save_pulse_count()
print("[STOP] 电机已停止,当前计数已保存")
def close(self):
"""释放GPIO资源"""
# 安全释放GPIO资源关键避免引脚电平残留
if self.pul_gpio:
self.pul_gpio.write(False) # 脉冲引脚置低
self.pul_gpio.close()
print("\n[OK] PUL引脚已关闭电平置低")
if self.dir_gpio:
self.dir_gpio.write(False) # 方向引脚置低
self.dir_gpio.close()
print("[OK] DIR引脚已关闭电平置低")
# 关闭时保存最终计数
self._save_pulse_count()
print(
f"[INFO] 最终累计脉冲:{self.total_pulse_history} → 累计圈数:{self.total_pulse_history / self.pulses_per_round:.2f}")
# 重置GPIO对象
self.pul_gpio = None
self.dir_gpio = None
def __del__(self):
"""析构函数:确保资源释放"""
self.close()
# 使用示例
def motor_demo():
"""电机控制示例"""
motor = None
try:
# 创建电机实例(使用默认配置)
motor = StepperMotor()
print("\n=== 步进电机控制程序启动 ===")
# 打印初始状态
init_status = motor.get_current_status()
print(f"[INIT] 初始状态 → 累计圈数:{init_status['total_rounds']:.2f}")
while True:
# 靠近电机方向 逆时针
motor.rotate(rounds=10.0, direction=motor.COUNTER_CLOCKWISE)
time.sleep(5) # 暂停5秒
# 远离电机方向 顺时针
motor.rotate(rounds=10.0, direction=motor.CLOCKWISE)
time.sleep(5) # 暂停5秒
except PermissionError:
print("\n[ERROR] 权限不足:请用 sudo 运行!")
print("命令sudo python3 double_direction_motor.py")
except ImportError:
print("\n[ERROR] 缺少依赖请安装python-periphery")
print("命令pip install python-periphery")
except KeyboardInterrupt:
print("\n[INFO] 用户手动停止程序")
except Exception as e:
print(f"\n[ERROR] 程序异常:{str(e)}")
finally:
if motor:
# 打印最终状态
final_status = motor.get_current_status()
print(f"\n[FINAL] 程序退出状态 → 累计脉冲:{final_status['total_pulses']} | 累计圈数:{final_status['total_rounds']:.2f}")
motor.close()
print("[OK] 程序退出完成")
if __name__ == '__main__':
motor_demo()