import socket import binascii import time from datetime import datetime import logging class RelayController: """ 测试传感器2信号情况 """ def __init__(self, host='192.168.0.18', port=50000): super().__init__() # ===================== 全局线程延时参数 ===================== self.sensor1_loop_delay = 0.1 # SENSOR1 线程轮询间隔(秒) self.sensor1_error_delay = 1.0 # SENSOR1 出错或暂停时延时(秒) self.sensor1_post_action_delay = 0.2 # SENSOR1 每次循环后延时(秒) self.sensor2_loop_delay = 0.2 # SENSOR2 线程轮询间隔(秒) self.sensor2_loop_lost=0.1 # SENSOR2 线程轮询间隔(秒) # self.sensor2_loop_delay = 0.5 # SENSOR2 线程轮询间隔(秒) self.sensor2_error_delay = 0.5 # SENSOR2 出错时延时(秒) self.sensor2_post_action_delay = 0.2 # SENSOR2 每次循环后延时(秒) # ===================== 全局动作延时参数 ===================== self.delay_conveyor = 0.5 # 传送带开/关动作延时(一半时间,我在控制程序和线程都加了一样的延时) self.delay_pusher = 0.05 # 推板开/关动作延时 self.delay_clamp = 0.5 # 夹爪动作延时 self.delay_after_pusher = 5.0 # 推板推出后到重启传动带时间 self.emergency_is_pressed=False # ===================== 传感器稳定检测参数 ===================== self.sensor_stable_duration = 1.0 # 传感器状态稳定检测时间(秒) self.sensor_max_attempts = 3 # 连续检测次数达到此值判定有效 self.sensor1_debounce_time = 1.0 # 传感器1防抖时间(秒) # ===================== 网络与设备映射 ===================== self.host = host self.port = port self.CONVEYOR1 = 'conveyor1' self.PUSHER = 'pusher' self.CONVEYOR2 = 'conveyor2' self.CONVEYOR2_REVERSE = 'conveyor2_reverse' self.CLAMP = 'clamp' self.PUSHER1 = 'pusher1' self.SENSOR1 = 'sensor1' self.SENSOR2 = 'sensor2' self.BELT = 'belt' self.ALARM = 'alarm' self.BLOW_SENSOR2 = 'blow_sensor2' self.valve_commands = { #包装机皮带 self.CONVEYOR1: {'open': '000000000006010500070000', 'close': '00000000000601050007FF00'}, # self.CONVEYOR11: {'open': '00000000000601050000FF00', 'close': '000000000006010500000000'}, self.PUSHER: {'open': '00000000000601050001FF00', 'close': '000000000006010500010000'}, #滚筒,2000 0012正转,2000 0022 2001变频器频率调整 2000正反转。 self.CONVEYOR2: {'open': '000100000006020620000012', 'close': '000100000006020620000001'}, #DO4 self.CLAMP: {'open': '00000000000601050003FF00', 'close': '000000000006010500030000'}, #DO5 回 DO2推 self.PUSHER1: {'open': '00000000000601050004FF00', 'close': '000000000006010500040000'}, #D07 长皮带 self.BELT: {'open': '00000000000601050006FF00', 'close': '000000000006010500060000'}, #D01 声控报警 self.ALARM: {'open': '00000000000601050000FF00', 'close': '000000000006010500000000'}, #DO6 吹传感器2 self.BLOW_SENSOR2: {'open': '00000000000601050005FF00', 'close': '000000000006010500050000'}, #滚筒反转 self.CONVEYOR2_REVERSE: {'open': '000100000006020620000022', 'close': '000100000006020620000001'} } #devices:读取继点器的状态 #sensors 传感器的状态 D12 self.read_status_command = { 'devices': '000000000006010100000008', 'sensors': '000000000006010200000008' } self.device_bit_map = { self.CONVEYOR1: 0, self.PUSHER: 1, self.CONVEYOR2: 2, self.CLAMP: 3, self.PUSHER1: 4, self.CONVEYOR2_REVERSE: 5, self.BELT: 6, self.ALARM: 7, #self.BLOW_SENSOR2: 8 } self.sensor_bit_map = { self.SENSOR1: 0, self.SENSOR2: 1, } self.device_name_map = { self.CONVEYOR1: "包装机皮带", self.PUSHER: "推板开", self.CONVEYOR2: "滚筒", self.CLAMP: "机械臂夹爪", self.PUSHER1: "推板关", self.CONVEYOR2_REVERSE: "滚筒反转", self.BELT: "皮带", self.ALARM: "声控报警", self.BLOW_SENSOR2: "吹传感器2" } self.sensor_name_map = { self.SENSOR1: '位置传感器1', self.SENSOR2: '位置传感器2', } # ===================== 状态控制变量 ===================== self._running = False #线程运行标识 self._ispause = False #线程暂停标识 self._sensor1_thread = None self._sensor2_thread = None self.required_codes = {'0101', '0103','0105','0107'} # 有效状态码(传感器1) self.required_codes_1 = {'0102', '0103','0106','0107'} # 有效状态码(传感器2) self.sensor1_triggered = False self.sensor1_last_time = 0 self.sensor2_ready = False #默认不打开 self.motor_stopped_by_sensor2 = True # ===================== 基础通信方法 ===================== def send_command(self, command_hex, retry_count=2, source='unknown'): byte_data = binascii.unhexlify(command_hex) for attempt in range(retry_count): try: # begin_time=time.time() with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.settimeout(1) sock.connect((self.host, self.port)) sock.send(byte_data) response = sock.recv(1024) # end_time=time.time() # print(f"({source}) 耗时: {end_time-begin_time:.3f}秒") # hex_response = binascii.hexlify(response).decode('utf-8') #if source == 'sensor': #print(f"[传感器响应] {hex_response}") #elif source == 'device': #print(f"[设备控制响应] {hex_response}") #else: #print(f"[通信响应] {hex_response}") return response except Exception as e: self.log_signal.emit(logging.INFO,f"网络继电器通信错误 ({source}): {e}, 尝试重连... ({attempt + 1}/{retry_count})") print(f"网络继电器通信错误 ({source}): {e}, 尝试重连... ({attempt + 1}/{retry_count})") time.sleep(1) self.trigger_alarm() return None def trigger_alarm(self): self.log_signal.emit(logging.ERROR,"警告:网络继电器连续多次通信失败,请检查设备连接!") print("警告:网络继电器连续多次通信失败,请检查设备连接!") # ===================== 状态读取方法 ===================== def get_all_device_status(self, command_type='devices'): # if Constant.DebugPosition: # return {self.SENSOR2:True} command = self.read_status_command.get(command_type) if not command: print(f"未知的网络继电器读取类型: {command_type}") return {} source = 'sensor' if command_type == 'sensors' else 'device' response = self.send_command(command, source=source) status_dict = {} if response and len(response) >= 10: status_byte = response[9] status_bin = f"{status_byte:08b}"[::-1] bit_map = self.device_bit_map if command_type == 'devices' else self.sensor_bit_map # name_map = self.device_name_map if command_type == 'devices' else self.sensor_name_map for key, bit_index in bit_map.items(): status_dict[key] = status_bin[bit_index] == '1' else: print(f"网络继电器[{command_type}] 读取状态失败或响应无效") return status_dict def get_all_sensor_responses(self, command_type='sensors'): """ 获取所有传感器的原始 Modbus 响应字符串 示例:{'sensor1': '00000000000401020101', 'sensor2': '00000000000401020100'} """ command = self.read_status_command.get(command_type) if not command: print(f"未知的读取类型: {command_type}") return {} source = 'sensor' if command_type == 'sensors' else 'device' response = self.send_command(command, source=source) responses = {} if response and len(response) >= 10: hex_response = binascii.hexlify(response).decode('utf-8') # print(f"[原始响应][{command_type}] {hex_response}") # 假设传感器数据从第 9 字节开始,长度为 2 字节 for name, bit_index in self.sensor_bit_map.items(): offset = 9 + (bit_index // 8) bit_pos = bit_index % 8 byte = response[offset] status = (byte >> bit_pos) & 1 responses[name] = hex_response else: print(f"[{command_type}] 无法获取响应数据") return responses def parse_status_code(self, response): """ 从 Modbus 响应字符串中提取状态码(后两位) 示例:00000000000401020101 -> '01' """ if isinstance(response, str) and len(response) >= 18: return response[16:20] return None def is_valid_sensor_status(self, sensor_name): stable_count = 0 for _ in range(int(self.sensor_stable_duration / self.sensor1_loop_delay)): responses = self.get_all_sensor_responses('sensors') response = responses.get(sensor_name) if not response: stable_count = 0 else: status_code = self.parse_status_code(response) if status_code in self.required_codes: stable_count += 1 if stable_count >= self.sensor_max_attempts: return True else: stable_count = 0 time.sleep(self.sensor1_loop_delay) return False def is_valid_sensor_signal_stable(self, sensor_name, detection_duration=3.0, stability_duration=2.5, check_interval=0.1): """ 检测在指定时间窗口内是否存在持续稳定的有效信号 参数: sensor_name: 传感器名称 detection_duration: 总检测时间窗口(秒),默认为3秒 stability_duration: 信号需要持续稳定的时间(秒),默认为2.5秒 check_interval: 检测间隔(秒),默认为0.1秒 返回: True: 在时间窗口内检测到持续稳定的有效信号 False: 未检测到持续稳定的有效信号 """ stable_start_time = None # 记录首次检测到有效信号的时间 start_time = time.time() if not self.is_valid_sensor(sensor_name): return False # 传感器状态无效,返回 else: stable_start_time = time.time() # 首次检测到有效信号 time.sleep(check_interval) while time.time() - start_time < detection_duration: temp_is_valid = self.is_valid_sensor(sensor_name) if temp_is_valid: if time.time() - stable_start_time >= stability_duration: return True # 信号持续稳定达到要求时间 else: stable_start_time = time.time() # 信号不稳定,重置计时 time.sleep(check_interval) return False def is_valid_sensor_status_1(self, sensor_name): stable_count = 0 for _ in range(int(self.sensor_stable_duration / self.sensor2_loop_delay)): responses = self.get_all_sensor_responses('sensors') response = responses.get(sensor_name) if not response: print(f"[警告] 无法获取 {sensor_name} 的响应,尝试重试...") stable_count = 0 else: status_code = self.parse_status_code(response) if status_code in self.required_codes_1: stable_count += 1 if stable_count >= self.sensor_max_attempts: return True else: stable_count = 0 time.sleep(self.sensor2_loop_delay) return False def is_valid_sensor2_status_lost(self, sensor_name): stable_count = 0 _try_nums=5 # 尝试次数 for _ in range(_try_nums): responses = self.get_all_sensor_responses('sensors') response = responses.get(sensor_name) if not response: print(f"[警告] 无法获取 {sensor_name} 的响应,尝试重试...") stable_count = 0 else: status_code = self.parse_status_code(response) if status_code not in self.required_codes_1: stable_count += 1 if stable_count >= self.sensor_max_attempts: return True else: stable_count = 0 time.sleep(self.sensor2_loop_lost) return False def is_valid_sensor(self,sensor_name): """ 检查传感器状态是否有效 参数: sensor_name: 传感器名称 返回: True: 传感器状态有效 False: 传感器状态无效 """ stable_count = 0 _try_nums=5 # 尝试次数 for _ in range(_try_nums): responses = self.get_all_sensor_responses('sensors') response = responses.get(sensor_name) if not response: print(f"[警告] 无法获取 {sensor_name} 的响应,尝试重试...") return False else: temp_status_code = self.parse_status_code(response) if temp_status_code in self.required_codes_1: stable_count += 1 if stable_count >= 3: return True else: stable_count = 0 time.sleep(self.sensor2_loop_lost) return False def is_valid_sensor_stable(self,sensor_name): """ 检查传感器状态是否有效 参数: sensor_name: 传感器名称 返回: True: 传感器状态有效 False: 传感器状态无效 """ if not self.is_valid_sensor(sensor_name): return False #需要增加超时时间,否则会一直等待 stable_count = 0 _try_nums=10 # 尝试次数 for _ in range(_try_nums): responses = self.get_all_sensor_responses('sensors') response = responses.get(sensor_name) if response: temp_status_code = self.parse_status_code(response) if temp_status_code in self.required_codes_1: stable_count += 1 print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}:检测到信号,已检测 {stable_count} 次") if stable_count >= 8: return True else: print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}:未检测到信号,已检测 {stable_count} 次") else: print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}:[警告] 无法获取 {sensor_name} 的响应,尝试重试...") # else: # stable_count = 0 time.sleep(self.sensor2_loop_delay) return False if __name__ == "__main__": # logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # 配置日志同时输出到控制台与文件 log_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') file_handler = logging.FileHandler('relay_controller.log', encoding='utf-8') file_handler.setLevel(logging.INFO) file_handler.setFormatter(log_formatter) console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) console_handler.setFormatter(log_formatter) logger = logging.getLogger() logger.setLevel(logging.INFO) logger.addHandler(file_handler) logger.addHandler(console_handler) relay_controller = RelayController() _count=0 while True: responses = relay_controller.get_all_sensor_responses('sensors') response = responses.get('sensor2') if response: temp_status_code = relay_controller.parse_status_code(response) if temp_status_code in relay_controller.required_codes_1: _count+=1 logger.info(f"检测到信号,连续 {_count} 次") else: _count=0 logger.info(f"未检测到信号") else: logger.info(f"[警告] 无法获取响应") time.sleep(0.2)