#!/usr/bin/env python # -*- coding: utf-8 -*- import socket import binascii import time import threading import logging from PySide6.QtCore import Signal, QObject import numpy as np class RelayController: need_origin_signal = Signal(str) take_no_photo_sigal = Signal() update_detect_image = Signal(np.ndarray) log_signal = Signal(int, str) def __init__(self, host='192.168.0.18', port=50000): self.host = host self.port = port # 控件映射 self.CONVEYOR1 = 'conveyor1' self.PUSHER = 'pusher' self.CONVEYOR2 = 'conveyor2' self.CLAMP = 'clamp' self.PUSHER1 = 'pusher1' self.SENSOR1 = 'sensor1' self.SENSOR2 = 'sensor2' self.valve_commands = { self.CONVEYOR1: {'open': '00000000000601050000FF00', 'close': '000000000006010500000000'}, self.PUSHER: {'open': '00000000000601050001FF00', 'close': '000000000006010500010000'}, #self.CONVEYOR2: {'open': '00000000000601050002FF00', 'close': '000000000006010500020000'}, self.CONVEYOR2: {'open': '000100000006020620000012', 'close': '000100000006020620000001'}, self.CLAMP: {'open': '00000000000601050003FF00', 'close': '000000000006010500030000'}, self.PUSHER1: {'open': '00000000000601050004FF00', 'close': '000000000006010500040000'}# } self.read_status_command = { 'devices': '000000000006010100000008', 'sensors': '000000000006010200000008' } self.device_bit_map = { self.CONVEYOR1: 0, self.PUSHER: 1, self.CONVEYOR2: 2, self.CLAMP: 3, self.PUSHER1: 4, } self.sensor_bit_map = { self.SENSOR1: 0, self.SENSOR2: 1,# } self.device_name_map = { self.CONVEYOR1: "传送带1", self.PUSHER: "推板开", self.CONVEYOR2: "传送带2", self.CLAMP: "机械臂夹爪", self.PUSHER1: "推板关", } self.sensor_name_map = { self.SENSOR1: '位置传感器1', self.SENSOR2: '位置传感器2', } # 传感器状态变量 self._running = False self._sensor1_thread = None self._sensor2_thread = None # 配置项 self.required_codes = {'0101', '0103'} # 有效状态码 self.required_codes_1 = { '0102', '0103'} # 有效状态码(需要修改) self.stable_duration = 1.0 # 稳定检测时间(秒) self.max_attempts = 3 # 连续检测次数 self.poll_interval = 0.2 # 检测间隔 # 状态锁和防抖 self.sensor1_triggered = False self.sensor1_last_time = 0 self.sensor1_debounce = 2.0 # 传感器2状态变量 self.sensor2_ready = True self.motor_stopped_by_sensor2 = False def send_command(self, command_hex, retry_count=2, source='unknown'): byte_data = binascii.unhexlify(command_hex) for attempt in range(retry_count): try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.settimeout(10) sock.connect((self.host, self.port)) sock.send(byte_data) response = sock.recv(1024) hex_response = binascii.hexlify(response).decode('utf-8') #if source == 'sensor': #print(f"[传感器响应] {hex_response}") #elif source == 'device': #print(f"[设备控制响应] {hex_response}") #else: #print(f"[通信响应] {hex_response}") return response except Exception as e: print(f"通信错误 ({source}): {e}, 尝试重连... ({attempt + 1}/{retry_count})") time.sleep(5) self.trigger_alarm() return None def trigger_alarm(self): print("警告:连续多次通信失败,请检查设备连接!") def get_all_device_status(self, command_type='devices'): command = self.read_status_command.get(command_type) if not command: print(f"未知的读取类型: {command_type}") return {} source = 'sensor' if command_type == 'sensors' else 'device' response = self.send_command(command, source=source) status_dict = {} if response and len(response) >= 10: status_byte = response[9] status_bin = f"{status_byte:08b}"[::-1] bit_map = self.device_bit_map if command_type == 'devices' else self.sensor_bit_map name_map = self.device_name_map if command_type == 'devices' else self.sensor_name_map for key, bit_index in bit_map.items(): status_dict[key] = status_bin[bit_index] == '1' else: print(f"[{command_type}] 读取状态失败或响应无效") return status_dict def get_all_sensor_responses(self, command_type='sensors'): """ 获取所有传感器的原始 Modbus 响应字符串 示例:{'sensor1': '00000000000401020101', 'sensor2': '00000000000401020100'} """ command = self.read_status_command.get(command_type) if not command: print(f"未知的读取类型: {command_type}") return {} source = 'sensor' if command_type == 'sensors' else 'device' response = self.send_command(command, source=source) responses = {} if response and len(response) >= 10: hex_response = binascii.hexlify(response).decode('utf-8') print(f"[原始响应][{command_type}] {hex_response}") # 假设传感器数据从第 9 字节开始,长度为 2 字节 for name, bit_index in self.sensor_bit_map.items(): offset = 9 + (bit_index // 8) bit_pos = bit_index % 8 byte = response[offset] status = (byte >> bit_pos) & 1 responses[name] = hex_response else: print(f"[{command_type}] 无法获取响应数据") return responses def parse_status_code(self, response): """ 从 Modbus 响应字符串中提取状态码(后两位) 示例:00000000000401020101 -> '01' """ if isinstance(response, str) and len(response) >= 18: return response[16:20] return None def is_valid_sensor_status(self, sensor_name: object) -> object: """ 检查传感器是否在稳定时间内连续返回有效状态码(01 或 03) """ stable_count = 0 for _ in range(int(self.stable_duration / self.poll_interval)): responses = self.get_all_sensor_responses('sensors') response = responses.get(sensor_name) if not response: print(f"[警告] 无法获取 {sensor_name} 的响应,尝试重试...") stable_count = 0 else: status_code = self.parse_status_code(response) if status_code in self.required_codes: stable_count += 1 if stable_count >= self.max_attempts: return True else: stable_count = 0 print(f"[警告] {sensor_name} 状态码无效: {status_code}") time.sleep(self.poll_interval) return False def is_valid_sensor_status_1(self, sensor_name: object) -> object: """ 检查传感器是否在稳定时间内连续返回有效状态码(01 或 03) """ stable_count = 0 for _ in range(int(self.stable_duration / self.poll_interval)): responses = self.get_all_sensor_responses('sensors') response = responses.get(sensor_name) if not response: print(f"[警告] 无法获取 {sensor_name} 的响应,尝试重试...") stable_count = 0 else: status_code = self.parse_status_code(response) if status_code in self.required_codes_1: stable_count += 1 if stable_count >= self.max_attempts: return True else: stable_count = 0 print(f"[警告] {sensor_name} 状态码无效: {status_code}") time.sleep(self.poll_interval) return False def open(self, conveyor1=False, pusher=False, conveyor2=False, clamp=False, pusher1=False): status = self.get_all_device_status() if conveyor1 and not status.get(self.CONVEYOR1, False): print("打开传送带1") self.send_command(self.valve_commands[self.CONVEYOR1]['open'], source='device') time.sleep(1) if pusher and not status.get(self.PUSHER, False): print("打开推板") self.send_command(self.valve_commands[self.PUSHER]['open'], source='device') time.sleep(0.05) if conveyor2 and not status.get(self.CONVEYOR2, False): print("打开传送带2") self.send_command(self.valve_commands[self.CONVEYOR2]['open'], source='device') time.sleep(1) if clamp and not status.get(self.CLAMP, False): print("启动机械臂抓夹") self.send_command(self.valve_commands[self.CLAMP]['open'], source='device') time.sleep(0.5) if pusher1 and not status.get(self.PUSHER1, False): print("关闭推板") self.send_command(self.valve_commands[self.PUSHER1]['open'], source='device') time.sleep(0.05) def close(self, conveyor1=False, pusher=False, conveyor2=False, clamp=False, pusher1=False): status = self.get_all_device_status() if conveyor1 : #if conveyor1 and status.get(self.CONVEYOR1, True): print("关闭传送带1") self.send_command(self.valve_commands[self.CONVEYOR1]['close'], source='device') time.sleep(1) if pusher : #if pusher and status.get(self.PUSHER, True): print("关闭推板") self.send_command(self.valve_commands[self.PUSHER]['close'], source='device') time.sleep(0.05) if conveyor2 : #if conveyor2 and status.get(self.CONVEYOR2, True): print("关闭传送带2") self.send_command(self.valve_commands[self.CONVEYOR2]['close'], source='device') time.sleep(1) if clamp : #if clamp and status.get(self.CLAMP, True): print("停止机械臂抓夹") self.send_command(self.valve_commands[self.CLAMP]['close'], source='device') time.sleep(0.5) if pusher1 : #if pusher and status.get(self.PUSHER1, True): print("关闭推板_1") self.send_command(self.valve_commands[self.PUSHER1]['close'], source='device') time.sleep(0.05) def handle_sensor1(self): while self._running: try: # 检查传感器是否返回有效状态码(01 或 03) if self.is_valid_sensor_status(self.SENSOR1): current_time = time.time() # 判断是否已触发 或 是否在防抖时间内 if not self.sensor1_triggered and (current_time - self.sensor1_last_time) > self.sensor1_debounce: print("✅ SENSOR1 检测到有效信号,开始执行推料流程") # 标记已触发,防止重复执行 self.sensor1_triggered = True self.sensor1_last_time = current_time # 1. 停止包装机皮带电机(关闭) self.close(conveyor1=True) time.sleep(0.5) # 2. 推板开启 self.open(pusher=True) time.sleep(0.1) self.close(pusher=True) # 2结束 time.sleep(3) # 保持 3 秒 # 3. 包装机皮带电机开启 self.open(conveyor1=True) time.sleep(0.5) # 4. 推板关闭 #self.close(pusher=True) self.open(pusher1=True) time.sleep(0.1) self.close(pusher1=True) time.sleep(1) # 5. 状态检查(可选) status = self.get_all_device_status() if status.get('conveyor1') and not status.get('pusher'): print("🟢流程完成1:皮带运行中,推板已收回") else: print("⚠️ 状态异常,请检查设备") # 流程结束,重置触发标志 self.sensor1_triggered = False # 如果传感器无效,确保触发标志可重置(可选) time.sleep(0.2) except Exception as e: print(f"SENSOR1 处理错误: {e}") self.sensor1_triggered = False time.sleep(1) #传感器2检测到料包 → 立即停止 conveyor2」这个逻辑 放在传感器线程中处理 def handle_sensor2(self): while self._running: try: # 检测传感器2状态 #self.sensor2_ready = None if self.is_valid_sensor_status_1(self.SENSOR2): print("✅ SENSOR2 检测到有效信号,开始执行关闭滚筒电机流程") if not self.sensor2_ready: #self.log_signal.emit(logging.INFO, "🟢 传感器2检测到料包到位,立即停止 conveyor2") # ✅ 立即停止电机(不管机器人是否在抓取) self.close(conveyor2=True) print("执行关闭") self.motor_stopped_by_sensor2 = True # 标记为传感器2触发停止 self.sensor2_ready = True else: if self.sensor2_ready and self.motor_stopped_by_sensor2: #self.log_signal.emit(logging.INFO, "🟡 传感器2未检测到料包,准备重新启动 conveyor2") # ✅ 重新启动 conveyor2(可选) self.open(conveyor2=True) self.motor_stopped_by_sensor2 = False self.sensor2_ready = False time.sleep(0.5) except Exception as e: self.log_signal.emit(logging.ERROR, f"🔴 SENSOR2 处理错误: {e}") time.sleep(1) def start(self): if self._running: print("线程已经在运行") return print("启动传感器线程") self._running = True self._sensor1_thread = threading.Thread(target=self.handle_sensor1, daemon=True) self._sensor2_thread = threading.Thread(target=self.handle_sensor2, daemon=True) self._sensor1_thread.start() self._sensor2_thread.start() def stop(self): if not self._running: print("线程未在运行") return print("停止传感器线程") self._running = False if self._sensor1_thread: self._sensor1_thread.join() if self._sensor2_thread: self._sensor2_thread.join() print("传感器线程已终止。") def start_sensor1_only(self): if self._running: print("传感器线程已经在运行") return print("启动传感器1监听线程...") self._running = True self._sensor1_thread = threading.Thread(target=self.handle_sensor1, daemon=True) self._sensor1_thread.start() if __name__ == '__main__': controller = RelayController() controller.start() try: while True: time.sleep(1) except KeyboardInterrupt: controller.stop()