#!/usr/bin/env python # -*- coding: utf-8 -*- ''' # @Time : 2026/1/5 16:18 # @Author : reenrr # @File : test.py # @Desc : 步进电机添加记录脉冲数量(防止断电情况) ''' import time import json import os from periphery import GPIO # ------------参数配置------------- # 1. 脉冲(PUL)引脚配置 → GPIO32 PUL_Pin = 32 # 2. 方向(DIR)引脚配置 → GPIO33 DIR_Pin = 33 # 3. 驱动器参数(根据拨码调整,默认不变) PULSES_PER_ROUND = 400 # 每圈脉冲数(SW5~SW8拨码,默认400) PULSE_FREQUENCY = 2500 # 脉冲频率(Hz,新手建议500~2000,最大200KHz) # 4. 计数持久化配置 COUNT_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "motor_pulse_count.json") SAVE_INTERVAL = 10 # 每发送10个脉冲保存一次计数(减少IO开销,可根据需求调整) class StepperMotor: """新力川MA860H驱动器步进电机控制类""" # 方向常量定义 CLOCKWISE = "clockwise" # 顺时针 COUNTER_CLOCKWISE = "counterclockwise" # 逆时针 def __init__(self, pul_pin: int = PUL_Pin, dir_pin: int = DIR_Pin, pulses_per_round: int = PULSES_PER_ROUND, pulse_frequency: int = PULSE_FREQUENCY, clockwise_level: bool = True, counter_clockwise_level: bool = False): """ 初始化步进电机控制器 :param pul_pin: 脉冲引脚 :param dir_pin: 方向引脚 :param pulses_per_round: 每圈脉冲数(SW5~SW8拨码,默认400) :param pulse_frequency: 脉冲频率(Hz,新手建议500~2000,最大200KHz) :param clockwise_level: 顺时针对应的DIR电平 :param counter_clockwise_level: 逆时针对应的DIR电平 """ # 硬件配置参数 self.pul_pin = pul_pin self.dir_pin = dir_pin # 驱动器参数 self.pulses_per_round = pulses_per_round self.pulse_frequency = pulse_frequency self.clockwise_level = clockwise_level self.counter_clockwise_level = counter_clockwise_level # GPIO对象初始化 self.pul_gpio = None self.dir_gpio = None # 脉冲计数相关(核心:记录已发送的脉冲数) self.current_pulse_count = 0 # 本次旋转已发送的脉冲数 self.total_pulse_history = self._load_pulse_count() # 历史总脉冲数(从文件加载) self.current_rotate_target = 0 # 本次旋转的目标脉冲数 # 初始化GPIO self._init_gpio() def _load_pulse_count(self): """加载历史脉冲计数(程序启动时执行)""" try: if os.path.exists(COUNT_FILE): with open(COUNT_FILE, "r", encoding="utf-8") as f: data = json.load(f) # 返回历史总脉冲数(默认0) return int(data.get("total_pulses", 0)) except Exception as e: print(f"[WARN] 加载历史计数失败:{e},将从0开始计数") return 0 def _save_pulse_count(self): """保存当前总脉冲数到文件(持久化)""" try: with open(COUNT_FILE, "w", encoding="utf-8") as f: json.dump({ "total_pulses": self.total_pulse_history, "update_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) }, f, ensure_ascii=False, indent=2) except Exception as e: print(f"[ERROR] 保存计数失败:{e}") def _init_gpio(self): """初始化PUL和DIR引脚(内部方法)""" try: # 初始化脉冲引脚(输出模式) self.pul_gpio = GPIO(self.pul_pin, "out") # 初始化方向引脚(输出模式) self.dir_gpio = GPIO(self.dir_pin, "out") # 初始电平置低(避免电机误动作) self.pul_gpio.write(False) self.dir_gpio.write(False) print(f"[OK] PUL引脚初始化完成:{self.pul_pin} 引脚") print(f"[OK] DIR引脚初始化完成:{self.dir_pin} 引脚") print( f"[INFO] 历史累计脉冲数:{self.total_pulse_history} → 对应圈数:{self.total_pulse_history / self.pulses_per_round:.2f} 圈") except PermissionError: raise RuntimeError("权限不足!请用sudo运行程序(sudo python xxx.py)") except Exception as e: raise RuntimeError(f"GPIO初始化失败:{str(e)}") from e def _validate_rounds(self, rounds: float) -> bool: """验证圈数是否合法(内部方法)""" if rounds <= 0: print("[ERROR] 圈数必须为正数") return False return True def _validate_direction(self, direction: str) -> bool: """验证方向参数是否合法(内部方法)""" if direction not in [self.CLOCKWISE, self.COUNTER_CLOCKWISE]: print(f"[ERROR] 方向参数错误:仅支持 {self.CLOCKWISE}/{self.COUNTER_CLOCKWISE}") return False return True def rotate(self, rounds: float, direction: str = CLOCKWISE): """ 控制电机旋转(支持正反转,实时记录脉冲数) :param rounds: 旋转圈数(可小数,如0.5=半圈) :param direction: 方向(clockwise=顺时针,counterclockwise=逆时针) """ # 参数验证 if not self._validate_rounds(rounds) or not self._validate_direction(direction): return # 重置本次旋转的计数 self.current_pulse_count = 0 # 计算本次旋转的目标脉冲数 self.current_rotate_target = int(rounds * self.pulses_per_round) # 设置旋转方向(DIR电平) if direction == self.CLOCKWISE: # 顺时针 self.dir_gpio.write(self.clockwise_level) print(f"\n=== 顺时针旋转 {rounds} 圈 ===") else: # 逆时针 self.dir_gpio.write(self.counter_clockwise_level) print(f"\n=== 逆时针旋转 {rounds} 圈 ===") # 计算总脉冲数和时序(精准控制,避免丢步) total_pulses = self.current_rotate_target pulse_period = 1.0 / self.pulse_frequency # 脉冲周期(秒) half_period = pulse_period / 2 # 占空比50%(MA860H最优) print(f"目标脉冲数:{total_pulses} | 频率:{self.pulse_frequency}Hz | 周期:{pulse_period * 1000:.2f}ms") start_time = time.perf_counter() # 高精度计时(避免丢步) try: # 发送脉冲序列(核心:占空比50%的方波,实时计数) for _ in range(total_pulses): # 高电平 self.pul_gpio.write(True) # 精准延时(比time.sleep稳定,适配高频脉冲) while time.perf_counter() - start_time < half_period: pass # 低电平 self.pul_gpio.write(False) # 更新下一个脉冲的起始时间 start_time += pulse_period # 🌟 核心:累加本次脉冲计数 self.current_pulse_count += 1 self.total_pulse_history += 1 # 每发送SAVE_INTERVAL个脉冲,保存一次计数(减少IO开销) if self.current_pulse_count % SAVE_INTERVAL == 0: self._save_pulse_count() print( f"[OK] 旋转完成 → 本次发送脉冲:{self.current_pulse_count} | 累计脉冲:{self.total_pulse_history} | 累计圈数:{self.total_pulse_history / self.pulses_per_round:.2f}") except Exception as e: # 🌟 关键:异常发生时(如断电前的程序崩溃),立即保存当前计数 self._save_pulse_count() # 计算已完成的圈数 completed_rounds = self.current_pulse_count / self.pulses_per_round remaining_pulses = self.current_rotate_target - self.current_pulse_count remaining_rounds = remaining_pulses / self.pulses_per_round print(f"\n[ERROR] 旋转过程中异常:{e}") print(f"[INFO] 异常时已发送脉冲:{self.current_pulse_count} → 已完成圈数:{completed_rounds:.2f}") print(f"[INFO] 剩余未发送脉冲:{remaining_pulses} → 剩余圈数:{remaining_rounds:.2f}") print( f"[INFO] 累计总脉冲:{self.total_pulse_history} → 累计总圈数:{self.total_pulse_history / self.pulses_per_round:.2f}") raise # 抛出异常,让上层处理 def get_current_status(self): """获取当前电机状态(脉冲数、圈数)""" return { "total_pulses": self.total_pulse_history, "total_rounds": self.total_pulse_history / self.pulses_per_round, "last_rotate_completed_pulses": self.current_pulse_count, "last_rotate_completed_rounds": self.current_pulse_count / self.pulses_per_round, "last_rotate_target_pulses": self.current_rotate_target, "last_rotate_target_rounds": self.current_rotate_target / self.pulses_per_round } def reset_count(self): """重置累计计数(按需使用,如电机归位后)""" self.total_pulse_history = 0 self._save_pulse_count() print("[INFO] 累计脉冲计数已重置为0") def stop(self): """紧急停止(置低脉冲引脚)""" if self.pul_gpio: self.pul_gpio.write(False) # 停止时保存当前计数 self._save_pulse_count() print("[STOP] 电机已停止,当前计数已保存") def close(self): """释放GPIO资源""" # 安全释放GPIO资源(关键:避免引脚电平残留) if self.pul_gpio: self.pul_gpio.write(False) # 脉冲引脚置低 self.pul_gpio.close() print("\n[OK] PUL引脚已关闭(电平置低)") if self.dir_gpio: self.dir_gpio.write(False) # 方向引脚置低 self.dir_gpio.close() print("[OK] DIR引脚已关闭(电平置低)") # 关闭时保存最终计数 self._save_pulse_count() print( f"[INFO] 最终累计脉冲:{self.total_pulse_history} → 累计圈数:{self.total_pulse_history / self.pulses_per_round:.2f}") # 重置GPIO对象 self.pul_gpio = None self.dir_gpio = None def __del__(self): """析构函数:确保资源释放""" self.close() # ----------对外接口----------- def motor_demo(): """电机控制示例""" motor = None try: # 创建电机实例(使用默认配置) motor = StepperMotor() print("\n=== 步进电机控制程序启动 ===") # 打印初始状态 init_status = motor.get_current_status() print(f"[INIT] 初始状态 → 累计圈数:{init_status['total_rounds']:.2f} 圈") while True: # 靠近电机方向 逆时针 motor.rotate(rounds=10.0, direction=motor.COUNTER_CLOCKWISE) time.sleep(5) # 暂停5秒 # 远离电机方向 顺时针 motor.rotate(rounds=10.0, direction=motor.CLOCKWISE) time.sleep(5) # 暂停5秒 except PermissionError: print("\n[ERROR] 权限不足:请用 sudo 运行!") print("命令:sudo python3 double_direction_motor.py") except ImportError: print("\n[ERROR] 缺少依赖:请安装python-periphery") print("命令:pip install python-periphery") except KeyboardInterrupt: print("\n[INFO] 用户手动停止程序") except Exception as e: print(f"\n[ERROR] 程序异常:{str(e)}") finally: if motor: # 打印最终状态 final_status = motor.get_current_status() print(f"\n[FINAL] 程序退出状态 → 累计脉冲:{final_status['total_pulses']} | 累计圈数:{final_status['total_rounds']:.2f} 圈") motor.close() print("[OK] 程序退出完成") if __name__ == '__main__': motor_demo()