修改步进电机接口、通讯方式和EMV的接口

This commit is contained in:
2026-02-06 15:16:17 +08:00
parent 17df13c08e
commit 5c9946dfc7
7 changed files with 432 additions and 1111 deletions

View File

@ -49,19 +49,19 @@ valve_commands = {
'close': '000000000006010500020000', 'close': '000000000006010500020000',
}, },
ABSORB_SOLENOID_VALVE2: { ABSORB_SOLENOID_VALVE2: {
'open': '00000000000601050002FF00', 'open': '00000000000601050003FF00',
'close': '000000000006010500030000', 'close': '000000000006010500030000',
}, },
ABSORB_SOLENOID_VALVE3: { ABSORB_SOLENOID_VALVE3: {
'open': '00000000000601050002FF00', 'open': '00000000000601050004FF00',
'close': '000000000006010500040000', 'close': '000000000006010500040000',
}, },
ABSORB_SOLENOID_VALVE4: { ABSORB_SOLENOID_VALVE4: {
'open': '00000000000601050002FF00', 'open': '00000000000601050005FF00',
'close': '000000000006010500050000', 'close': '000000000006010500050000',
}, },
ABSORB_SOLENOID_VALVE5: { ABSORB_SOLENOID_VALVE5: {
'open': '00000000000601050002FF00', 'open': '00000000000601050006FF00',
'close': '000000000006010500060000', 'close': '000000000006010500060000',
} }
} }

View File

