Files
AutoControlSystem-G/EMV/EMV.py
2025-09-30 14:44:12 +08:00

685 lines
30 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import socket
import binascii
import time
import threading
import logging
from PySide6.QtCore import Signal, QObject
import numpy as np
from pandas.core.arrays import boolean
import Constant
class RelayController(QObject):
log_signal = Signal(int, str)
take_robot_signal = Signal()
def __init__(self, host='192.168.0.18', port=50000):
super().__init__()
# ===================== 全局线程延时参数 =====================
self.sensor1_loop_delay = 0.1 # SENSOR1 线程轮询间隔(秒)
self.sensor1_error_delay = 1.0 # SENSOR1 出错或暂停时延时(秒)
self.sensor1_post_action_delay = 0.2 # SENSOR1 每次循环后延时(秒)
self.sensor2_loop_delay = 0.2 # SENSOR2 线程轮询间隔(秒)
self.sensor2_loop_lost=0.1 # SENSOR2 线程轮询间隔(秒)
# self.sensor2_loop_delay = 0.5 # SENSOR2 线程轮询间隔(秒)
self.sensor2_error_delay = 0.5 # SENSOR2 出错时延时(秒)
self.sensor2_post_action_delay = 0.2 # SENSOR2 每次循环后延时(秒)
# ===================== 全局动作延时参数 =====================
self.delay_conveyor = 0.5 # 传送带开/关动作延时(一半时间,我在控制程序和线程都加了一样的延时)
self.delay_pusher = 0.05 # 推板开/关动作延时
self.delay_clamp = 0.5 # 夹爪动作延时
self.delay_after_pusher = 5.0 # 推板推出后到重启传动带时间
# ===================== 传感器稳定检测参数 =====================
self.sensor_stable_duration = 1.0 # 传感器状态稳定检测时间(秒)
self.sensor_max_attempts = 3 # 连续检测次数达到此值判定有效
self.sensor1_debounce_time = 1.0 # 传感器1防抖时间
self.sensor2_debounce_time = 3.0 # 袋尾检测3秒有效信号
# ===================== 网络与设备映射 =====================
self.host = host
self.port = port
self.CONVEYOR1 = 'conveyor1'
self.PUSHER = 'pusher'
self.CONVEYOR2 = 'conveyor2'
self.CONVEYOR2_REVERSE = 'conveyor2_reverse'
self.CLAMP = 'clamp'
self.PUSHER1 = 'pusher1'
self.SENSOR1 = 'sensor1'
self.SENSOR2 = 'sensor2'
self.valve_commands = {
self.CONVEYOR1: {'open': '000000000006010500070000', 'close': '00000000000601050007FF00'},
# self.CONVEYOR11: {'open': '00000000000601050000FF00', 'close': '000000000006010500000000'},
self.PUSHER: {'open': '00000000000601050001FF00', 'close': '000000000006010500010000'},
#滚筒2000 0012正转2000 0022 2001变频器频率调整 2000正反转。
self.CONVEYOR2: {'open': '000100000006020620000012', 'close': '000100000006020620000001'},
#DO4
self.CLAMP: {'open': '00000000000601050003FF00', 'close': '000000000006010500030000'},
#DO5 回 DO2推
self.PUSHER1: {'open': '00000000000601050004FF00', 'close': '000000000006010500040000'},
#滚筒反转
self.CONVEYOR2_REVERSE: {'open': '000100000006020620000022', 'close': '000100000006020620000001'}
}
#devices:读取继点器的状态
#sensors 传感器的状态 D12
self.read_status_command = {
'devices': '000000000006010100000008',
'sensors': '000000000006010200000008'
}
self.device_bit_map = {
self.CONVEYOR1: 0,
self.PUSHER: 1,
self.CONVEYOR2: 2,
self.CLAMP: 3,
self.PUSHER1: 4,
self.CONVEYOR2_REVERSE: 5
}
self.sensor_bit_map = {
self.SENSOR1: 0,
self.SENSOR2: 1,
}
self.device_name_map = {
self.CONVEYOR1: "传送带1",
self.PUSHER: "推板开",
self.CONVEYOR2: "传送带2",
self.CLAMP: "机械臂夹爪",
self.PUSHER1: "推板关",
self.CONVEYOR2_REVERSE: "传送带2反转"
}
self.sensor_name_map = {
self.SENSOR1: '位置传感器1',
self.SENSOR2: '位置传感器2',
}
# ===================== 状态控制变量 =====================
self._running = False #线程运行标识
self._ispause = False #线程暂停标识
self._sensor1_thread = None
self._sensor2_thread = None
self.required_codes = {'0101', '0103'} # 有效状态码传感器1
self.required_codes_1 = {'0102', '0103'} # 有效状态码传感器2
self.sensor1_triggered = False
self.sensor1_last_time = 0
self.sensor2_ready = False #默认不打开
self.motor_stopped_by_sensor2 = True
self.is_drop_35=False #是否是35码
# ===================== 基础通信方法 =====================
def send_command(self, command_hex, retry_count=2, source='unknown'):
if Constant.DebugPosition:
# print(f"[发送命令] {command_hex} ({source})")
return None
byte_data = binascii.unhexlify(command_hex)
for attempt in range(retry_count):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(10)
sock.connect((self.host, self.port))
sock.send(byte_data)
response = sock.recv(1024)
# hex_response = binascii.hexlify(response).decode('utf-8')
#if source == 'sensor':
#print(f"[传感器响应] {hex_response}")
#elif source == 'device':
#print(f"[设备控制响应] {hex_response}")
#else:
#print(f"[通信响应] {hex_response}")
return response
except Exception as e:
# print(f"网络继电器通信错误 ({source}): {e}, 尝试重连... ({attempt + 1}/{retry_count})")
self.log_signal.emit(logging.INFO,f"网络继电器通信错误 ({source}): {e}, 尝试重连... ({attempt + 1}/{retry_count})")
time.sleep(5)
self.trigger_alarm()
return None
def trigger_alarm(self):
self.log_signal.emit(logging.ERROR,"警告:网络继电器连续多次通信失败,请检查设备连接!")
# ===================== 状态读取方法 =====================
def get_all_device_status(self, command_type='devices'):
if Constant.DebugPosition:
return {self.SENSOR2:True}
command = self.read_status_command.get(command_type)
if not command:
print(f"未知的网络继电器读取类型: {command_type}")
return {}
source = 'sensor' if command_type == 'sensors' else 'device'
response = self.send_command(command, source=source)
status_dict = {}
if response and len(response) >= 10:
status_byte = response[9]
status_bin = f"{status_byte:08b}"[::-1]
bit_map = self.device_bit_map if command_type == 'devices' else self.sensor_bit_map
name_map = self.device_name_map if command_type == 'devices' else self.sensor_name_map
for key, bit_index in bit_map.items():
status_dict[key] = status_bin[bit_index] == '1'
else:
print(f"网络继电器[{command_type}] 读取状态失败或响应无效")
return status_dict
def get_all_sensor_responses(self, command_type='sensors'):
"""
获取所有传感器的原始 Modbus 响应字符串
示例:{'sensor1': '00000000000401020101', 'sensor2': '00000000000401020100'}
"""
command = self.read_status_command.get(command_type)
if not command:
print(f"未知的读取类型: {command_type}")
return {}
source = 'sensor' if command_type == 'sensors' else 'device'
response = self.send_command(command, source=source)
responses = {}
if response and len(response) >= 10:
hex_response = binascii.hexlify(response).decode('utf-8')
# print(f"[原始响应][{command_type}] {hex_response}")
# 假设传感器数据从第 9 字节开始,长度为 2 字节
for name, bit_index in self.sensor_bit_map.items():
offset = 9 + (bit_index // 8)
bit_pos = bit_index % 8
byte = response[offset]
status = (byte >> bit_pos) & 1
responses[name] = hex_response
else:
print(f"[{command_type}] 无法获取响应数据")
return responses
def parse_status_code(self, response):
"""
从 Modbus 响应字符串中提取状态码(后两位)
示例00000000000401020101 -> '01'
"""
if isinstance(response, str) and len(response) >= 18:
return response[16:20]
return None
def is_valid_sensor_status(self, sensor_name):
stable_count = 0
for _ in range(int(self.sensor_stable_duration / self.sensor1_loop_delay)):
responses = self.get_all_sensor_responses('sensors')
response = responses.get(sensor_name)
if not response:
stable_count = 0
else:
status_code = self.parse_status_code(response)
if status_code in self.required_codes:
stable_count += 1
if stable_count >= self.sensor_max_attempts:
return True
else:
stable_count = 0
time.sleep(self.sensor1_loop_delay)
return False
def is_valid_sensor_status_1(self, sensor_name):
stable_count = 0
for _ in range(int(self.sensor_stable_duration / self.sensor2_loop_delay)):
responses = self.get_all_sensor_responses('sensors')
response = responses.get(sensor_name)
if not response:
print(f"[警告] 无法获取 {sensor_name} 的响应,尝试重试...")
stable_count = 0
else:
status_code = self.parse_status_code(response)
if status_code in self.required_codes_1:
stable_count += 1
if stable_count >= self.sensor_max_attempts:
return True
else:
stable_count = 0
time.sleep(self.sensor2_loop_delay)
return False
def is_valid_sensor(self,sensor_name):
"""
检查传感器状态是否有效
参数:
sensor_name: 传感器名称
返回:
True: 传感器状态有效
False: 传感器状态无效
"""
responses = self.get_all_sensor_responses('sensors')
response = responses.get(sensor_name)
if not response:
print(f"[警告] 无法获取 {sensor_name} 的响应,尝试重试...")
return False
else:
temp_status_code = self.parse_status_code(response)
if temp_status_code in self.required_codes_1:
return True
else:
return False
def is_valid_sensor_signal_stable(self, sensor_name, detection_duration=3.0, stability_duration=2.5, check_interval=0.1):
"""
检测在指定时间窗口内是否存在持续稳定的有效信号
参数:
sensor_name: 传感器名称
detection_duration: 总检测时间窗口(秒)默认为3秒
stability_duration: 信号需要持续稳定的时间(秒)默认为2.5秒
check_interval: 检测间隔(秒)默认为0.1秒
返回:
True: 在时间窗口内检测到持续稳定的有效信号
False: 未检测到持续稳定的有效信号
"""
stable_start_time = None # 记录首次检测到有效信号的时间
start_time = time.time()
if not self.is_valid_sensor(sensor_name):
return False # 传感器状态无效,返回
else:
stable_start_time = time.time() # 首次检测到有效信号
time.sleep(check_interval)
while time.time() - start_time < detection_duration:
temp_is_valid = self.is_valid_sensor(sensor_name)
if temp_is_valid:
if time.time() - stable_start_time >= stability_duration:
return True # 信号持续稳定达到要求时间
else:
stable_start_time = None # 信号不稳定,重置计时
time.sleep(check_interval)
return False
def is_valid_sensor2_status_lost(self, sensor_name):
"""
检查传感器2是否丢失信号
"""
stable_count = 0
_try_nums=5 # 尝试次数
for _ in range(_try_nums):
responses = self.get_all_sensor_responses('sensors')
response = responses.get(sensor_name)
if not response:
print(f"[警告] 无法获取 {sensor_name} 的响应,尝试重试...")
stable_count = 0
else:
status_code = self.parse_status_code(response)
if status_code not in self.required_codes_1:
stable_count += 1
if stable_count >= self.sensor_max_attempts:
return True
else:
stable_count = 0
time.sleep(self.sensor2_loop_lost)
return False
# ===================== 动作控制方法 =====================
def open(self, conveyor1=False, pusher=False, conveyor2=False, clamp=False, pusher1=False, conveyor2_reverse=False)->bool:
"""将if改成elif,一次只能打开一个设备否则会造成延时sleep时间不一致问题。并返回成功核验"""
loc_ret=False
loc_reponse=None
loc_send_command=None
status = self.get_all_device_status()
if conveyor1:
if not status.get(self.CONVEYOR1, False):
loc_send_command=self.valve_commands[self.CONVEYOR1]['open']
loc_reponse=self.send_command(loc_send_command)
time.sleep(self.delay_conveyor)
else:
loc_ret=True
elif pusher:
if not status.get(self.PUSHER, False):
loc_send_command=self.valve_commands[self.PUSHER]['open']
loc_reponse=self.send_command(loc_send_command)
time.sleep(self.delay_pusher)
else:
loc_ret=True
elif conveyor2:
if not status.get(self.CONVEYOR2, False):
loc_send_command=self.valve_commands[self.CONVEYOR2]['open']
loc_reponse=self.send_command(loc_send_command)
time.sleep(self.delay_conveyor)
else:
loc_ret=True
elif clamp:
if not status.get(self.CLAMP, False):
loc_send_command=self.valve_commands[self.CLAMP]['open']
loc_reponse=self.send_command(loc_send_command)
time.sleep(self.delay_clamp)
else:
loc_ret=True
elif pusher1:
if not status.get(self.PUSHER1, False):
loc_send_command=self.valve_commands[self.PUSHER1]['open']
loc_reponse=self.send_command(loc_send_command)
time.sleep(self.delay_pusher)
else:
loc_ret=True
elif conveyor2_reverse:
if not status.get(self.CONVEYOR2_REVERSE, False):
loc_send_command=self.valve_commands[self.CONVEYOR2_REVERSE]['open']
loc_reponse=self.send_command(loc_send_command)
time.sleep(self.delay_conveyor)
else:
loc_ret=True
if loc_reponse and len(loc_reponse) >= 10:
lol_hex_response = binascii.hexlify(response).decode('utf-8')
if lol_hex_response == loc_send_command:
loc_ret=True
if Constant.DebugPosition:
loc_ret=True
return loc_ret
def close(self, conveyor1=False, pusher=False, conveyor2=False, clamp=False, pusher1=False, conveyor2_reverse=False)->bool:
loc_ret=False
loc_reponse=None
loc_send_command=None
if conveyor1:
loc_send_command=self.valve_commands[self.CONVEYOR1]['close']
loc_reponse=self.send_command(loc_send_command)
time.sleep(self.delay_conveyor)
if pusher:
loc_send_command=self.valve_commands[self.PUSHER]['close']
loc_reponse=self.send_command(loc_send_command)
time.sleep(self.delay_pusher)
if conveyor2:
loc_send_command=self.valve_commands[self.CONVEYOR2]['close']
loc_reponse=self.send_command(loc_send_command)
time.sleep(self.delay_conveyor)
if clamp:
loc_send_command=self.valve_commands[self.CLAMP]['close']
loc_reponse=self.send_command(loc_send_command)
time.sleep(self.delay_clamp)
if pusher1:
loc_send_command=self.valve_commands[self.PUSHER1]['close']
loc_reponse=self.send_command(loc_send_command)
time.sleep(self.delay_pusher)
if conveyor2_reverse:
loc_send_command=self.valve_commands[self.CONVEYOR2_REVERSE]['close']
loc_reponse=self.send_command(loc_send_command)
time.sleep(self.delay_conveyor)
if loc_reponse and len(loc_reponse) >= 10:
lol_hex_response = binascii.hexlify(response).decode('utf-8')
if lol_hex_response == loc_send_command:
loc_ret=True
if Constant.DebugPosition:
loc_ret=True
return loc_ret
# ===================== 传感器处理线程 =====================
def handle_sensor1(self):
while self._running:
if self._ispause and not self.sensor1_triggered:
#暂停线程, 保证线程1执行完传感器操作
time.sleep(self.sensor1_error_delay)
continue
try:
if self.is_valid_sensor_status(self.SENSOR1):
current_time = time.time()
if not self.sensor1_triggered and (
current_time - self.sensor1_last_time) > self.sensor1_debounce_time:
self.sensor1_triggered = True
self.sensor1_last_time = current_time
# 1.停止包装机皮带电机:关闭 conveyor1
self.close(conveyor1=True)
time.sleep(self.delay_conveyor)
# 2.推板开启:推出去动作
self.open(pusher=True)
time.sleep(self.delay_pusher)
self.close(pusher=True)
# 推板推出后重新启动电机时间传送带1延时5秒
time.sleep(self.delay_after_pusher)
# 4.推板关闭:推板收回来动作
self.open(pusher1=True)
time.sleep(self.delay_pusher)
self.close(pusher1=True)
# 3.重新开启包装机皮带电机: 开启conveyor1
time.sleep(self.delay_conveyor)
self.open(conveyor1=True)
time.sleep(1)
# 5. 状态检查(可选)
status = self.get_all_device_status()
if status.get('conveyor1') and not status.get('pusher'):
print("流程完成1皮带运行中推板已收回")
else:
print("状态异常,请检查设备")
# 流程结束,重置触发标志
self.sensor1_triggered = False
time.sleep(self.sensor1_loop_delay)
except Exception as e:
print(f"SENSOR1 处理错误: {e}")
self.log_signal.emit(logging.ERROR,f"SENSOR1 处理错误: {e}")
self.sensor1_triggered = False
time.sleep(self.sensor1_error_delay)
def handle_sensor2(self):
_is_pause_close=True
#是否料袋尾部(有信号--》无信号)
_is_signal=False
#正发转用
_is_reverse_2=False
#是否反转后正转信号消息
_is_signal_2=False
#是否首次正转信号
_is_first_signal_2=False
while self._running:
if self._ispause:
#暂停
if _is_pause_close:
self.close(conveyor2=True)
self.motor_stopped_by_sensor2 = True
# self.sensor2_ready = True #初始值
_is_pause_close=False
time.sleep(self.sensor2_error_delay)
continue
#开启线程
_is_pause_close=True
if self.is_drop_35:
#region 35kg 正反转打平
try:
if _is_signal_2 or self.is_valid_sensor_status_1(self.SENSOR2):
#反转要加个防抖动时间
if _is_reverse_2:
print('回退后检查到sensor2 35KG信号正转')
self.open(conveyor2=True)
_is_reverse_2=False
_is_signal_2=True
elif _is_signal_2 and self.is_valid_sensor2_status_lost(self.SENSOR2):
print('检查到sensor2正转35KG信号消失')
self.close(conveyor2=True)
#滚筒关闭标志
self.motor_stopped_by_sensor2 = True
# 发送信号通知机器人取走物品
self.take_robot_signal.emit()
_is_signal_2=False
#停止后即使有信号了也不能转直到self.sensor2_ready=True
# _is_first_signal=False
self.sensor2_ready=False #打开滚洞标识
elif not _is_first_signal_2:
print('检查到正转sensor2 35KG信号')
#检测到信号5秒
time.sleep(6)
self.open(conveyor2_reverse=True)
_is_reverse_2=True
_is_first_signal_2=True
time.sleep(0.1)
continue
elif self.sensor2_ready:
#sensor2_ready:通过Feeding:FPhoto处控制是否启动
if self.motor_stopped_by_sensor2:
print('开滚筒')
self.open(conveyor2=True)
self.motor_stopped_by_sensor2 = False
_is_first_signal=False
# _is_reverse=False
# _is_first_signal=False
# _is_signal=False
if _is_reverse_2:
time.sleep(0.1)
else:
time.sleep(2)
except Exception as e:
print(f"SENSOR3 处理错误: {e}")
self.log_signal.emit(logging.ERROR,f"SENSOR3 处理错误: {e}")
time.sleep(self.sensor2_error_delay)
#endregion
else:
try:
if self.sensor2_ready:
#sensor2_ready:通过Feeding:FPhoto处控制是否启动,到了,先启动
if self.motor_stopped_by_sensor2:
print('开滚筒')
self.open(conveyor2=True)
self.motor_stopped_by_sensor2 = False
elif _is_signal or self.is_valid_sensor_signal_stable(self.SENSOR2,detection_duration=3,stability_duration=2.5,check_interval=0.5):
#检测到信号,如果之前是没有信号,关闭滚筒
print('检查到sensor2信号')
if _is_signal and self.is_valid_sensor2_status_lost(self.SENSOR2):
print('检查到sensor2信号消失')
self.close(conveyor2=True)
#滚筒关闭标志
self.motor_stopped_by_sensor2 = True
# 发送信号通知机器人取走物品
self.take_robot_signal.emit()
_is_signal=False
self.sensor2_ready=False #打开滚洞标识
else:
_is_signal=True
# time.sleep(0.1)
continue
time.sleep(2)
except Exception as e:
print(f"SENSOR2 处理错误: {e}")
self.log_signal.emit(logging.ERROR,f"SENSOR2 处理错误: {e}")
time.sleep(self.sensor2_error_delay)
def handle_sensor3(self):
"""
正转--》反转--》正转
"""
_is_pause_close=True
#是否反转
_is_reverse=False
#是否反转后正转信号消息
_is_signal=False
#是否首次正转信号
_is_first_signal=False
#是否料袋尾部(有信号--》无信号)
while self._running:
if self._ispause:
#暂停
if _is_pause_close:
self.close(conveyor2=True)
self.motor_stopped_by_sensor2 = True
# self.sensor2_ready = True #初始值
_is_pause_close=False
time.sleep(self.sensor2_error_delay)
continue
#开启线程
_is_pause_close=True
#滚动次数
try:
if _is_signal or self.is_valid_sensor_status_1(self.SENSOR2):
#反转要加个防抖动时间
if _is_reverse:
print('回退后检查到sensor2信号正转')
self.open(conveyor2=True)
_is_reverse=False
_is_signal=True
elif _is_signal and self.is_valid_sensor2_status_lost(self.SENSOR2):
print('检查到sensor2正转信号消失')
self.close(conveyor2=True)
#滚筒关闭标志
self.motor_stopped_by_sensor2 = True
# 发送信号通知机器人取走物品
self.take_robot_signal.emit()
_is_signal=False
#停止后即使有信号了也不能转直到self.sensor2_ready=True
# _is_first_signal=False
self.sensor2_ready=False #打开滚洞标识
elif not _is_first_signal:
print('检查到正转sensor2信号')
#检测到信号5秒
time.sleep(6)
self.open(conveyor2_reverse=True)
_is_reverse=True
_is_first_signal=True
time.sleep(0.1)
continue
elif self.sensor2_ready:
#sensor2_ready:通过Feeding:FPhoto处控制是否启动
if self.motor_stopped_by_sensor2:
print('开滚筒')
self.open(conveyor2=True)
self.motor_stopped_by_sensor2 = False
_is_first_signal=False
# _is_reverse=False
# _is_first_signal=False
# _is_signal=False
if _is_reverse:
time.sleep(0.1)
else:
time.sleep(2)
except Exception as e:
print(f"SENSOR3 处理错误: {e}")
self.log_signal.emit(logging.ERROR,f"SENSOR3 处理错误: {e}")
time.sleep(self.sensor2_error_delay)
def pause_start_sensor(self,is_pause):
"""
暂停或开启传感器线程
is_pause:True是False否
"""
self._ispause = is_pause
def set_drop_35(self,is_drop_35):
"""
设置是否是35码
is_drop_35:True是False否
"""
self.is_drop_35=is_drop_35
def stop_sensor(self,sensor1_thread,sensor2_thread):
if not self._running:
print("线程未在运行")
return
print("停止传感器线程")
self._running = False
if sensor1_thread and sensor1_thread.is_alive():
sensor1_thread.join()
if sensor2_thread and sensor2_thread.is_alive():
sensor2_thread.join()
print("传感器线程已终止。")