diff --git a/EMV/EMV.py b/EMV/EMV.py index f384ddb..06a0e03 100644 --- a/EMV/EMV.py +++ b/EMV/EMV.py @@ -14,7 +14,7 @@ import threading import logging # 网络继电器的 IP 和端口 -HOST = '192.168.1.18' +HOST = '192.168.5.18' PORT = 50000 # 控件命名映射 @@ -312,12 +312,12 @@ def init_sensor_monitor(): ) # press_sensor_thread.start() - # 启动红外传感器监听线程 - infrared_sensor_thread = threading.Thread( + # 启动光纤传感器监听线程 + fiber_sensor_thread = threading.Thread( target=relay.fiber_sensor_monitor, daemon=True ) - # infrared_sensor_thread.start() + fiber_sensor_thread.start() return relay @@ -329,17 +329,14 @@ def control_solenoid(): """ 控制电磁阀,并检测光纤传感器触发状态 """ - # 创建控制器实例 - controller = RelayController() - - global fiber_triggered + global fiber_triggered, global_relay try: # 重置光纤传感器触发事件 fiber_triggered.clear() # 同时打开 - controller.open(solenoid_valve1=True, solenoid_valve2=True) + global_relay.open(solenoid_valve1=True, solenoid_valve2=True) logging.info("电磁阀1、2已打开") # 等待线条掉落(最多等待1秒) timeout = 2.0 @@ -355,7 +352,7 @@ def control_solenoid(): logging.info("出问题!!!,红外传感器未检测到线条") time.sleep(0.2) # 等待线条掉落 - controller.close(solenoid_valve1=True, solenoid_valve2=True) + global_relay.close(solenoid_valve1=True, solenoid_valve2=True) logging.info("电磁阀1、2已关闭") except Exception as e: logging.info(f"操作电磁阀失败:{str(e)}") diff --git a/EMV/EMV_test.py b/EMV/EMV_test.py new file mode 100644 index 0000000..f7d94aa --- /dev/null +++ b/EMV/EMV_test.py @@ -0,0 +1,382 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2026/1/8 16:52 +# @Author : reenrr +# @File : test.py +# @Desc : 网络继电器控制输入、输出设备测试程序 +''' +import socket +import binascii +import time +from threading import Event, Lock +import threading +import logging + +# 网络继电器的 IP 和端口 +HOST = '192.168.5.18' +PORT = 50000 + +# 控件命名映射 +SOLENOID_VALVE1 = 'solenoid_valve1' # 控制排料机构NG时的电磁阀1 +SOLENOID_VALVE2 = 'solenoid_valve2' # 控制排料机构NG时的电磁阀2 +SOLENOID_VALVE3 = 'solenoid_valve3' # 控制吸取设备的电磁阀3 + +# 传感器命名映射 +CONVEYOR1_SENSOR = 'conveyor1_sensor' # 传送带1的行程开关 +CONVEYOR2_SENSOR = 'conveyor2_sensor' # 传送带2的行程开关 +PRESS_SENSOR1 = 'press_sensor1' # 传送带1旁边的按压开关1 +PRESS_SENSOR2 = 'press_sensor2' # 传送带1旁边的按压开关2 +FIBER_SENSOR = 'fiber_sensor' # 传送带1旁边的光纤传感器 + +# 控件控制报文 +valve_commands = { + SOLENOID_VALVE1: { + 'open': '00000000000601050000FF00', + 'close': '000000000006010500000000', + }, + SOLENOID_VALVE2: { + 'open': '00000000000601050001FF00', + 'close': '000000000006010500010000', + }, + SOLENOID_VALVE3: { + 'open': '00000000000601050002FF00', + 'close': '000000000006010500020000', + } +} + +# 读取状态命令 +read_status_command = { + 'devices': '000000000006010100000008', + 'sensors': '000000000006010200000008' +} + +# 控件对应 DO 位(从低到高) +device_bit_map = { + SOLENOID_VALVE1: 0, + SOLENOID_VALVE2: 1, + SOLENOID_VALVE3: 2, +} + +device_name_map = { + SOLENOID_VALVE1: "电磁阀1", + SOLENOID_VALVE2: "电磁阀2", + SOLENOID_VALVE3: "电磁阀3", +} + +# 传感器对应位(从低到高) +sensor_bit_map = { + CONVEYOR1_SENSOR: 0, + CONVEYOR2_SENSOR: 1, + PRESS_SENSOR1: 2, + PRESS_SENSOR2: 3, + FIBER_SENSOR: 6 + # 根据你继电器的配置,继续添加更多传感器 +} + +sensor_name_map = { + CONVEYOR1_SENSOR: '传送带1开关', + CONVEYOR2_SENSOR: '传送带2开关', + PRESS_SENSOR1: '按压开关1', + PRESS_SENSOR2: '按压开关2', + FIBER_SENSOR: '光纤传感器' +} + +# -------------全局事件------------- +press_sensors_triggered = Event() +fiber_triggered = Event() # 光纤传感器触发事件 +fiber_lock = Lock() # 线程锁,保护共享变量 +valve1_open_flag = False # 电磁阀1打开标志 + + +class RelayController: + def __init__(self): + """初始化继电器控制器""" + self.socket = None + + # 线程相关状态 + self.fiber_thread = None # 保存光纤传感器线程对象 + self.fiber_monitor_running = False # 光纤传感器监听线程运行标志 + + self.press_sensors_thread = None # 保存按压开关线程对象 + self.press_sensors_monitor_running = False # 按压传感器监听线程运行标志 + + def send_command(self, command): + """ + 将十六进制字符串转换为字节数据并发送 + :param command: 十六进制字符串 + :return: 响应字节数据 / False + """ + try: + byte_data = binascii.unhexlify(command) + + # 创建套接字并连接到继电器 + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.connect((HOST, PORT)) + sock.send(byte_data) + # 接收响应 + response = sock.recv(1024) + # logging.info(f"收到响应: {binascii.hexlify(response)}") + # 校验响应 + return response + except Exception as e: + logging.info(f"通信错误: {e}") + return False + + def get_all_device_status(self, command_type='devices'): + """ + 获取所有设备/传感器状态 + :param command_type: 'devices'(控件) / 'sensors'(传感器) + :return: 状态字典 {设备名: 状态(bool)} + """ + command = read_status_command.get(command_type) + if not command: + logging.info(f"未知的读取类型: {command_type}") + return {} + + response = self.send_command(command) + status_dict = {} + + if response and len(response) >= 10: + status_byte = response[9] # 状态在第10字节 + status_bin = f"{status_byte:08b}"[::-1] + + if command_type == 'devices': + bit_map = device_bit_map + name_map = device_name_map + elif command_type == 'sensors': + bit_map = sensor_bit_map + name_map = sensor_name_map + else: + logging.info("不支持的映射类型") + return{} + + for key, bit_index in bit_map.items(): + state = status_bin[bit_index] == '1' + status_dict[key] = state + # readable = "开启" if state else "关闭" + # logging.info(f"{device.capitalize()} 状态: {readable}") + else: + logging.info("读取状态失败或响应无效") + + return status_dict + + def get_device_status(self, device_name, command_type='devices'): + """ + 获取单个控件/传感器状态 + :param device_name:设备名称 + :param command_type: 'devices'/'sensors' + :return:True(开启) / False(关闭) / None(无法读取) + """ + status = self.get_all_device_status(command_type) + return status.get(device_name, None) + + def open(self, solenoid_valve1=False, solenoid_valve2=False, solenoid_valve3=False): + """ + 根据状态决定是否执行开操作 + :param solenoid_valve1:是否打开电磁阀1 + :param solenoid_valve2:是否打开电磁阀2 + :param solenoid_valve3:是否打开电磁阀3 + :return: + """ + global valve1_open_flag + status = self.get_all_device_status() + + if solenoid_valve1 and not status.get(SOLENOID_VALVE1, False): + logging.info("打开电磁阀1") + self.send_command(valve_commands[SOLENOID_VALVE1]['open']) + # 记录电磁阀1打开时的时间戳和标志 + with fiber_lock: + valve1_open_flag = True + + if solenoid_valve2 and not status.get(SOLENOID_VALVE2, False): + logging.info("打开电磁阀2") + self.send_command(valve_commands[SOLENOID_VALVE2]['open']) + + if solenoid_valve3 and not status.get(SOLENOID_VALVE3, False): + logging.info("打开电磁阀3") + self.send_command(valve_commands[SOLENOID_VALVE3]['open']) + time.sleep(1) # 实际测试需要考虑这个延时是否合适 + + def close(self, solenoid_valve1=False, solenoid_valve2=False, solenoid_valve3=False): + """ + 根据状态决定是否执行关操作 + :param solenoid_valve1:是否关闭电磁阀1 + :param solenoid_valve2:是否关闭电磁阀2 + :param solenoid_valve3:是否关闭电磁阀3 + :return: + """ + global valve1_open_flag + + status = self.get_all_device_status() + + if solenoid_valve1 and status.get(SOLENOID_VALVE1, True): + logging.info("关闭电磁阀1") + self.send_command(valve_commands[SOLENOID_VALVE1]['close']) + # 重置电磁阀1打开标志 + with fiber_lock: + valve1_open_flag = False + + if solenoid_valve2 and status.get(SOLENOID_VALVE2, True): + logging.info("关闭电磁阀2") + self.send_command(valve_commands[SOLENOID_VALVE2]['close']) + + if solenoid_valve2 and status.get(SOLENOID_VALVE3, True): + logging.info("关闭电磁阀3") + self.send_command(valve_commands[SOLENOID_VALVE3]['close']) + time.sleep(1) # 实际测试需要考虑这个延时是否合适 + + def fiber_sensor_monitor(self): + """ + 光纤传感器监听线程,专门检测电磁阀打开后的触发状态 + """ + global fiber_triggered, valve1_open_flag + logging.info("光纤传感器监听线程已启动") + + while self.fiber_monitor_running: + try: + # 增加短休眠,降低CPU占用,避免错过信号 + time.sleep(0.005) + + # 获取光纤传感器状态 + fiber_status = self.get_device_status(FIBER_SENSOR, 'sensors') + # 检测是否检测到信号 + if fiber_status: + with fiber_lock: + # 检查电磁阀1是否处于打开状态 + if valve1_open_flag: + fiber_triggered.set() + # 防止重复触发 + time.sleep(0.1) + fiber_triggered.clear() + except Exception as e: + logging.info(f"光纤传感器监听异常:{e}") + time.sleep(0.1) # 异常时增加休眠 + + def start_fiber_monitor(self): + """启动光纤传感器监听线程""" + # 检查线程是否已在运行 + if self.fiber_monitor_running or (self.fiber_thread and self.fiber_thread.is_alive()): + logging.warning("光纤传感器监听线程已在运行,无需重复启动") + return + + # 启动线程 + self.fiber_monitor_running = True + self.fiber_thread = threading.Thread( + target=self.fiber_sensor_monitor, + daemon=True + ) + self.fiber_thread.start() + logging.info("光纤传感器监听线程启动成功") + + def stop_fiber_monitor(self): + """停止光纤传感器监听线程""" + self.fiber_monitor_running = False + # 等待线程完全退出 + if self.fiber_thread and self.fiber_thread.is_alive(): + self.fiber_thread.join(timeout=1) + logging.info("光纤传感器监听线程已停止") + + def press_sensors_monitor(self, check_interval=0.1): + """ + 双压传感器监听线程 + :param check_interval: 检测间隔 + :return: + """ + global press_sensors_triggered + logging.info("双压传感器监听线程已启动") + + while self.press_sensors_monitor_running: + try: + # 检测两个传感器任意一个是否触发 + press_sensor1_status = self.get_device_status(PRESS_SENSOR1, 'sensors') + press_sensor2_status = self.get_device_status(PRESS_SENSOR2, 'sensors') + if press_sensor1_status or press_sensor2_status: + press_sensors_triggered.set() # 触发事件,通知主线程 + logging.info("双压传感器触发:线条已落到传送带") + # 重置事件(等待下一次触发) + time.sleep(0.1) # 防重复触发 + press_sensors_triggered.clear() + time.sleep(check_interval) + except Exception as e: + logging.info(f"双压传感器监听异常: {e}") + time.sleep(0.1) # 异常时增加休眠 + + def start_press_sensors_monitor(self): + """启动双压传感器监听线程""" + # 检查线程是否已在运行 + if self.press_sensors_monitor_running or (self.press_sensors_thread and self.press_sensors_thread.is_alive()): + logging.warning("双压传感器监听线程已在运行,无需重复启动") + return + + # 启动线程 + self.press_sensors_monitor_running = True + self.press_sensors_thread = threading.Thread( + target=self.press_sensors_monitor, + daemon=True + ) + self.press_sensors_thread.start() + logging.info("双压传感器监听线程启动成功") + + def stop_press_sensors_monitor(self): + """停止光纤传感器监听线程""" + self.press_sensors_monitor_running = False + # 等待线程完全退出 + if self.press_sensors_thread and self.press_sensors_thread.is_alive(): + self.press_sensors_thread.join(timeout=1) + logging.info("双压传感器监听线程已停止") + + +# 全局继电器实例 +global_relay = RelayController() + +# ------------对外接口---------- +def control_solenoid(): + """ + 线条不合格场景专用:控制电磁阀+监听光纤传感器 + 执行流程:启动监听-->打开电磁阀-->等待检测-->关闭电磁阀-->停止监听 + """ + global fiber_triggered, global_relay + + try: + # 启动光纤传感器监听线程 + global_relay.start_fiber_monitor() + + # 重置光纤传感器触发事件,准备检测 + fiber_triggered.clear() + + # 同时打开电磁阀1、2 + global_relay.open(solenoid_valve1=True, solenoid_valve2=True) + logging.info("电磁阀1、2已打开") + + # 等待线条掉落(最多等待2秒) + timeout = 2.0 + start_time = time.time() + fiber_detected = False + # 等待光纤传感器触发或超时 + while time.time() - start_time < timeout: + if fiber_triggered.is_set(): + fiber_detected = True + logging.info("该NG线条掉入费料区") + break + time.sleep(0.01) # 降低CPU空转 + + if not fiber_detected: + logging.info("出问题!!!,红外传感器未检测到线条") + + # 关闭电磁阀1、2 + time.sleep(0.2) # 等待线条掉落 + global_relay.close(solenoid_valve1=True, solenoid_valve2=True) + logging.info("电磁阀1、2已关闭") + + # 停止光纤传感器监听线程 + global_relay.stop_fiber_monitor() + + except Exception as e: + logging.info(f"操作电磁阀失败:{str(e)}") + # 异常时也要停止线程,避免残留 + global_relay.stop_fiber_monitor() + +# ------------测试接口------------- +if __name__ == '__main__': + control_solenoid() \ No newline at end of file diff --git a/RK1106/stepper_motor.py b/RK1106/stepper_motor.py index 2098eff..f9dcbeb 100644 --- a/RK1106/stepper_motor.py +++ b/RK1106/stepper_motor.py @@ -9,6 +9,7 @@ import time from periphery import GPIO import logging +import os # ------------参数配置------------- # 1. 脉冲(PUL)引脚配置 → GPIO32 diff --git a/main_control.py b/main_control.py index 95d4500..da24a4c 100644 --- a/main_control.py +++ b/main_control.py @@ -10,41 +10,72 @@ import multiprocessing # 多进程模块 import threading from threading import Event import time -from EMV import sensor_triggered, global_relay, control_solenoid -from visual_algorithm import flaw_detection +from EMV.EMV_test import press_sensors_triggered, control_solenoid, global_relay +from visual_algorithm.visual_algorithm import flaw_detection +import logging +import os + +# ------------ 日志+参数配置 ------------ +script_dir = os.path.dirname(os.path.abspath(__file__)) +log_file_path = os.path.join(script_dir, "main_control.log") + +logging.basicConfig( + level=logging.INFO, + format='[%(asctime)s.%(msecs)03d] [%(levelname)s] %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + handlers=[ + logging.StreamHandler(), + logging.FileHandler(log_file_path, encoding='utf-8') + ] +) # ------------全局事件------------- manger = multiprocessing.Manager() conveyor_start_event = manger.Event() +def start_thread(): + """开启各种线程""" + global_relay.start_press_sensors_monitor() # 双压传感器监听线程 + +def stop_thread(): + """关闭各种线程""" + global_relay.stop_press_sensors_monitor() # 双压传感器监听线程 + def quality_testing(): - print("线条开始质量检测:") + logging.info("线条开始质量检测:") # 执行质量检测 result = flaw_detection({"line_id": "L001", "straightness": 0.95, "noise_ratio": 0.08}) if result == "qualified": result = "合格" - print("该线条是否合格:", result) - print("等待线条落到传送带(双压传感器触发)...") + logging.info("该线条是否合格:", result) + logging.info("等待线条落到传送带(双压传感器触发)...") # 等待时间触发,超时时间设为10秒(避免无限等待) - if sensor_triggered.wait(timeout=10): - print("线条已落到传送带,控制两个传送带向前移动") + if press_sensors_triggered.wait(timeout=10): + logging.info("线条已落到传送带,控制两个传送带向前移动") # 触发传送带启动事件 conveyor_start_event.set() else: - print("超时警告:线条未落到传送带,请检查") + logging.info("超时警告:线条未落到传送带,请检查") elif result == "unqualified": result = "不合格" - print("该线条是否合格:", result) - print("进入NG动作") + logging.info("该线条是否合格:", result) + logging.info("等待线条落到传送带上") + # 等待时间触发,超时时间设为10秒(避免无限等待) + if press_sensors_triggered.wait(timeout=10): + logging.info("线条已落到传送带") + logging.info("进入NG动作") control_solenoid() # 执行NG动作,控制电磁阀 - print("NG动作结束") - # print("判断NG线条是否落入肥料区:") + logging.info("NG动作结束") + # logging.info("判断NG线条是否落入肥料区:") # -----------对外接口------------- def main_control(): - print("开始摆放线条") + logging.info("开启各种线程") + start_thread() + + logging.info("开始摆放线条") # 质量检测 quality_testing() diff --git a/readme.md b/readme.md index 3b89d97..b2764b6 100644 --- a/readme.md +++ b/readme.md @@ -13,4 +13,7 @@ sudo chmod 666 /dev/ttyACM0 # python版本 3.9 +# 过年之后调试,需要注意 +## 1.修改网络继电器的IP,将1网段改成5网段 +