@ -1,37 +1,37 @@
#!/usr/bin/env python #!/usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
''' '''
# @Time : 2026/1/8 16:52 # @Time : 2025/12/12 14:39
# @Author : reenrr # @Author : reenrr
# @File : EMV_test.py # @File : EMV.py
# @Desc : 网络继电器控制输入、输出设备测试程序 # @Desc : 网络继电器控制输入、输出设备 修改了接口,需测试
''' '''
import socket import socket
import binascii import binascii
import time import time
from threading import Event, Lock
import threading
import logging import logging
from typing import Union
# 网络继电器的 IP 和端口 # 网络继电器的 IP 和端口
HOST = '192.168.5.18' HOST = '192.168.5.18'
PORT = 50000 PORT = 50000
# 控件命名映射 # 控件命名映射
SOLENOID_VALVE1 = 'solenoid_valve1' # 控制排料机构NG时的电磁阀1 SOLENOID_VALVE1 = 'solenoid_valve1' # 控制排料机构NG时的电磁阀1
SOLENOID_VALVE2 = 'solenoid_valve2' # 控制排料机构NG时的电磁阀2 SOLENOID_VALVE2 = 'solenoid_valve2' # 控制排料机构NG时的电磁阀2
ABSORB_SOLENOID_VALVE1 = 'absorb_solenoid_valve1' # 控制吸取设备的电磁阀1 ABSORB_SOLENOID_VALVE1 = 'absorb_solenoid_valve1' # 控制吸取设备的电磁阀1
ABSORB_SOLENOID_VALVE2 = 'absorb_solenoid_valve2' # 控制吸取设备的电磁阀2 ABSORB_SOLENOID_VALVE2 = 'absorb_solenoid_valve2' # 控制吸取设备的电磁阀2
ABSORB_SOLENOID_VALVE3 = 'absorb_solenoid_valve3' # 控制吸取设备的电磁阀3 ABSORB_SOLENOID_VALVE3 = 'absorb_solenoid_valve3' # 控制吸取设备的电磁阀3
ABSORB_SOLENOID_VALVE4 = 'absorb_solenoid_valve4' # 控制吸取设备的电磁阀4 ABSORB_SOLENOID_VALVE4 = 'absorb_solenoid_valve4' # 控制吸取设备的电磁阀4
ABSORB_SOLENOID_VALVE5 = 'absorb_solenoid_valve5' # 控制吸取设备的电磁阀5 ABSORB_SOLENOID_VALVE5 = 'absorb_solenoid_valve5' # 控制吸取设备的电磁阀5
# 传感器命名映射 # 传感器命名映射
CONVEYOR1_SENSOR = 'conveyor1_sensor' # 传送带1的行程开关 CONVEYOR1_SENSOR = 'conveyor1_sensor' # 传送带1的行程开关
CONVEYOR2_SENSOR = 'conveyor2_sensor' # 传送带2的行程开关 CONVEYOR2_SENSOR = 'conveyor2_sensor' # 传送带2的行程开关
PRESS_SENSOR1 = 'press_sensor1' # 传送带1旁边的按压开关1 PRESS_SENSOR1 = 'press_sensor1' # 传送带1旁边的按压开关1
PRESS_SENSOR2 = 'press_sensor2' # 传送带1旁边的按压开关2 PRESS_SENSOR2 = 'press_sensor2' # 传送带1旁边的按压开关2
FIBER_SENSOR = 'fiber_sensor' # 传送带1旁边的光纤传感器 FIBER_SENSOR = 'fiber_sensor' # 传送带1旁边的光纤传感器
# 控件控制报文 # 控件控制报文
valve_commands = { valve_commands = {
@ -48,19 +48,19 @@ valve_commands = {
'close': '000000000006010500020000', 'close': '000000000006010500020000',
}, },
ABSORB_SOLENOID_VALVE2: { ABSORB_SOLENOID_VALVE2: {
'open': '00000000000601050002FF00', 'open': '00000000000601050003FF00',
'close': '000000000006010500030000', 'close': '000000000006010500030000',
}, },
ABSORB_SOLENOID_VALVE3: { ABSORB_SOLENOID_VALVE3: {
'open': '00000000000601050002FF00', 'open': '00000000000601050004FF00',
'close': '000000000006010500040000', 'close': '000000000006010500040000',
}, },
ABSORB_SOLENOID_VALVE4: { ABSORB_SOLENOID_VALVE4: {
'open': '00000000000601050002FF00', 'open': '00000000000601050005FF00',
'close': '000000000006010500050000', 'close': '000000000006010500050000',
}, },
ABSORB_SOLENOID_VALVE5: { ABSORB_SOLENOID_VALVE5: {
'open': '00000000000601050002FF00', 'open': '00000000000601050006FF00',
'close': '000000000006010500060000', 'close': '000000000006010500060000',
} }
} }
@ -80,7 +80,6 @@ device_bit_map = {
ABSORB_SOLENOID_VALVE3: 4, ABSORB_SOLENOID_VALVE3: 4,
ABSORB_SOLENOID_VALVE4: 5, ABSORB_SOLENOID_VALVE4: 5,
ABSORB_SOLENOID_VALVE5: 6 ABSORB_SOLENOID_VALVE5: 6
} }
device_name_map = { device_name_map = {
@ -111,50 +110,77 @@ sensor_name_map = {
CONVEYOR2_SENSOR: '传送带2开关' CONVEYOR2_SENSOR: '传送带2开关'
} }
# -------------全局事件-------------
press_sensors_triggered = Event()
fiber_triggered = Event() # 光纤传感器触发事件
fiber_lock = Lock() # 线程锁,保护共享变量
valve1_open_flag = False # 电磁阀1打开标志
class RelayController: class RelayController:
def __init__(self): def __init__(self):
"""初始化继电器控制器""" """初始化继电器控制器"""
self.socket = None self.sock = None
self.is_connected = False # 长连接状态标记
# 线程相关状态 def connect(self) -> bool:
self.fiber_thread = None # 保存光纤传感器线程对象 """
self.fiber_monitor_running = False # 光纤传感器监听线程运行标志 建立长连接
:return: True-连接成功False-连接失败
"""
if not self.is_connected:
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(3)
self.sock.connect((HOST, PORT))
self.is_connected = True
print(f"长连接建立成功:{HOST}:{PORT}")
return True
except Exception as e:
print(f"长连接建立失败:{e}")
self.sock = None
self.is_connected = False
return False
return True
self.press_sensors_thread = None # 保存按压开关线程对象 def disconnect(self):
self.press_sensors_monitor_running = False # 按压传感器监听线程运行标志 """断开长连接"""
self.last_press_sensor_status = False # 记录传感器上一次状态,初始为无信号(用于上升沿检测) if self.sock and self.is_connected:
try:
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
print("长连接已正常断开")
except Exception as e:
print(f"断开长连接时出现异常:{e}")
finally:
self.sock = None
self.is_connected = False
def send_command(self, command): def send_command(self, command: str) -> Union[bytes, bool]:
""" """
将十六进制字符串转换为字节数据并发送 将十六进制字符串转换为字节数据并发送
:param command: 十六进制字符串 :param command: 十六进制字符串
:return: 响应字节数据 / False :return: 响应字节数据 / False
""" """
try: if not self.is_connected or not self.sock:
byte_data = binascii.unhexlify(command) print("长连接未建立,无法发送指令")
# 创建套接字并连接到继电器
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect((HOST, PORT))
sock.send(byte_data)
# 接收响应
response = sock.recv(1024)
# logging.info(f"收到响应: {binascii.hexlify(response)}")
# 校验响应
return response
except Exception as e:
logging.info(f"通信错误: {e}")
return False return False
def get_all_device_status(self, command_type='devices'): try:
byte_data = binascii.unhexlify(command)
self.sock.sendall(byte_data) # sendall确保数据全部发送
# 接收响应
response = self.sock.recv(1024)
if not response:
logging.warning("长连接接收空响应,可能连接已断开")
self.is_connected = False
return False
# print(f"收到响应: {binascii.hexlify(response)}")
# 校验响应
return response
except socket.timeout:
print("长连接发送/接收超时")
self.is_connected = False
return False
except Exception as e:
print(f"通信错误: {e}")
self.is_connected = False
return False
def get_all_device_status(self, command_type: str='devices') -> dict[str, bool]:
""" """
获取所有设备/传感器状态 获取所有设备/传感器状态
:param command_type: 'devices'(控件) / 'sensors'(传感器) :param command_type: 'devices'(控件) / 'sensors'(传感器)
@ -162,7 +188,7 @@ class RelayController:
""" """
command = read_status_command.get(command_type) command = read_status_command.get(command_type)
if not command: if not command:
logging.info(f"未知的读取类型: {command_type}") print(f"未知的读取类型: {command_type}")
return {} return {}
response = self.send_command(command) response = self.send_command(command)
@ -179,295 +205,143 @@ class RelayController:
bit_map = sensor_bit_map bit_map = sensor_bit_map
name_map = sensor_name_map name_map = sensor_name_map
else: else:
logging.info("不支持的映射类型") print("不支持的映射类型")
return{} return {}
for key, bit_index in bit_map.items(): for key, bit_index in bit_map.items():
state = status_bin[bit_index] == '1' state = status_bin[bit_index] == '1'
status_dict[key] = state status_dict[key] = state
# readable = "开启" if state else "关闭" # readable = "开启" if state else "关闭"
# logging.info(f"{device.capitalize()} 状态: {readable}") # print(f"{device.capitalize()} 状态: {readable}")
else: else:
logging.info("读取状态失败或响应无效") print("读取状态失败或响应无效")
return status_dict return status_dict
def get_device_status(self, device_name, command_type='devices'): def set_device(self, device_name: str, state: bool):
""" """
获取单个控件/传感器状态 设置指定输出设备DO的开关状态
:param device_name:设备名称 :param device_name: 设备名称
:param command_type: 'devices'/'sensors' :param state: 目标状态 True--打开 FALSE--关闭
:return:True(开启) / False(关闭) / None(无法读取)
""" """
status = self.get_all_device_status(command_type) if device_name not in valve_commands:
return status.get(device_name, None) raise ValueError(f"未知设备:{device_name}")
def open(self, solenoid_valve1=False, solenoid_valve2=False, absorb_solenoid_valve1=False, try:
absorb_solenoid_valve2=False, absorb_solenoid_valve3=False, absorb_solenoid_valve4=False, current = self.get_all_device_status()
absorb_solenoid_valve5=False): is_on = current.get(device_name, False)
"""
根据状态决定是否执行开操作
:param solenoid_valve1:是否打开电磁阀1
:param solenoid_valve2:是否打开电磁阀2
:param absorb_solenoid_valve1:是否打开吸取装置电磁阀1
:param absorb_solenoid_valve2:是否打开吸取装置电磁阀2
:param absorb_solenoid_valve3:是否打开吸取装置电磁阀3
:param absorb_solenoid_valve4:是否打开吸取装置电磁阀4
:param absorb_solenoid_valve5:是否打开吸取装置电磁阀5
:return: if state and not is_on:
""" self.send_command(valve_commands[device_name]['open'])
global valve1_open_time, valve1_open_flag elif not state and is_on:
status = self.get_all_device_status() self.send_command(valve_commands[device_name]['close'])
if solenoid_valve1 and not status.get(SOLENOID_VALVE1, False): except Exception as e:
print("打开电磁阀1") raise RuntimeError(f"设置设备 '{device_name}' 状态失败: {e}") from e
self.send_command(valve_commands[SOLENOID_VALVE1]['open'])
# 记录电磁阀1打开时的时间戳和标志
with fiber_lock:
valve1_open_time = time.time()
valve1_open_flag = True
if solenoid_valve2 and not status.get(SOLENOID_VALVE2, False):
print("打开电磁阀2")
self.send_command(valve_commands[SOLENOID_VALVE2]['open'])
if absorb_solenoid_valve1 and not status.get(ABSORB_SOLENOID_VALVE1, False):
print("打开吸取装置电磁阀1")
self.send_command(valve_commands[ABSORB_SOLENOID_VALVE1]['open'])
time.sleep(1) # 实际测试需要考虑这个延时是否合适
if absorb_solenoid_valve2 and not status.get(ABSORB_SOLENOID_VALVE2, False):
print("打开吸取装置电磁阀2")
self.send_command(valve_commands[ABSORB_SOLENOID_VALVE2]['open'])
time.sleep(1) # 实际测试需要考虑这个延时是否合适
if absorb_solenoid_valve3 and not status.get(ABSORB_SOLENOID_VALVE3, False):
print("打开吸取装置电磁阀3")
self.send_command(valve_commands[ABSORB_SOLENOID_VALVE3]['open'])
time.sleep(1) # 实际测试需要考虑这个延时是否合适
if absorb_solenoid_valve4 and not status.get(ABSORB_SOLENOID_VALVE4, False):
print("打开吸取装置电磁阀4")
self.send_command(valve_commands[ABSORB_SOLENOID_VALVE4]['open'])
time.sleep(1) # 实际测试需要考虑这个延时是否合适
if absorb_solenoid_valve5 and not status.get(ABSORB_SOLENOID_VALVE5, False):
print("打开吸取装置电磁阀5")
self.send_command(valve_commands[ABSORB_SOLENOID_VALVE5]['open'])
time.sleep(1) # 实际测试需要考虑这个延时是否合适
# 根据状态决定是否执行关操作
def close(self, solenoid_valve1=False, solenoid_valve2=False, absorb_solenoid_valve1=False,
absorb_solenoid_valve2=False, absorb_solenoid_valve3=False, absorb_solenoid_valve4=False,
absorb_solenoid_valve5=False):
"""
根据状态决定是否执行关操作
:param solenoid_valve1:是否关闭电磁阀1
:param solenoid_valve2:是否关闭电磁阀2
:param absorb_solenoid_valve1:是否关闭吸取电磁阀1
:param absorb_solenoid_valve2:是否关闭吸取电磁阀2
:param absorb_solenoid_valve3:是否关闭吸取电磁阀3
:param absorb_solenoid_valve4:是否关闭吸取电磁阀4
:param absorb_solenoid_valve5:是否关闭吸取电磁阀5
:return:
"""
global valve1_open_flag
status = self.get_all_device_status()
if solenoid_valve1 and status.get(SOLENOID_VALVE1, True):
print("关闭电磁阀1")
self.send_command(valve_commands[SOLENOID_VALVE1]['close'])
# 重置电磁阀1打开标志
with fiber_lock:
valve1_open_flag = False
if solenoid_valve2 and status.get(SOLENOID_VALVE2, True):
print("关闭电磁阀2")
self.send_command(valve_commands[SOLENOID_VALVE2]['close'])
if absorb_solenoid_valve1 and status.get(ABSORB_SOLENOID_VALVE1, True):
print("关闭吸取装置电磁阀1")
self.send_command(valve_commands[ABSORB_SOLENOID_VALVE1]['close'])
time.sleep(1) # 实际测试需要考虑这个延时是否合适
if absorb_solenoid_valve2 and status.get(ABSORB_SOLENOID_VALVE2, True):
print("关闭吸取装置电磁阀2")
self.send_command(valve_commands[ABSORB_SOLENOID_VALVE2]['close'])
time.sleep(1) # 实际测试需要考虑这个延时是否合适
if absorb_solenoid_valve3 and status.get(ABSORB_SOLENOID_VALVE3, True):
print("关闭吸取装置电磁阀3")
self.send_command(valve_commands[ABSORB_SOLENOID_VALVE3]['close'])
time.sleep(1) # 实际测试需要考虑这个延时是否合适
if absorb_solenoid_valve4 and status.get(ABSORB_SOLENOID_VALVE4, True):
print("关闭吸取装置电磁阀4")
self.send_command(valve_commands[ABSORB_SOLENOID_VALVE4]['close'])
time.sleep(1) # 实际测试需要考虑这个延时是否合适
if absorb_solenoid_valve5 and status.get(ABSORB_SOLENOID_VALVE5, True):
print("关闭吸取装置电磁阀5")
self.send_command(valve_commands[ABSORB_SOLENOID_VALVE5]['close'])
time.sleep(1) # 实际测试需要考虑这个延时是否合适
def fiber_sensor_monitor(self):
"""
光纤传感器监听线程,专门检测电磁阀打开后的触发状态
"""
global fiber_triggered, valve1_open_flag
logging.info("光纤传感器监听线程已启动")
while self.fiber_monitor_running:
try:
# 增加短休眠降低CPU占用避免错过信号
time.sleep(0.005)
# 获取光纤传感器状态
fiber_status = self.get_device_status(FIBER_SENSOR, 'sensors')
# 检测是否检测到信号
if fiber_status:
with fiber_lock:
# 检查电磁阀1是否处于打开状态
if valve1_open_flag:
fiber_triggered.set()
# 防止重复触发
time.sleep(0.1)
fiber_triggered.clear()
except Exception as e:
logging.info(f"光纤传感器监听异常:{e}")
time.sleep(0.1) # 异常时增加休眠
def start_fiber_monitor(self):
"""启动光纤传感器监听线程"""
# 检查线程是否已在运行
if self.fiber_monitor_running or (self.fiber_thread and self.fiber_thread.is_alive()):
logging.warning("光纤传感器监听线程已在运行,无需重复启动")
return
# 启动线程
self.fiber_monitor_running = True
self.fiber_thread = threading.Thread(
target=self.fiber_sensor_monitor,
daemon=True
)
self.fiber_thread.start()
logging.info("光纤传感器监听线程启动成功")
def stop_fiber_monitor(self):
"""停止光纤传感器监听线程"""
self.fiber_monitor_running = False
# 等待线程完全退出
if self.fiber_thread and self.fiber_thread.is_alive():
self.fiber_thread.join(timeout=1)
logging.info("光纤传感器监听线程已停止")
def press_sensors_monitor(self, check_interval=0.1):
"""
双压传感器监听线程
:param check_interval: 检测间隔
:return:
"""
global press_sensors_triggered
logging.info("双压传感器监听线程已启动")
while self.press_sensors_monitor_running:
try:
# 检测两个传感器任意一个是否触发
press_sensor1_status = self.get_device_status(PRESS_SENSOR1, 'sensors')
press_sensor2_status = self.get_device_status(PRESS_SENSOR2, 'sensors')
current_sensor_state = press_sensor1_status or press_sensor2_status
# 上升沿触发(仅从无信号-->有信号时,才触发事件)
if current_sensor_state and not self.last_press_sensor_status:
press_sensors_triggered.set() # 触发事件,通知主线程
logging.info("双压传感器触发:线条已落到传送带")
# 更新上一次传感器状态,为下一次上升沿检测做准备
self.last_press_sensor_status = current_sensor_state
# 传感器检测间隔
time.sleep(check_interval)
except Exception as e:
logging.info(f"双压传感器监听异常: {e}")
time.sleep(0.1) # 异常时增加休眠
def start_press_sensors_monitor(self):
"""启动双压传感器监听线程"""
# 检查线程是否已在运行
if self.press_sensors_monitor_running or (self.press_sensors_thread and self.press_sensors_thread.is_alive()):
logging.warning("双压传感器监听线程已在运行,无需重复启动")
return
# 启动线程
self.press_sensors_monitor_running = True
self.press_sensors_thread = threading.Thread(
target=self.press_sensors_monitor,
daemon=True
)
self.press_sensors_thread.start()
logging.info("双压传感器监听线程启动成功")
def stop_press_sensors_monitor(self):
"""停止光纤传感器监听线程"""
self.press_sensors_monitor_running = False
# 等待线程完全退出
if self.press_sensors_thread and self.press_sensors_thread.is_alive():
self.press_sensors_thread.join(timeout=1)
logging.info("双压传感器监听线程已停止")
# 全局继电器实例 # 全局变量
global_relay = RelayController() GLOBAL_RELAY = None
# ------------对外接口---------- # --------对外接口--------
def control_solenoid(): def init_relay():
""" """初始化网络继电器实例+建立长连接"""
线条不合格场景专用:控制电磁阀+监听光纤传感器 global GLOBAL_RELAY
执行流程:启动监听-->打开电磁阀-->等待检测-->关闭电磁阀-->停止监听
"""
global fiber_triggered, global_relay
try: try:
# 启动光纤传感器监听线程 GLOBAL_RELAY = RelayController()
global_relay.start_fiber_monitor() GLOBAL_RELAY.connect()
# 重置光纤传感器触发事件,准备检测
fiber_triggered.clear()
# 同时打开电磁阀1、2
global_relay.open(solenoid_valve1=True, solenoid_valve2=True)
logging.info("电磁阀1、2已打开")
# 等待线条掉落最多等待2秒
timeout = 3.0
start_time = time.time()
fiber_detected = False
# 等待光纤传感器触发或超时
while time.time() - start_time < timeout:
if fiber_triggered.is_set():
fiber_detected = True
logging.info("该NG线条掉入费料区")
break
time.sleep(0.01) # 降低CPU空转
if not fiber_detected:
logging.info("出问题!!!,红外传感器未检测到线条")
# 关闭电磁阀1、2
time.sleep(0.2) # 等待线条掉落
global_relay.close(solenoid_valve1=True, solenoid_valve2=True)
logging.info("电磁阀1、2已关闭")
# 停止光纤传感器监听线程
global_relay.stop_fiber_monitor()
except Exception as e: except Exception as e:
logging.info(f"操作电磁阀失败{str(e)}") raise ValueError(f"初始化失败{e}")
# 异常时也要停止线程,避免残留
global_relay.stop_fiber_monitor() def deinit_relay():
"""断开长连接"""
global GLOBAL_RELAY
if GLOBAL_RELAY is not None:
GLOBAL_RELAY.disconnect()
GLOBAL_RELAY = None
def ng_push():
"""NG推料流程"""
try:
# 同时打开电磁阀1、2
write_do(SOLENOID_VALVE1, True)
write_do(SOLENOID_VALVE2, True)
print(f"电磁阀1、2已打开")
# 等待线条掉落
time.sleep(0.5)
write_do(SOLENOID_VALVE1, False)
write_do(SOLENOID_VALVE2, False)
print(f"电磁阀1、2已关闭")
except Exception as e:
print(f"NG推料失败:{e}")
raise RuntimeError("NG推料流程异常") from e
def write_do(device_name: str, state: bool):
"""
控制单个数字输出设备DO的开关状态
:param device_name: 设备名称
:param state: True:打开 False关闭
"""
global GLOBAL_RELAY
if GLOBAL_RELAY is None:
raise ValueError("未初始化实例")
# 验证设备是否存在
if device_name not in device_bit_map:
valid_devices = list(device_bit_map.keys())
raise ValueError(f"无效的设备名 '{device_name}'。有效设备: {valid_devices}")
# 确保已连接
if not GLOBAL_RELAY.is_connected:
if not GLOBAL_RELAY.connect():
raise RuntimeError("无法连接到网络继电器")
try:
GLOBAL_RELAY.set_device(device_name, state)
except Exception as e:
raise RuntimeError(f"控制设备 '{device_name}' 失败: {e}")
def read_all_io() -> dict[str, dict[str, bool]]:
"""
读取所有DI传感器和DO设备状态
:return: {'devices': {...}, 'sensors': {...}}
"""
global GLOBAL_RELAY
if GLOBAL_RELAY is None:
raise ValueError("未初始化")
try:
devices = GLOBAL_RELAY.get_all_device_status('devices')
sensors = GLOBAL_RELAY.get_all_device_status('sensors')
return {'devices': devices, 'sensors': sensors}
except Exception as e:
print(f"读取IO状态失败:{e}")
raise RuntimeError("读取IO失败") from e
# ------------测试接口------------- # ------------测试接口-------------
if __name__ == '__main__': if __name__ == '__main__':
control_solenoid() init_relay()
write_do(SOLENOID_VALVE1, True)
time.sleep(5)
io_status = read_all_io()
for name, status in io_status['devices'].items():
status_str = "开启" if status else "关闭"
print(f"{device_name_map.get(name, name)}: {status_str}")
deinit_relay()

View File

@ -4,15 +4,17 @@
# @Time : 2026/1/9 10:45 # @Time : 2026/1/9 10:45
# @Author : reenrr # @Author : reenrr
# @File : RK1106_server.py # @File : RK1106_server.py
# @Desc : RK1106服务端等待工控机调用 # @Desc : RK1106服务端等待工控机调用 通信为JSON格式
''' '''
import socket import socket
import logging import logging
import sys import sys
from test import motor_demo import json
from stepper_motor import motor_start, align_wire, cleanup, motor_stop
# --------日志配置(终端+文件双输出)-------------- # ------------日志配置(终端+文件双输出)--------------
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s', format='%(asctime)s - %(levelname)s - %(message)s',
@ -30,88 +32,177 @@ logging.basicConfig(
) )
# --------配置TCP服务端---------- # --------配置TCP服务端----------
HOST = "127.0.0.1" HOST = "192.168.0.100"
PORT = 8888 PORT = 8888
# 程序映射表(指令表示 -> 执行函数) # 全局参数缓存
PROG_MAP = { MOTOR_CONFIG = {
# "STEPPER_TEST": motor_test_demo, "speed": 2500, # 默认速度
"test": motor_demo "cycle": 10.0, # 默认旋转圈数
} }
def parse_command(cmd_str: str) ->tuple[str, dict]: def handle_setting(para_type: str, para_value: str) ->dict:
""" """
解析工控机发送的指令字符串 处理客户端发送的参数设置指令cmd:"setting"),更新全局电机配置
:param cmd_str: 指令字符串 :param para_type: 要设置的参数类型,仅支持"speed""cycle"
:return: 指令名称,参数字典 :param para_value: 参数值字符串将尝试转换为int(speed)或float(cycle)
:return: 标准化相应字典dict,如:
{
"Result": "1"表示成功,"0"表示失败,
"ErrMsg": str # 成功提示或错误详情
}
""" """
# 空指令处理 try:
if not cmd_str or cmd_str.strip() == "": if para_type == "speed":
return "", {} MOTOR_CONFIG["speed"] = int(para_value)
elif para_type == "cycle":
MOTOR_CONFIG["cycle"] = float(para_value)
else:
return {"Result": "0", "ErrMsg": f"不支持的参数类型: {para_type}"}
return {"Result": "1", "ErrMsg": "设置成功"}
except ValueError as e:
return {"Result": "0", "ErrMsg": f"参数值格式错误: {str(e)}"}
# 分割指令标识和参数 def handle_start(para_type: str, para_value: str) -> dict:
cmd_parts = cmd_str.strip().split("|", 1) """
prog_id = cmd_parts[0].strip() 处理启动电机指令cmd: "start"),使用当前MOTOR_CONFIG配置运行电机
params = {} :param para_type:为"direction"时,使用"para_value"作为临时方向
:param para_value:为0或1
:return: 标准化相应字典dict,如:
{
"Result": "1"表示成功,"0"表示失败,
"ErrMsg": str #执行结果或异常信息
}
"""
try:
if para_type == "direction":
direction = int(para_value)
if direction not in (0,1):
return {"Result": "0", "ErrMsg": "方向必须为0或1"}
# 解析参数格式param1=val1&param2=val2 motor_start(speed=MOTOR_CONFIG["speed"],
if len(cmd_parts) > 1 and cmd_parts[1].strip() != "": cycle=MOTOR_CONFIG["cycle"],
param_str = cmd_parts[1].strip() direction=direction)
param_pairs = param_str.split("&") dir_str = "正向" if direction == 1 else "负向"
for pair in param_pairs: return {"Result": "1", "ErrMsg": f"电机启动成功({dir_str}"}
if "=" in pair: else:
key, value = pair.split("=", 1) # 处理值中含=的情况 return {"Result": "0", "ErrMsg": "start 指令仅支持'direction'"}
# 类型自动转换(数字/字符串) except Exception as e:
try: return {"Result": "0", "ErrMsg": f"电机启动失败:{str(e)}"}
params[key.strip()] = int(value.strip())
except ValueError: def handle_stop() -> dict:
try: """
params[key.strip()] = float(value.strip()) 处理停止电机指令cmd: "stop")
except ValueError: :return: 标准化相应字典dict,如:
params[key.strip()] = value.strip() {
"Result": "1"表示成功,"0"表示失败,
"ErrMsg": str #执行结果或异常信息
}
"""
try:
motor_stop()
return {"Result": "1", "ErrMsg": "电机已停止"}
except Exception as e:
return {"Result": "0", "ErrMsg": f"电机停止失败:{str(e)}"}
def handle_align() -> dict:
"""
处理线条对齐(挡板一来一回)
:return: dict
"""
try:
align_wire(MOTOR_CONFIG['speed'], MOTOR_CONFIG['cycle'])
return {"Result": "1", "ErrMsg": "处理线条对齐"}
except Exception as e:
return {"Result": "0", "ErrMsg": "线条对齐失败"}
def parse_json_command(data: str) -> dict:
"""
解析客户端发送的原始JSON字符串指令并分发至对应处理函数
:param data: 客户端发送的原始JSON字符串
:return dict:标准化响应字典
"""
try:
cmd_obj = json.loads(data.strip())
except json.JSONDecodeError as e:
return {"Result": "0", "ErrMsg": f"JSON 格式错误: {str(e)}"}
cmd = cmd_obj.get("cmd", "").strip()
para_type = cmd_obj.get("para_type", "").strip()
para_value = cmd_obj.get("para_value", "").strip()
if cmd == "setting":
return handle_setting(para_type, para_value)
elif cmd == "start":
return handle_start(para_type, para_value)
elif cmd == "stop":
if para_type == "none" and para_value == "none":
return handle_stop()
else:
return {"Result": "0", "ErrMsg": "stop指令参数必须为none"}
elif cmd == "alignment":
if para_type == "none" and para_value == "none":
return handle_align()
else:
return {"Result": "0", "ErrMsg": "alignment指令参数必须为none"}
else:
return {"Result": "0", "ErrMsg": f"未知指令:{cmd}"}
return prog_id, params
# ----------对外接口---------- # ----------对外接口----------
def server(): def server():
"""启动TCP服务端监听指定端口接收工控机连接并循环处理JSON指令"""
# 创建TCP socket # 创建TCP socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket: server_socket = None
# 允许端口复用 conn = None
try:
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((HOST, PORT)) server_socket.bind((HOST, PORT))
server_socket.listen(1) # 只允许1个工控机连接 server_socket.listen(1) # 只允许1个工控机连接
logging.info(f"[1106] 服务已启动,监听端口:{PORT},等待工控机连接...") logging.info(f"[1106] 服务已启动,监听端口:{PORT},等待工控机连接...")
# 等待工控机连接 while True: # 持续接受新连接
conn, addr = server_socket.accept() try:
with conn: # 等待工控机连接
logging.info(f"[1106] 工控机已连接:{addr}") conn, addr = server_socket.accept()
# 循环接收指令 logging.info(f"[1106] 工控机已连接:{addr}")
while True:
# 接收指令最大1024字节
data = conn.recv(1024).decode()
logging.info(f"\n[1106] 收到工控机指令:{data}")
# 解析指令
prog_id, params = parse_command(data)
logging.info(f"[1106] 解析结果 - 指令:{prog_id},参数:{params}")
# 执行对应程序 # 循环接收指令
responses = "" while True:
if prog_id in PROG_MAP: # 接收指令最大1024字节
try: data = conn.recv(1024).decode()
result = PROG_MAP[prog_id](**params) if not data:
response = f"SUCCESS|步进电机测试执行完成,结果:{result}" logging.warning("客户端断开连接")
logging.info(f"[1106] {response}") break
except Exception as e:
response = f"FAIL|步进电机测试执行失败:{str(e)}"
logging.error(f"[1106] {response}", exc_info=True)
else:
response = f"FAIL|未知指令:{prog_id},支持指令:{list(PROG_MAP.keys())}"
logging.warning(f"[1106] {response}")
# 发送响应给工控机 logging.info(f"\n[1106] 收到工控机指令:{data}")
conn.sendall(response.encode("utf-8"))
logging.info(f"[1106] 已发送响应:{response}") # 解析指令
response_dict = parse_json_command(data)
response_json = json.dumps(response_dict, ensure_ascii=False) + "\n" # 看需不需要加换行符\n
# 发送响应给工控机
conn.sendall(response_json.encode("utf-8"))
logging.info(f"[1106] 已发送响应:{response_json}")
except ConnectionError:
logging.info("客户端异常断开")
except Exception as e:
logging.info(f"处理连接时发生错误: {e}")
finally:
if conn is not None:
conn.close()
conn = None # 重置,避免重复关闭
logging.info("客户端连接已关闭,等待新连接...")
except KeyboardInterrupt:
logging.info("\n收到 Ctrl+C正在关闭服务...")
finally:
if server_socket:
server_socket.close()
logging.info("服务已停止,监听 socket 已释放")
# ----------测试接口---------- # ----------测试接口----------
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -1,15 +1,14 @@
#!/usr/bin/env python #!/usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
''' """
# @Time : 2026/1/5 15:50 # @Time : 2026/1/4 19:13
# @Author : reenrr # @Author : reenrr
# @File : stepper_motor.py # @File : stepper_motor_test1.py
# @Desc : 控制步进电机从初始位置移动10cm,移动后回到初始位置 # @Desc : 线条厂控制步进电机测试 应该不会丢步
''' """
import time import time
from periphery import GPIO from periphery import GPIO
import logging
import os
# ------------参数配置------------- # ------------参数配置-------------
# 1. 脉冲PUL引脚配置 → GPIO32 # 1. 脉冲PUL引脚配置 → GPIO32
@ -20,33 +19,15 @@ DIR_Pin = 33
# 3. 驱动器参数(根据拨码调整,默认不变) # 3. 驱动器参数(根据拨码调整,默认不变)
PULSES_PER_ROUND = 400 # 每圈脉冲数SW5~SW8拨码默认400 PULSES_PER_ROUND = 400 # 每圈脉冲数SW5~SW8拨码默认400
PULSE_FREQUENCY = 2500 # 脉冲频率Hz PULSE_FREQUENCY = 2500 # 脉冲频率Hz新手建议500~2000最大200KHz
# ------------ 日志+参数配置 ------------
script_dir = os.path.dirname(os.path.abspath(__file__))
log_file_path = os.path.join(script_dir, "stepper_motor.log")
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s.%(msecs)03d] [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[
logging.StreamHandler(),
logging.FileHandler(log_file_path, encoding='utf-8')
]
)
class StepperMotor: class StepperMotor:
"""新力川MA860H驱动器步进电机控制类""" """新力川MA860H驱动器步进电机控制类"""
# 方向常量定义
CLOCKWISE = "clockwise" # 顺时针
COUNTER_CLOCKWISE = "counterclockwise" # 逆时针
def __init__(self, def __init__(self,
pul_pin: int = PUL_Pin, pul_pin: int = PUL_Pin,
dir_pin: int = DIR_Pin, dir_pin: int = DIR_Pin,
pulses_per_round: int = PULSES_PER_ROUND, pulses_per_round: int = PULSES_PER_ROUND,
pulse_frequency: int = PULSE_FREQUENCY,
clockwise_level: bool = True, clockwise_level: bool = True,
counter_clockwise_level: bool = False): counter_clockwise_level: bool = False):
""" """
@ -54,7 +35,6 @@ class StepperMotor:
:param pul_pin: 脉冲引脚 :param pul_pin: 脉冲引脚
:param dir_pin: 方向引脚 :param dir_pin: 方向引脚
:param pulses_per_round: 每圈脉冲数SW5~SW8拨码默认400 :param pulses_per_round: 每圈脉冲数SW5~SW8拨码默认400
:param pulse_frequency: 脉冲频率Hz
:param clockwise_level: 顺时针对应的DIR电平 :param clockwise_level: 顺时针对应的DIR电平
:param counter_clockwise_level: 逆时针对应的DIR电平 :param counter_clockwise_level: 逆时针对应的DIR电平
""" """
@ -64,7 +44,6 @@ class StepperMotor:
# 驱动器参数 # 驱动器参数
self.pulses_per_round = pulses_per_round self.pulses_per_round = pulses_per_round
self.pulse_frequency = pulse_frequency
self.clockwise_level = clockwise_level self.clockwise_level = clockwise_level
self.counter_clockwise_level = counter_clockwise_level self.counter_clockwise_level = counter_clockwise_level
@ -76,7 +55,7 @@ class StepperMotor:
self._init_gpio() self._init_gpio()
def _init_gpio(self): def _init_gpio(self):
"""初始化PUL和DIR引脚""" """初始化PUL和DIR引脚(内部方法)"""
try: try:
# 初始化脉冲引脚(输出模式) # 初始化脉冲引脚(输出模式)
self.pul_gpio = GPIO(self.pul_pin, "out") self.pul_gpio = GPIO(self.pul_pin, "out")
@ -87,52 +66,48 @@ class StepperMotor:
self.pul_gpio.write(False) self.pul_gpio.write(False)
self.dir_gpio.write(False) self.dir_gpio.write(False)
logging.info(f"PUL引脚初始化完成{self.pul_pin} 引脚") print(f"PUL引脚初始化完成{self.pul_pin} 引脚")
logging.info(f"DIR引脚初始化完成{self.dir_pin} 引脚") print(f"DIR引脚初始化完成{self.dir_pin} 引脚")
except PermissionError: except PermissionError:
raise RuntimeError("权限不足请用sudo运行程序sudo python xxx.py") raise RuntimeError("权限不足请用sudo运行程序sudo python xxx.py")
except Exception as e: except Exception as e:
raise RuntimeError(f"GPIO初始化失败{str(e)}") from e raise RuntimeError(f"GPIO初始化失败{str(e)}") from e
def _validate_rounds(self, rounds: float) -> bool: def _validate_params(self, rounds: float, direction: int) -> bool:
"""验证圈数是否合法(内部方法)"""
if rounds <= 0: if rounds <= 0:
logging.info("圈数必须为正数") print("圈数必须为正数")
return False
if direction not in (0, 1):
print("方向必须为0逆时针或1顺时针")
return False return False
return True return True
def _validate_direction(self, direction: str) -> bool: def rotate(self, pulse_frequency: int, rounds: float, direction: int):
"""验证方向参数是否合法(内部方法)"""
if direction not in [self.CLOCKWISE, self.COUNTER_CLOCKWISE]:
logging.info(f"方向参数错误:仅支持 {self.CLOCKWISE}/{self.COUNTER_CLOCKWISE}")
return False
return True
def rotate(self, rounds: float, direction: str = CLOCKWISE):
""" """
控制电机旋转(支持正反转) 控制电机旋转(支持正反转)
:param pulse_frequency: 脉冲频率hz)
:param rounds: 旋转圈数可小数如0.5=半圈) :param rounds: 旋转圈数可小数如0.5=半圈)
:param direction: 方向(clockwise=顺时针counterclockwise=逆时针) :param direction: 方向(1=顺时针0=逆时针)
""" """
# 参数验证 # 参数验证
if not self._validate_rounds(rounds) or not self._validate_direction(direction): if not self._validate_params(rounds, direction):
return return
# 设置旋转方向DIR电平 # 设置旋转方向DIR电平
if direction == self.CLOCKWISE: # 顺时针 if direction == 1: # 顺时针
self.dir_gpio.write(self.clockwise_level) self.dir_gpio.write(self.clockwise_level)
logging.info(f"\n=== 顺时针旋转 {rounds} 圈 ===") print(f"\n=== 顺时针旋转 {rounds} 圈 ===")
else: # 逆时针 else: # 逆时针
self.dir_gpio.write(self.counter_clockwise_level) self.dir_gpio.write(self.counter_clockwise_level)
logging.info(f"\n=== 逆时针旋转 {rounds} 圈 ===") print(f"\n=== 逆时针旋转 {rounds} 圈 ===")
# 计算总脉冲数和时序(精准控制,避免丢步) # 计算总脉冲数和时序(精准控制,避免丢步)
total_pulses = int(rounds * self.pulses_per_round) total_pulses = int(rounds * self.pulses_per_round)
pulse_period = 1.0 / self.pulse_frequency # 脉冲周期(秒) pulse_period = 1.0 / pulse_frequency # 脉冲周期(秒)
half_period = pulse_period / 2 # 占空比50%MA860H最优 half_period = pulse_period / 2 # 占空比50%MA860H最优
logging.info(f"总脉冲数:{total_pulses} | 频率:{self.pulse_frequency}Hz | 周期:{pulse_period * 1000:.2f}ms") print(f"总脉冲数:{total_pulses} | 频率:{pulse_frequency}Hz | 周期:{pulse_period * 1000:.2f}ms")
start_time = time.perf_counter() # 高精度计时(避免丢步) start_time = time.perf_counter() # 高精度计时(避免丢步)
# 发送脉冲序列核心占空比50%的方波) # 发送脉冲序列核心占空比50%的方波)
@ -147,13 +122,13 @@ class StepperMotor:
# 更新下一个脉冲的起始时间 # 更新下一个脉冲的起始时间
start_time += pulse_period start_time += pulse_period
logging.info("旋转完成") print("旋转完成")
def stop(self): def stop(self):
"""紧急停止(置低脉冲引脚)""" """紧急停止(置低脉冲引脚)"""
if self.pul_gpio: if self.pul_gpio:
self.pul_gpio.write(False) self.pul_gpio.write(False)
logging.info("电机已停止") print("🛑 电机已停止")
def close(self): def close(self):
"""释放GPIO资源""" """释放GPIO资源"""
@ -161,12 +136,12 @@ class StepperMotor:
if self.pul_gpio: if self.pul_gpio:
self.pul_gpio.write(False) # 脉冲引脚置低 self.pul_gpio.write(False) # 脉冲引脚置低
self.pul_gpio.close() self.pul_gpio.close()
logging.info("\n PUL引脚已关闭电平置低") print("\n PUL引脚已关闭电平置低")
if self.dir_gpio: if self.dir_gpio:
self.dir_gpio.write(False) # 方向引脚置低 self.dir_gpio.write(False) # 方向引脚置低
self.dir_gpio.close() self.dir_gpio.close()
logging.info("DIR引脚已关闭电平置低") print("DIR引脚已关闭电平置低")
# 重置GPIO对象 # 重置GPIO对象
self.pul_gpio = None self.pul_gpio = None
@ -176,36 +151,65 @@ class StepperMotor:
"""析构函数:确保资源释放""" """析构函数:确保资源释放"""
self.close() self.close()
# ------全局实例-------
GLOBAL_MOTOR = StepperMotor()
#---------控制步进电机外部接口-------------- # -------对外接口----------
def stepper_motor_control(): def motor_start(speed: int, cycle: float, direction: int):
motor = None """
开启电机,用于断电时电机恢复到起始位置
:param speed: 脉冲频率hz)
:param cycle: 旋转圈数
:param direction: 0=负向逆时针1=正向(顺时针)
"""
try: try:
# 创建电机实例(使用默认配置) print("\n=== 启动步进电机 ===")
motor = StepperMotor()
logging.info("\n=== 步进电机控制程序启动 ===") GLOBAL_MOTOR.rotate(pulse_frequency=speed, rounds=cycle, direction=direction)
time.sleep(5) # 暂停5秒
except ImportError:
print("\n❌ 缺少依赖请安装python-periphery")
print("命令pip install python-periphery")
except Exception as e:
print(f"\n❌ 程序异常:{str(e)}")
def motor_stop():
"""紧急停止(仅停止脉冲,保留实例)"""
try:
if GLOBAL_MOTOR:
GLOBAL_MOTOR.stop()
except Exception as e:
print("停止失败:{e}")
def align_wire(speed: int, cycle: float):
"""
使线条对齐
:param speed: 脉冲频率hz)
:param cycle: 旋转圈数
"""
try:
print("\n=== 启动线条对齐 ===")
# 靠近电机方向 逆时针 # 靠近电机方向 逆时针
motor.rotate(rounds=10.0, direction=motor.COUNTER_CLOCKWISE) GLOBAL_MOTOR.rotate(pulse_frequency=speed, rounds=cycle, direction=0)
time.sleep(5) # 暂停5秒 time.sleep(5) # 暂停5秒
# 远离电机方向 顺时针 # 远离电机方向 顺时针
motor.rotate(rounds=10.0, direction=motor.CLOCKWISE) GLOBAL_MOTOR.rotate(pulse_frequency=speed,rounds=10.0, direction=1)
time.sleep(5) # 暂停5秒 time.sleep(5) # 暂停5秒
except PermissionError:
logging.info("\n 权限不足:请用 sudo 运行!")
logging.info("命令sudo python3 double_direction_motor.py")
except ImportError: except ImportError:
logging.info("\n 缺少依赖请安装python-periphery") print("\n 缺少依赖请安装python-periphery")
logging.info("命令pip install python-periphery") print("命令pip install python-periphery")
except Exception as e: except Exception as e:
logging.info(f"\n 程序异常:{str(e)}") print(f"\n 程序异常:{str(e)}")
finally:
if motor: def cleanup():
motor.close() """程序退出时统一清理"""
logging.info("程序退出完成") if GLOBAL_MOTOR:
GLOBAL_MOTOR.close()
if __name__ == '__main__': if __name__ == '__main__':
stepper_motor_control() motor_start(2500, 10.0, 1)

View File

@ -1,159 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
# @Time : 2026/1/4 15:06
# @Author : reenrr
# @File : stepper_motor_test.py
# @Desc : 线条厂控制步进电机
"""
import time
from periphery import GPIO
import sys
sys.setswitchinterval(0.000001) # 减小线程切换间隔
# --------------
# 硬件参数配置(必须与你的设备匹配!)
# --------------
# 1. 脉冲PUL引脚配置 → GPIO32
PUL_Pin = 32
# 2. 方向DIR引脚配置 → GPIO33
DIR_Pin = 33
# 3. LCDA257C驱动器核心参数关键与拨码/软件设置一致)
PULSES_PER_ROUND = 400 # 每圈脉冲数(按驱动器细分拨码调整)
PULSE_FREQUENCY = 5000 # 初始测试频率建议从500开始逐步调高
# 4. LCDA257C时序参数遵循手册要求不可低于最小值
MIN_DIR_DELAY_US = 10 # DIR提前PUL的最小延迟>8us
PULSE_LOW_MIN_US = 2 # 脉冲低电平最小宽度≥2us
PULSE_HIGH_MIN_US = 2 # 脉冲高电平最小宽度≥2us
# 5. 方向电平定义(可根据实际测试调整)
CLOCKWISE_LEVEL = True # 顺时针→DIR高电平
COUNTER_CLOCKWISE_LEVEL = False # 逆时针→DIR低电平
def init_stepper() -> tuple[GPIO, GPIO]:
"""初始化PUL和DIR引脚输出模式适配LCDA257C"""
try:
# 初始化脉冲引脚(输出模式)
pul_gpio = GPIO(PUL_Pin, "out")
# 初始化方向引脚(输出模式)
dir_gpio = GPIO(DIR_Pin, "out")
# 初始状态:脉冲低电平,方向低电平(驱动器空闲状态)
pul_gpio.write(False)
dir_gpio.write(False)
print(f"✅ PUL引脚初始化完成{PUL_Pin}引脚")
print(f"✅ DIR引脚初始化完成{DIR_Pin}引脚")
return pul_gpio, dir_gpio
except FileNotFoundError:
raise RuntimeError(f"GPIO芯片不存在检查 {PUL_Pin} 是否存在命令ls /dev/gpiochip*")
except PermissionError:
raise RuntimeError("GPIO权限不足请用 sudo 运行程序sudo python test.py")
except Exception as e:
raise RuntimeError(f"GPIO初始化失败{str(e)}") from e
def rotate(pul_gpio: GPIO, dir_gpio: GPIO, rounds: float, direction: str = "clockwise",
test_freq: int = PULSE_FREQUENCY):
"""
控制LCDA257C驱动器旋转优化版脉冲发送解决频率不生效问题
:param pul_gpio: PUL引脚GPIO对象
:param dir_gpio: DIR引脚GPIO对象
:param rounds: 旋转圈数(正数,可小数)
:param direction: 方向clockwise/counterclockwise
:param test_freq: 本次旋转使用的脉冲频率(覆盖全局默认值)
"""
# 1. 参数合法性校验
if rounds <= 0:
print("❌ 圈数必须为正数!")
return
if test_freq < 100:
print("❌ 频率过低≥100Hz请调整test_freq参数")
return
# 2. 设置旋转方向严格遵循DIR提前时序
if direction == "clockwise":
dir_gpio.write(CLOCKWISE_LEVEL)
print(f"\n=== 顺时针旋转 {rounds} 圈(目标频率:{test_freq}Hz===")
elif direction == "counterclockwise":
dir_gpio.write(COUNTER_CLOCKWISE_LEVEL)
print(f"\n=== 逆时针旋转 {rounds} 圈(目标频率:{test_freq}Hz===")
else:
print("❌ 方向参数错误!仅支持 clockwise/counterclockwise")
return
# DIR电平设置后延迟≥8us满足LCDA257C时序要求
time.sleep(MIN_DIR_DELAY_US / 1_000_000)
# 3. 计算脉冲参数(确保高低电平≥最小宽度)
total_pulses = int(rounds * PULSES_PER_ROUND)
pulse_period = 1.0 / test_freq # 脉冲周期(秒)
# 确保高低电平宽度不低于驱动器要求(避免脉冲识别失败)
high_period = max(pulse_period / 2, PULSE_HIGH_MIN_US / 1_000_000)
low_period = max(pulse_period / 2, PULSE_LOW_MIN_US / 1_000_000)
# 打印参数(便于调试)
print(f"总脉冲数:{total_pulses} | 理论高电平:{high_period * 1e6:.1f}us | 理论低电平:{low_period * 1e6:.1f}us")
# 4. 优化版脉冲发送解决Python高频延时不准问题
start_total = time.perf_counter() # 高精度计时(统计实际频率)
for _ in range(total_pulses):
# 高电平直接sleep避免while循环的调度延迟
pul_gpio.write(True)
time.sleep(high_period)
# 低电平
pul_gpio.write(False)
time.sleep(low_period)
end_total = time.perf_counter()
# 5. 计算实际频率(验证是否达到目标)
actual_duration = end_total - start_total
actual_freq = total_pulses / actual_duration if actual_duration > 0 else 0
print(f"✅ 旋转完成 | 实际频率:{actual_freq:.0f}Hz | 耗时:{actual_duration:.2f}")
def motor_test_demo():
"""电机测试示例(逐步提升频率,验证转速变化)"""
pul_gpio = None
dir_gpio = None
try:
# 初始化引脚
pul_gpio, dir_gpio = init_stepper()
print("\n=== 步进电机频率测试程序启动 ===")
while True:
print(f"\n===== 测试频率:{PULSE_FREQUENCY}Hz =====")
# 远离电机方向 顺时针
rotate(pul_gpio, dir_gpio, rounds=10.0, direction="clockwise")
time.sleep(5) #
# 靠近电机方向 逆时针
rotate(pul_gpio, dir_gpio, rounds=10.0, direction="counterclockwise")
time.sleep(5) #
except Exception as e:
print(f"\n❌ 程序异常:{str(e)}")
finally:
# 安全释放GPIO资源必须执行避免引脚电平残留
if pul_gpio:
pul_gpio.write(False)
pul_gpio.close()
print("\n✅ PUL引脚已关闭低电平")
if dir_gpio:
dir_gpio.write(False)
dir_gpio.close()
print("✅ DIR引脚已关闭低电平")
print("✅ 程序退出完成")
if __name__ == '__main__':
# 运行测试demo必须sudo执行
motor_test_demo()

View File

@ -1,198 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
# @Time : 2026/1/4 19:13
# @Author : reenrr
# @File : stepper_motor_test1.py
# @Desc : 线条厂控制步进电机测试 应该不会丢步
"""
import time
from periphery import GPIO # 若未安装pip install python-periphery
# ------------参数配置-------------
# 1. 脉冲PUL引脚配置 → GPIO32
PUL_Pin = 32
# 2. 方向DIR引脚配置 → GPIO33
DIR_Pin = 33
# 3. 驱动器参数(根据拨码调整,默认不变)
PULSES_PER_ROUND = 400 # 每圈脉冲数SW5~SW8拨码默认400
PULSE_FREQUENCY = 2500 # 脉冲频率Hz新手建议500~2000最大200KHz
class StepperMotor:
"""新力川MA860H驱动器步进电机控制类"""
# 方向常量定义
CLOCKWISE = "clockwise" # 顺时针
COUNTER_CLOCKWISE = "counterclockwise" # 逆时针
def __init__(self,
pul_pin: int = PUL_Pin,
dir_pin: int = DIR_Pin,
pulses_per_round: int = PULSES_PER_ROUND,
pulse_frequency: int = PULSE_FREQUENCY,
clockwise_level: bool = True,
counter_clockwise_level: bool = False):
"""
初始化步进电机控制器
:param pul_pin: 脉冲引脚
:param dir_pin: 方向引脚
:param pulses_per_round: 每圈脉冲数SW5~SW8拨码默认400
:param pulse_frequency: 脉冲频率Hz新手建议500~2000最大200KHz
:param clockwise_level: 顺时针对应的DIR电平
:param counter_clockwise_level: 逆时针对应的DIR电平
"""
# 硬件配置参数
self.pul_pin = pul_pin
self.dir_pin = dir_pin
# 驱动器参数
self.pulses_per_round = pulses_per_round
self.pulse_frequency = pulse_frequency
self.clockwise_level = clockwise_level
self.counter_clockwise_level = counter_clockwise_level
# GPIO对象初始化
self.pul_gpio = None
self.dir_gpio = None
# 初始化GPIO
self._init_gpio()
def _init_gpio(self):
"""初始化PUL和DIR引脚内部方法"""
try:
# 初始化脉冲引脚(输出模式)
self.pul_gpio = GPIO(self.pul_pin, "out")
# 初始化方向引脚(输出模式)
self.dir_gpio = GPIO(self.dir_pin, "out")
# 初始电平置低(避免电机误动作)
self.pul_gpio.write(False)
self.dir_gpio.write(False)
print(f"✅ PUL引脚初始化完成{self.pul_pin} 引脚")
print(f"✅ DIR引脚初始化完成{self.dir_pin} 引脚")
except PermissionError:
raise RuntimeError("权限不足请用sudo运行程序sudo python xxx.py")
except Exception as e:
raise RuntimeError(f"GPIO初始化失败{str(e)}") from e
def _validate_rounds(self, rounds: float) -> bool:
"""验证圈数是否合法(内部方法)"""
if rounds <= 0:
print("❌ 圈数必须为正数")
return False
return True
def _validate_direction(self, direction: str) -> bool:
"""验证方向参数是否合法(内部方法)"""
if direction not in [self.CLOCKWISE, self.COUNTER_CLOCKWISE]:
print(f"❌ 方向参数错误:仅支持 {self.CLOCKWISE}/{self.COUNTER_CLOCKWISE}")
return False
return True
def rotate(self, rounds: float, direction: str = CLOCKWISE):
"""
控制电机旋转(支持正反转)
:param rounds: 旋转圈数可小数如0.5=半圈)
:param direction: 方向clockwise=顺时针counterclockwise=逆时针)
"""
# 参数验证
if not self._validate_rounds(rounds) or not self._validate_direction(direction):
return
# 设置旋转方向DIR电平
if direction == self.CLOCKWISE: # 顺时针
self.dir_gpio.write(self.clockwise_level)
print(f"\n=== 顺时针旋转 {rounds} 圈 ===")
else: # 逆时针
self.dir_gpio.write(self.counter_clockwise_level)
print(f"\n=== 逆时针旋转 {rounds} 圈 ===")
# 计算总脉冲数和时序(精准控制,避免丢步)
total_pulses = int(rounds * self.pulses_per_round)
pulse_period = 1.0 / self.pulse_frequency # 脉冲周期(秒)
half_period = pulse_period / 2 # 占空比50%MA860H最优
print(f"总脉冲数:{total_pulses} | 频率:{self.pulse_frequency}Hz | 周期:{pulse_period * 1000:.2f}ms")
start_time = time.perf_counter() # 高精度计时(避免丢步)
# 发送脉冲序列核心占空比50%的方波)
for _ in range(total_pulses):
# 高电平
self.pul_gpio.write(True)
# 精准延时比time.sleep稳定适配高频脉冲
while time.perf_counter() - start_time < half_period:
pass
# 低电平
self.pul_gpio.write(False)
# 更新下一个脉冲的起始时间
start_time += pulse_period
print("✅ 旋转完成")
def stop(self):
"""紧急停止(置低脉冲引脚)"""
if self.pul_gpio:
self.pul_gpio.write(False)
print("🛑 电机已停止")
def close(self):
"""释放GPIO资源"""
# 安全释放GPIO资源关键避免引脚电平残留
if self.pul_gpio:
self.pul_gpio.write(False) # 脉冲引脚置低
self.pul_gpio.close()
print("\n✅ PUL引脚已关闭电平置低")
if self.dir_gpio:
self.dir_gpio.write(False) # 方向引脚置低
self.dir_gpio.close()
print("✅ DIR引脚已关闭电平置低")
# 重置GPIO对象
self.pul_gpio = None
self.dir_gpio = None
def __del__(self):
"""析构函数:确保资源释放"""
self.close()
# 使用示例
def motor_demo():
"""电机控制示例"""
motor = None
try:
# 创建电机实例(使用默认配置)
motor = StepperMotor()
print("\n=== 步进电机控制程序启动 ===")
while True:
# 靠近电机方向 逆时针
motor.rotate(rounds=10.0, direction=motor.COUNTER_CLOCKWISE)
time.sleep(5) # 暂停5秒
# 远离电机方向 顺时针
motor.rotate(rounds=10.0, direction=motor.CLOCKWISE)
time.sleep(5) # 暂停5秒
except PermissionError:
print("\n❌ 权限不足:请用 sudo 运行!")
print("命令sudo python3 double_direction_motor.py")
except ImportError:
print("\n❌ 缺少依赖请安装python-periphery")
print("命令pip install python-periphery")
except Exception as e:
print(f"\n❌ 程序异常:{str(e)}")
finally:
if motor:
motor.close()
print("✅ 程序退出完成")
if __name__ == '__main__':
motor_demo()

View File

@ -1,291 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
# @Time : 2026/1/5 16:18
# @Author : reenrr
# @File : test.py
# @Desc : 步进电机添加记录脉冲数量(防止断电情况)
'''
import time
import json
import os
from periphery import GPIO
# ------------参数配置-------------
# 1. 脉冲PUL引脚配置 → GPIO32
PUL_Pin = 32
# 2. 方向DIR引脚配置 → GPIO33
DIR_Pin = 33
# 3. 驱动器参数(根据拨码调整,默认不变)
PULSES_PER_ROUND = 400 # 每圈脉冲数SW5~SW8拨码默认400
PULSE_FREQUENCY = 2500 # 脉冲频率Hz新手建议500~2000最大200KHz
# 4. 计数持久化配置
COUNT_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "motor_pulse_count.json")
SAVE_INTERVAL = 10 # 每发送10个脉冲保存一次计数减少IO开销可根据需求调整
class StepperMotor:
"""新力川MA860H驱动器步进电机控制类"""
# 方向常量定义
CLOCKWISE = "clockwise" # 顺时针
COUNTER_CLOCKWISE = "counterclockwise" # 逆时针
def __init__(self,
pul_pin: int = PUL_Pin,
dir_pin: int = DIR_Pin,
pulses_per_round: int = PULSES_PER_ROUND,
pulse_frequency: int = PULSE_FREQUENCY,
clockwise_level: bool = True,
counter_clockwise_level: bool = False):
"""
初始化步进电机控制器
:param pul_pin: 脉冲引脚
:param dir_pin: 方向引脚
:param pulses_per_round: 每圈脉冲数SW5~SW8拨码默认400
:param pulse_frequency: 脉冲频率Hz新手建议500~2000最大200KHz
:param clockwise_level: 顺时针对应的DIR电平
:param counter_clockwise_level: 逆时针对应的DIR电平
"""
# 硬件配置参数
self.pul_pin = pul_pin
self.dir_pin = dir_pin
# 驱动器参数
self.pulses_per_round = pulses_per_round
self.pulse_frequency = pulse_frequency
self.clockwise_level = clockwise_level
self.counter_clockwise_level = counter_clockwise_level
# GPIO对象初始化
self.pul_gpio = None
self.dir_gpio = None
# 脉冲计数相关(核心:记录已发送的脉冲数)
self.current_pulse_count = 0 # 本次旋转已发送的脉冲数
self.total_pulse_history = self._load_pulse_count() # 历史总脉冲数(从文件加载)
self.current_rotate_target = 0 # 本次旋转的目标脉冲数
# 初始化GPIO
self._init_gpio()
def _load_pulse_count(self):
"""加载历史脉冲计数(程序启动时执行)"""
try:
if os.path.exists(COUNT_FILE):
with open(COUNT_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
# 返回历史总脉冲数默认0
return int(data.get("total_pulses", 0))
except Exception as e:
print(f"[WARN] 加载历史计数失败:{e}将从0开始计数")
return 0
def _save_pulse_count(self):
"""保存当前总脉冲数到文件(持久化)"""
try:
with open(COUNT_FILE, "w", encoding="utf-8") as f:
json.dump({
"total_pulses": self.total_pulse_history,
"update_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
}, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"[ERROR] 保存计数失败:{e}")
def _init_gpio(self):
"""初始化PUL和DIR引脚内部方法"""
try:
# 初始化脉冲引脚(输出模式)
self.pul_gpio = GPIO(self.pul_pin, "out")
# 初始化方向引脚(输出模式)
self.dir_gpio = GPIO(self.dir_pin, "out")
# 初始电平置低(避免电机误动作)
self.pul_gpio.write(False)
self.dir_gpio.write(False)
print(f"[OK] PUL引脚初始化完成{self.pul_pin} 引脚")
print(f"[OK] DIR引脚初始化完成{self.dir_pin} 引脚")
print(
f"[INFO] 历史累计脉冲数:{self.total_pulse_history} → 对应圈数:{self.total_pulse_history / self.pulses_per_round:.2f}")
except PermissionError:
raise RuntimeError("权限不足请用sudo运行程序sudo python xxx.py")
except Exception as e:
raise RuntimeError(f"GPIO初始化失败{str(e)}") from e
def _validate_rounds(self, rounds: float) -> bool:
"""验证圈数是否合法(内部方法)"""
if rounds <= 0:
print("[ERROR] 圈数必须为正数")
return False
return True
def _validate_direction(self, direction: str) -> bool:
"""验证方向参数是否合法(内部方法)"""
if direction not in [self.CLOCKWISE, self.COUNTER_CLOCKWISE]:
print(f"[ERROR] 方向参数错误:仅支持 {self.CLOCKWISE}/{self.COUNTER_CLOCKWISE}")
return False
return True
def rotate(self, rounds: float, direction: str = CLOCKWISE):
"""
控制电机旋转(支持正反转,实时记录脉冲数)
:param rounds: 旋转圈数可小数如0.5=半圈)
:param direction: 方向clockwise=顺时针counterclockwise=逆时针)
"""
# 参数验证
if not self._validate_rounds(rounds) or not self._validate_direction(direction):
return
# 重置本次旋转的计数
self.current_pulse_count = 0
# 计算本次旋转的目标脉冲数
self.current_rotate_target = int(rounds * self.pulses_per_round)
# 设置旋转方向DIR电平
if direction == self.CLOCKWISE: # 顺时针
self.dir_gpio.write(self.clockwise_level)
print(f"\n=== 顺时针旋转 {rounds} 圈 ===")
else: # 逆时针
self.dir_gpio.write(self.counter_clockwise_level)
print(f"\n=== 逆时针旋转 {rounds} 圈 ===")
# 计算总脉冲数和时序(精准控制,避免丢步)
total_pulses = self.current_rotate_target
pulse_period = 1.0 / self.pulse_frequency # 脉冲周期(秒)
half_period = pulse_period / 2 # 占空比50%MA860H最优
print(f"目标脉冲数:{total_pulses} | 频率:{self.pulse_frequency}Hz | 周期:{pulse_period * 1000:.2f}ms")
start_time = time.perf_counter() # 高精度计时(避免丢步)
try:
# 发送脉冲序列核心占空比50%的方波,实时计数)
for _ in range(total_pulses):
# 高电平
self.pul_gpio.write(True)
# 精准延时比time.sleep稳定适配高频脉冲
while time.perf_counter() - start_time < half_period:
pass
# 低电平
self.pul_gpio.write(False)
# 更新下一个脉冲的起始时间
start_time += pulse_period
# 🌟 核心:累加本次脉冲计数
self.current_pulse_count += 1
self.total_pulse_history += 1
# 每发送SAVE_INTERVAL个脉冲保存一次计数减少IO开销
if self.current_pulse_count % SAVE_INTERVAL == 0:
self._save_pulse_count()
print(
f"[OK] 旋转完成 → 本次发送脉冲:{self.current_pulse_count} | 累计脉冲:{self.total_pulse_history} | 累计圈数:{self.total_pulse_history / self.pulses_per_round:.2f}")
except Exception as e:
# 🌟 关键:异常发生时(如断电前的程序崩溃),立即保存当前计数
self._save_pulse_count()
# 计算已完成的圈数
completed_rounds = self.current_pulse_count / self.pulses_per_round
remaining_pulses = self.current_rotate_target - self.current_pulse_count
remaining_rounds = remaining_pulses / self.pulses_per_round
print(f"\n[ERROR] 旋转过程中异常:{e}")
print(f"[INFO] 异常时已发送脉冲:{self.current_pulse_count} → 已完成圈数:{completed_rounds:.2f}")
print(f"[INFO] 剩余未发送脉冲:{remaining_pulses} → 剩余圈数:{remaining_rounds:.2f}")
print(
f"[INFO] 累计总脉冲:{self.total_pulse_history} → 累计总圈数:{self.total_pulse_history / self.pulses_per_round:.2f}")
raise # 抛出异常,让上层处理
def get_current_status(self):
"""获取当前电机状态(脉冲数、圈数)"""
return {
"total_pulses": self.total_pulse_history,
"total_rounds": self.total_pulse_history / self.pulses_per_round,
"last_rotate_completed_pulses": self.current_pulse_count,
"last_rotate_completed_rounds": self.current_pulse_count / self.pulses_per_round,
"last_rotate_target_pulses": self.current_rotate_target,
"last_rotate_target_rounds": self.current_rotate_target / self.pulses_per_round
}
def reset_count(self):
"""重置累计计数(按需使用,如电机归位后)"""
self.total_pulse_history = 0
self._save_pulse_count()
print("[INFO] 累计脉冲计数已重置为0")
def stop(self):
"""紧急停止(置低脉冲引脚)"""
if self.pul_gpio:
self.pul_gpio.write(False)
# 停止时保存当前计数
self._save_pulse_count()
print("[STOP] 电机已停止,当前计数已保存")
def close(self):
"""释放GPIO资源"""
# 安全释放GPIO资源关键避免引脚电平残留
if self.pul_gpio:
self.pul_gpio.write(False) # 脉冲引脚置低
self.pul_gpio.close()
print("\n[OK] PUL引脚已关闭电平置低")
if self.dir_gpio:
self.dir_gpio.write(False) # 方向引脚置低
self.dir_gpio.close()
print("[OK] DIR引脚已关闭电平置低")
# 关闭时保存最终计数
self._save_pulse_count()
print(
f"[INFO] 最终累计脉冲:{self.total_pulse_history} → 累计圈数:{self.total_pulse_history / self.pulses_per_round:.2f}")
# 重置GPIO对象
self.pul_gpio = None
self.dir_gpio = None
def __del__(self):
"""析构函数:确保资源释放"""
self.close()
# ----------对外接口-----------
def motor_demo():
"""电机控制示例"""
motor = None
try:
# 创建电机实例(使用默认配置)
motor = StepperMotor()
print("\n=== 步进电机控制程序启动 ===")
# 打印初始状态
init_status = motor.get_current_status()
print(f"[INIT] 初始状态 → 累计圈数:{init_status['total_rounds']:.2f}")
while True:
# 靠近电机方向 逆时针
motor.rotate(rounds=10.0, direction=motor.COUNTER_CLOCKWISE)
time.sleep(5) # 暂停5秒
# 远离电机方向 顺时针
motor.rotate(rounds=10.0, direction=motor.CLOCKWISE)
time.sleep(5) # 暂停5秒
except PermissionError:
print("\n[ERROR] 权限不足:请用 sudo 运行!")
print("命令sudo python3 double_direction_motor.py")
except ImportError:
print("\n[ERROR] 缺少依赖请安装python-periphery")
print("命令pip install python-periphery")
except KeyboardInterrupt:
print("\n[INFO] 用户手动停止程序")
except Exception as e:
print(f"\n[ERROR] 程序异常:{str(e)}")
finally:
if motor:
# 打印最终状态
final_status = motor.get_current_status()
print(f"\n[FINAL] 程序退出状态 → 累计脉冲:{final_status['total_pulses']} | 累计圈数:{final_status['total_rounds']:.2f}")
motor.close()
print("[OK] 程序退出完成")
if __name__ == '__main__':
motor_demo()