#!/usr/bin/env python # -*- coding: utf-8 -*- import socket import binascii import time import threading import logging from PySide6.QtCore import Signal, QObject import numpy as np from pandas.core.arrays import boolean import Constant class RelayController(QObject): log_signal = Signal(int, str) take_robot_signal = Signal() emergency_signal = Signal(bool) def __init__(self, host='192.168.0.18', port=50000): super().__init__() # ===================== 全局线程延时参数 ===================== self.sensor1_loop_delay = 0.1 # SENSOR1 线程轮询间隔(秒) self.sensor1_error_delay = 1.0 # SENSOR1 出错或暂停时延时(秒) self.sensor1_post_action_delay = 0.2 # SENSOR1 每次循环后延时(秒) self.sensor2_loop_delay = 0.2 # SENSOR2 线程轮询间隔(秒) self.sensor2_loop_lost=0.1 # SENSOR2 线程轮询间隔(秒) # self.sensor2_loop_delay = 0.5 # SENSOR2 线程轮询间隔(秒) self.sensor2_error_delay = 0.5 # SENSOR2 出错时延时(秒) self.sensor2_post_action_delay = 0.2 # SENSOR2 每次循环后延时(秒) # ===================== 全局动作延时参数 ===================== self.delay_conveyor = 0.5 # 传送带开/关动作延时(一半时间,我在控制程序和线程都加了一样的延时) self.delay_pusher = 0.05 # 推板开/关动作延时 self.delay_clamp = 0.5 # 夹爪动作延时 self.delay_after_pusher = 5.0 # 推板推出后到重启传动带时间 self.emergency_is_pressed=False # ===================== 传感器稳定检测参数 ===================== self.sensor_stable_duration = 1.0 # 传感器状态稳定检测时间(秒) self.sensor_max_attempts = 3 # 连续检测次数达到此值判定有效 self.sensor1_debounce_time = 1.0 # 传感器1防抖时间(秒) # ===================== 网络与设备映射 ===================== self.host = host self.port = port self.CONVEYOR1 = 'conveyor1' self.PUSHER = 'pusher' self.CONVEYOR2 = 'conveyor2' self.CONVEYOR2_REVERSE = 'conveyor2_reverse' self.CLAMP = 'clamp' self.PUSHER1 = 'pusher1' self.SENSOR1 = 'sensor1' self.SENSOR2 = 'sensor2' self.BELT = 'belt' self.ALARM = 'alarm' self.BLOW_SENSOR2 = 'blow_sensor2' self.valve_commands = { #包装机皮带 self.CONVEYOR1: {'open': '000000000006010500070000', 'close': '00000000000601050007FF00'}, # self.CONVEYOR11: {'open': '00000000000601050000FF00', 'close': '000000000006010500000000'}, self.PUSHER: {'open': '00000000000601050001FF00', 'close': '000000000006010500010000'}, #滚筒,2000 0012正转,2000 0022 2001变频器频率调整 2000正反转。 self.CONVEYOR2: {'open': '000100000006020620000012', 'close': '000100000006020620000001'}, #DO4 self.CLAMP: {'open': '00000000000601050003FF00', 'close': '000000000006010500030000'}, #DO5 回 DO2推 self.PUSHER1: {'open': '00000000000601050004FF00', 'close': '000000000006010500040000'}, #D07 长皮带 self.BELT: {'open': '00000000000601050006FF00', 'close': '000000000006010500060000'}, #D01 声控报警 self.ALARM: {'open': '00000000000601050000FF00', 'close': '000000000006010500000000'}, #DO6 吹传感器2 self.BLOW_SENSOR2: {'open': '00000000000601050005FF00', 'close': '000000000006010500050000'}, #滚筒反转 self.CONVEYOR2_REVERSE: {'open': '000100000006020620000022', 'close': '000100000006020620000001'} } #devices:读取继点器的状态 #sensors 传感器的状态 D12 self.read_status_command = { 'devices': '000000000006010100000008', 'sensors': '000000000006010200000008' } self.device_bit_map = { self.CONVEYOR1: 0, self.PUSHER: 1, self.CONVEYOR2: 2, self.CLAMP: 3, self.PUSHER1: 4, self.CONVEYOR2_REVERSE: 5, self.BELT: 6, self.ALARM: 7, #self.BLOW_SENSOR2: 8 } self.sensor_bit_map = { self.SENSOR1: 0, self.SENSOR2: 1, } self.device_name_map = { self.CONVEYOR1: "包装机皮带", self.PUSHER: "推板开", self.CONVEYOR2: "滚筒", self.CLAMP: "机械臂夹爪", self.PUSHER1: "推板关", self.CONVEYOR2_REVERSE: "滚筒反转", self.BELT: "皮带", self.ALARM: "声控报警", self.BLOW_SENSOR2: "吹传感器2" } self.sensor_name_map = { self.SENSOR1: '位置传感器1', self.SENSOR2: '位置传感器2', } # ===================== 状态控制变量 ===================== self._running = False #线程运行标识 self._ispause = False #线程暂停标识 self._sensor1_thread = None self._sensor2_thread = None self.required_codes = {'0101', '0103','0105','0107'} # 有效状态码(传感器1) self.required_codes_1 = {'0102', '0103','0106','0107'} # 有效状态码(传感器2) self.sensor1_triggered = False self.sensor1_last_time = 0 self.sensor2_ready = False #默认不打开 self.motor_stopped_by_sensor2 = True # ===================== 基础通信方法 ===================== def send_command(self, command_hex, retry_count=2, source='unknown'): if Constant.DebugPosition: print(f"[发送命令] {command_hex} ({source})") return None byte_data = binascii.unhexlify(command_hex) for attempt in range(retry_count): try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.settimeout(10) sock.connect((self.host, self.port)) sock.send(byte_data) response = sock.recv(1024) # hex_response = binascii.hexlify(response).decode('utf-8') #if source == 'sensor': #print(f"[传感器响应] {hex_response}") #elif source == 'device': #print(f"[设备控制响应] {hex_response}") #else: #print(f"[通信响应] {hex_response}") return response except Exception as e: self.log_signal.emit(logging.INFO,f"网络继电器通信错误 ({source}): {e}, 尝试重连... ({attempt + 1}/{retry_count})") print(f"网络继电器通信错误 ({source}): {e}, 尝试重连... ({attempt + 1}/{retry_count})") time.sleep(5) self.trigger_alarm() return None def trigger_alarm(self): self.log_signal.emit(logging.ERROR,"警告:网络继电器连续多次通信失败,请检查设备连接!") print("警告:网络继电器连续多次通信失败,请检查设备连接!") # ===================== 状态读取方法 ===================== def get_all_device_status(self, command_type='devices'): # if Constant.DebugPosition: # return {self.SENSOR2:True} command = self.read_status_command.get(command_type) if not command: print(f"未知的网络继电器读取类型: {command_type}") return {} source = 'sensor' if command_type == 'sensors' else 'device' response = self.send_command(command, source=source) status_dict = {} if response and len(response) >= 10: status_byte = response[9] status_bin = f"{status_byte:08b}"[::-1] bit_map = self.device_bit_map if command_type == 'devices' else self.sensor_bit_map # name_map = self.device_name_map if command_type == 'devices' else self.sensor_name_map for key, bit_index in bit_map.items(): status_dict[key] = status_bin[bit_index] == '1' else: print(f"网络继电器[{command_type}] 读取状态失败或响应无效") return status_dict def get_all_sensor_responses(self, command_type='sensors'): """ 获取所有传感器的原始 Modbus 响应字符串 示例:{'sensor1': '00000000000401020101', 'sensor2': '00000000000401020100'} """ command = self.read_status_command.get(command_type) if not command: print(f"未知的读取类型: {command_type}") return {} source = 'sensor' if command_type == 'sensors' else 'device' response = self.send_command(command, source=source) responses = {} if response and len(response) >= 10: hex_response = binascii.hexlify(response).decode('utf-8') # print(f"[原始响应][{command_type}] {hex_response}") # 假设传感器数据从第 9 字节开始,长度为 2 字节 for name, bit_index in self.sensor_bit_map.items(): offset = 9 + (bit_index // 8) bit_pos = bit_index % 8 byte = response[offset] status = (byte >> bit_pos) & 1 responses[name] = hex_response else: print(f"[{command_type}] 无法获取响应数据") return responses def get_emergency_is_pressed(self)->bool: "获取急停信号,DI 3 位为1,正常,DI 3为0时为按下急停状态,00000000000401020107 后四位01表示一个字节,最后一位(07)二进制对应开关量" "按下急停为" command = self.read_status_command.get("sensors") response = self.send_command(command) loc_is_pressed=False if response and len(response) >= 10: status_byte = response[9] #简单验证 status_crc=response[8] loc_is_pressed =status_crc==1 and (status_byte & 0b100) == 0 # 0b100 表示第三位为1 else: self.log_signal.emit(logging.ERROR,f"网络继电器[急停] 读取状态失败或响应无效") print(f"网络继电器[急停] 读取状态失败或响应无效") return loc_is_pressed def parse_status_code(self, response): """ 从 Modbus 响应字符串中提取状态码(后两位) 示例:00000000000401020101 -> '01' """ if isinstance(response, str) and len(response) >= 18: return response[16:20] return None def is_valid_sensor_status(self, sensor_name): stable_count = 0 for _ in range(int(self.sensor_stable_duration / self.sensor1_loop_delay)): responses = self.get_all_sensor_responses('sensors') response = responses.get(sensor_name) if not response: stable_count = 0 else: status_code = self.parse_status_code(response) if status_code in self.required_codes: stable_count += 1 if stable_count >= self.sensor_max_attempts: return True else: stable_count = 0 time.sleep(self.sensor1_loop_delay) return False def is_valid_sensor_status_1(self, sensor_name): stable_count = 0 for _ in range(int(self.sensor_stable_duration / self.sensor2_loop_delay)): responses = self.get_all_sensor_responses('sensors') response = responses.get(sensor_name) if not response: print(f"[警告] 无法获取 {sensor_name} 的响应,尝试重试...") stable_count = 0 else: status_code = self.parse_status_code(response) if status_code in self.required_codes_1: stable_count += 1 if stable_count >= self.sensor_max_attempts: return True else: stable_count = 0 time.sleep(self.sensor2_loop_delay) return False def is_valid_sensor2_status_lost(self, sensor_name): stable_count = 0 _try_nums=5 # 尝试次数 for _ in range(_try_nums): responses = self.get_all_sensor_responses('sensors') response = responses.get(sensor_name) if not response: print(f"[警告] 无法获取 {sensor_name} 的响应,尝试重试...") stable_count = 0 else: status_code = self.parse_status_code(response) if status_code not in self.required_codes_1: stable_count += 1 if stable_count >= self.sensor_max_attempts: return True else: stable_count = 0 time.sleep(self.sensor2_loop_lost) return False def is_valid_sensor(self,sensor_name): """ 检查传感器状态是否有效 参数: sensor_name: 传感器名称 返回: True: 传感器状态有效 False: 传感器状态无效 """ stable_count = 0 _try_nums=5 # 尝试次数 for _ in range(_try_nums): responses = self.get_all_sensor_responses('sensors') response = responses.get(sensor_name) if not response: print(f"[警告] 无法获取 {sensor_name} 的响应,尝试重试...") return False else: temp_status_code = self.parse_status_code(response) if temp_status_code in self.required_codes_1: stable_count += 1 if stable_count >= 3: return True else: stable_count = 0 time.sleep(self.sensor2_loop_lost) return False def is_valid_sensor_signal_stable(self, sensor_name, detection_duration=3.0, stability_duration=2.5, check_interval=0.1): """ 检测在指定时间窗口内是否存在持续稳定的有效信号 参数: sensor_name: 传感器名称 detection_duration: 总检测时间窗口(秒),默认为3秒 stability_duration: 信号需要持续稳定的时间(秒),默认为2.5秒 check_interval: 检测间隔(秒),默认为0.1秒 返回: True: 在时间窗口内检测到持续稳定的有效信号 False: 未检测到持续稳定的有效信号 """ stable_start_time = None # 记录首次检测到有效信号的时间 start_time = time.time() if not self.is_valid_sensor(sensor_name): return False # 传感器状态无效,返回 else: stable_start_time = time.time() # 首次检测到有效信号 time.sleep(check_interval) while time.time() - start_time < detection_duration: temp_is_valid = self.is_valid_sensor(sensor_name) if temp_is_valid: if time.time() - stable_start_time >= stability_duration: return True # 信号持续稳定达到要求时间 else: stable_start_time = None # 信号不稳定,重置计时 time.sleep(check_interval) return False # ===================== 动作控制方法 ===================== def open(self, conveyor1=False, pusher=False, conveyor2=False, clamp=False, pusher1=False, conveyor2_reverse=False,belt=False,alarm=False,blow_sensor2=False): # if Constant.DebugPosition: # return # status = self.get_all_device_status() if conveyor1: self.send_command(self.valve_commands[self.CONVEYOR1]['open']) time.sleep(self.delay_conveyor) if pusher: self.send_command(self.valve_commands[self.PUSHER]['open']) time.sleep(self.delay_pusher) if conveyor2: self.send_command(self.valve_commands[self.CONVEYOR2]['open']) time.sleep(self.delay_conveyor) if clamp: self.send_command(self.valve_commands[self.CLAMP]['open']) time.sleep(self.delay_clamp) if pusher1: self.send_command(self.valve_commands[self.PUSHER1]['open']) time.sleep(self.delay_pusher) if conveyor2_reverse: self.send_command(self.valve_commands[self.CONVEYOR2_REVERSE]['open']) time.sleep(self.delay_conveyor) if belt: self.send_command(self.valve_commands[self.BELT]['open']) # time.sleep(self.delay_belt) if alarm: self.send_command(self.valve_commands[self.ALARM]['open']) # time.sleep(self.delay_alarm) if blow_sensor2: self.send_command(self.valve_commands[self.BLOW_SENSOR2]['open']) # time.sleep(self.delay_blow_sensor2) def close(self, conveyor1=False, pusher=False, conveyor2=False, clamp=False, pusher1=False, conveyor2_reverse=False,belt=False,alarm=False,blow_sensor2=False): if conveyor1: self.send_command(self.valve_commands[self.CONVEYOR1]['close']) time.sleep(self.delay_conveyor) if pusher: self.send_command(self.valve_commands[self.PUSHER]['close']) time.sleep(self.delay_pusher) if conveyor2: self.send_command(self.valve_commands[self.CONVEYOR2]['close']) time.sleep(self.delay_conveyor) if clamp: self.send_command(self.valve_commands[self.CLAMP]['close']) time.sleep(self.delay_clamp) if pusher1: self.send_command(self.valve_commands[self.PUSHER1]['close']) time.sleep(self.delay_pusher) if conveyor2_reverse: self.send_command(self.valve_commands[self.CONVEYOR2_REVERSE]['close']) time.sleep(self.delay_conveyor) if belt: self.send_command(self.valve_commands[self.BELT]['close']) # time.sleep(self.delay_belt) if alarm: self.send_command(self.valve_commands[self.ALARM]['close']) # time.sleep(self.delay_alarm) if blow_sensor2: self.send_command(self.valve_commands[self.BLOW_SENSOR2]['close']) # time.sleep(self.delay_blow_sensor2) # ===================== 传感器处理线程 ===================== def handle_sensor1(self): while self._running: if self._ispause and not self.sensor1_triggered: #暂停线程, 保证线程1执行完传感器操作 time.sleep(self.sensor1_error_delay) continue try: if self.is_valid_sensor_status(self.SENSOR1): current_time = time.time() if not self.sensor1_triggered and ( current_time - self.sensor1_last_time) > self.sensor1_debounce_time: self.sensor1_triggered = True self.sensor1_last_time = current_time # 1.停止包装机皮带电机:关闭 conveyor1 self.close(conveyor1=True) time.sleep(self.delay_conveyor) # 2.推板开启:推出去动作 self.open(pusher=True) time.sleep(self.delay_pusher) self.close(pusher=True) # 推板推出后重新启动电机时间传送带1延时5秒 time.sleep(self.delay_after_pusher) # 4.推板关闭:推板收回来动作 self.open(pusher1=True) time.sleep(self.delay_pusher) self.close(pusher1=True) # 3.重新开启包装机皮带电机: 开启conveyor1 time.sleep(self.delay_conveyor) self.open(conveyor1=True) time.sleep(1) # 5. 状态检查(可选) # status = self.get_all_device_status() # if status.get('conveyor1') and not status.get('pusher'): # print("流程完成1:皮带运行中,推板已收回") # else: # print("状态异常,请检查设备") # 流程结束,重置触发标志 self.sensor1_triggered = False time.sleep(self.sensor1_loop_delay) except Exception as e: print(f"SENSOR1 处理错误: {e}") self.log_signal.emit(logging.ERROR,f"SENSOR1 处理错误: {e}") self.sensor1_triggered = False time.sleep(self.sensor1_error_delay) def handle_sensor2(self): _is_pause_close=True #是否料袋尾部(有信号--》无信号) _is_signal=False while self._running: if self._ispause: #暂停 if _is_pause_close: self.close(conveyor2=True) self.motor_stopped_by_sensor2 = True # self.sensor2_ready = True #初始值 _is_pause_close=False time.sleep(self.sensor2_error_delay) continue #开启线程 _is_pause_close=True try: if _is_signal or self.is_valid_sensor_status_1(self.SENSOR2): #检测到信号,如果之前是没有信号,关闭滚筒 print('检查到sensor2信号') if _is_signal and self.is_valid_sensor2_status_lost(self.SENSOR2): print('检查到sensor2信号消失') self.close(conveyor2=True) #滚筒关闭标志 self.motor_stopped_by_sensor2 = True # 发送信号通知机器人取走物品 self.take_robot_signal.emit() _is_signal=False self.sensor2_ready=False #打开滚洞标识,FPhoto控制打开 else: if self.sensor2_ready: #只有在FPhoto处才有效 _is_signal=True if self.motor_stopped_by_sensor2: print('开滚筒') self.open(conveyor2=True) self.motor_stopped_by_sensor2 = False # time.sleep(0.1) continue elif self.sensor2_ready: #sensor2_ready:通过Feeding:FPhoto处控制是否启动 if self.motor_stopped_by_sensor2: print('开滚筒') self.open(conveyor2=True) self.motor_stopped_by_sensor2 = False time.sleep(2) except Exception as e: print(f"SENSOR2 处理错误: {e}") self.log_signal.emit(logging.ERROR,f"SENSOR2 处理错误: {e}") time.sleep(self.sensor2_error_delay) def pause_start_sensor(self,is_pause): """ 暂停或开启传感器线程 is_pause:True是,False否 """ self._ispause = is_pause if is_pause: """暂停皮带""" self.close(belt=True) else: """开启皮带""" self.open(belt=True) def stop_sensor(self,sensor1_thread,sensor2_thread): if not self._running: print("线程未在运行") return print("停止传感器线程") self._running = False if sensor1_thread and sensor1_thread.is_alive(): sensor1_thread.join() if sensor2_thread and sensor2_thread.is_alive(): sensor2_thread.join() print("传感器线程已终止。") def handle_emergency_pressed(self): "处理急停按钮信号状态线程" print('检查急停按钮状态1') while self._running: try: print('检查急停按钮状态') loc_is_pressed =self.get_emergency_is_pressed() if loc_is_pressed: # 处理急停按钮信号状态 if not self.emergency_is_pressed: print('急停按钮被按下') self.log_signal.emit(logging.INFO,f"急停按钮被按下") self.emergency_is_pressed=True self.emergency_signal.emit(True) else: print('急停按钮未被按下') self.emergency_is_pressed=False self.emergency_signal.emit(False) time.sleep(0.5) except Exception as e: print(f"急停 处理错误: {e}") self.log_signal.emit(logging.ERROR,f"急停线程 处理错误: {e}") time.sleep(2)