diff --git a/EMV/EMV.py b/EMV/EMV.py index 7cb607d..0e88d1f 100644 --- a/EMV/EMV.py +++ b/EMV/EMV.py @@ -49,19 +49,19 @@ valve_commands = { 'close': '000000000006010500020000', }, ABSORB_SOLENOID_VALVE2: { - 'open': '00000000000601050002FF00', + 'open': '00000000000601050003FF00', 'close': '000000000006010500030000', }, ABSORB_SOLENOID_VALVE3: { - 'open': '00000000000601050002FF00', + 'open': '00000000000601050004FF00', 'close': '000000000006010500040000', }, ABSORB_SOLENOID_VALVE4: { - 'open': '00000000000601050002FF00', + 'open': '00000000000601050005FF00', 'close': '000000000006010500050000', }, ABSORB_SOLENOID_VALVE5: { - 'open': '00000000000601050002FF00', + 'open': '00000000000601050006FF00', 'close': '000000000006010500060000', } } diff --git a/EMV/EMV_test.py b/EMV/EMV_test.py index 00ca765..c72a830 100644 --- a/EMV/EMV_test.py +++ b/EMV/EMV_test.py @@ -1,37 +1,37 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- ''' -# @Time : 2026/1/8 16:52 +# @Time : 2025/12/12 14:39 # @Author : reenrr -# @File : EMV_test.py -# @Desc : 网络继电器控制输入、输出设备测试程序 +# @File : EMV.py +# @Desc : 网络继电器控制输入、输出设备 修改了接口,需测试 ''' import socket import binascii import time -from threading import Event, Lock -import threading import logging +from typing import Union + # 网络继电器的 IP 和端口 HOST = '192.168.5.18' PORT = 50000 # 控件命名映射 -SOLENOID_VALVE1 = 'solenoid_valve1' # 控制排料机构NG时的电磁阀1 -SOLENOID_VALVE2 = 'solenoid_valve2' # 控制排料机构NG时的电磁阀2 -ABSORB_SOLENOID_VALVE1 = 'absorb_solenoid_valve1' # 控制吸取设备的电磁阀1 -ABSORB_SOLENOID_VALVE2 = 'absorb_solenoid_valve2' # 控制吸取设备的电磁阀2 -ABSORB_SOLENOID_VALVE3 = 'absorb_solenoid_valve3' # 控制吸取设备的电磁阀3 -ABSORB_SOLENOID_VALVE4 = 'absorb_solenoid_valve4' # 控制吸取设备的电磁阀4 -ABSORB_SOLENOID_VALVE5 = 'absorb_solenoid_valve5' # 控制吸取设备的电磁阀5 +SOLENOID_VALVE1 = 'solenoid_valve1' # 控制排料机构NG时的电磁阀1 +SOLENOID_VALVE2 = 'solenoid_valve2' # 控制排料机构NG时的电磁阀2 +ABSORB_SOLENOID_VALVE1 = 'absorb_solenoid_valve1' # 控制吸取设备的电磁阀1 +ABSORB_SOLENOID_VALVE2 = 'absorb_solenoid_valve2' # 控制吸取设备的电磁阀2 +ABSORB_SOLENOID_VALVE3 = 'absorb_solenoid_valve3' # 控制吸取设备的电磁阀3 +ABSORB_SOLENOID_VALVE4 = 'absorb_solenoid_valve4' # 控制吸取设备的电磁阀4 +ABSORB_SOLENOID_VALVE5 = 'absorb_solenoid_valve5' # 控制吸取设备的电磁阀5 # 传感器命名映射 -CONVEYOR1_SENSOR = 'conveyor1_sensor' # 传送带1的行程开关 -CONVEYOR2_SENSOR = 'conveyor2_sensor' # 传送带2的行程开关 -PRESS_SENSOR1 = 'press_sensor1' # 传送带1旁边的按压开关1 -PRESS_SENSOR2 = 'press_sensor2' # 传送带1旁边的按压开关2 -FIBER_SENSOR = 'fiber_sensor' # 传送带1旁边的光纤传感器 +CONVEYOR1_SENSOR = 'conveyor1_sensor' # 传送带1的行程开关 +CONVEYOR2_SENSOR = 'conveyor2_sensor' # 传送带2的行程开关 +PRESS_SENSOR1 = 'press_sensor1' # 传送带1旁边的按压开关1 +PRESS_SENSOR2 = 'press_sensor2' # 传送带1旁边的按压开关2 +FIBER_SENSOR = 'fiber_sensor' # 传送带1旁边的光纤传感器 # 控件控制报文 valve_commands = { @@ -48,19 +48,19 @@ valve_commands = { 'close': '000000000006010500020000', }, ABSORB_SOLENOID_VALVE2: { - 'open': '00000000000601050002FF00', + 'open': '00000000000601050003FF00', 'close': '000000000006010500030000', }, ABSORB_SOLENOID_VALVE3: { - 'open': '00000000000601050002FF00', + 'open': '00000000000601050004FF00', 'close': '000000000006010500040000', }, ABSORB_SOLENOID_VALVE4: { - 'open': '00000000000601050002FF00', + 'open': '00000000000601050005FF00', 'close': '000000000006010500050000', }, ABSORB_SOLENOID_VALVE5: { - 'open': '00000000000601050002FF00', + 'open': '00000000000601050006FF00', 'close': '000000000006010500060000', } } @@ -80,7 +80,6 @@ device_bit_map = { ABSORB_SOLENOID_VALVE3: 4, ABSORB_SOLENOID_VALVE4: 5, ABSORB_SOLENOID_VALVE5: 6 - } device_name_map = { @@ -111,50 +110,77 @@ sensor_name_map = { CONVEYOR2_SENSOR: '传送带2开关' } - -# -------------全局事件------------- -press_sensors_triggered = Event() -fiber_triggered = Event() # 光纤传感器触发事件 -fiber_lock = Lock() # 线程锁,保护共享变量 -valve1_open_flag = False # 电磁阀1打开标志 - - class RelayController: def __init__(self): """初始化继电器控制器""" - self.socket = None + self.sock = None + self.is_connected = False # 长连接状态标记 - # 线程相关状态 - self.fiber_thread = None # 保存光纤传感器线程对象 - self.fiber_monitor_running = False # 光纤传感器监听线程运行标志 + def connect(self) -> bool: + """ + 建立长连接 + :return: True-连接成功,False-连接失败 + """ + if not self.is_connected: + try: + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.settimeout(3) + self.sock.connect((HOST, PORT)) + self.is_connected = True + print(f"长连接建立成功:{HOST}:{PORT}") + return True + except Exception as e: + print(f"长连接建立失败:{e}") + self.sock = None + self.is_connected = False + return False + return True - self.press_sensors_thread = None # 保存按压开关线程对象 - self.press_sensors_monitor_running = False # 按压传感器监听线程运行标志 - self.last_press_sensor_status = False # 记录传感器上一次状态,初始为无信号(用于上升沿检测) + def disconnect(self): + """断开长连接""" + if self.sock and self.is_connected: + try: + self.sock.shutdown(socket.SHUT_RDWR) + self.sock.close() + print("长连接已正常断开") + except Exception as e: + print(f"断开长连接时出现异常:{e}") + finally: + self.sock = None + self.is_connected = False - def send_command(self, command): + def send_command(self, command: str) -> Union[bytes, bool]: """ 将十六进制字符串转换为字节数据并发送 :param command: 十六进制字符串 :return: 响应字节数据 / False """ - try: - byte_data = binascii.unhexlify(command) - - # 创建套接字并连接到继电器 - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: - sock.connect((HOST, PORT)) - sock.send(byte_data) - # 接收响应 - response = sock.recv(1024) - # logging.info(f"收到响应: {binascii.hexlify(response)}") - # 校验响应 - return response - except Exception as e: - logging.info(f"通信错误: {e}") + if not self.is_connected or not self.sock: + print("长连接未建立,无法发送指令") return False - def get_all_device_status(self, command_type='devices'): + try: + byte_data = binascii.unhexlify(command) + self.sock.sendall(byte_data) # sendall确保数据全部发送 + # 接收响应 + response = self.sock.recv(1024) + if not response: + logging.warning("长连接接收空响应,可能连接已断开") + self.is_connected = False + return False + # print(f"收到响应: {binascii.hexlify(response)}") + # 校验响应 + return response + except socket.timeout: + print("长连接发送/接收超时") + self.is_connected = False + return False + except Exception as e: + print(f"通信错误: {e}") + self.is_connected = False + return False + + def get_all_device_status(self, command_type: str='devices') -> dict[str, bool]: """ 获取所有设备/传感器状态 :param command_type: 'devices'(控件) / 'sensors'(传感器) @@ -162,7 +188,7 @@ class RelayController: """ command = read_status_command.get(command_type) if not command: - logging.info(f"未知的读取类型: {command_type}") + print(f"未知的读取类型: {command_type}") return {} response = self.send_command(command) @@ -179,295 +205,143 @@ class RelayController: bit_map = sensor_bit_map name_map = sensor_name_map else: - logging.info("不支持的映射类型") - return{} + print("不支持的映射类型") + return {} for key, bit_index in bit_map.items(): state = status_bin[bit_index] == '1' status_dict[key] = state # readable = "开启" if state else "关闭" - # logging.info(f"{device.capitalize()} 状态: {readable}") + # print(f"{device.capitalize()} 状态: {readable}") else: - logging.info("读取状态失败或响应无效") + print("读取状态失败或响应无效") return status_dict - def get_device_status(self, device_name, command_type='devices'): + def set_device(self, device_name: str, state: bool): """ - 获取单个控件/传感器状态 - :param device_name:设备名称 - :param command_type: 'devices'/'sensors' - :return:True(开启) / False(关闭) / None(无法读取) + 设置指定输出设备(DO)的开关状态 + :param device_name: 设备名称 + :param state: 目标状态 True--打开 FALSE--关闭 """ - status = self.get_all_device_status(command_type) - return status.get(device_name, None) + if device_name not in valve_commands: + raise ValueError(f"未知设备:{device_name}") - def open(self, solenoid_valve1=False, solenoid_valve2=False, absorb_solenoid_valve1=False, - absorb_solenoid_valve2=False, absorb_solenoid_valve3=False, absorb_solenoid_valve4=False, - absorb_solenoid_valve5=False): - """ - 根据状态决定是否执行开操作 - :param solenoid_valve1:是否打开电磁阀1 - :param solenoid_valve2:是否打开电磁阀2 - :param absorb_solenoid_valve1:是否打开吸取装置电磁阀1 - :param absorb_solenoid_valve2:是否打开吸取装置电磁阀2 - :param absorb_solenoid_valve3:是否打开吸取装置电磁阀3 - :param absorb_solenoid_valve4:是否打开吸取装置电磁阀4 - :param absorb_solenoid_valve5:是否打开吸取装置电磁阀5 + try: + current = self.get_all_device_status() + is_on = current.get(device_name, False) - :return: - """ - global valve1_open_time, valve1_open_flag - status = self.get_all_device_status() + if state and not is_on: + self.send_command(valve_commands[device_name]['open']) + elif not state and is_on: + self.send_command(valve_commands[device_name]['close']) - if solenoid_valve1 and not status.get(SOLENOID_VALVE1, False): - print("打开电磁阀1") - self.send_command(valve_commands[SOLENOID_VALVE1]['open']) - # 记录电磁阀1打开时的时间戳和标志 - with fiber_lock: - valve1_open_time = time.time() - valve1_open_flag = True - - if solenoid_valve2 and not status.get(SOLENOID_VALVE2, False): - print("打开电磁阀2") - self.send_command(valve_commands[SOLENOID_VALVE2]['open']) - - if absorb_solenoid_valve1 and not status.get(ABSORB_SOLENOID_VALVE1, False): - print("打开吸取装置电磁阀1") - self.send_command(valve_commands[ABSORB_SOLENOID_VALVE1]['open']) - time.sleep(1) # 实际测试需要考虑这个延时是否合适 - - if absorb_solenoid_valve2 and not status.get(ABSORB_SOLENOID_VALVE2, False): - print("打开吸取装置电磁阀2") - self.send_command(valve_commands[ABSORB_SOLENOID_VALVE2]['open']) - time.sleep(1) # 实际测试需要考虑这个延时是否合适 - - if absorb_solenoid_valve3 and not status.get(ABSORB_SOLENOID_VALVE3, False): - print("打开吸取装置电磁阀3") - self.send_command(valve_commands[ABSORB_SOLENOID_VALVE3]['open']) - time.sleep(1) # 实际测试需要考虑这个延时是否合适 - - if absorb_solenoid_valve4 and not status.get(ABSORB_SOLENOID_VALVE4, False): - print("打开吸取装置电磁阀4") - self.send_command(valve_commands[ABSORB_SOLENOID_VALVE4]['open']) - time.sleep(1) # 实际测试需要考虑这个延时是否合适 - - if absorb_solenoid_valve5 and not status.get(ABSORB_SOLENOID_VALVE5, False): - print("打开吸取装置电磁阀5") - self.send_command(valve_commands[ABSORB_SOLENOID_VALVE5]['open']) - time.sleep(1) # 实际测试需要考虑这个延时是否合适 - - # 根据状态决定是否执行关操作 - def close(self, solenoid_valve1=False, solenoid_valve2=False, absorb_solenoid_valve1=False, - absorb_solenoid_valve2=False, absorb_solenoid_valve3=False, absorb_solenoid_valve4=False, - absorb_solenoid_valve5=False): - """ - 根据状态决定是否执行关操作 - :param solenoid_valve1:是否关闭电磁阀1 - :param solenoid_valve2:是否关闭电磁阀2 - :param absorb_solenoid_valve1:是否关闭吸取电磁阀1 - :param absorb_solenoid_valve2:是否关闭吸取电磁阀2 - :param absorb_solenoid_valve3:是否关闭吸取电磁阀3 - :param absorb_solenoid_valve4:是否关闭吸取电磁阀4 - :param absorb_solenoid_valve5:是否关闭吸取电磁阀5 - - :return: - """ - global valve1_open_flag - - status = self.get_all_device_status() - - if solenoid_valve1 and status.get(SOLENOID_VALVE1, True): - print("关闭电磁阀1") - self.send_command(valve_commands[SOLENOID_VALVE1]['close']) - # 重置电磁阀1打开标志 - with fiber_lock: - valve1_open_flag = False - - if solenoid_valve2 and status.get(SOLENOID_VALVE2, True): - print("关闭电磁阀2") - self.send_command(valve_commands[SOLENOID_VALVE2]['close']) - - if absorb_solenoid_valve1 and status.get(ABSORB_SOLENOID_VALVE1, True): - print("关闭吸取装置电磁阀1") - self.send_command(valve_commands[ABSORB_SOLENOID_VALVE1]['close']) - time.sleep(1) # 实际测试需要考虑这个延时是否合适 - - if absorb_solenoid_valve2 and status.get(ABSORB_SOLENOID_VALVE2, True): - print("关闭吸取装置电磁阀2") - self.send_command(valve_commands[ABSORB_SOLENOID_VALVE2]['close']) - time.sleep(1) # 实际测试需要考虑这个延时是否合适 - - if absorb_solenoid_valve3 and status.get(ABSORB_SOLENOID_VALVE3, True): - print("关闭吸取装置电磁阀3") - self.send_command(valve_commands[ABSORB_SOLENOID_VALVE3]['close']) - time.sleep(1) # 实际测试需要考虑这个延时是否合适 - - if absorb_solenoid_valve4 and status.get(ABSORB_SOLENOID_VALVE4, True): - print("关闭吸取装置电磁阀4") - self.send_command(valve_commands[ABSORB_SOLENOID_VALVE4]['close']) - time.sleep(1) # 实际测试需要考虑这个延时是否合适 - - if absorb_solenoid_valve5 and status.get(ABSORB_SOLENOID_VALVE5, True): - print("关闭吸取装置电磁阀5") - self.send_command(valve_commands[ABSORB_SOLENOID_VALVE5]['close']) - time.sleep(1) # 实际测试需要考虑这个延时是否合适 - - def fiber_sensor_monitor(self): - """ - 光纤传感器监听线程,专门检测电磁阀打开后的触发状态 - """ - global fiber_triggered, valve1_open_flag - logging.info("光纤传感器监听线程已启动") - - while self.fiber_monitor_running: - try: - # 增加短休眠,降低CPU占用,避免错过信号 - time.sleep(0.005) - - # 获取光纤传感器状态 - fiber_status = self.get_device_status(FIBER_SENSOR, 'sensors') - # 检测是否检测到信号 - if fiber_status: - with fiber_lock: - # 检查电磁阀1是否处于打开状态 - if valve1_open_flag: - fiber_triggered.set() - # 防止重复触发 - time.sleep(0.1) - fiber_triggered.clear() - except Exception as e: - logging.info(f"光纤传感器监听异常:{e}") - time.sleep(0.1) # 异常时增加休眠 - - def start_fiber_monitor(self): - """启动光纤传感器监听线程""" - # 检查线程是否已在运行 - if self.fiber_monitor_running or (self.fiber_thread and self.fiber_thread.is_alive()): - logging.warning("光纤传感器监听线程已在运行,无需重复启动") - return - - # 启动线程 - self.fiber_monitor_running = True - self.fiber_thread = threading.Thread( - target=self.fiber_sensor_monitor, - daemon=True - ) - self.fiber_thread.start() - logging.info("光纤传感器监听线程启动成功") - - def stop_fiber_monitor(self): - """停止光纤传感器监听线程""" - self.fiber_monitor_running = False - # 等待线程完全退出 - if self.fiber_thread and self.fiber_thread.is_alive(): - self.fiber_thread.join(timeout=1) - logging.info("光纤传感器监听线程已停止") - - def press_sensors_monitor(self, check_interval=0.1): - """ - 双压传感器监听线程 - :param check_interval: 检测间隔 - :return: - """ - global press_sensors_triggered - logging.info("双压传感器监听线程已启动") - - while self.press_sensors_monitor_running: - try: - # 检测两个传感器任意一个是否触发 - press_sensor1_status = self.get_device_status(PRESS_SENSOR1, 'sensors') - press_sensor2_status = self.get_device_status(PRESS_SENSOR2, 'sensors') - current_sensor_state = press_sensor1_status or press_sensor2_status - - # 上升沿触发(仅从无信号-->有信号时,才触发事件) - if current_sensor_state and not self.last_press_sensor_status: - press_sensors_triggered.set() # 触发事件,通知主线程 - logging.info("双压传感器触发:线条已落到传送带") - - # 更新上一次传感器状态,为下一次上升沿检测做准备 - self.last_press_sensor_status = current_sensor_state - # 传感器检测间隔 - time.sleep(check_interval) - except Exception as e: - logging.info(f"双压传感器监听异常: {e}") - time.sleep(0.1) # 异常时增加休眠 - - def start_press_sensors_monitor(self): - """启动双压传感器监听线程""" - # 检查线程是否已在运行 - if self.press_sensors_monitor_running or (self.press_sensors_thread and self.press_sensors_thread.is_alive()): - logging.warning("双压传感器监听线程已在运行,无需重复启动") - return - - # 启动线程 - self.press_sensors_monitor_running = True - self.press_sensors_thread = threading.Thread( - target=self.press_sensors_monitor, - daemon=True - ) - self.press_sensors_thread.start() - logging.info("双压传感器监听线程启动成功") - - def stop_press_sensors_monitor(self): - """停止光纤传感器监听线程""" - self.press_sensors_monitor_running = False - # 等待线程完全退出 - if self.press_sensors_thread and self.press_sensors_thread.is_alive(): - self.press_sensors_thread.join(timeout=1) - logging.info("双压传感器监听线程已停止") + except Exception as e: + raise RuntimeError(f"设置设备 '{device_name}' 状态失败: {e}") from e -# 全局继电器实例 -global_relay = RelayController() +# 全局变量 +GLOBAL_RELAY = None -# ------------对外接口---------- -def control_solenoid(): - """ - 线条不合格场景专用:控制电磁阀+监听光纤传感器 - 执行流程:启动监听-->打开电磁阀-->等待检测-->关闭电磁阀-->停止监听 - """ - global fiber_triggered, global_relay +# --------对外接口-------- +def init_relay(): + """初始化网络继电器实例+建立长连接""" + global GLOBAL_RELAY try: - # 启动光纤传感器监听线程 - global_relay.start_fiber_monitor() - - # 重置光纤传感器触发事件,准备检测 - fiber_triggered.clear() - - # 同时打开电磁阀1、2 - global_relay.open(solenoid_valve1=True, solenoid_valve2=True) - logging.info("电磁阀1、2已打开") - - # 等待线条掉落(最多等待2秒) - timeout = 3.0 - start_time = time.time() - fiber_detected = False - # 等待光纤传感器触发或超时 - while time.time() - start_time < timeout: - if fiber_triggered.is_set(): - fiber_detected = True - logging.info("该NG线条掉入费料区") - break - time.sleep(0.01) # 降低CPU空转 - - if not fiber_detected: - logging.info("出问题!!!,红外传感器未检测到线条") - - # 关闭电磁阀1、2 - time.sleep(0.2) # 等待线条掉落 - global_relay.close(solenoid_valve1=True, solenoid_valve2=True) - logging.info("电磁阀1、2已关闭") - - # 停止光纤传感器监听线程 - global_relay.stop_fiber_monitor() + GLOBAL_RELAY = RelayController() + GLOBAL_RELAY.connect() except Exception as e: - logging.info(f"操作电磁阀失败:{str(e)}") - # 异常时也要停止线程,避免残留 - global_relay.stop_fiber_monitor() + raise ValueError(f"初始化失败{e}") + +def deinit_relay(): + """断开长连接""" + global GLOBAL_RELAY + + if GLOBAL_RELAY is not None: + GLOBAL_RELAY.disconnect() + GLOBAL_RELAY = None + +def ng_push(): + """NG推料流程""" + try: + # 同时打开电磁阀1、2 + write_do(SOLENOID_VALVE1, True) + write_do(SOLENOID_VALVE2, True) + print(f"电磁阀1、2已打开") + + # 等待线条掉落 + time.sleep(0.5) + + write_do(SOLENOID_VALVE1, False) + write_do(SOLENOID_VALVE2, False) + print(f"电磁阀1、2已关闭") + + except Exception as e: + print(f"NG推料失败:{e}") + raise RuntimeError("NG推料流程异常") from e + +def write_do(device_name: str, state: bool): + """ + 控制单个数字输出设备(DO)的开关状态 + :param device_name: 设备名称 + :param state: True:打开 False:关闭 + """ + global GLOBAL_RELAY + if GLOBAL_RELAY is None: + raise ValueError("未初始化实例") + + # 验证设备是否存在 + if device_name not in device_bit_map: + valid_devices = list(device_bit_map.keys()) + raise ValueError(f"无效的设备名 '{device_name}'。有效设备: {valid_devices}") + + # 确保已连接 + if not GLOBAL_RELAY.is_connected: + if not GLOBAL_RELAY.connect(): + raise RuntimeError("无法连接到网络继电器") + + try: + GLOBAL_RELAY.set_device(device_name, state) + + except Exception as e: + raise RuntimeError(f"控制设备 '{device_name}' 失败: {e}") + +def read_all_io() -> dict[str, dict[str, bool]]: + """ + 读取所有DI(传感器)和DO(设备)状态 + :return: {'devices': {...}, 'sensors': {...}} + """ + global GLOBAL_RELAY + if GLOBAL_RELAY is None: + raise ValueError("未初始化") + + try: + devices = GLOBAL_RELAY.get_all_device_status('devices') + sensors = GLOBAL_RELAY.get_all_device_status('sensors') + return {'devices': devices, 'sensors': sensors} + + except Exception as e: + print(f"读取IO状态失败:{e}") + raise RuntimeError("读取IO失败") from e # ------------测试接口------------- if __name__ == '__main__': - control_solenoid() \ No newline at end of file + init_relay() + + write_do(SOLENOID_VALVE1, True) + + time.sleep(5) + + io_status = read_all_io() + for name, status in io_status['devices'].items(): + status_str = "开启" if status else "关闭" + print(f"{device_name_map.get(name, name)}: {status_str}") + + deinit_relay() + + + + + diff --git a/RK1106/RK1106_server.py b/RK1106/RK1106_server.py index 6cbae98..6466deb 100644 --- a/RK1106/RK1106_server.py +++ b/RK1106/RK1106_server.py @@ -4,15 +4,17 @@ # @Time : 2026/1/9 10:45 # @Author : reenrr # @File : RK1106_server.py -# @Desc : RK1106服务端,等待工控机调用 +# @Desc : RK1106服务端,等待工控机调用 通信为JSON格式 ''' import socket import logging import sys -from test import motor_demo +import json + +from stepper_motor import motor_start, align_wire, cleanup, motor_stop -# --------日志配置(终端+文件双输出)-------------- +# ------------日志配置(终端+文件双输出)-------------- logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', @@ -30,88 +32,177 @@ logging.basicConfig( ) # --------配置TCP服务端---------- -HOST = "127.0.0.1" +HOST = "192.168.0.100" PORT = 8888 -# 程序映射表(指令表示 -> 执行函数) -PROG_MAP = { - # "STEPPER_TEST": motor_test_demo, - "test": motor_demo +# 全局参数缓存 +MOTOR_CONFIG = { + "speed": 2500, # 默认速度 + "cycle": 10.0, # 默认旋转圈数 } -def parse_command(cmd_str: str) ->tuple[str, dict]: +def handle_setting(para_type: str, para_value: str) ->dict: """ - 解析工控机发送的指令字符串 - :param cmd_str: 指令字符串 - :return: 指令名称,参数字典 + 处理客户端发送的参数设置指令(cmd:"setting"),更新全局电机配置 + :param para_type: 要设置的参数类型,仅支持"speed"或"cycle" + :param para_value: 参数值字符串,将尝试转换为int(speed)或float(cycle) + :return: 标准化相应字典dict,如: + { + "Result": "1"表示成功,"0"表示失败, + "ErrMsg": str # 成功提示或错误详情 + } """ - # 空指令处理 - if not cmd_str or cmd_str.strip() == "": - return "", {} + try: + if para_type == "speed": + MOTOR_CONFIG["speed"] = int(para_value) + elif para_type == "cycle": + MOTOR_CONFIG["cycle"] = float(para_value) + else: + return {"Result": "0", "ErrMsg": f"不支持的参数类型: {para_type}"} + return {"Result": "1", "ErrMsg": "设置成功"} + except ValueError as e: + return {"Result": "0", "ErrMsg": f"参数值格式错误: {str(e)}"} - # 分割指令标识和参数 - cmd_parts = cmd_str.strip().split("|", 1) - prog_id = cmd_parts[0].strip() - params = {} +def handle_start(para_type: str, para_value: str) -> dict: + """ + 处理启动电机指令(cmd: "start"),使用当前MOTOR_CONFIG配置运行电机 + :param para_type:为"direction"时,使用"para_value"作为临时方向 + :param para_value:为0或1 + :return: 标准化相应字典dict,如: + { + "Result": "1"表示成功,"0"表示失败, + "ErrMsg": str #执行结果或异常信息 + } + """ + try: + if para_type == "direction": + direction = int(para_value) + if direction not in (0,1): + return {"Result": "0", "ErrMsg": "方向必须为0或1"} - # 解析参数(格式:param1=val1¶m2=val2) - if len(cmd_parts) > 1 and cmd_parts[1].strip() != "": - param_str = cmd_parts[1].strip() - param_pairs = param_str.split("&") - for pair in param_pairs: - if "=" in pair: - key, value = pair.split("=", 1) # 处理值中含=的情况 - # 类型自动转换(数字/字符串) - try: - params[key.strip()] = int(value.strip()) - except ValueError: - try: - params[key.strip()] = float(value.strip()) - except ValueError: - params[key.strip()] = value.strip() + motor_start(speed=MOTOR_CONFIG["speed"], + cycle=MOTOR_CONFIG["cycle"], + direction=direction) + dir_str = "正向" if direction == 1 else "负向" + return {"Result": "1", "ErrMsg": f"电机启动成功({dir_str})"} + else: + return {"Result": "0", "ErrMsg": "start 指令仅支持'direction'"} + except Exception as e: + return {"Result": "0", "ErrMsg": f"电机启动失败:{str(e)}"} + +def handle_stop() -> dict: + """ + 处理停止电机指令(cmd: "stop") + :return: 标准化相应字典dict,如: + { + "Result": "1"表示成功,"0"表示失败, + "ErrMsg": str #执行结果或异常信息 + } + """ + try: + motor_stop() + return {"Result": "1", "ErrMsg": "电机已停止"} + except Exception as e: + return {"Result": "0", "ErrMsg": f"电机停止失败:{str(e)}"} + +def handle_align() -> dict: + """ + 处理线条对齐(挡板一来一回) + :return: dict + """ + try: + align_wire(MOTOR_CONFIG['speed'], MOTOR_CONFIG['cycle']) + return {"Result": "1", "ErrMsg": "处理线条对齐"} + except Exception as e: + return {"Result": "0", "ErrMsg": "线条对齐失败"} + +def parse_json_command(data: str) -> dict: + """ + 解析客户端发送的原始JSON字符串指令,并分发至对应处理函数 + :param data: 客户端发送的原始JSON字符串 + :return dict:标准化响应字典 + """ + try: + cmd_obj = json.loads(data.strip()) + except json.JSONDecodeError as e: + return {"Result": "0", "ErrMsg": f"JSON 格式错误: {str(e)}"} + + cmd = cmd_obj.get("cmd", "").strip() + para_type = cmd_obj.get("para_type", "").strip() + para_value = cmd_obj.get("para_value", "").strip() + + if cmd == "setting": + return handle_setting(para_type, para_value) + elif cmd == "start": + return handle_start(para_type, para_value) + elif cmd == "stop": + if para_type == "none" and para_value == "none": + return handle_stop() + else: + return {"Result": "0", "ErrMsg": "stop指令参数必须为none"} + elif cmd == "alignment": + if para_type == "none" and para_value == "none": + return handle_align() + else: + return {"Result": "0", "ErrMsg": "alignment指令参数必须为none"} + else: + return {"Result": "0", "ErrMsg": f"未知指令:{cmd}"} - return prog_id, params # ----------对外接口---------- def server(): + """启动TCP服务端,监听指定端口,接收工控机连接并循环处理JSON指令""" # 创建TCP socket - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket: - # 允许端口复用 + server_socket = None + conn = None + + try: + server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind((HOST, PORT)) server_socket.listen(1) # 只允许1个工控机连接 logging.info(f"[1106] 服务已启动,监听端口:{PORT},等待工控机连接...") - # 等待工控机连接 - conn, addr = server_socket.accept() - with conn: - logging.info(f"[1106] 工控机已连接:{addr}") - # 循环接收指令 - while True: - # 接收指令(最大1024字节) - data = conn.recv(1024).decode() - logging.info(f"\n[1106] 收到工控机指令:{data}") - # 解析指令 - prog_id, params = parse_command(data) - logging.info(f"[1106] 解析结果 - 指令:{prog_id},参数:{params}") + while True: # 持续接受新连接 + try: + # 等待工控机连接 + conn, addr = server_socket.accept() + logging.info(f"[1106] 工控机已连接:{addr}") - # 执行对应程序 - responses = "" - if prog_id in PROG_MAP: - try: - result = PROG_MAP[prog_id](**params) - response = f"SUCCESS|步进电机测试执行完成,结果:{result}" - logging.info(f"[1106] {response}") - except Exception as e: - response = f"FAIL|步进电机测试执行失败:{str(e)}" - logging.error(f"[1106] {response}", exc_info=True) - else: - response = f"FAIL|未知指令:{prog_id},支持指令:{list(PROG_MAP.keys())}" - logging.warning(f"[1106] {response}") + # 循环接收指令 + while True: + # 接收指令(最大1024字节) + data = conn.recv(1024).decode() + if not data: + logging.warning("客户端断开连接") + break - # 发送响应给工控机 - conn.sendall(response.encode("utf-8")) - logging.info(f"[1106] 已发送响应:{response}") + logging.info(f"\n[1106] 收到工控机指令:{data}") + + # 解析指令 + response_dict = parse_json_command(data) + response_json = json.dumps(response_dict, ensure_ascii=False) + "\n" # 看需不需要加换行符\n + + # 发送响应给工控机 + conn.sendall(response_json.encode("utf-8")) + logging.info(f"[1106] 已发送响应:{response_json}") + + except ConnectionError: + logging.info("客户端异常断开") + except Exception as e: + logging.info(f"处理连接时发生错误: {e}") + finally: + if conn is not None: + conn.close() + conn = None # 重置,避免重复关闭 + logging.info("客户端连接已关闭,等待新连接...") + + except KeyboardInterrupt: + logging.info("\n收到 Ctrl+C,正在关闭服务...") + finally: + if server_socket: + server_socket.close() + logging.info("服务已停止,监听 socket 已释放") # ----------测试接口---------- if __name__ == "__main__": diff --git a/RK1106/stepper_motor.py b/RK1106/stepper_motor.py index f9dcbeb..c0f272f 100644 --- a/RK1106/stepper_motor.py +++ b/RK1106/stepper_motor.py @@ -1,15 +1,14 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -''' -# @Time : 2026/1/5 15:50 +""" +# @Time : 2026/1/4 19:13 # @Author : reenrr -# @File : stepper_motor.py -# @Desc : 控制步进电机从初始位置移动10cm,移动后回到初始位置 -''' +# @File : stepper_motor_test1.py +# @Desc : 线条厂控制步进电机测试 应该不会丢步 +""" import time + from periphery import GPIO -import logging -import os # ------------参数配置------------- # 1. 脉冲(PUL)引脚配置 → GPIO32 @@ -20,33 +19,15 @@ DIR_Pin = 33 # 3. 驱动器参数(根据拨码调整,默认不变) PULSES_PER_ROUND = 400 # 每圈脉冲数(SW5~SW8拨码,默认400) -PULSE_FREQUENCY = 2500 # 脉冲频率(Hz) +PULSE_FREQUENCY = 2500 # 脉冲频率(Hz,新手建议500~2000,最大200KHz) -# ------------ 日志+参数配置 ------------ -script_dir = os.path.dirname(os.path.abspath(__file__)) -log_file_path = os.path.join(script_dir, "stepper_motor.log") - -logging.basicConfig( - level=logging.INFO, - format='[%(asctime)s.%(msecs)03d] [%(levelname)s] %(message)s', - datefmt='%Y-%m-%d %H:%M:%S', - handlers=[ - logging.StreamHandler(), - logging.FileHandler(log_file_path, encoding='utf-8') - ] -) class StepperMotor: """新力川MA860H驱动器步进电机控制类""" - # 方向常量定义 - CLOCKWISE = "clockwise" # 顺时针 - COUNTER_CLOCKWISE = "counterclockwise" # 逆时针 - def __init__(self, pul_pin: int = PUL_Pin, dir_pin: int = DIR_Pin, pulses_per_round: int = PULSES_PER_ROUND, - pulse_frequency: int = PULSE_FREQUENCY, clockwise_level: bool = True, counter_clockwise_level: bool = False): """ @@ -54,7 +35,6 @@ class StepperMotor: :param pul_pin: 脉冲引脚 :param dir_pin: 方向引脚 :param pulses_per_round: 每圈脉冲数(SW5~SW8拨码,默认400) - :param pulse_frequency: 脉冲频率(Hz,) :param clockwise_level: 顺时针对应的DIR电平 :param counter_clockwise_level: 逆时针对应的DIR电平 """ @@ -64,7 +44,6 @@ class StepperMotor: # 驱动器参数 self.pulses_per_round = pulses_per_round - self.pulse_frequency = pulse_frequency self.clockwise_level = clockwise_level self.counter_clockwise_level = counter_clockwise_level @@ -76,7 +55,7 @@ class StepperMotor: self._init_gpio() def _init_gpio(self): - """初始化PUL和DIR引脚""" + """初始化PUL和DIR引脚(内部方法)""" try: # 初始化脉冲引脚(输出模式) self.pul_gpio = GPIO(self.pul_pin, "out") @@ -87,52 +66,48 @@ class StepperMotor: self.pul_gpio.write(False) self.dir_gpio.write(False) - logging.info(f"PUL引脚初始化完成:{self.pul_pin} 引脚") - logging.info(f"DIR引脚初始化完成:{self.dir_pin} 引脚") + print(f"✅ PUL引脚初始化完成:{self.pul_pin} 引脚") + print(f"✅ DIR引脚初始化完成:{self.dir_pin} 引脚") except PermissionError: raise RuntimeError("权限不足!请用sudo运行程序(sudo python xxx.py)") except Exception as e: raise RuntimeError(f"GPIO初始化失败:{str(e)}") from e - def _validate_rounds(self, rounds: float) -> bool: - """验证圈数是否合法(内部方法)""" + def _validate_params(self, rounds: float, direction: int) -> bool: if rounds <= 0: - logging.info("圈数必须为正数") + print("圈数必须为正数") + return False + if direction not in (0, 1): + print("方向必须为0(逆时针)或1(顺时针)") return False return True - def _validate_direction(self, direction: str) -> bool: - """验证方向参数是否合法(内部方法)""" - if direction not in [self.CLOCKWISE, self.COUNTER_CLOCKWISE]: - logging.info(f"方向参数错误:仅支持 {self.CLOCKWISE}/{self.COUNTER_CLOCKWISE}") - return False - return True - - def rotate(self, rounds: float, direction: str = CLOCKWISE): + def rotate(self, pulse_frequency: int, rounds: float, direction: int): """ 控制电机旋转(支持正反转) + :param pulse_frequency: 脉冲频率(hz) :param rounds: 旋转圈数(可小数,如0.5=半圈) - :param direction: 方向(clockwise=顺时针,counterclockwise=逆时针) + :param direction: 方向(1=顺时针,0=逆时针) """ # 参数验证 - if not self._validate_rounds(rounds) or not self._validate_direction(direction): + if not self._validate_params(rounds, direction): return # 设置旋转方向(DIR电平) - if direction == self.CLOCKWISE: # 顺时针 + if direction == 1: # 顺时针 self.dir_gpio.write(self.clockwise_level) - logging.info(f"\n=== 顺时针旋转 {rounds} 圈 ===") + print(f"\n=== 顺时针旋转 {rounds} 圈 ===") else: # 逆时针 self.dir_gpio.write(self.counter_clockwise_level) - logging.info(f"\n=== 逆时针旋转 {rounds} 圈 ===") + print(f"\n=== 逆时针旋转 {rounds} 圈 ===") # 计算总脉冲数和时序(精准控制,避免丢步) total_pulses = int(rounds * self.pulses_per_round) - pulse_period = 1.0 / self.pulse_frequency # 脉冲周期(秒) + pulse_period = 1.0 / pulse_frequency # 脉冲周期(秒) half_period = pulse_period / 2 # 占空比50%(MA860H最优) - logging.info(f"总脉冲数:{total_pulses} | 频率:{self.pulse_frequency}Hz | 周期:{pulse_period * 1000:.2f}ms") + print(f"总脉冲数:{total_pulses} | 频率:{pulse_frequency}Hz | 周期:{pulse_period * 1000:.2f}ms") start_time = time.perf_counter() # 高精度计时(避免丢步) # 发送脉冲序列(核心:占空比50%的方波) @@ -147,13 +122,13 @@ class StepperMotor: # 更新下一个脉冲的起始时间 start_time += pulse_period - logging.info("旋转完成") + print("✅ 旋转完成") def stop(self): """紧急停止(置低脉冲引脚)""" if self.pul_gpio: self.pul_gpio.write(False) - logging.info("电机已停止") + print("🛑 电机已停止") def close(self): """释放GPIO资源""" @@ -161,12 +136,12 @@ class StepperMotor: if self.pul_gpio: self.pul_gpio.write(False) # 脉冲引脚置低 self.pul_gpio.close() - logging.info("\n PUL引脚已关闭(电平置低)") + print("\n✅ PUL引脚已关闭(电平置低)") if self.dir_gpio: self.dir_gpio.write(False) # 方向引脚置低 self.dir_gpio.close() - logging.info("DIR引脚已关闭(电平置低)") + print("✅ DIR引脚已关闭(电平置低)") # 重置GPIO对象 self.pul_gpio = None @@ -176,36 +151,65 @@ class StepperMotor: """析构函数:确保资源释放""" self.close() +# ------全局实例------- +GLOBAL_MOTOR = StepperMotor() -#---------控制步进电机外部接口-------------- -def stepper_motor_control(): - motor = None +# -------对外接口---------- +def motor_start(speed: int, cycle: float, direction: int): + """ + 开启电机,用于断电时电机恢复到起始位置 + :param speed: 脉冲频率(hz) + :param cycle: 旋转圈数 + :param direction: 0=负向(逆时针),1=正向(顺时针) + """ try: - # 创建电机实例(使用默认配置) - motor = StepperMotor() - logging.info("\n=== 步进电机控制程序启动 ===") + print("\n=== 启动步进电机 ===") + + GLOBAL_MOTOR.rotate(pulse_frequency=speed, rounds=cycle, direction=direction) + time.sleep(5) # 暂停5秒 + + except ImportError: + print("\n❌ 缺少依赖:请安装python-periphery") + print("命令:pip install python-periphery") + except Exception as e: + print(f"\n❌ 程序异常:{str(e)}") + +def motor_stop(): + """紧急停止(仅停止脉冲,保留实例)""" + try: + if GLOBAL_MOTOR: + GLOBAL_MOTOR.stop() + except Exception as e: + print("停止失败:{e}") + +def align_wire(speed: int, cycle: float): + """ + 使线条对齐 + :param speed: 脉冲频率(hz) + :param cycle: 旋转圈数 + """ + try: + print("\n=== 启动线条对齐 ===") # 靠近电机方向 逆时针 - motor.rotate(rounds=10.0, direction=motor.COUNTER_CLOCKWISE) + GLOBAL_MOTOR.rotate(pulse_frequency=speed, rounds=cycle, direction=0) time.sleep(5) # 暂停5秒 # 远离电机方向 顺时针 - motor.rotate(rounds=10.0, direction=motor.CLOCKWISE) + GLOBAL_MOTOR.rotate(pulse_frequency=speed,rounds=10.0, direction=1) time.sleep(5) # 暂停5秒 - except PermissionError: - logging.info("\n 权限不足:请用 sudo 运行!") - logging.info("命令:sudo python3 double_direction_motor.py") except ImportError: - logging.info("\n 缺少依赖:请安装python-periphery") - logging.info("命令:pip install python-periphery") + print("\n❌ 缺少依赖:请安装python-periphery") + print("命令:pip install python-periphery") except Exception as e: - logging.info(f"\n 程序异常:{str(e)}") - finally: - if motor: - motor.close() - logging.info("程序退出完成") + print(f"\n❌ 程序异常:{str(e)}") + +def cleanup(): + """程序退出时统一清理""" + if GLOBAL_MOTOR: + GLOBAL_MOTOR.close() if __name__ == '__main__': - stepper_motor_control() + motor_start(2500, 10.0, 1) diff --git a/RK1106/stepper_motor_test.py b/RK1106/stepper_motor_test.py deleted file mode 100644 index 3c113c6..0000000 --- a/RK1106/stepper_motor_test.py +++ /dev/null @@ -1,159 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -""" -# @Time : 2026/1/4 15:06 -# @Author : reenrr -# @File : stepper_motor_test.py -# @Desc : 线条厂控制步进电机 -""" -import time -from periphery import GPIO -import sys -sys.setswitchinterval(0.000001) # 减小线程切换间隔 - -# -------------- -# 硬件参数配置(必须与你的设备匹配!) -# -------------- -# 1. 脉冲(PUL)引脚配置 → GPIO32 -PUL_Pin = 32 - -# 2. 方向(DIR)引脚配置 → GPIO33 -DIR_Pin = 33 - -# 3. LCDA257C驱动器核心参数(关键!与拨码/软件设置一致) -PULSES_PER_ROUND = 400 # 每圈脉冲数(按驱动器细分拨码调整) -PULSE_FREQUENCY = 5000 # 初始测试频率(建议从500开始逐步调高) - -# 4. LCDA257C时序参数(遵循手册要求,不可低于最小值) -MIN_DIR_DELAY_US = 10 # DIR提前PUL的最小延迟(>8us) -PULSE_LOW_MIN_US = 2 # 脉冲低电平最小宽度(≥2us) -PULSE_HIGH_MIN_US = 2 # 脉冲高电平最小宽度(≥2us) - -# 5. 方向电平定义(可根据实际测试调整) -CLOCKWISE_LEVEL = True # 顺时针→DIR高电平 -COUNTER_CLOCKWISE_LEVEL = False # 逆时针→DIR低电平 - - -def init_stepper() -> tuple[GPIO, GPIO]: - """初始化PUL和DIR引脚(输出模式,适配LCDA257C)""" - try: - # 初始化脉冲引脚(输出模式) - pul_gpio = GPIO(PUL_Pin, "out") - # 初始化方向引脚(输出模式) - dir_gpio = GPIO(DIR_Pin, "out") - - # 初始状态:脉冲低电平,方向低电平(驱动器空闲状态) - pul_gpio.write(False) - dir_gpio.write(False) - - print(f"✅ PUL引脚初始化完成:{PUL_Pin}引脚") - print(f"✅ DIR引脚初始化完成:{DIR_Pin}引脚") - return pul_gpio, dir_gpio - - except FileNotFoundError: - raise RuntimeError(f"GPIO芯片不存在!检查 {PUL_Pin} 是否存在(命令:ls /dev/gpiochip*)") - except PermissionError: - raise RuntimeError("GPIO权限不足!请用 sudo 运行程序(sudo python test.py)") - except Exception as e: - raise RuntimeError(f"GPIO初始化失败:{str(e)}") from e - - -def rotate(pul_gpio: GPIO, dir_gpio: GPIO, rounds: float, direction: str = "clockwise", - test_freq: int = PULSE_FREQUENCY): - """ - 控制LCDA257C驱动器旋转(优化版脉冲发送,解决频率不生效问题) - :param pul_gpio: PUL引脚GPIO对象 - :param dir_gpio: DIR引脚GPIO对象 - :param rounds: 旋转圈数(正数,可小数) - :param direction: 方向(clockwise/counterclockwise) - :param test_freq: 本次旋转使用的脉冲频率(覆盖全局默认值) - """ - # 1. 参数合法性校验 - if rounds <= 0: - print("❌ 圈数必须为正数!") - return - if test_freq < 100: - print("❌ 频率过低(≥100Hz),请调整test_freq参数!") - return - - # 2. 设置旋转方向(严格遵循DIR提前时序) - if direction == "clockwise": - dir_gpio.write(CLOCKWISE_LEVEL) - print(f"\n=== 顺时针旋转 {rounds} 圈(目标频率:{test_freq}Hz)===") - elif direction == "counterclockwise": - dir_gpio.write(COUNTER_CLOCKWISE_LEVEL) - print(f"\n=== 逆时针旋转 {rounds} 圈(目标频率:{test_freq}Hz)===") - else: - print("❌ 方向参数错误!仅支持 clockwise/counterclockwise") - return - - # DIR电平设置后,延迟≥8us(满足LCDA257C时序要求) - time.sleep(MIN_DIR_DELAY_US / 1_000_000) - - # 3. 计算脉冲参数(确保高低电平≥最小宽度) - total_pulses = int(rounds * PULSES_PER_ROUND) - pulse_period = 1.0 / test_freq # 脉冲周期(秒) - - # 确保高低电平宽度不低于驱动器要求(避免脉冲识别失败) - high_period = max(pulse_period / 2, PULSE_HIGH_MIN_US / 1_000_000) - low_period = max(pulse_period / 2, PULSE_LOW_MIN_US / 1_000_000) - - # 打印参数(便于调试) - print(f"总脉冲数:{total_pulses} | 理论高电平:{high_period * 1e6:.1f}us | 理论低电平:{low_period * 1e6:.1f}us") - - # 4. 优化版脉冲发送(解决Python高频延时不准问题) - start_total = time.perf_counter() # 高精度计时(统计实际频率) - for _ in range(total_pulses): - # 高电平(直接sleep,避免while循环的调度延迟) - pul_gpio.write(True) - time.sleep(high_period) - - # 低电平 - pul_gpio.write(False) - time.sleep(low_period) - end_total = time.perf_counter() - - # 5. 计算实际频率(验证是否达到目标) - actual_duration = end_total - start_total - actual_freq = total_pulses / actual_duration if actual_duration > 0 else 0 - print(f"✅ 旋转完成 | 实际频率:{actual_freq:.0f}Hz | 耗时:{actual_duration:.2f}秒") - - -def motor_test_demo(): - """电机测试示例(逐步提升频率,验证转速变化)""" - pul_gpio = None - dir_gpio = None - try: - # 初始化引脚 - pul_gpio, dir_gpio = init_stepper() - print("\n=== 步进电机频率测试程序启动 ===") - - while True: - print(f"\n===== 测试频率:{PULSE_FREQUENCY}Hz =====") - - # 远离电机方向 顺时针 - rotate(pul_gpio, dir_gpio, rounds=10.0, direction="clockwise") - time.sleep(5) # - # 靠近电机方向 逆时针 - rotate(pul_gpio, dir_gpio, rounds=10.0, direction="counterclockwise") - time.sleep(5) # - - except Exception as e: - print(f"\n❌ 程序异常:{str(e)}") - finally: - # 安全释放GPIO资源(必须执行,避免引脚电平残留) - if pul_gpio: - pul_gpio.write(False) - pul_gpio.close() - print("\n✅ PUL引脚已关闭(低电平)") - if dir_gpio: - dir_gpio.write(False) - dir_gpio.close() - print("✅ DIR引脚已关闭(低电平)") - print("✅ 程序退出完成") - - -if __name__ == '__main__': - # 运行测试demo(必须sudo执行) - motor_test_demo() - diff --git a/RK1106/stepper_motor_test1.py b/RK1106/stepper_motor_test1.py deleted file mode 100644 index 38a4c1e..0000000 --- a/RK1106/stepper_motor_test1.py +++ /dev/null @@ -1,198 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -""" -# @Time : 2026/1/4 19:13 -# @Author : reenrr -# @File : stepper_motor_test1.py -# @Desc : 线条厂控制步进电机测试 应该不会丢步 -""" -import time -from periphery import GPIO # 若未安装:pip install python-periphery - -# ------------参数配置------------- -# 1. 脉冲(PUL)引脚配置 → GPIO32 -PUL_Pin = 32 - -# 2. 方向(DIR)引脚配置 → GPIO33 -DIR_Pin = 33 - -# 3. 驱动器参数(根据拨码调整,默认不变) -PULSES_PER_ROUND = 400 # 每圈脉冲数(SW5~SW8拨码,默认400) -PULSE_FREQUENCY = 2500 # 脉冲频率(Hz,新手建议500~2000,最大200KHz) - -class StepperMotor: - """新力川MA860H驱动器步进电机控制类""" - # 方向常量定义 - CLOCKWISE = "clockwise" # 顺时针 - COUNTER_CLOCKWISE = "counterclockwise" # 逆时针 - - def __init__(self, - pul_pin: int = PUL_Pin, - dir_pin: int = DIR_Pin, - pulses_per_round: int = PULSES_PER_ROUND, - pulse_frequency: int = PULSE_FREQUENCY, - clockwise_level: bool = True, - counter_clockwise_level: bool = False): - """ - 初始化步进电机控制器 - :param pul_pin: 脉冲引脚 - :param dir_pin: 方向引脚 - :param pulses_per_round: 每圈脉冲数(SW5~SW8拨码,默认400) - :param pulse_frequency: 脉冲频率(Hz,新手建议500~2000,最大200KHz) - :param clockwise_level: 顺时针对应的DIR电平 - :param counter_clockwise_level: 逆时针对应的DIR电平 - """ - # 硬件配置参数 - self.pul_pin = pul_pin - self.dir_pin = dir_pin - - # 驱动器参数 - self.pulses_per_round = pulses_per_round - self.pulse_frequency = pulse_frequency - self.clockwise_level = clockwise_level - self.counter_clockwise_level = counter_clockwise_level - - # GPIO对象初始化 - self.pul_gpio = None - self.dir_gpio = None - - # 初始化GPIO - self._init_gpio() - - def _init_gpio(self): - """初始化PUL和DIR引脚(内部方法)""" - try: - # 初始化脉冲引脚(输出模式) - self.pul_gpio = GPIO(self.pul_pin, "out") - # 初始化方向引脚(输出模式) - self.dir_gpio = GPIO(self.dir_pin, "out") - - # 初始电平置低(避免电机误动作) - self.pul_gpio.write(False) - self.dir_gpio.write(False) - - print(f"✅ PUL引脚初始化完成:{self.pul_pin} 引脚") - print(f"✅ DIR引脚初始化完成:{self.dir_pin} 引脚") - - except PermissionError: - raise RuntimeError("权限不足!请用sudo运行程序(sudo python xxx.py)") - except Exception as e: - raise RuntimeError(f"GPIO初始化失败:{str(e)}") from e - - def _validate_rounds(self, rounds: float) -> bool: - """验证圈数是否合法(内部方法)""" - if rounds <= 0: - print("❌ 圈数必须为正数") - return False - return True - - def _validate_direction(self, direction: str) -> bool: - """验证方向参数是否合法(内部方法)""" - if direction not in [self.CLOCKWISE, self.COUNTER_CLOCKWISE]: - print(f"❌ 方向参数错误:仅支持 {self.CLOCKWISE}/{self.COUNTER_CLOCKWISE}") - return False - return True - - def rotate(self, rounds: float, direction: str = CLOCKWISE): - """ - 控制电机旋转(支持正反转) - :param rounds: 旋转圈数(可小数,如0.5=半圈) - :param direction: 方向(clockwise=顺时针,counterclockwise=逆时针) - """ - # 参数验证 - if not self._validate_rounds(rounds) or not self._validate_direction(direction): - return - - # 设置旋转方向(DIR电平) - if direction == self.CLOCKWISE: # 顺时针 - self.dir_gpio.write(self.clockwise_level) - print(f"\n=== 顺时针旋转 {rounds} 圈 ===") - else: # 逆时针 - self.dir_gpio.write(self.counter_clockwise_level) - print(f"\n=== 逆时针旋转 {rounds} 圈 ===") - - # 计算总脉冲数和时序(精准控制,避免丢步) - total_pulses = int(rounds * self.pulses_per_round) - pulse_period = 1.0 / self.pulse_frequency # 脉冲周期(秒) - half_period = pulse_period / 2 # 占空比50%(MA860H最优) - - print(f"总脉冲数:{total_pulses} | 频率:{self.pulse_frequency}Hz | 周期:{pulse_period * 1000:.2f}ms") - start_time = time.perf_counter() # 高精度计时(避免丢步) - - # 发送脉冲序列(核心:占空比50%的方波) - for _ in range(total_pulses): - # 高电平 - self.pul_gpio.write(True) - # 精准延时(比time.sleep稳定,适配高频脉冲) - while time.perf_counter() - start_time < half_period: - pass - # 低电平 - self.pul_gpio.write(False) - # 更新下一个脉冲的起始时间 - start_time += pulse_period - - print("✅ 旋转完成") - - def stop(self): - """紧急停止(置低脉冲引脚)""" - if self.pul_gpio: - self.pul_gpio.write(False) - print("🛑 电机已停止") - - def close(self): - """释放GPIO资源""" - # 安全释放GPIO资源(关键:避免引脚电平残留) - if self.pul_gpio: - self.pul_gpio.write(False) # 脉冲引脚置低 - self.pul_gpio.close() - print("\n✅ PUL引脚已关闭(电平置低)") - - if self.dir_gpio: - self.dir_gpio.write(False) # 方向引脚置低 - self.dir_gpio.close() - print("✅ DIR引脚已关闭(电平置低)") - - # 重置GPIO对象 - self.pul_gpio = None - self.dir_gpio = None - - def __del__(self): - """析构函数:确保资源释放""" - self.close() - - -# 使用示例 -def motor_demo(): - """电机控制示例""" - motor = None - try: - # 创建电机实例(使用默认配置) - motor = StepperMotor() - print("\n=== 步进电机控制程序启动 ===") - - while True: - # 靠近电机方向 逆时针 - motor.rotate(rounds=10.0, direction=motor.COUNTER_CLOCKWISE) - time.sleep(5) # 暂停5秒 - # 远离电机方向 顺时针 - motor.rotate(rounds=10.0, direction=motor.CLOCKWISE) - time.sleep(5) # 暂停5秒 - - - except PermissionError: - print("\n❌ 权限不足:请用 sudo 运行!") - print("命令:sudo python3 double_direction_motor.py") - except ImportError: - print("\n❌ 缺少依赖:请安装python-periphery") - print("命令:pip install python-periphery") - except Exception as e: - print(f"\n❌ 程序异常:{str(e)}") - finally: - if motor: - motor.close() - print("✅ 程序退出完成") - - -if __name__ == '__main__': - motor_demo() - diff --git a/RK1106/test.py b/RK1106/test.py deleted file mode 100644 index 2b0c77d..0000000 --- a/RK1106/test.py +++ /dev/null @@ -1,291 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -''' -# @Time : 2026/1/5 16:18 -# @Author : reenrr -# @File : test.py -# @Desc : 步进电机添加记录脉冲数量(防止断电情况) -''' -import time -import json -import os -from periphery import GPIO - -# ------------参数配置------------- -# 1. 脉冲(PUL)引脚配置 → GPIO32 -PUL_Pin = 32 - -# 2. 方向(DIR)引脚配置 → GPIO33 -DIR_Pin = 33 - -# 3. 驱动器参数(根据拨码调整,默认不变) -PULSES_PER_ROUND = 400 # 每圈脉冲数(SW5~SW8拨码,默认400) -PULSE_FREQUENCY = 2500 # 脉冲频率(Hz,新手建议500~2000,最大200KHz) - -# 4. 计数持久化配置 -COUNT_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "motor_pulse_count.json") -SAVE_INTERVAL = 10 # 每发送10个脉冲保存一次计数(减少IO开销,可根据需求调整) - -class StepperMotor: - """新力川MA860H驱动器步进电机控制类""" - # 方向常量定义 - CLOCKWISE = "clockwise" # 顺时针 - COUNTER_CLOCKWISE = "counterclockwise" # 逆时针 - - def __init__(self, - pul_pin: int = PUL_Pin, - dir_pin: int = DIR_Pin, - pulses_per_round: int = PULSES_PER_ROUND, - pulse_frequency: int = PULSE_FREQUENCY, - clockwise_level: bool = True, - counter_clockwise_level: bool = False): - """ - 初始化步进电机控制器 - :param pul_pin: 脉冲引脚 - :param dir_pin: 方向引脚 - :param pulses_per_round: 每圈脉冲数(SW5~SW8拨码,默认400) - :param pulse_frequency: 脉冲频率(Hz,新手建议500~2000,最大200KHz) - :param clockwise_level: 顺时针对应的DIR电平 - :param counter_clockwise_level: 逆时针对应的DIR电平 - """ - # 硬件配置参数 - self.pul_pin = pul_pin - self.dir_pin = dir_pin - - # 驱动器参数 - self.pulses_per_round = pulses_per_round - self.pulse_frequency = pulse_frequency - self.clockwise_level = clockwise_level - self.counter_clockwise_level = counter_clockwise_level - - # GPIO对象初始化 - self.pul_gpio = None - self.dir_gpio = None - - # 脉冲计数相关(核心:记录已发送的脉冲数) - self.current_pulse_count = 0 # 本次旋转已发送的脉冲数 - self.total_pulse_history = self._load_pulse_count() # 历史总脉冲数(从文件加载) - self.current_rotate_target = 0 # 本次旋转的目标脉冲数 - - # 初始化GPIO - self._init_gpio() - - def _load_pulse_count(self): - """加载历史脉冲计数(程序启动时执行)""" - try: - if os.path.exists(COUNT_FILE): - with open(COUNT_FILE, "r", encoding="utf-8") as f: - data = json.load(f) - # 返回历史总脉冲数(默认0) - return int(data.get("total_pulses", 0)) - except Exception as e: - print(f"[WARN] 加载历史计数失败:{e},将从0开始计数") - return 0 - - def _save_pulse_count(self): - """保存当前总脉冲数到文件(持久化)""" - try: - with open(COUNT_FILE, "w", encoding="utf-8") as f: - json.dump({ - "total_pulses": self.total_pulse_history, - "update_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - }, f, ensure_ascii=False, indent=2) - except Exception as e: - print(f"[ERROR] 保存计数失败:{e}") - - def _init_gpio(self): - """初始化PUL和DIR引脚(内部方法)""" - try: - # 初始化脉冲引脚(输出模式) - self.pul_gpio = GPIO(self.pul_pin, "out") - # 初始化方向引脚(输出模式) - self.dir_gpio = GPIO(self.dir_pin, "out") - - # 初始电平置低(避免电机误动作) - self.pul_gpio.write(False) - self.dir_gpio.write(False) - - print(f"[OK] PUL引脚初始化完成:{self.pul_pin} 引脚") - print(f"[OK] DIR引脚初始化完成:{self.dir_pin} 引脚") - print( - f"[INFO] 历史累计脉冲数:{self.total_pulse_history} → 对应圈数:{self.total_pulse_history / self.pulses_per_round:.2f} 圈") - - except PermissionError: - raise RuntimeError("权限不足!请用sudo运行程序(sudo python xxx.py)") - except Exception as e: - raise RuntimeError(f"GPIO初始化失败:{str(e)}") from e - - def _validate_rounds(self, rounds: float) -> bool: - """验证圈数是否合法(内部方法)""" - if rounds <= 0: - print("[ERROR] 圈数必须为正数") - return False - return True - - def _validate_direction(self, direction: str) -> bool: - """验证方向参数是否合法(内部方法)""" - if direction not in [self.CLOCKWISE, self.COUNTER_CLOCKWISE]: - print(f"[ERROR] 方向参数错误:仅支持 {self.CLOCKWISE}/{self.COUNTER_CLOCKWISE}") - return False - return True - - def rotate(self, rounds: float, direction: str = CLOCKWISE): - """ - 控制电机旋转(支持正反转,实时记录脉冲数) - :param rounds: 旋转圈数(可小数,如0.5=半圈) - :param direction: 方向(clockwise=顺时针,counterclockwise=逆时针) - """ - # 参数验证 - if not self._validate_rounds(rounds) or not self._validate_direction(direction): - return - - # 重置本次旋转的计数 - self.current_pulse_count = 0 - # 计算本次旋转的目标脉冲数 - self.current_rotate_target = int(rounds * self.pulses_per_round) - - # 设置旋转方向(DIR电平) - if direction == self.CLOCKWISE: # 顺时针 - self.dir_gpio.write(self.clockwise_level) - print(f"\n=== 顺时针旋转 {rounds} 圈 ===") - else: # 逆时针 - self.dir_gpio.write(self.counter_clockwise_level) - print(f"\n=== 逆时针旋转 {rounds} 圈 ===") - - # 计算总脉冲数和时序(精准控制,避免丢步) - total_pulses = self.current_rotate_target - pulse_period = 1.0 / self.pulse_frequency # 脉冲周期(秒) - half_period = pulse_period / 2 # 占空比50%(MA860H最优) - - print(f"目标脉冲数:{total_pulses} | 频率:{self.pulse_frequency}Hz | 周期:{pulse_period * 1000:.2f}ms") - start_time = time.perf_counter() # 高精度计时(避免丢步) - - try: - # 发送脉冲序列(核心:占空比50%的方波,实时计数) - for _ in range(total_pulses): - # 高电平 - self.pul_gpio.write(True) - # 精准延时(比time.sleep稳定,适配高频脉冲) - while time.perf_counter() - start_time < half_period: - pass - # 低电平 - self.pul_gpio.write(False) - # 更新下一个脉冲的起始时间 - start_time += pulse_period - - # 🌟 核心:累加本次脉冲计数 - self.current_pulse_count += 1 - self.total_pulse_history += 1 - - # 每发送SAVE_INTERVAL个脉冲,保存一次计数(减少IO开销) - if self.current_pulse_count % SAVE_INTERVAL == 0: - self._save_pulse_count() - - print( - f"[OK] 旋转完成 → 本次发送脉冲:{self.current_pulse_count} | 累计脉冲:{self.total_pulse_history} | 累计圈数:{self.total_pulse_history / self.pulses_per_round:.2f}") - - except Exception as e: - # 🌟 关键:异常发生时(如断电前的程序崩溃),立即保存当前计数 - self._save_pulse_count() - # 计算已完成的圈数 - completed_rounds = self.current_pulse_count / self.pulses_per_round - remaining_pulses = self.current_rotate_target - self.current_pulse_count - remaining_rounds = remaining_pulses / self.pulses_per_round - print(f"\n[ERROR] 旋转过程中异常:{e}") - print(f"[INFO] 异常时已发送脉冲:{self.current_pulse_count} → 已完成圈数:{completed_rounds:.2f}") - print(f"[INFO] 剩余未发送脉冲:{remaining_pulses} → 剩余圈数:{remaining_rounds:.2f}") - print( - f"[INFO] 累计总脉冲:{self.total_pulse_history} → 累计总圈数:{self.total_pulse_history / self.pulses_per_round:.2f}") - raise # 抛出异常,让上层处理 - - def get_current_status(self): - """获取当前电机状态(脉冲数、圈数)""" - return { - "total_pulses": self.total_pulse_history, - "total_rounds": self.total_pulse_history / self.pulses_per_round, - "last_rotate_completed_pulses": self.current_pulse_count, - "last_rotate_completed_rounds": self.current_pulse_count / self.pulses_per_round, - "last_rotate_target_pulses": self.current_rotate_target, - "last_rotate_target_rounds": self.current_rotate_target / self.pulses_per_round - } - - def reset_count(self): - """重置累计计数(按需使用,如电机归位后)""" - self.total_pulse_history = 0 - self._save_pulse_count() - print("[INFO] 累计脉冲计数已重置为0") - - def stop(self): - """紧急停止(置低脉冲引脚)""" - if self.pul_gpio: - self.pul_gpio.write(False) - # 停止时保存当前计数 - self._save_pulse_count() - print("[STOP] 电机已停止,当前计数已保存") - - def close(self): - """释放GPIO资源""" - # 安全释放GPIO资源(关键:避免引脚电平残留) - if self.pul_gpio: - self.pul_gpio.write(False) # 脉冲引脚置低 - self.pul_gpio.close() - print("\n[OK] PUL引脚已关闭(电平置低)") - - if self.dir_gpio: - self.dir_gpio.write(False) # 方向引脚置低 - self.dir_gpio.close() - print("[OK] DIR引脚已关闭(电平置低)") - - # 关闭时保存最终计数 - self._save_pulse_count() - print( - f"[INFO] 最终累计脉冲:{self.total_pulse_history} → 累计圈数:{self.total_pulse_history / self.pulses_per_round:.2f}") - - # 重置GPIO对象 - self.pul_gpio = None - self.dir_gpio = None - - def __del__(self): - """析构函数:确保资源释放""" - self.close() - -# ----------对外接口----------- -def motor_demo(): - """电机控制示例""" - motor = None - try: - # 创建电机实例(使用默认配置) - motor = StepperMotor() - print("\n=== 步进电机控制程序启动 ===") - # 打印初始状态 - init_status = motor.get_current_status() - print(f"[INIT] 初始状态 → 累计圈数:{init_status['total_rounds']:.2f} 圈") - - while True: - # 靠近电机方向 逆时针 - motor.rotate(rounds=10.0, direction=motor.COUNTER_CLOCKWISE) - time.sleep(5) # 暂停5秒 - # 远离电机方向 顺时针 - motor.rotate(rounds=10.0, direction=motor.CLOCKWISE) - time.sleep(5) # 暂停5秒 - - except PermissionError: - print("\n[ERROR] 权限不足:请用 sudo 运行!") - print("命令:sudo python3 double_direction_motor.py") - except ImportError: - print("\n[ERROR] 缺少依赖:请安装python-periphery") - print("命令:pip install python-periphery") - except KeyboardInterrupt: - print("\n[INFO] 用户手动停止程序") - except Exception as e: - print(f"\n[ERROR] 程序异常:{str(e)}") - finally: - if motor: - # 打印最终状态 - final_status = motor.get_current_status() - print(f"\n[FINAL] 程序退出状态 → 累计脉冲:{final_status['total_pulses']} | 累计圈数:{final_status['total_rounds']:.2f} 圈") - motor.close() - print("[OK] 程序退出完成") - -if __name__ == '__main__': - motor_demo() \ No newline at end of file