#!/usr/bin/env python # -*- coding: utf-8 -*- ''' # @Time : 2025/12/12 14:39 # @Author : reenrr # @File : EMV.py # @Desc : 网络继电器控制输入、输出设备 ''' import socket import binascii import time from threading import Event, Lock import threading import logging # 网络继电器的 IP 和端口 HOST = '192.168.1.18' PORT = 50000 # 控件命名映射 SOLENOID_VALVE1 = 'solenoid_valve1' # 控制排料机构NG时的电磁阀1 SOLENOID_VALVE2 = 'solenoid_valve2' # 控制排料机构NG时的电磁阀2 SOLENOID_VALVE3 = 'solenoid_valve3' # 控制吸取设备的电磁阀3 # 传感器命名映射 CONVEYOR1_SENSOR = 'conveyor1_sensor' # 传送带1的行程开关 CONVEYOR2_SENSOR = 'conveyor2_sensor' # 传送带2的行程开关 PRESS_SENSOR1 = 'press_sensor1' # 传送带1旁边的按压开关1 PRESS_SENSOR2 = 'press_sensor2' # 传送带1旁边的按压开关2 FIBER_SENSOR = 'fiber_sensor' # 传送带1旁边的光纤传感器 # 控件控制报文 valve_commands = { SOLENOID_VALVE1: { 'open': '00000000000601050000FF00', 'close': '000000000006010500000000', }, SOLENOID_VALVE2: { 'open': '00000000000601050001FF00', 'close': '000000000006010500010000', }, SOLENOID_VALVE3: { 'open': '00000000000601050002FF00', 'close': '000000000006010500020000', } } # 读取状态命令 read_status_command = { 'devices': '000000000006010100000008', 'sensors': '000000000006010200000008' } # 控件对应 DO 位(从低到高) device_bit_map = { SOLENOID_VALVE1: 0, SOLENOID_VALVE2: 1, SOLENOID_VALVE3: 2, } device_name_map = { SOLENOID_VALVE1: "电磁阀1", SOLENOID_VALVE2: "电磁阀2", SOLENOID_VALVE3: "电磁阀3", } # 传感器对应位(从低到高) sensor_bit_map = { CONVEYOR1_SENSOR: 0, CONVEYOR2_SENSOR: 1, PRESS_SENSOR1: 2, PRESS_SENSOR2: 3, FIBER_SENSOR: 6 # 根据你继电器的配置,继续添加更多传感器 } sensor_name_map = { CONVEYOR1_SENSOR: '传送带1开关', CONVEYOR2_SENSOR: '传送带2开关', PRESS_SENSOR1: '按压开关1', PRESS_SENSOR2: '按压开关2', FIBER_SENSOR: '光纤传感器' } # -------------全局事件------------- sensor_triggered = Event() fiber_triggered = Event() # 光纤传感器触发事件 fiber_lock = Lock() # 线程锁,保护共享变量 valve1_open_time = 0.0 # 电磁阀1打开时间戳 valve1_open_flag = False # 电磁阀1打开标志 class RelayController: def __init__(self): """初始化继电器控制器""" self.socket = None def send_command(self, command): """ 将十六进制字符串转换为字节数据并发送 :param command: 十六进制字符串 :return: 响应字节数据 / False """ try: byte_data = binascii.unhexlify(command) # 创建套接字并连接到继电器 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.connect((HOST, PORT)) sock.send(byte_data) # 接收响应 response = sock.recv(1024) # logging.info(f"收到响应: {binascii.hexlify(response)}") # 校验响应 return response except Exception as e: logging.info(f"通信错误: {e}") return False def get_all_device_status(self, command_type='devices'): """ 获取所有设备/传感器状态 :param command_type: 'devices'(控件) / 'sensors'(传感器) :return: 状态字典 {设备名: 状态(bool)} """ command = read_status_command.get(command_type) if not command: logging.info(f"未知的读取类型: {command_type}") return {} response = self.send_command(command) status_dict = {} if response and len(response) >= 10: status_byte = response[9] # 状态在第10字节 status_bin = f"{status_byte:08b}"[::-1] if command_type == 'devices': bit_map = device_bit_map name_map = device_name_map elif command_type == 'sensors': bit_map = sensor_bit_map name_map = sensor_name_map else: logging.info("不支持的映射类型") return{} for key, bit_index in bit_map.items(): state = status_bin[bit_index] == '1' status_dict[key] = state # readable = "开启" if state else "关闭" # logging.info(f"{device.capitalize()} 状态: {readable}") else: logging.info("读取状态失败或响应无效") return status_dict def get_device_status(self, device_name, command_type='devices'): """ 获取单个控件/传感器状态 :param device_name:设备名称 :param command_type: 'devices'/'sensors' :return:True(开启) / False(关闭) / None(无法读取) """ status = self.get_all_device_status(command_type) return status.get(device_name, None) def open(self, solenoid_valve1=False, solenoid_valve2=False, solenoid_valve3=False): """ 根据状态决定是否执行开操作 :param solenoid_valve1:是否打开电磁阀1 :param solenoid_valve2:是否打开电磁阀2 :param solenoid_valve3:是否打开电磁阀3 :return: """ global valve1_open_time, valve1_open_flag status = self.get_all_device_status() if solenoid_valve1 and not status.get(SOLENOID_VALVE1, False): logging.info("打开电磁阀1") self.send_command(valve_commands[SOLENOID_VALVE1]['open']) # 记录电磁阀1打开时的时间戳和标志 with fiber_lock: valve1_open_time = time.time() valve1_open_flag = True if solenoid_valve2 and not status.get(SOLENOID_VALVE2, False): logging.info("打开电磁阀2") self.send_command(valve_commands[SOLENOID_VALVE2]['open']) if solenoid_valve3 and not status.get(SOLENOID_VALVE3, False): logging.info("打开电磁阀3") self.send_command(valve_commands[SOLENOID_VALVE3]['open']) time.sleep(1) # 实际测试需要考虑这个延时是否合适 # 根据状态决定是否执行关操作 def close(self, solenoid_valve1=False, solenoid_valve2=False, solenoid_valve3=False): """ 根据状态决定是否执行关操作 :param solenoid_valve1:是否关闭电磁阀1 :param solenoid_valve2:是否关闭电磁阀2 :param solenoid_valve3:是否关闭电磁阀3 :return: """ global valve1_open_flag status = self.get_all_device_status() if solenoid_valve1 and status.get(SOLENOID_VALVE1, True): logging.info("关闭电磁阀1") self.send_command(valve_commands[SOLENOID_VALVE1]['close']) # 重置电磁阀1打开标志 with fiber_lock: valve1_open_flag = False if solenoid_valve2 and status.get(SOLENOID_VALVE2, True): logging.info("关闭电磁阀2") self.send_command(valve_commands[SOLENOID_VALVE2]['close']) if solenoid_valve2 and status.get(SOLENOID_VALVE3, True): logging.info("关闭电磁阀3") self.send_command(valve_commands[SOLENOID_VALVE3]['close']) time.sleep(1) # 实际测试需要考虑这个延时是否合适 def control_solenoid(self): """ 控制电磁阀,并检测光纤传感器触发状态 """ global fiber_triggered try: # 重置光纤传感器触发事件 fiber_triggered.clear() # 同时打开 self.open(solenoid_valve1=True, solenoid_valve2=True) logging.info("电磁阀1、2已打开") # 等待线条掉落(最多等待1秒) timeout = 2.0 start_time = time.time() fiber_detected = False # 等待红外传感器触发或超时 while time.time() - start_time < timeout: if fiber_triggered.is_set(): fiber_detected = True logging.info("该NG线条掉入费料区") break else: logging.info("出问题!!!,红外传感器未检测到线条") time.sleep(0.2) # 等待线条掉落 self.close(solenoid_valve1=True, solenoid_valve2=True) logging.info("电磁阀1、2已关闭") except Exception as e: logging.info(f"操作电磁阀失败:{str(e)}") def fiber_sensor_monitor(self): """ 光纤传感器监听线程,专门检测电磁阀打开后的触发状态 """ global fiber_triggered, valve_open_time, valve_open_flag logging.info("光纤传感器监听线程已启动") while True: try: # 增加短休眠,降低CPU占用,避免错过信号 time.sleep(0.005) # 获取光纤传感器状态 fiber_status = self.get_device_status(FIBER_SENSOR, 'sensors') # 检测是否检测到信号 if fiber_status: with fiber_lock: # 检查电磁阀1是否处于打开状态 if valve1_open_flag: fiber_triggered.set() # 防止重复触发 time.sleep(0.1) fiber_triggered.clear() except Exception as e: logging.info(f"光纤传感器监听异常:{e}") time.sleep(0.1) # 异常时增加休眠 def press_sensors_monitor(self, check_interval=0.1): """ 双压传感器监听线程 :param check_interval: 检测间隔 :return: """ global sensor_triggered logging.info("双压传感器监听线程已启动") while True: # 检测两个传感器任意一个是否触发 press_sensor1_status = self.get_device_status(PRESS_SENSOR1, 'sensors') press_sensor2_status = self.get_device_status(PRESS_SENSOR2, 'sensors') if press_sensor1_status or press_sensor2_status: sensor_triggered.set() # 触发事件,通知主线程 logging.info("双压传感器触发:线条已落到传送带") # 重置事件(等待下一次触发) time.sleep(1) # 防重复触发 sensor_triggered.clear() time.sleep(check_interval) # 全局初始化:启动传感器监听线程 def init_sensor_monitor(): relay = RelayController() press_sensor_thread = threading.Thread( target=relay.press_sensors_monitor, args=(0.1,), daemon=True ) # press_sensor_thread.start() # 启动红外传感器监听线程 infrared_sensor_thread = threading.Thread( target=relay.fiber_sensor_monitor, daemon=True ) # infrared_sensor_thread.start() return relay # 全局继电器实例 global_relay = init_sensor_monitor() # ------------对外接口---------- def control_solenoid(): """ 控制电磁阀,并检测光纤传感器触发状态 """ # 创建控制器实例 controller = RelayController() global fiber_triggered try: # 重置光纤传感器触发事件 fiber_triggered.clear() # 同时打开 controller.open(solenoid_valve1=True, solenoid_valve2=True) logging.info("电磁阀1、2已打开") # 等待线条掉落(最多等待1秒) timeout = 2.0 start_time = time.time() fiber_detected = False # 等待光纤传感器触发或超时 while time.time() - start_time < timeout: if fiber_triggered.is_set(): fiber_detected = True logging.info("该NG线条掉入费料区") break else: logging.info("出问题!!!,红外传感器未检测到线条") time.sleep(0.2) # 等待线条掉落 controller.close(solenoid_valve1=True, solenoid_valve2=True) logging.info("电磁阀1、2已关闭") except Exception as e: logging.info(f"操作电磁阀失败:{str(e)}") # ------------测试接口------------- if __name__ == '__main__': control_solenoid()