#!/usr/bin/env python # -*- coding: utf-8 -*- import socket import binascii import time import threading import logging from PySide6.QtCore import Signal, QObject import numpy as np from pandas.core.arrays import boolean import Constant class RelayController(QObject): log_signal = Signal(int, str) take_robot_signal = Signal() def __init__(self, host='192.168.0.18', port=50000): super().__init__() # ===================== 全局线程延时参数 ===================== self.sensor1_loop_delay = 0.1 # SENSOR1 线程轮询间隔(秒) self.sensor1_error_delay = 1.0 # SENSOR1 出错或暂停时延时(秒) self.sensor1_post_action_delay = 0.2 # SENSOR1 每次循环后延时(秒) self.sensor2_loop_delay = 0.2 # SENSOR2 线程轮询间隔(秒) self.sensor2_loop_lost=0.1 # SENSOR2 线程轮询间隔(秒) # self.sensor2_loop_delay = 0.5 # SENSOR2 线程轮询间隔(秒) self.sensor2_error_delay = 0.5 # SENSOR2 出错时延时(秒) self.sensor2_post_action_delay = 0.2 # SENSOR2 每次循环后延时(秒) # ===================== 全局动作延时参数 ===================== self.delay_conveyor = 0.5 # 传送带开/关动作延时(一半时间,我在控制程序和线程都加了一样的延时) self.delay_pusher = 0.05 # 推板开/关动作延时 self.delay_clamp = 0.5 # 夹爪动作延时 self.delay_after_pusher = 5.0 # 推板推出后到重启传动带时间 # ===================== 传感器稳定检测参数 ===================== self.sensor_stable_duration = 1.0 # 传感器状态稳定检测时间(秒) self.sensor_max_attempts = 3 # 连续检测次数达到此值判定有效 self.sensor1_debounce_time = 1.0 # 传感器1防抖时间(秒) self.sensor2_debounce_time = 3.0 # 袋尾检测3秒有效信号 # ===================== 网络与设备映射 ===================== self.host = host self.port = port self.CONVEYOR1 = 'conveyor1' self.PUSHER = 'pusher' self.CONVEYOR2 = 'conveyor2' self.CONVEYOR2_REVERSE = 'conveyor2_reverse' self.CLAMP = 'clamp' self.PUSHER1 = 'pusher1' self.SENSOR1 = 'sensor1' self.SENSOR2 = 'sensor2' self.valve_commands = { self.CONVEYOR1: {'open': '000000000006010500070000', 'close': '00000000000601050007FF00'}, # self.CONVEYOR11: {'open': '00000000000601050000FF00', 'close': '000000000006010500000000'}, self.PUSHER: {'open': '00000000000601050001FF00', 'close': '000000000006010500010000'}, #滚筒,2000 0012正转,2000 0022 2001变频器频率调整 2000正反转。 self.CONVEYOR2: {'open': '000100000006020620000012', 'close': '000100000006020620000001'}, #DO4 self.CLAMP: {'open': '00000000000601050003FF00', 'close': '000000000006010500030000'}, #DO5 回 DO2推 self.PUSHER1: {'open': '00000000000601050004FF00', 'close': '000000000006010500040000'}, #滚筒反转 self.CONVEYOR2_REVERSE: {'open': '000100000006020620000022', 'close': '000100000006020620000001'} } #devices:读取继点器的状态 #sensors 传感器的状态 D12 self.read_status_command = { 'devices': '000000000006010100000008', 'sensors': '000000000006010200000008' } self.device_bit_map = { self.CONVEYOR1: 0, self.PUSHER: 1, self.CONVEYOR2: 2, self.CLAMP: 3, self.PUSHER1: 4, self.CONVEYOR2_REVERSE: 5 } self.sensor_bit_map = { self.SENSOR1: 0, self.SENSOR2: 1, } self.device_name_map = { self.CONVEYOR1: "传送带1", self.PUSHER: "推板开", self.CONVEYOR2: "传送带2", self.CLAMP: "机械臂夹爪", self.PUSHER1: "推板关", self.CONVEYOR2_REVERSE: "传送带2反转" } self.sensor_name_map = { self.SENSOR1: '位置传感器1', self.SENSOR2: '位置传感器2', } # ===================== 状态控制变量 ===================== self._running = False #线程运行标识 self._ispause = False #线程暂停标识 self._sensor1_thread = None self._sensor2_thread = None self.required_codes = {'0101', '0103'} # 有效状态码(传感器1) self.required_codes_1 = {'0102', '0103'} # 有效状态码(传感器2) self.sensor1_triggered = False self.sensor1_last_time = 0 self.sensor2_ready = False #默认不打开 self.motor_stopped_by_sensor2 = True self.is_drop_35=False #是否是35码 # ===================== 基础通信方法 ===================== def send_command(self, command_hex, retry_count=2, source='unknown'): if Constant.DebugPosition: # print(f"[发送命令] {command_hex} ({source})") return None byte_data = binascii.unhexlify(command_hex) for attempt in range(retry_count): try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.settimeout(10) sock.connect((self.host, self.port)) sock.send(byte_data) response = sock.recv(1024) # hex_response = binascii.hexlify(response).decode('utf-8') #if source == 'sensor': #print(f"[传感器响应] {hex_response}") #elif source == 'device': #print(f"[设备控制响应] {hex_response}") #else: #print(f"[通信响应] {hex_response}") return response except Exception as e: # print(f"网络继电器通信错误 ({source}): {e}, 尝试重连... ({attempt + 1}/{retry_count})") self.log_signal.emit(logging.INFO,f"网络继电器通信错误 ({source}): {e}, 尝试重连... ({attempt + 1}/{retry_count})") time.sleep(5) self.trigger_alarm() return None def trigger_alarm(self): self.log_signal.emit(logging.ERROR,"警告:网络继电器连续多次通信失败,请检查设备连接!") # ===================== 状态读取方法 ===================== def get_all_device_status(self, command_type='devices'): if Constant.DebugPosition: return {self.SENSOR2:True} command = self.read_status_command.get(command_type) if not command: print(f"未知的网络继电器读取类型: {command_type}") return {} source = 'sensor' if command_type == 'sensors' else 'device' response = self.send_command(command, source=source) status_dict = {} if response and len(response) >= 10: status_byte = response[9] status_bin = f"{status_byte:08b}"[::-1] bit_map = self.device_bit_map if command_type == 'devices' else self.sensor_bit_map name_map = self.device_name_map if command_type == 'devices' else self.sensor_name_map for key, bit_index in bit_map.items(): status_dict[key] = status_bin[bit_index] == '1' else: print(f"网络继电器[{command_type}] 读取状态失败或响应无效") return status_dict def get_all_sensor_responses(self, command_type='sensors'): """ 获取所有传感器的原始 Modbus 响应字符串 示例:{'sensor1': '00000000000401020101', 'sensor2': '00000000000401020100'} """ command = self.read_status_command.get(command_type) if not command: print(f"未知的读取类型: {command_type}") return {} source = 'sensor' if command_type == 'sensors' else 'device' response = self.send_command(command, source=source) responses = {} if response and len(response) >= 10: hex_response = binascii.hexlify(response).decode('utf-8') # print(f"[原始响应][{command_type}] {hex_response}") # 假设传感器数据从第 9 字节开始,长度为 2 字节 for name, bit_index in self.sensor_bit_map.items(): offset = 9 + (bit_index // 8) bit_pos = bit_index % 8 byte = response[offset] status = (byte >> bit_pos) & 1 responses[name] = hex_response else: print(f"[{command_type}] 无法获取响应数据") return responses def parse_status_code(self, response): """ 从 Modbus 响应字符串中提取状态码(后两位) 示例:00000000000401020101 -> '01' """ if isinstance(response, str) and len(response) >= 18: return response[16:20] return None def is_valid_sensor_status(self, sensor_name): stable_count = 0 for _ in range(int(self.sensor_stable_duration / self.sensor1_loop_delay)): responses = self.get_all_sensor_responses('sensors') response = responses.get(sensor_name) if not response: stable_count = 0 else: status_code = self.parse_status_code(response) if status_code in self.required_codes: stable_count += 1 if stable_count >= self.sensor_max_attempts: return True else: stable_count = 0 time.sleep(self.sensor1_loop_delay) return False def is_valid_sensor_status_1(self, sensor_name): stable_count = 0 for _ in range(int(self.sensor_stable_duration / self.sensor2_loop_delay)): responses = self.get_all_sensor_responses('sensors') response = responses.get(sensor_name) if not response: print(f"[警告] 无法获取 {sensor_name} 的响应,尝试重试...") stable_count = 0 else: status_code = self.parse_status_code(response) if status_code in self.required_codes_1: stable_count += 1 if stable_count >= self.sensor_max_attempts: return True else: stable_count = 0 time.sleep(self.sensor2_loop_delay) return False def is_valid_sensor(self,sensor_name): """ 检查传感器状态是否有效 参数: sensor_name: 传感器名称 返回: True: 传感器状态有效 False: 传感器状态无效 """ responses = self.get_all_sensor_responses('sensors') response = responses.get(sensor_name) if not response: print(f"[警告] 无法获取 {sensor_name} 的响应,尝试重试...") return False else: temp_status_code = self.parse_status_code(response) if temp_status_code in self.required_codes_1: return True else: return False def is_valid_sensor_signal_stable(self, sensor_name, detection_duration=3.0, stability_duration=2.5, check_interval=0.1): """ 检测在指定时间窗口内是否存在持续稳定的有效信号 参数: sensor_name: 传感器名称 detection_duration: 总检测时间窗口(秒),默认为3秒 stability_duration: 信号需要持续稳定的时间(秒),默认为2.5秒 check_interval: 检测间隔(秒),默认为0.1秒 返回: True: 在时间窗口内检测到持续稳定的有效信号 False: 未检测到持续稳定的有效信号 """ stable_start_time = None # 记录首次检测到有效信号的时间 start_time = time.time() if not self.is_valid_sensor(sensor_name): return False # 传感器状态无效,返回 else: stable_start_time = time.time() # 首次检测到有效信号 time.sleep(check_interval) while time.time() - start_time < detection_duration: temp_is_valid = self.is_valid_sensor(sensor_name) if temp_is_valid: if time.time() - stable_start_time >= stability_duration: return True # 信号持续稳定达到要求时间 else: stable_start_time = None # 信号不稳定,重置计时 time.sleep(check_interval) return False def is_valid_sensor2_status_lost(self, sensor_name): """ 检查传感器2是否丢失信号 """ stable_count = 0 _try_nums=5 # 尝试次数 for _ in range(_try_nums): responses = self.get_all_sensor_responses('sensors') response = responses.get(sensor_name) if not response: print(f"[警告] 无法获取 {sensor_name} 的响应,尝试重试...") stable_count = 0 else: status_code = self.parse_status_code(response) if status_code not in self.required_codes_1: stable_count += 1 if stable_count >= self.sensor_max_attempts: return True else: stable_count = 0 time.sleep(self.sensor2_loop_lost) return False # ===================== 动作控制方法 ===================== def open(self, conveyor1=False, pusher=False, conveyor2=False, clamp=False, pusher1=False, conveyor2_reverse=False)->bool: """将if改成elif,一次只能打开一个设备,否则会造成延时sleep时间不一致问题。并返回成功核验""" loc_ret=False loc_reponse=None loc_send_command=None status = self.get_all_device_status() if conveyor1: if not status.get(self.CONVEYOR1, False): loc_send_command=self.valve_commands[self.CONVEYOR1]['open'] loc_reponse=self.send_command(loc_send_command) time.sleep(self.delay_conveyor) else: loc_ret=True elif pusher: if not status.get(self.PUSHER, False): loc_send_command=self.valve_commands[self.PUSHER]['open'] loc_reponse=self.send_command(loc_send_command) time.sleep(self.delay_pusher) else: loc_ret=True elif conveyor2: if not status.get(self.CONVEYOR2, False): loc_send_command=self.valve_commands[self.CONVEYOR2]['open'] loc_reponse=self.send_command(loc_send_command) time.sleep(self.delay_conveyor) else: loc_ret=True elif clamp: if not status.get(self.CLAMP, False): loc_send_command=self.valve_commands[self.CLAMP]['open'] loc_reponse=self.send_command(loc_send_command) time.sleep(self.delay_clamp) else: loc_ret=True elif pusher1: if not status.get(self.PUSHER1, False): loc_send_command=self.valve_commands[self.PUSHER1]['open'] loc_reponse=self.send_command(loc_send_command) time.sleep(self.delay_pusher) else: loc_ret=True elif conveyor2_reverse: if not status.get(self.CONVEYOR2_REVERSE, False): loc_send_command=self.valve_commands[self.CONVEYOR2_REVERSE]['open'] loc_reponse=self.send_command(loc_send_command) time.sleep(self.delay_conveyor) else: loc_ret=True if loc_reponse and len(loc_reponse) >= 10: lol_hex_response = binascii.hexlify(response).decode('utf-8') if lol_hex_response == loc_send_command: loc_ret=True if Constant.DebugPosition: loc_ret=True return loc_ret def close(self, conveyor1=False, pusher=False, conveyor2=False, clamp=False, pusher1=False, conveyor2_reverse=False)->bool: loc_ret=False loc_reponse=None loc_send_command=None if conveyor1: loc_send_command=self.valve_commands[self.CONVEYOR1]['close'] loc_reponse=self.send_command(loc_send_command) time.sleep(self.delay_conveyor) if pusher: loc_send_command=self.valve_commands[self.PUSHER]['close'] loc_reponse=self.send_command(loc_send_command) time.sleep(self.delay_pusher) if conveyor2: loc_send_command=self.valve_commands[self.CONVEYOR2]['close'] loc_reponse=self.send_command(loc_send_command) time.sleep(self.delay_conveyor) if clamp: loc_send_command=self.valve_commands[self.CLAMP]['close'] loc_reponse=self.send_command(loc_send_command) time.sleep(self.delay_clamp) if pusher1: loc_send_command=self.valve_commands[self.PUSHER1]['close'] loc_reponse=self.send_command(loc_send_command) time.sleep(self.delay_pusher) if conveyor2_reverse: loc_send_command=self.valve_commands[self.CONVEYOR2_REVERSE]['close'] loc_reponse=self.send_command(loc_send_command) time.sleep(self.delay_conveyor) if loc_reponse and len(loc_reponse) >= 10: lol_hex_response = binascii.hexlify(response).decode('utf-8') if lol_hex_response == loc_send_command: loc_ret=True if Constant.DebugPosition: loc_ret=True return loc_ret # ===================== 传感器处理线程 ===================== def handle_sensor1(self): while self._running: if self._ispause and not self.sensor1_triggered: #暂停线程, 保证线程1执行完传感器操作 time.sleep(self.sensor1_error_delay) continue try: if self.is_valid_sensor_status(self.SENSOR1): current_time = time.time() if not self.sensor1_triggered and ( current_time - self.sensor1_last_time) > self.sensor1_debounce_time: self.sensor1_triggered = True self.sensor1_last_time = current_time # 1.停止包装机皮带电机:关闭 conveyor1 self.close(conveyor1=True) time.sleep(self.delay_conveyor) # 2.推板开启:推出去动作 self.open(pusher=True) time.sleep(self.delay_pusher) self.close(pusher=True) # 推板推出后重新启动电机时间传送带1延时5秒 time.sleep(self.delay_after_pusher) # 4.推板关闭:推板收回来动作 self.open(pusher1=True) time.sleep(self.delay_pusher) self.close(pusher1=True) # 3.重新开启包装机皮带电机: 开启conveyor1 time.sleep(self.delay_conveyor) self.open(conveyor1=True) time.sleep(1) # 5. 状态检查(可选) status = self.get_all_device_status() if status.get('conveyor1') and not status.get('pusher'): print("流程完成1:皮带运行中,推板已收回") else: print("状态异常,请检查设备") # 流程结束,重置触发标志 self.sensor1_triggered = False time.sleep(self.sensor1_loop_delay) except Exception as e: print(f"SENSOR1 处理错误: {e}") self.log_signal.emit(logging.ERROR,f"SENSOR1 处理错误: {e}") self.sensor1_triggered = False time.sleep(self.sensor1_error_delay) def handle_sensor2(self): _is_pause_close=True #是否料袋尾部(有信号--》无信号) _is_signal=False #正发转用 _is_reverse_2=False #是否反转后正转信号消息 _is_signal_2=False #是否首次正转信号 _is_first_signal_2=False while self._running: if self._ispause: #暂停 if _is_pause_close: self.close(conveyor2=True) self.motor_stopped_by_sensor2 = True # self.sensor2_ready = True #初始值 _is_pause_close=False time.sleep(self.sensor2_error_delay) continue #开启线程 _is_pause_close=True if self.is_drop_35: #region 35kg 正反转打平 try: if _is_signal_2 or self.is_valid_sensor_status_1(self.SENSOR2): #反转要加个防抖动时间 if _is_reverse_2: print('回退后检查到sensor2 35KG信号,正转') self.open(conveyor2=True) _is_reverse_2=False _is_signal_2=True elif _is_signal_2 and self.is_valid_sensor2_status_lost(self.SENSOR2): print('检查到sensor2正转35KG信号消失') self.close(conveyor2=True) #滚筒关闭标志 self.motor_stopped_by_sensor2 = True # 发送信号通知机器人取走物品 self.take_robot_signal.emit() _is_signal_2=False #停止后即使有信号了,也不能转,直到self.sensor2_ready=True # _is_first_signal=False self.sensor2_ready=False #打开滚洞标识 elif not _is_first_signal_2: print('检查到正转sensor2 35KG信号') #检测到信号,5秒 time.sleep(6) self.open(conveyor2_reverse=True) _is_reverse_2=True _is_first_signal_2=True time.sleep(0.1) continue elif self.sensor2_ready: #sensor2_ready:通过Feeding:FPhoto处控制是否启动 if self.motor_stopped_by_sensor2: print('开滚筒') self.open(conveyor2=True) self.motor_stopped_by_sensor2 = False _is_first_signal=False # _is_reverse=False # _is_first_signal=False # _is_signal=False if _is_reverse_2: time.sleep(0.1) else: time.sleep(2) except Exception as e: print(f"SENSOR3 处理错误: {e}") self.log_signal.emit(logging.ERROR,f"SENSOR3 处理错误: {e}") time.sleep(self.sensor2_error_delay) #endregion else: try: if self.sensor2_ready: #sensor2_ready:通过Feeding:FPhoto处控制是否启动,到了,先启动 if self.motor_stopped_by_sensor2: print('开滚筒') self.open(conveyor2=True) self.motor_stopped_by_sensor2 = False elif _is_signal or self.is_valid_sensor_signal_stable(self.SENSOR2,detection_duration=3,stability_duration=2.5,check_interval=0.5): #检测到信号,如果之前是没有信号,关闭滚筒 print('检查到sensor2信号') if _is_signal and self.is_valid_sensor2_status_lost(self.SENSOR2): print('检查到sensor2信号消失') self.close(conveyor2=True) #滚筒关闭标志 self.motor_stopped_by_sensor2 = True # 发送信号通知机器人取走物品 self.take_robot_signal.emit() _is_signal=False self.sensor2_ready=False #打开滚洞标识 else: _is_signal=True # time.sleep(0.1) continue time.sleep(2) except Exception as e: print(f"SENSOR2 处理错误: {e}") self.log_signal.emit(logging.ERROR,f"SENSOR2 处理错误: {e}") time.sleep(self.sensor2_error_delay) def handle_sensor3(self): """ 正转--》反转--》正转 """ _is_pause_close=True #是否反转 _is_reverse=False #是否反转后正转信号消息 _is_signal=False #是否首次正转信号 _is_first_signal=False #是否料袋尾部(有信号--》无信号) while self._running: if self._ispause: #暂停 if _is_pause_close: self.close(conveyor2=True) self.motor_stopped_by_sensor2 = True # self.sensor2_ready = True #初始值 _is_pause_close=False time.sleep(self.sensor2_error_delay) continue #开启线程 _is_pause_close=True #滚动次数 try: if _is_signal or self.is_valid_sensor_status_1(self.SENSOR2): #反转要加个防抖动时间 if _is_reverse: print('回退后检查到sensor2信号,正转') self.open(conveyor2=True) _is_reverse=False _is_signal=True elif _is_signal and self.is_valid_sensor2_status_lost(self.SENSOR2): print('检查到sensor2正转信号消失') self.close(conveyor2=True) #滚筒关闭标志 self.motor_stopped_by_sensor2 = True # 发送信号通知机器人取走物品 self.take_robot_signal.emit() _is_signal=False #停止后即使有信号了,也不能转,直到self.sensor2_ready=True # _is_first_signal=False self.sensor2_ready=False #打开滚洞标识 elif not _is_first_signal: print('检查到正转sensor2信号') #检测到信号,5秒 time.sleep(6) self.open(conveyor2_reverse=True) _is_reverse=True _is_first_signal=True time.sleep(0.1) continue elif self.sensor2_ready: #sensor2_ready:通过Feeding:FPhoto处控制是否启动 if self.motor_stopped_by_sensor2: print('开滚筒') self.open(conveyor2=True) self.motor_stopped_by_sensor2 = False _is_first_signal=False # _is_reverse=False # _is_first_signal=False # _is_signal=False if _is_reverse: time.sleep(0.1) else: time.sleep(2) except Exception as e: print(f"SENSOR3 处理错误: {e}") self.log_signal.emit(logging.ERROR,f"SENSOR3 处理错误: {e}") time.sleep(self.sensor2_error_delay) def pause_start_sensor(self,is_pause): """ 暂停或开启传感器线程 is_pause:True是,False否 """ self._ispause = is_pause def set_drop_35(self,is_drop_35): """ 设置是否是35码 is_drop_35:True是,False否 """ self.is_drop_35=is_drop_35 def stop_sensor(self,sensor1_thread,sensor2_thread): if not self._running: print("线程未在运行") return print("停止传感器线程") self._running = False if sensor1_thread and sensor1_thread.is_alive(): sensor1_thread.join() if sensor2_thread and sensor2_thread.is_alive(): sensor2_thread.join() print("传感器线程已终止。")