#!/usr/bin/env python # -*- coding: utf-8 -*- import socket import binascii import time import threading class RelayController: def __init__(self, host='192.168.0.18', port=50000): self.host = host self.port = port # 控件映射 self.CONVEYOR1 = 'conveyor1' self.PUSHER = 'pusher' self.CONVEYOR2 = 'conveyor2' self.CLAMP = 'clamp' self.SENSOR1 = 'sensor1' self.SENSOR2 = 'sensor2' self.valve_commands = { self.CONVEYOR1: {'open': '00000000000601050000FF00', 'close': '000000000006010500000000'}, self.PUSHER: {'open': '00000000000601050001FF00', 'close': '000000000006010500010000'}, self.CONVEYOR2: {'open': '00000000000601050002FF00', 'close': '000000000006010500020000'}, self.CLAMP: {'open': '00000000000601050003FF00', 'close': '000000000006010500030000'} } self.read_status_command = { 'devices': '000000000006010100000008', 'sensors': '000000000006010200000008' } self.device_bit_map = { self.CONVEYOR1: 0, self.PUSHER: 1, self.CONVEYOR2: 2, self.CLAMP: 3, } self.sensor_bit_map = { self.SENSOR1: 0, self.SENSOR2: 1, } self.device_name_map = { self.CONVEYOR1: "传送带1", self.PUSHER: "推板", self.CONVEYOR2: "传送带2", self.CLAMP: "机械臂" } self.sensor_name_map = { self.SENSOR1: '位置传感器1', self.SENSOR2: '位置传感器2', } self._running = False self._sensor1_thread = None self._sensor2_thread = None def send_command_old(self, command_hex): byte_data = binascii.unhexlify(command_hex) with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: try: sock.connect((self.host, self.port)) sock.send(byte_data) response = sock.recv(1024) print(f"收到响应: {binascii.hexlify(response)}") return response except Exception as e: print(f"通信错误: {e}") return None def start_sensor1_only(self): if self._running: print("线程已经在运行") return print("启动传感器1监听线程") self._running = True self._sensor1_thread = threading.Thread(target=self.handle_sensor1, daemon=True) self._sensor1_thread.start() def send_command(self, command_hex, retry_count=5, source='unknown'): byte_data = binascii.unhexlify(command_hex) for attempt in range(retry_count): try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.settimeout(10) sock.connect((self.host, self.port)) sock.send(byte_data) response = sock.recv(1024) hex_response = binascii.hexlify(response).decode('utf-8') # 根据 source 区分响应来源 if source == 'sensor': print(f"[传感器响应] {hex_response}") elif source == 'device': print(f"[设备控制响应] {hex_response}") else: print(f"[通信响应] {hex_response}") return response except Exception as e: print(f"通信错误 ({source}): {e}, 尝试重连... ({attempt + 1}/{retry_count})") if attempt == retry_count - 1: self.trigger_alarm() time.sleep(5) return None def trigger_alarm(self): """当通信错误超过最大重试次数时触发报警""" print("警告:连续多次通信失败,请检查设备连接!") # 这里可以添加更多的报警措施,如发送邮件、短信或声音警报等 def get_all_device_status(self, command_type='devices'): command = self.read_status_command.get(command_type) if not command: print(f"未知的读取类型: {command_type}") return {} # 设置来源标识 source = 'sensor' if command_type == 'sensors' else 'device' response = self.send_command(command, source=source) status_dict = {} if response and len(response) >= 10: status_byte = response[9] status_bin = f"{status_byte:08b}"[::-1] bit_map = self.device_bit_map if command_type == 'devices' else self.sensor_bit_map name_map = self.device_name_map if command_type == 'devices' else self.sensor_name_map for key, bit_index in bit_map.items(): status_dict[key] = status_bin[bit_index] == '1' else: print(f"[{command_type}] 读取状态失败或响应无效") return status_dict def get_device_status_old(self, name, command_type='devices'): return self.get_all_device_status(command_type).get(name, None) def get_device_status(self, name, command_type='devices', stable_duration=1.0, max_attempts=3): """ 获取指定设备/传感器的状态,只有在连续稳定检测到信号后才返回 True。 参数: name (str): 设备/传感器名称,如 SENSOR1 command_type (str): 类型,'devices' 或 'sensors' stable_duration (float): 信号需要稳定的持续时间(秒) max_attempts (int): 最大尝试次数(用于稳定性判断) 返回: bool or None: 稳定检测到信号返回 True,否则返回 False 或 None(失败) """ stable_count = 0 interval = 0.2 # 每隔多久检查一次 for _ in range(int(stable_duration / interval)): statuses = self.get_all_device_status(command_type) status = statuses.get(name) if status is True: stable_count += 1 if stable_count >= max_attempts: return True elif status is False: return False else: # None 表示读取失败 print(f"[警告] 读取 {name} 状态失败,尝试重试...") stable_count = 0 time.sleep(interval) return False # 默认返回 False,避免误触发 def open(self, conveyor1=False, pusher=False, conveyor2=False, clamp=False): status = self.get_all_device_status() if conveyor1 and not status.get(self.CONVEYOR1, False): print("打开传送带1") self.send_command(self.valve_commands[self.CONVEYOR1]['open'], source='device') time.sleep(1) if pusher and not status.get(self.PUSHER, False): print("打开推板") self.send_command(self.valve_commands[self.PUSHER]['open'], source='device') time.sleep(0.05) if conveyor2 and not status.get(self.CONVEYOR2, False): print("打开传送带2") self.send_command(self.valve_commands[self.CONVEYOR2]['open'], source='device') time.sleep(1) if clamp and not status.get(self.CLAMP, False): print("启动机械臂抓夹") self.send_command(self.valve_commands[self.CLAMP]['open'], source='device') time.sleep(0.5) def close(self, conveyor1=False, pusher=False, conveyor2=False, clamp=False): status = self.get_all_device_status() if conveyor1 and status.get(self.CONVEYOR1, True): print("关闭传送带1") self.send_command(self.valve_commands[self.CONVEYOR1]['close'], source='device') time.sleep(1) if pusher and status.get(self.PUSHER, True): print("关闭推板") self.send_command(self.valve_commands[self.PUSHER]['close'], source='device') time.sleep(0.05) if conveyor2 and status.get(self.CONVEYOR2, True): print("关闭传送带2") self.send_command(self.valve_commands[self.CONVEYOR2]['close'], source='device') time.sleep(1) if clamp and status.get(self.CLAMP, True): print("停止机械臂抓夹") self.send_command(self.valve_commands[self.CLAMP]['close'], source='device') time.sleep(0.5) def handle_sensor1(self): while self._running: try: if self.get_device_status(self.SENSOR1, 'sensors'): print("SENSOR1 检测到信号,执行流程") self.close(conveyor1=True) time.sleep(2) self.open(pusher=True) time.sleep(5) self.close(pusher=True) time.sleep(2) self.open(conveyor1=True) time.sleep(0.5) except Exception as e: print(f"SENSOR1 处理错误: {e}") def handle_sensor2(self): while self._running: try: if self.get_device_status(self.SENSOR2, 'sensors'): print("SENSOR2 检测到信号,执行流程") self.close(conveyor2=True) time.sleep(2) self.open(clamp=True) time.sleep(2) self.open(conveyor2=True) time.sleep(0.5) except Exception as e: print(f"SENSOR2 处理错误: {e}") def start(self): if self._running: print("线程已经在运行") return print("启动传感器线程") self._running = True self._sensor1_thread = threading.Thread(target=self.handle_sensor1, daemon=True) self._sensor2_thread = threading.Thread(target=self.handle_sensor2, daemon=True) self._sensor1_thread.start() self._sensor2_thread.start() def stop(self): if not self._running: print("线程未在运行") return print("停止传感器线程") self._running = False # 设置标志位为 False if self._sensor1_thread is not None: self._sensor1_thread.join() # 等待线程结束 if self._sensor2_thread is not None: self._sensor2_thread.join() print("传感器线程已终止。")