From cc4bdbb965c20017e5d11df3720d8b3b454b883d Mon Sep 17 00:00:00 2001 From: pengqi Date: Tue, 6 Jan 2026 16:01:15 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BA=BF=E6=9D=A1=E5=8E=82=E5=90=84=E8=AE=BE?= =?UTF-8?q?=E5=A4=87=E6=8E=A7=E5=88=B6=E4=BB=A3=E7=A0=81V1.0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .idea/.gitignore | 3 + .idea/inspectionProfiles/Project_Default.xml | 18 + .../inspectionProfiles/profiles_settings.xml | 6 + .idea/modules.xml | 8 + .idea/vcs.xml | 6 + .idea/wire_controlsystem.iml | 12 + DM/DM_Motor_test.py | 257 +++++++++ EMV/EMV.py | 369 +++++++++++++ .../conveyor_master_controller1.py | 501 ++++++++++++++++++ .../conveyor_master_controller2.py | 381 +++++++++++++ .../conveyor_master_controller2_test.py | 431 +++++++++++++++ main_control.py | 61 +++ readme.md | 14 + visual_algorithm/visual_algorithm.py | 50 ++ 14 files changed, 2117 insertions(+) create mode 100644 .idea/.gitignore create mode 100644 .idea/inspectionProfiles/Project_Default.xml create mode 100644 .idea/inspectionProfiles/profiles_settings.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml create mode 100644 .idea/wire_controlsystem.iml create mode 100644 DM/DM_Motor_test.py create mode 100644 EMV/EMV.py create mode 100644 conveyor_controller/conveyor_master_controller1.py create mode 100644 conveyor_controller/conveyor_master_controller2.py create mode 100644 conveyor_controller/conveyor_master_controller2_test.py create mode 100644 main_control.py create mode 100644 readme.md create mode 100644 visual_algorithm/visual_algorithm.py diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..359bb53 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# 默认忽略的文件 +/shelf/ +/workspace.xml diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..78dfc6d --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,18 @@ + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..808fc8e --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/wire_controlsystem.iml b/.idea/wire_controlsystem.iml new file mode 100644 index 0000000..8b8c395 --- /dev/null +++ b/.idea/wire_controlsystem.iml @@ -0,0 +1,12 @@ + + + + + + + + + + \ No newline at end of file diff --git a/DM/DM_Motor_test.py b/DM/DM_Motor_test.py new file mode 100644 index 0000000..512cdeb --- /dev/null +++ b/DM/DM_Motor_test.py @@ -0,0 +1,257 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2026/1/6 13:55 +# @Author : reenrr +# @File : DM_Motor_test.py +# @Desc : 达妙电机测试 +''' +from DM_CAN import * +import serial +import time + +# -------------------------- 电机参数配置 -------------------------- +SLAVE_ID = 0x01 +MASTER_ID = 0x11 +PORT = 'COM10' +BAUDRATE = 921600 + + +class DMMotorController: + """达妙电机控制器类""" + def __init__(self, slave_id=None, master_id=None, port=None, baudrate=None): + """ + 初始化电机控制器 + :param slave_id: 从机ID,默认0x01 + :param master_id: 主机ID,默认0x11 + :param port: 串口端口,默认COM6 + :param baudrate: 波特率,默认921600 + """ + # 初始化参数 + self.slave_id = slave_id if slave_id is not None else SLAVE_ID + self.master_id = master_id if master_id is not None else MASTER_ID + self.port = port if port is not None else PORT + self.baudrate = baudrate if baudrate is not None else BAUDRATE + + # 核心属性初始化 + self.serial_device = None # 串口设备 + self.motor = None # 电机实例 + self.motor_control = None # 电机控制器实例 + + # 初始化电机和串口 + self.init_motor() + + def init_motor(self): + """初始化电机和串口""" + try: + # 1.初始化串口 + self.serial_device = serial.Serial( + port=self.port, + baudrate=self.baudrate, + timeout=0.5 + ) + + # 2.创建电机实例 + self.motor = Motor(DM_Motor_Type.DM4310, self.slave_id, self.master_id) + # 3.创建电机控制器实例 + self.motor_control = MotorControl(self.serial_device) + # 4.添加电机 + self.motor_control.addMotor(self.motor) + + print(f"✅ 电机初始化成功") + print(f" - 串口:{self.port} | 波特率:{self.baudrate}") + print(f" - 从机ID:{hex(self.slave_id)} | 主机ID:{hex(self.master_id)}") + + except Exception as e: + raise RuntimeError(f"❌ 电机初始化失败:{e}") + + def switch_control_mode(self, control_type): + """ + 切换电机控制模式 + :param control_type: 控制模式(Control_Type.POS_VEl/VEL/MIT) + :return: 切换成功返回True,否则返回False + """ + try: + result = self.motor_control.switchControlMode(self.motor, control_type) + mode_name = self._get_mode_name(control_type) + if result: + print(f"✅ 切换到{mode_name}模式成功") + # 切换模式后保存参数 + self.motor_control.save_motor_param(self.motor) + else: + print(f"❌ 切换到{mode_name}模式失败") + return result + except Exception as e: + print(f"❌ 切换模式出错:{str(e)}") + return False + + def enable_motor(self): + """使能电机""" + try: + self.motor_control.enable(self.motor) + print("✅ 电机使能成功") + return True + except Exception as e: + print(f"❌ 电机使能失败:{str(e)}") + return False + + def disable_motor(self): + """失能电机""" + try: + self.motor_control.disable(self.motor) + print("✅ 电机失能成功") + return True + except Exception as e: + print(f"❌ 电机失能失败:{str(e)}") + return False + + def control_pos_vel(self, p_desired, v_desired): + """ + 位置-速度模式控制 + :param p_desired: 目标位置(rad, 范围[-300, 300]) + :param v_desired: 目标速度(rad/s, 范围[-30, 30]) + """ + try: + # 归零 + 发送运动指令 + self.motor_control.set_zero_position(self.motor) + self.motor_control.control_Pos_Vel(self.motor, p_desired, v_desired) + time.sleep(0.1) + print(f"✅ 位置-速度控制:位置={p_desired}rad | 速度={v_desired}rad/s") + return True + + except Exception as e: + print(f"❌ 位置-速度控制出错:{str(e)}") + return False + + def close_serial(self): + """关闭串口""" + try: + if self.serial_device and self.serial_device.is_open: + self.serial_device.close() + print("✅ 串口关闭成功") + return True + except Exception as e: + print(f"❌ 串口关闭失败:{str(e)}") + return False + + def _get_mode_name(self, control_type): + """ + 获取模式名称 + :param control_type: 控制模式(Control_Type.POS_VEl/VEL/MIT) + """ + # 1.定义[枚举值--中文名称]的映射字典 + mode_map = { + Control_Type.POS_VEL: "位置-速度模式", + Control_Type.VEL: "速度模式", + Control_Type.MIT: "MIT模式" + } + # 2.根据控制模式值获取中文名称 字典方法 + return mode_map.get(control_type, "未知模式") # “未知模式”默认值 + + def save_param(self): + """保存所有电机参数""" + try: + if self.motor is None: + raise ValueError("电机实例为None,无法保存参数") + + self.motor_control.save_motor_param(self.motor) + print("电机参数保存成功") + except Exception as e: + print(f"❌ 电机参数保存失败:{str(e)}") + + def refresh_motor_status(self): + """获得电机状态""" + try: + if self.motor is None: + raise ValueError("电机实例为None,无法保存参数") + + self.motor_control.refresh_motor_status(self.motor) + print("电机状态刷新成功") + except Exception as e: + print(f"❌ 电机状态刷新失败:{str(e)}") + + def get_position(self): + """获取电机位置""" + try: + if self.motor is None: + raise ValueError("电机实例为None,无法保存参数") + + position = self.motor.getPosition() + print(f"获取电机位置成功,当前位置: {position}") + return position + except Exception as e: + print(f"获取电机位置失败: {str(e)}") + + def change_limit_param(self, motor_type, pmax, vmax, tmax): + """ + 改变电机的PMAX VMAX TMAX + :param motor_type: 电机的类型 + :param pmax: 电机的PMAX + :param vmax: 电机的VMAX + :param tmax: 电机的TAMX + """ + try: + self.motor_control.change_limit_param(motor_type, pmax, vmax, tmax) + print( + f"电机限位参数修改成功 | 类型: {motor_type} | PMAX: {pmax} | VMAX: {vmax} | TMAX: {tmax}" + ) + except Exception as e: + print(f"修改电机限位参数失败: {str(e)}") + + def __del__(self): + """析构函数:确保程序退出时失能电机、关闭串口""" + try: + # 先检查串口是否打开,避免重复操作 + if self.serial_device and self.serial_device.is_open: + self.disable_motor() + self.close_serial() + else: + # 串口已关闭,无需重复操作,仅打印日志 + print("ℹ️ 串口已关闭,析构函数无需重复释放资源") + except Exception as e: + print(f"ℹ️ 析构函数执行警告:{str(e)}") + + +def dm_motor_control(): + # 1.创建电机控制器实例 + motor_controller = DMMotorController( + slave_id=SLAVE_ID, + master_id=MASTER_ID, + port=PORT, + baudrate=BAUDRATE + ) + + try: + # 切换到位置-速度模式 + motor_controller.switch_control_mode(Control_Type.POS_VEL) + + # 使能电机 + motor_controller.enable_motor() + + # 循环控制电机 + while True: + print("运动前的位置", motor_controller.get_position()) # 需要测试断电后是否能读取得到 + motor_controller.control_pos_vel(p_desired=282.6, v_desired=30) # 450mm 665-215 + time.sleep(20) + + motor_controller.refresh_motor_status() + print("运动1的位置", motor_controller.get_position()) # 刷新的比较慢,可以等位置不变一段时间之后,再获取位置 + motor_controller.refresh_motor_status() + + motor_controller.control_pos_vel(p_desired=-282.6, v_desired=30) + time.sleep(20) + + + except KeyboardInterrupt: + print("\n⚠️ 用户手动停止程序") + except Exception as e: + print(f"\n❌ 程序运行出错:{str(e)}") + finally: + # 5. 无论是否出错,最终都要失能电机、关闭串口 + motor_controller.disable_motor() + motor_controller.close_serial() + print("✅ 程序正常退出") + +# ---------调试接口---------- +if __name__ == '__main__': + dm_motor_control() \ No newline at end of file diff --git a/EMV/EMV.py b/EMV/EMV.py new file mode 100644 index 0000000..f384ddb --- /dev/null +++ b/EMV/EMV.py @@ -0,0 +1,369 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/12/12 14:39 +# @Author : reenrr +# @File : EMV.py +# @Desc : 网络继电器控制输入、输出设备 +''' +import socket +import binascii +import time +from threading import Event, Lock +import threading +import logging + +# 网络继电器的 IP 和端口 +HOST = '192.168.1.18' +PORT = 50000 + +# 控件命名映射 +SOLENOID_VALVE1 = 'solenoid_valve1' # 控制排料机构NG时的电磁阀1 +SOLENOID_VALVE2 = 'solenoid_valve2' # 控制排料机构NG时的电磁阀2 +SOLENOID_VALVE3 = 'solenoid_valve3' # 控制吸取设备的电磁阀3 + +# 传感器命名映射 +CONVEYOR1_SENSOR = 'conveyor1_sensor' # 传送带1的行程开关 +CONVEYOR2_SENSOR = 'conveyor2_sensor' # 传送带2的行程开关 +PRESS_SENSOR1 = 'press_sensor1' # 传送带1旁边的按压开关1 +PRESS_SENSOR2 = 'press_sensor2' # 传送带1旁边的按压开关2 +FIBER_SENSOR = 'fiber_sensor' # 传送带1旁边的光纤传感器 + +# 控件控制报文 +valve_commands = { + SOLENOID_VALVE1: { + 'open': '00000000000601050000FF00', + 'close': '000000000006010500000000', + }, + SOLENOID_VALVE2: { + 'open': '00000000000601050001FF00', + 'close': '000000000006010500010000', + }, + SOLENOID_VALVE3: { + 'open': '00000000000601050002FF00', + 'close': '000000000006010500020000', + } +} + +# 读取状态命令 +read_status_command = { + 'devices': '000000000006010100000008', + 'sensors': '000000000006010200000008' +} + +# 控件对应 DO 位(从低到高) +device_bit_map = { + SOLENOID_VALVE1: 0, + SOLENOID_VALVE2: 1, + SOLENOID_VALVE3: 2, +} + +device_name_map = { + SOLENOID_VALVE1: "电磁阀1", + SOLENOID_VALVE2: "电磁阀2", + SOLENOID_VALVE3: "电磁阀3", +} + +# 传感器对应位(从低到高) +sensor_bit_map = { + CONVEYOR1_SENSOR: 0, + CONVEYOR2_SENSOR: 1, + PRESS_SENSOR1: 2, + PRESS_SENSOR2: 3, + FIBER_SENSOR: 6 + # 根据你继电器的配置,继续添加更多传感器 +} + +sensor_name_map = { + CONVEYOR1_SENSOR: '传送带1开关', + CONVEYOR2_SENSOR: '传送带2开关', + PRESS_SENSOR1: '按压开关1', + PRESS_SENSOR2: '按压开关2', + FIBER_SENSOR: '光纤传感器' +} + +# -------------全局事件------------- +sensor_triggered = Event() +fiber_triggered = Event() # 光纤传感器触发事件 +fiber_lock = Lock() # 线程锁,保护共享变量 +valve1_open_time = 0.0 # 电磁阀1打开时间戳 +valve1_open_flag = False # 电磁阀1打开标志 + + +class RelayController: + def __init__(self): + """初始化继电器控制器""" + self.socket = None + + def send_command(self, command): + """ + 将十六进制字符串转换为字节数据并发送 + :param command: 十六进制字符串 + :return: 响应字节数据 / False + """ + try: + byte_data = binascii.unhexlify(command) + + # 创建套接字并连接到继电器 + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.connect((HOST, PORT)) + sock.send(byte_data) + # 接收响应 + response = sock.recv(1024) + # logging.info(f"收到响应: {binascii.hexlify(response)}") + # 校验响应 + return response + except Exception as e: + logging.info(f"通信错误: {e}") + return False + + def get_all_device_status(self, command_type='devices'): + """ + 获取所有设备/传感器状态 + :param command_type: 'devices'(控件) / 'sensors'(传感器) + :return: 状态字典 {设备名: 状态(bool)} + """ + command = read_status_command.get(command_type) + if not command: + logging.info(f"未知的读取类型: {command_type}") + return {} + + response = self.send_command(command) + status_dict = {} + + if response and len(response) >= 10: + status_byte = response[9] # 状态在第10字节 + status_bin = f"{status_byte:08b}"[::-1] + + if command_type == 'devices': + bit_map = device_bit_map + name_map = device_name_map + elif command_type == 'sensors': + bit_map = sensor_bit_map + name_map = sensor_name_map + else: + logging.info("不支持的映射类型") + return{} + + for key, bit_index in bit_map.items(): + state = status_bin[bit_index] == '1' + status_dict[key] = state + # readable = "开启" if state else "关闭" + # logging.info(f"{device.capitalize()} 状态: {readable}") + else: + logging.info("读取状态失败或响应无效") + + return status_dict + + def get_device_status(self, device_name, command_type='devices'): + """ + 获取单个控件/传感器状态 + :param device_name:设备名称 + :param command_type: 'devices'/'sensors' + :return:True(开启) / False(关闭) / None(无法读取) + """ + status = self.get_all_device_status(command_type) + return status.get(device_name, None) + + def open(self, solenoid_valve1=False, solenoid_valve2=False, solenoid_valve3=False): + """ + 根据状态决定是否执行开操作 + :param solenoid_valve1:是否打开电磁阀1 + :param solenoid_valve2:是否打开电磁阀2 + :param solenoid_valve3:是否打开电磁阀3 + :return: + """ + global valve1_open_time, valve1_open_flag + status = self.get_all_device_status() + + if solenoid_valve1 and not status.get(SOLENOID_VALVE1, False): + logging.info("打开电磁阀1") + self.send_command(valve_commands[SOLENOID_VALVE1]['open']) + # 记录电磁阀1打开时的时间戳和标志 + with fiber_lock: + valve1_open_time = time.time() + valve1_open_flag = True + + if solenoid_valve2 and not status.get(SOLENOID_VALVE2, False): + logging.info("打开电磁阀2") + self.send_command(valve_commands[SOLENOID_VALVE2]['open']) + + if solenoid_valve3 and not status.get(SOLENOID_VALVE3, False): + logging.info("打开电磁阀3") + self.send_command(valve_commands[SOLENOID_VALVE3]['open']) + time.sleep(1) # 实际测试需要考虑这个延时是否合适 + + # 根据状态决定是否执行关操作 + def close(self, solenoid_valve1=False, solenoid_valve2=False, solenoid_valve3=False): + """ + 根据状态决定是否执行关操作 + :param solenoid_valve1:是否关闭电磁阀1 + :param solenoid_valve2:是否关闭电磁阀2 + :param solenoid_valve3:是否关闭电磁阀3 + :return: + """ + global valve1_open_flag + + status = self.get_all_device_status() + + if solenoid_valve1 and status.get(SOLENOID_VALVE1, True): + logging.info("关闭电磁阀1") + self.send_command(valve_commands[SOLENOID_VALVE1]['close']) + # 重置电磁阀1打开标志 + with fiber_lock: + valve1_open_flag = False + + if solenoid_valve2 and status.get(SOLENOID_VALVE2, True): + logging.info("关闭电磁阀2") + self.send_command(valve_commands[SOLENOID_VALVE2]['close']) + + if solenoid_valve2 and status.get(SOLENOID_VALVE3, True): + logging.info("关闭电磁阀3") + self.send_command(valve_commands[SOLENOID_VALVE3]['close']) + time.sleep(1) # 实际测试需要考虑这个延时是否合适 + + def control_solenoid(self): + """ + 控制电磁阀,并检测光纤传感器触发状态 + """ + global fiber_triggered + + try: + # 重置光纤传感器触发事件 + fiber_triggered.clear() + + # 同时打开 + self.open(solenoid_valve1=True, solenoid_valve2=True) + logging.info("电磁阀1、2已打开") + # 等待线条掉落(最多等待1秒) + timeout = 2.0 + start_time = time.time() + fiber_detected = False + # 等待红外传感器触发或超时 + while time.time() - start_time < timeout: + if fiber_triggered.is_set(): + fiber_detected = True + logging.info("该NG线条掉入费料区") + break + else: + logging.info("出问题!!!,红外传感器未检测到线条") + + time.sleep(0.2) # 等待线条掉落 + self.close(solenoid_valve1=True, solenoid_valve2=True) + logging.info("电磁阀1、2已关闭") + except Exception as e: + logging.info(f"操作电磁阀失败:{str(e)}") + + def fiber_sensor_monitor(self): + """ + 光纤传感器监听线程,专门检测电磁阀打开后的触发状态 + """ + global fiber_triggered, valve_open_time, valve_open_flag + logging.info("光纤传感器监听线程已启动") + + while True: + try: + # 增加短休眠,降低CPU占用,避免错过信号 + time.sleep(0.005) + + # 获取光纤传感器状态 + fiber_status = self.get_device_status(FIBER_SENSOR, 'sensors') + # 检测是否检测到信号 + if fiber_status: + with fiber_lock: + # 检查电磁阀1是否处于打开状态 + if valve1_open_flag: + fiber_triggered.set() + # 防止重复触发 + time.sleep(0.1) + fiber_triggered.clear() + except Exception as e: + logging.info(f"光纤传感器监听异常:{e}") + time.sleep(0.1) # 异常时增加休眠 + + def press_sensors_monitor(self, check_interval=0.1): + """ + 双压传感器监听线程 + :param check_interval: 检测间隔 + :return: + """ + global sensor_triggered + logging.info("双压传感器监听线程已启动") + while True: + # 检测两个传感器任意一个是否触发 + press_sensor1_status = self.get_device_status(PRESS_SENSOR1, 'sensors') + press_sensor2_status = self.get_device_status(PRESS_SENSOR2, 'sensors') + if press_sensor1_status or press_sensor2_status: + sensor_triggered.set() # 触发事件,通知主线程 + logging.info("双压传感器触发:线条已落到传送带") + # 重置事件(等待下一次触发) + time.sleep(1) # 防重复触发 + sensor_triggered.clear() + time.sleep(check_interval) + + +# 全局初始化:启动传感器监听线程 +def init_sensor_monitor(): + relay = RelayController() + press_sensor_thread = threading.Thread( + target=relay.press_sensors_monitor, + args=(0.1,), + daemon=True + ) + # press_sensor_thread.start() + + # 启动红外传感器监听线程 + infrared_sensor_thread = threading.Thread( + target=relay.fiber_sensor_monitor, + daemon=True + ) + # infrared_sensor_thread.start() + + return relay + +# 全局继电器实例 +global_relay = init_sensor_monitor() + +# ------------对外接口---------- +def control_solenoid(): + """ + 控制电磁阀,并检测光纤传感器触发状态 + """ + # 创建控制器实例 + controller = RelayController() + + global fiber_triggered + + try: + # 重置光纤传感器触发事件 + fiber_triggered.clear() + + # 同时打开 + controller.open(solenoid_valve1=True, solenoid_valve2=True) + logging.info("电磁阀1、2已打开") + # 等待线条掉落(最多等待1秒) + timeout = 2.0 + start_time = time.time() + fiber_detected = False + # 等待光纤传感器触发或超时 + while time.time() - start_time < timeout: + if fiber_triggered.is_set(): + fiber_detected = True + logging.info("该NG线条掉入费料区") + break + else: + logging.info("出问题!!!,红外传感器未检测到线条") + + time.sleep(0.2) # 等待线条掉落 + controller.close(solenoid_valve1=True, solenoid_valve2=True) + logging.info("电磁阀1、2已关闭") + except Exception as e: + logging.info(f"操作电磁阀失败:{str(e)}") + +# ------------测试接口------------- +if __name__ == '__main__': + control_solenoid() + + + + diff --git a/conveyor_controller/conveyor_master_controller1.py b/conveyor_controller/conveyor_master_controller1.py new file mode 100644 index 0000000..b10ea76 --- /dev/null +++ b/conveyor_controller/conveyor_master_controller1.py @@ -0,0 +1,501 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/12/22 +# @Author : reenrr +# @File : conveyor_master_controller.py +# @Desc : 传送带1和2协同控制 使用一个串口两个轴地址(485总线) 每隔4秒两个传送带同步走一个挡板的距离 +''' +import logging +import threading +import time +from datetime import datetime +import serial +from EMV import RelayController + +# 彻底屏蔽pymodbus所有日志 +logging.getLogger("pymodbus").setLevel(logging.CRITICAL) +logging.basicConfig(level=logging.INFO) + +# --- 全局参数配置 --- +SERIAL_PORT = '/dev/ttyUSB0' # 单串口(485总线) +BAUD_RATE = 115200 +ACTION_DELAY = 5 +SLAVE_ID_1 = 1 # 传送带1轴地址 +SLAVE_ID_2 = 2 # 传送带2轴地址 +SERIAL_TIMEOUT = 0.05 # 串口超时50ms +SYNC_STOP_DURATION = 4.0 # 同步停止时间(4秒) +SENSOR_DEBOUNCE_TIME = 0.2 # 传感器防抖时间(200ms) +DETECTION_INTERVAL = 0.05 # 传感器检测间隔(50ms) +RELEASE_WAIT_TIMEOUT = 5.0 # 等待挡板离开超时(5秒) + +# 全局串口锁(485总线必须单指令发送,避免冲突) +GLOBAL_SERIAL_LOCK = threading.Lock() + + +class SingleMotorController: + """单个电机控制器(按轴地址自动匹配指令集,复用全局串口)""" + + def __init__(self, slave_id, conveyor_id, action_delay, serial_obj, global_serial_lock): + self.slave_id = slave_id # 轴地址(1/2) + self.conveyor_id = conveyor_id # 传送带编号(1/2) + self.action_delay = action_delay + self.ser = serial_obj # 复用主控制器的串口实例 + self.global_serial_lock = global_serial_lock # 全局串口锁 + + # 初始化指令 + self._init_commands() + + # 核心状态标志 + self.status_thread_is_running = False + self.conveyor_thread_is_running = False + self.sensor_triggered = False # 传感器触发标志 + self.sensor_locked = False # 传感器锁定(防抖) + self.stop_flag = False # 本地停止标志 + self.wait_sensor_release = False # 重启后等待挡板离开标志 + self.last_sensor_trigger = 0 # 上次触发时间(防抖) + + # 线程对象 + self.monitor_thread = None + self.run_speed_thread = None + self.relay_controller = RelayController() + + # 锁 + self.sensor_lock = threading.Lock() + self.state_lock = threading.Lock() + + def _init_commands(self): + """根据轴地址初始化指令集""" + if self.slave_id == 1: + # --------传送带1(轴地址1)指令集-------- + self.start_command = bytes([0x01, 0x06, 0x60, 0x02, 0x00, 0x10, 0x37, 0xC6]) # 启动指令 + self.stop_command = bytes([0x01, 0x06, 0x60, 0x02, 0x00, 0x40, 0x37, 0xFA]) # 停止指令 + self.speed_commands = [ + bytes([0x01, 0x06, 0x62, 0x00, 0x00, 0x02, 0x17, 0xB3]), # 设定PR0为速度模式 + bytes([0x01, 0x06, 0x62, 0x03, 0xFF, 0xE2, 0xA7, 0xCB]), # 设定PR0速度 -30 + bytes([0x01, 0x06, 0x62, 0x04, 0x00, 0x32, 0x56, 0x66]), # 设定PR0加速度 + bytes([0x01, 0x06, 0x62, 0x05, 0x00, 0x32, 0x07, 0xA6]), # 设定PR0减速度 + ] + elif self.slave_id == 2: + # --------传送带2(轴地址2)指令集-------- + self.start_command = bytes([0x02, 0x06, 0x60, 0x02, 0x00, 0x10, 0x37, 0xF5]) # 启动指令 + self.stop_command = bytes([0x02, 0x06, 0x60, 0x02, 0x00, 0x40, 0x37, 0xC9]) # 停止指令 + self.speed_commands = [ + bytes([0x02, 0x06, 0x62, 0x00, 0x00, 0x02, 0x17, 0x80]), # 设定PR0为速度模式 + bytes([0x02, 0x06, 0x62, 0x03, 0xFF, 0xE2, 0xA7, 0xF8]), # 设定PR0速度 -30 + bytes([0x02, 0x06, 0x62, 0x04, 0x00, 0x32, 0x56, 0x55]), # 设定PR0加速度 + bytes([0x02, 0x06, 0x62, 0x05, 0x00, 0x32, 0x07, 0x95]), # 设定PR0减速度 + ] + else: + raise ValueError(f"不支持的轴地址:{self.slave_id},仅支持1/2") + + # 打印指令(调试用) + logging.info(f"[传送带{self.conveyor_id}] 加载轴地址{self.slave_id}指令集:") + logging.info(f" 启动指令: {self.start_command.hex(' ')}") + logging.info(f" 停止指令: {self.stop_command.hex(' ')}") + + def send_command_list(self, command_list, delay=0.05): + """批量发送指令列表(加全局锁,指令间延时避免总线冲突)""" + if not (self.ser and self.ser.is_open): + logging.info(f"传送带{self.conveyor_id}串口未打开,跳过指令列表发送") + return + for idx, cmd in enumerate(command_list): + self.send_and_receive_raw(cmd, f"指令{idx + 1}") + time.sleep(delay) # 485总线指令间必须加延时 + + def clear_buffer(self): + """清空串口缓冲区(加全局锁)""" + if self.ser and self.ser.is_open: + with self.global_serial_lock: + self.ser.reset_input_buffer() + self.ser.reset_output_buffer() + while self.ser.in_waiting > 0: + self.ser.read(self.ser.in_waiting) + time.sleep(0.001) + + def send_and_receive_raw(self, command, description): + """底层串口通信(核心:使用全局锁保证单指令发送)""" + if not (self.ser and self.ser.is_open): + logging.info(f"传送带{self.conveyor_id}串口未打开,跳过发送") + return None + + try: + self.clear_buffer() + # 全局锁:同一时间仅一个设备发送指令 + with self.global_serial_lock: + send_start = time.perf_counter() + self.ser.write(command) + self.ser.flush() + send_cost = (time.perf_counter() - send_start) * 1000 + + # 接收响应(仅对应轴地址的设备会回复) + recv_start = time.perf_counter() + response = b"" + while (time.perf_counter() - recv_start) < SERIAL_TIMEOUT: + if self.ser.in_waiting > 0: + chunk = self.ser.read(8) + response += chunk + if len(response) >= 8: + break + time.sleep(0.001) + recv_cost = (time.perf_counter() - recv_start) * 1000 + + # 处理响应 + valid_resp = response[:8] if len(response) >= 8 else response + logging.info(f"[传送带{self.conveyor_id}] [{datetime.now().strftime('%H:%M:%S.%f')[:-3]}]") + logging.info(f" 发送 {description}: {command.hex(' ')} (耗时: {send_cost:.2f}ms)") + logging.info(f" 接收响应: {valid_resp.hex(' ')} (长度: {len(valid_resp)}, 耗时: {recv_cost:.2f}ms)") + + return valid_resp + + except Exception as e: + logging.info(f"传送带{self.conveyor_id}通信异常 ({description}): {e}") + return None + + def emergency_stop(self): + """紧急停止(调用对应轴地址的停止指令)""" + with self.state_lock: + if self.stop_flag: + return + self.stop_flag = True + + self.send_and_receive_raw(self.stop_command, "停止寄存器") + logging.info(f"[传送带{self.conveyor_id}] 电机(轴地址{self.slave_id})已紧急停止") + + def resume_motor(self): + """恢复电机运行(调用对应轴地址的启动指令)""" + with self.state_lock: + self.stop_flag = False + self.sensor_triggered = False + self.sensor_locked = False + self.last_sensor_trigger = 0 + self.wait_sensor_release = True # 标记需要等待挡板离开 + + # 主动发送运行指令 + self.send_and_receive_raw(self.start_command, "重启寄存器") + logging.info(f"[传送带{self.conveyor_id}] 电机(轴地址{self.slave_id})已重启,等待挡板离开传感器...") + + def wait_for_sensor_release(self): + """等待挡板离开传感器""" + start_time = time.time() + logging.info(f"[传送带{self.conveyor_id}] 开始等待挡板离开(超时{RELEASE_WAIT_TIMEOUT}秒)") + + while time.time() - start_time < RELEASE_WAIT_TIMEOUT: + # 读取传感器状态(True=无遮挡,False=有遮挡) + if self.conveyor_id == 1: + sensor_status = self.relay_controller.get_device_status('conveyor1_sensor', 'sensors') + else: + sensor_status = self.relay_controller.get_device_status('conveyor2_sensor', 'sensors') + + # 传感器恢复为无遮挡(True),说明挡板已离开 + if sensor_status: + logging.info(f"[传送带{self.conveyor_id}] 挡板已离开传感器,恢复正常检测") + with self.state_lock: + self.wait_sensor_release = False + return True + + time.sleep(DETECTION_INTERVAL) + + # 超时处理 + logging.info(f"[传送带{self.conveyor_id}] 等待挡板离开超时!强制恢复检测") + with self.state_lock: + self.wait_sensor_release = False + return False + + def monitor_conveyors_sensor_status(self, master_controller): + """传感器检测线程""" + logging.info(f"[传送带{self.conveyor_id}] 传感器检测线程已启动(检测间隔:{DETECTION_INTERVAL}s)") + + while self.status_thread_is_running and not master_controller.global_stop_flag: + try: + with self.sensor_lock: + # 1. 全局停止/电机停止时跳过检测 + if master_controller.global_stop_flag or self.stop_flag: + time.sleep(DETECTION_INTERVAL) + continue + + # 2. 重启后先等待挡板离开 + if self.wait_sensor_release: + self.wait_for_sensor_release() + continue + + # 3. 读取传感器状态 + if self.conveyor_id == 1: + sensor_status = self.relay_controller.get_device_status('conveyor1_sensor', 'sensors') + else: + sensor_status = self.relay_controller.get_device_status('conveyor2_sensor', 'sensors') + + current_time = time.time() + + # 4. 检测到挡板且满足防抖条件 + if (not sensor_status) and (not self.sensor_locked) and \ + (current_time - self.last_sensor_trigger) > SENSOR_DEBOUNCE_TIME: + logging.info(f"\n[传送带{self.conveyor_id}] [{datetime.now().strftime('%H:%M:%S')}] 检测到挡板!立即响应") + self.last_sensor_trigger = current_time + + with self.state_lock: + self.sensor_triggered = True + self.sensor_locked = True + + # 立即通知主控制器 + threading.Thread( + target=master_controller.on_sensor_triggered, + args=(self.conveyor_id,), + daemon=True + ).start() + + time.sleep(DETECTION_INTERVAL) + + except Exception as e: + logging.info(f"[传送带{self.conveyor_id}] 传感器检测异常: {e}") + time.sleep(0.1) + + logging.info(f"[传送带{self.conveyor_id}] 传感器检测线程已停止") + + def run_speed_mode(self, master_controller): + """电机速度模式线程""" + logging.info(f"[传送带{self.conveyor_id}] 电机速度模式线程已启动(轴地址{self.slave_id})") + self.conveyor_thread_is_running = True + + while self.conveyor_thread_is_running and not master_controller.global_stop_flag: + try: + # 串口未打开则退出循环(由主控制器管理串口连接) + if not (self.ser and self.ser.is_open): + logging.info(f"[传送带{self.conveyor_id}] 串口已关闭,停止发送运行指令") + time.sleep(1) + continue + + # 仅当未停止时发送运行指令 + if not self.stop_flag: + self.send_and_receive_raw(self.start_command, "持续运行指令") + + # 启动传感器线程(仅一次) + if not self.status_thread_is_running and ( + self.monitor_thread is None or not self.monitor_thread.is_alive()): + self.status_thread_is_running = True + self.monitor_thread = threading.Thread( + target=self.monitor_conveyors_sensor_status, + args=(master_controller,), + daemon=True + ) + self.monitor_thread.start() + + time.sleep(0.5) + + except Exception as e: + logging.info(f"[传送带{self.conveyor_id}] 电机运行异常: {e}") + time.sleep(1) + + self.emergency_stop() + self.conveyor_thread_is_running = False + logging.info(f"[传送带{self.conveyor_id}] 电机速度模式线程已停止") + + def start_run_speed_thread(self, master_controller): + """启动电机线程""" + if self.run_speed_thread and self.run_speed_thread.is_alive(): + with self.state_lock: + self.conveyor_thread_is_running = False + self.run_speed_thread.join(timeout=2) + + with self.state_lock: + self.stop_flag = False + self.sensor_triggered = False + self.sensor_locked = False + self.wait_sensor_release = True + self.last_sensor_trigger = 0 + + self.run_speed_thread = threading.Thread( + target=self.run_speed_mode, + args=(master_controller,), + daemon=True + ) + self.run_speed_thread.start() + + +class MasterConveyorController: + """主控制器 - 单串口管理两个传送带(485总线)""" + + def __init__(self): + self.global_stop_flag = False + self.sync_running = False + + # 1. 初始化单串口(485总线) + self.ser = None + self._init_serial() + + # 2. 初始化两个电机控制器(复用同一个串口) + self.conveyor1 = SingleMotorController( + slave_id=SLAVE_ID_1, + conveyor_id=1, + action_delay=ACTION_DELAY, + serial_obj=self.ser, + global_serial_lock=GLOBAL_SERIAL_LOCK + ) + self.conveyor2 = SingleMotorController( + slave_id=SLAVE_ID_2, + conveyor_id=2, + action_delay=ACTION_DELAY, + serial_obj=self.ser, + global_serial_lock=GLOBAL_SERIAL_LOCK + ) + + # 同步锁 + self.sync_lock = threading.Lock() + self.sync_condition = threading.Condition(self.sync_lock) + + def _init_serial(self): + """初始化485总线串口(主控制器统一管理)""" + try: + self.ser = serial.Serial( + port=SERIAL_PORT, + baudrate=BAUD_RATE, + bytesize=serial.EIGHTBITS, + parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE, + timeout=SERIAL_TIMEOUT, + write_timeout=SERIAL_TIMEOUT, + xonxoff=False, + rtscts=False, + dsrdtr=False + ) + if self.ser.is_open: + logging.info(f"成功初始化485总线串口 {SERIAL_PORT}(波特率{BAUD_RATE})") + # 初始化时清空缓冲区 + with GLOBAL_SERIAL_LOCK: + self.ser.reset_input_buffer() + self.ser.reset_output_buffer() + else: + raise RuntimeError("串口初始化失败:无法打开串口") + except Exception as e: + raise RuntimeError(f"485串口初始化失败: {e}") + + def on_sensor_triggered(self, conveyor_id): + """传感器触发回调""" + with self.sync_condition: + if self.sync_running: + return + + if conveyor_id == 1: + self.conveyor1.sensor_triggered = True + logging.info(f"\n[主控制器] 传送带1检测到挡板,等待传送带2...") + else: + self.conveyor2.sensor_triggered = True + logging.info(f"\n[主控制器] 传送带2检测到挡板,等待传送带1...") + + # 立即停止当前触发的传送带 + if conveyor_id == 1: + self.conveyor1.emergency_stop() + else: + self.conveyor2.emergency_stop() + + # 两个都触发后同步停止 + if self.conveyor1.sensor_triggered and self.conveyor2.sensor_triggered: + self.sync_running = True + logging.info(f"\n[主控制器] 两个传送带都检测到挡板!开始同步停止 {SYNC_STOP_DURATION} 秒") + + self.conveyor1.emergency_stop() + self.conveyor2.emergency_stop() + + # 同步停止+重启逻辑 + def sync_stop_and_resume(): + try: + time.sleep(SYNC_STOP_DURATION) + + with self.sync_condition: + logging.info(f"\n[主控制器] 同步停止结束,重启所有传送带") + # 重启两个电机 + self.conveyor1.resume_motor() + self.conveyor2.resume_motor() + + # 重置所有状态 + self.conveyor1.sensor_triggered = False + self.conveyor2.sensor_triggered = False + self.conveyor1.sensor_locked = False + self.conveyor2.sensor_locked = False + self.conveyor1.last_sensor_trigger = 0 + self.conveyor2.last_sensor_trigger = 0 + + # 强制重启电机线程 + self.conveyor1.start_run_speed_thread(self) + self.conveyor2.start_run_speed_thread(self) + + self.sync_running = False + self.sync_condition.notify_all() + except Exception as e: + logging.info(f"[主控制器] 同步恢复异常: {e}") + self.sync_running = False + + threading.Thread(target=sync_stop_and_resume, daemon=True).start() + + def start_all_conveyors(self): + """启动所有传送带""" + logging.info("\n=== 主控制器:启动所有传送带(485总线)===") + + # 检查串口是否正常 + if not (self.ser and self.ser.is_open): + logging.info("[主控制器] 485串口未打开,无法启动") + return False + + # 发送速度模式指令(按轴地址发送) + self.conveyor1.send_command_list(self.conveyor1.speed_commands[:4]) + self.conveyor2.send_command_list(self.conveyor2.speed_commands[:4]) + + # 启动电机线程 + self.conveyor1.start_run_speed_thread(self) + self.conveyor2.start_run_speed_thread(self) + + logging.info("[主控制器] 所有传送带已启动,等待挡板离开后开始检测...") + return True + + def stop_all_conveyors(self): + """停止所有传送带并关闭串口""" + logging.info("\n=== 主控制器:停止所有传送带 ===") + self.global_stop_flag = True + + with self.sync_condition: + self.sync_running = False + self.conveyor1.emergency_stop() + self.conveyor2.emergency_stop() + + # 关闭串口 + time.sleep(1) + with GLOBAL_SERIAL_LOCK: + if self.ser and self.ser.is_open: + self.ser.close() + logging.info(f"485总线串口 {SERIAL_PORT} 已关闭") + + logging.info("[主控制器] 所有传送带已停止并关闭串口") + + def run(self): + """主运行函数""" + try: + if not self.start_all_conveyors(): + return + + # 主线程保持运行 + while not self.global_stop_flag: + time.sleep(0.5) + + except KeyboardInterrupt: + logging.info("\n\n[主控制器] 检测到退出指令,正在停止系统...") + except Exception as e: + logging.info(f"\n[主控制器] 程序异常: {e}") + finally: + self.stop_all_conveyors() + logging.info("\n=== 主控制器:程序执行完毕 ===") + + +def main(): + """主函数""" + try: + master = MasterConveyorController() + master.run() + except RuntimeError as e: + logging.info(f"系统启动失败: {e}") + except Exception as e: + logging.info(f"未知异常: {e}") + + +if __name__ == '__main__': + main() + diff --git a/conveyor_controller/conveyor_master_controller2.py b/conveyor_controller/conveyor_master_controller2.py new file mode 100644 index 0000000..8e63462 --- /dev/null +++ b/conveyor_controller/conveyor_master_controller2.py @@ -0,0 +1,381 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2026/1/6 09:41 +# @Author : reenrr +# @File : conveyor_master_controller2.py +# @Desc : 传送带1和2协同控制 两个传送带同步走一个挡板的距离 +''' +import logging +import threading +import time +from datetime import datetime +import serial +from EMV import RelayController + +logging.getLogger("pymodbus").setLevel(logging.CRITICAL) + +# --- 全局参数配置 --- +SERIAL_PORT = '/dev/ttyUSB0' # 单串口(485总线) +BAUD_RATE = 115200 +ACTION_DELAY = 5 +SLAVE_ID_1 = 1 # 传送带1轴地址 +SLAVE_ID_2 = 2 # 传送带2轴地址 +SERIAL_TIMEOUT = 0.05 # 串口超时50ms +SENSOR_DEBOUNCE_TIME = 0.2 # 传感器防抖时间(200ms) +DETECTION_INTERVAL = 0.05 # 传感器检测间隔(50ms) + +# 全局串口锁(485总线必须单指令发送,避免冲突) +GLOBAL_SERIAL_LOCK = threading.Lock() + +class SingleMotorController: + """单个电机控制器(按轴地址自动匹配指令集,复用全局串口)""" + def __init__(self, slave_id, conveyor_id, action_delay, serial_obj, global_serial_lock): + self.slave_id = slave_id # 轴地址(1/2) + self.conveyor_id = conveyor_id # 传送带编号(1/2) + self.action_delay = action_delay + self.ser = serial_obj # 复用主控制器的串口实例 + self.global_serial_lock = global_serial_lock # 全局串口锁 + + # 初始化指令 + self._init_commands() + + # 核心状态标志(简化版) + self.status_thread_is_running = False # 传感器线程运行标志 + self.is_running = False # 电机是否已启动(一次性) + self.is_stopped = False # 电机是否已停止(触发传感器后) + self.sensor_triggered = False # 传感器触发标志 + self.sensor_locked = False # 传感器锁定(防抖) + self.last_sensor_trigger = 0 # 上次触发时间(防抖) + + # 线程对象 + self.monitor_thread = None + self.relay_controller = RelayController() + + # 锁 + self.sensor_lock = threading.Lock() + self.state_lock = threading.Lock() + + def _init_commands(self): + """根据轴地址初始化指令集""" + if self.slave_id == 1: + # --------传送带1(轴地址1)指令集-------- + self.start_command = bytes([0x01, 0x06, 0x60, 0x02, 0x00, 0x10, 0x37, 0xC6]) # 启动指令 + self.stop_command = bytes([0x01, 0x06, 0x60, 0x02, 0x00, 0x40, 0x37, 0xFA]) # 停止指令 + self.speed_commands = [ + bytes([0x01, 0x06, 0x62, 0x00, 0x00, 0x02, 0x17, 0xB3]), # 设定PR0为速度模式 + bytes([0x01, 0x06, 0x62, 0x03, 0xFF, 0xE2, 0xA7, 0xCB]), # 设定PR0速度 -30 + bytes([0x01, 0x06, 0x62, 0x04, 0x00, 0x32, 0x56, 0x66]), # 设定PR0加速度 + bytes([0x01, 0x06, 0x62, 0x05, 0x00, 0x32, 0x07, 0xA6]), # 设定PR0减速度 + ] + elif self.slave_id == 2: + # --------传送带2(轴地址2)指令集-------- + self.start_command = bytes([0x02, 0x06, 0x60, 0x02, 0x00, 0x10, 0x37, 0xF5]) # 启动指令 + self.stop_command = bytes([0x02, 0x06, 0x60, 0x02, 0x00, 0x40, 0x37, 0xC9]) # 停止指令 + self.speed_commands = [ + bytes([0x02, 0x06, 0x62, 0x00, 0x00, 0x02, 0x17, 0x80]), # 设定PR0为速度模式 + bytes([0x02, 0x06, 0x62, 0x03, 0xFF, 0xE2, 0xA7, 0xF8]), # 设定PR0速度 -30 + bytes([0x02, 0x06, 0x62, 0x04, 0x00, 0x32, 0x56, 0x55]), # 设定PR0加速度 + bytes([0x02, 0x06, 0x62, 0x05, 0x00, 0x32, 0x07, 0x95]), # 设定PR0减速度 + ] + else: + raise ValueError(f"不支持的轴地址:{self.slave_id},仅支持1/2") + + # 打印指令(调试用) + print(f"[传送带{self.conveyor_id}] 加载轴地址{self.slave_id}指令集:") + print(f" 启动指令: {self.start_command.hex(' ')}") + print(f" 停止指令: {self.stop_command.hex(' ')}") + + def send_command_list(self, command_list, delay=0.05): + """ + 批量发送指令列表(加全局锁,指令间延时避免总线冲突) + :param command_list: 指令列表 + :param delay: 指令间延时(默认0.05s) + """ + if not (self.ser and self.ser.is_open): + print(f"传送带{self.conveyor_id}串口未打开,跳过指令列表发送") + return + for idx, cmd in enumerate(command_list): + self.send_and_receive_raw(cmd, f"指令{idx + 1}") + time.sleep(delay) # 485总线指令间必须加延时 + + def clear_buffer(self): + """清空串口缓冲区(加全局锁)""" + if self.ser and self.ser.is_open: + with self.global_serial_lock: + self.ser.reset_input_buffer() + self.ser.reset_output_buffer() + while self.ser.in_waiting > 0: + self.ser.read(self.ser.in_waiting) + time.sleep(0.001) + + def send_and_receive_raw(self, command, description): + """ + 底层串口通信(核心:使用全局锁保证单指令发送) + :param command: 指令 + :param description: 指令描述 + :return: 返回响应 + """ + if not (self.ser and self.ser.is_open): + print(f"传送带{self.conveyor_id}串口未打开,跳过发送") + return None + + try: + self.clear_buffer() + # 全局锁:同一时间仅一个设备发送指令 + with self.global_serial_lock: + send_start = time.perf_counter() + self.ser.write(command) + self.ser.flush() + send_cost = (time.perf_counter() - send_start) * 1000 + + # 接收响应(仅对应轴地址的设备会回复) + recv_start = time.perf_counter() + response = b"" + while (time.perf_counter() - recv_start) < SERIAL_TIMEOUT: + if self.ser.in_waiting > 0: + chunk = self.ser.read(8) + response += chunk + if len(response) >= 8: + break + time.sleep(0.001) + recv_cost = (time.perf_counter() - recv_start) * 1000 + + # 处理响应 + valid_resp = response[:8] if len(response) >= 8 else response + print(f"[传送带{self.conveyor_id}] [{datetime.now().strftime('%H:%M:%S.%f')[:-3]}]") + print(f" 发送 {description}: {command.hex(' ')} (耗时: {send_cost:.2f}ms)") + print(f" 接收响应: {valid_resp.hex(' ')} (长度: {len(valid_resp)}, 耗时: {recv_cost:.2f}ms)") + + return valid_resp + + except Exception as e: + print(f"传送带{self.conveyor_id}通信异常 ({description}): {e}") + return None + + def start_motor_once(self): + """一次性启动电机""" + with self.state_lock: + if self.is_running or self.is_stopped: + print(f"[传送带{self.conveyor_id}] 电机已启动/停止,无需重复启动") + return + + # 1. 发送速度模式配置指令 + print(f"[传送带{self.conveyor_id}] 配置速度模式...") + self.send_command_list(self.speed_commands[:4]) + + # 2. 发送启动指令 + print(f"[传送带{self.conveyor_id}] 发送启动指令") + self.send_and_receive_raw(self.start_command, "启动传送带") + + with self.state_lock: + self.is_running = True + + def stop_motor(self): + """停止电机""" + with self.state_lock: + if self.is_stopped: + return + self.is_stopped = True + self.is_running = False + + # 发送停止指令 + print(f"[传送带{self.conveyor_id}] 发送停止指令") + self.send_and_receive_raw(self.stop_command, "停止传送带") + + def monitor_conveyors_sensor_status(self, master_controller): + """传感器检测线程(仅检测,触发后停止电机)""" + print(f"[传送带{self.conveyor_id}] 传感器检测线程已启动(检测间隔:{DETECTION_INTERVAL}s)") + + while self.status_thread_is_running and not master_controller.global_stop_flag: + try: + with self.sensor_lock: + # 1. 全局停止/电机已停止时跳过检测 + if master_controller.global_stop_flag or self.is_stopped: + time.sleep(DETECTION_INTERVAL) + continue + + # 2. 读取传感器状态 + if self.conveyor_id == 1: + sensor_status = self.relay_controller.get_device_status('conveyor1_sensor', 'sensors') + else: + sensor_status = self.relay_controller.get_device_status('conveyor2_sensor', 'sensors') + + current_time = time.time() + + # 3. 检测到挡板且满足防抖条件 + if (not sensor_status) and (not self.sensor_locked) and \ + (current_time - self.last_sensor_trigger) > SENSOR_DEBOUNCE_TIME: + print( + f"\n[传送带{self.conveyor_id}] [{datetime.now().strftime('%H:%M:%S')}] 检测到挡板!立即停止") + self.last_sensor_trigger = current_time + self.sensor_triggered = True + self.sensor_locked = True + + # 立即停止电机 + self.stop_motor() + + # 通知主控制器(可选:用于同步两个传送带停止) + threading.Thread( + target=master_controller.on_sensor_triggered, + args=(self.conveyor_id,), + daemon=True + ).start() + + time.sleep(DETECTION_INTERVAL) + + except Exception as e: + print(f"[传送带{self.conveyor_id}] 传感器检测异常: {e}") + time.sleep(0.1) + + print(f"[传送带{self.conveyor_id}] 传感器检测线程已停止") + + def start_sensor_thread(self, master_controller): + """启动传感器检测线程""" + if self.monitor_thread and self.monitor_thread.is_alive(): + return + + self.status_thread_is_running = True + self.monitor_thread = threading.Thread( + target=self.monitor_conveyors_sensor_status, + args=(master_controller,), + daemon=True + ) + self.monitor_thread.start() + +class MasterConveyorController: + """主控制器 - 单串口管理两个传送带(485总线)""" + def __init__(self): + self.global_stop_flag = False + self.both_stopped = False # 两个传送带是否都已停止 + + # 1. 初始化单串口(485总线) + self.ser = None + self._init_serial() + + # 2. 初始化两个电机控制器(复用同一个串口) + self.conveyor1 = SingleMotorController( + slave_id=SLAVE_ID_1, + conveyor_id=1, + action_delay=ACTION_DELAY, + serial_obj=self.ser, + global_serial_lock=GLOBAL_SERIAL_LOCK + ) + self.conveyor2 = SingleMotorController( + slave_id=SLAVE_ID_2, + conveyor_id=2, + action_delay=ACTION_DELAY, + serial_obj=self.ser, + global_serial_lock=GLOBAL_SERIAL_LOCK + ) + + # 同步锁 + self.sync_lock = threading.Lock() + + def _init_serial(self): + """初始化485总线串口(主控制器统一管理)""" + try: + self.ser = serial.Serial( + port=SERIAL_PORT, + baudrate=BAUD_RATE, + bytesize=serial.EIGHTBITS, + parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE, + timeout=SERIAL_TIMEOUT, + write_timeout=SERIAL_TIMEOUT, + xonxoff=False, + rtscts=False, + dsrdtr=False + ) + if self.ser.is_open: + print(f"成功初始化485总线串口 {SERIAL_PORT}(波特率{BAUD_RATE})") + # 初始化时清空缓冲区 + with GLOBAL_SERIAL_LOCK: + self.ser.reset_input_buffer() + self.ser.reset_output_buffer() + else: + raise RuntimeError("串口初始化失败:无法打开串口") + except Exception as e: + raise RuntimeError(f"485串口初始化失败: {e}") + + def on_sensor_triggered(self): + """传感器触发回调(同步两个传送带停止)""" + with self.sync_lock: + # 检查是否两个传送带都已触发 + if self.conveyor1.sensor_triggered and self.conveyor2.sensor_triggered: + self.both_stopped = True + print(f"\n[主控制器] 两个传送带都已检测到挡板并停止!任务完成") + self.global_stop_flag = True # 标记全局停止 + + def start_all_conveyors(self): + """启动所有传送带""" + print("\n=== 主控制器:启动所有传送带===") + + # 检查串口是否正常 + if not (self.ser and self.ser.is_open): + print("[主控制器] 485串口未打开,无法启动") + return False + # 1. 一次性启动两个电机 + self.conveyor1.start_motor_once() + self.conveyor2.start_motor_once() + + # 2. 启动传感器检测线程 + self.conveyor1.start_sensor_thread(self) + self.conveyor2.start_sensor_thread(self) + + print("[主控制器] 所有传送带已一次性启动,传感器开始检测...") + return True + + def stop_all_conveyors(self): + """停止所有传送带并关闭串口""" + print("\n=== 主控制器:停止所有传送带 ===") + self.global_stop_flag = True + + # 强制停止两个电机 + self.conveyor1.stop_motor() + self.conveyor2.stop_motor() + + # 关闭串口 + time.sleep(1) + with GLOBAL_SERIAL_LOCK: + if self.ser and self.ser.is_open: + self.ser.close() + print(f"485总线串口 {SERIAL_PORT} 已关闭") + + print("[主控制器] 所有传送带已停止并关闭串口") + + def run(self): + """主运行函数""" + try: + if not self.start_all_conveyors(): + return + + # 主线程等待:直到两个传送带都停止或手动退出 + while not self.global_stop_flag: + if self.both_stopped: + break + time.sleep(0.5) + + except KeyboardInterrupt: + print("\n\n[主控制器] 检测到退出指令,正在停止系统...") + except Exception as e: + print(f"\n[主控制器] 程序异常: {e}") + finally: + self.stop_all_conveyors() + print("\n=== 主控制器:程序执行完毕 ===") + +# -----------传送带对外接口-------------- +def conveyor_control(): + """主函数""" + try: + master = MasterConveyorController() + master.run() + except RuntimeError as e: + print(f"系统启动失败: {e}") + except Exception as e: + print(f"未知异常: {e}") + + +# ------------测试接口-------------- +if __name__ == '__main__': + conveyor_control() diff --git a/conveyor_controller/conveyor_master_controller2_test.py b/conveyor_controller/conveyor_master_controller2_test.py new file mode 100644 index 0000000..3bcc63a --- /dev/null +++ b/conveyor_controller/conveyor_master_controller2_test.py @@ -0,0 +1,431 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2026/1/6 10:41 +# @Author : reenrr +# @File : conveyor_master_controller2_test.py +# @Desc : 传送带1和2协同控制 两个传送带同步走一个挡板的距离--测试代码 +''' +import logging +import threading +import time +from datetime import datetime +import serial +from EMV import RelayController + +logging.getLogger("pymodbus").setLevel(logging.CRITICAL) + +# --- 全局参数配置 --- +SERIAL_PORT = '/dev/ttyUSB0' # 单串口(485总线) +BAUD_RATE = 115200 +ACTION_DELAY = 5 +SLAVE_ID_1 = 1 # 传送带1轴地址 +SLAVE_ID_2 = 2 # 传送带2轴地址 +SERIAL_TIMEOUT = 0.05 # 串口超时50ms +SENSOR_DEBOUNCE_TIME = 0.2 # 传感器防抖时间(200ms) +DETECTION_INTERVAL = 0.05 # 传感器检测间隔(50ms) +WAIT_INIT_RELEASE_TIMEOUT = 3.0 # 等待初始挡板离开超时(10秒) + +# 全局串口锁(485总线必须单指令发送,避免冲突) +GLOBAL_SERIAL_LOCK = threading.Lock() + +class SingleMotorController: + """单个电机控制器(按轴地址自动匹配指令集,复用全局串口)""" + def __init__(self, slave_id, conveyor_id, action_delay, serial_obj, global_serial_lock): + self.slave_id = slave_id # 轴地址(1/2) + self.conveyor_id = conveyor_id # 传送带编号(1/2) + self.action_delay = action_delay + self.ser = serial_obj # 复用主控制器的串口实例 + self.global_serial_lock = global_serial_lock # 全局串口锁 + + # 初始化指令 + self._init_commands() + + # 核心状态标志 + self.status_thread_is_running = False # 传感器线程运行标志 + self.is_running = False # 电机是否已启动 + self.is_stopped = False # 电机是否已停止 + self.sensor_triggered = False # 传感器检测到挡板(无信号) + self.sensor_locked = False # 传感器锁定(防抖) + self.last_sensor_trigger = 0 # 上次触发时间(防抖) + self.init_release_done = False # 初始挡板是否已离开 + + # 线程对象 + self.monitor_thread = None + self.relay_controller = RelayController() + + # 锁 + self.sensor_lock = threading.Lock() + self.state_lock = threading.Lock() + + def _init_commands(self): + """根据轴地址初始化指令集""" + if self.slave_id == 1: + # --------传送带1(轴地址1)指令集-------- + self.start_command = bytes([0x01, 0x06, 0x60, 0x02, 0x00, 0x10, 0x37, 0xC6]) # 启动指令 + self.stop_command = bytes([0x01, 0x06, 0x60, 0x02, 0x00, 0x40, 0x37, 0xFA]) # 停止指令 + self.speed_commands = [ + bytes([0x01, 0x06, 0x62, 0x00, 0x00, 0x02, 0x17, 0xB3]), # 设定PR0为速度模式 + bytes([0x01, 0x06, 0x62, 0x03, 0xFF, 0xE2, 0xA7, 0xCB]), # 设定PR0速度 -30 + bytes([0x01, 0x06, 0x62, 0x04, 0x00, 0x32, 0x56, 0x66]), # 设定PR0加速度 + bytes([0x01, 0x06, 0x62, 0x05, 0x00, 0x32, 0x07, 0xA6]), # 设定PR0减速度 + ] + elif self.slave_id == 2: + # --------传送带2(轴地址2)指令集-------- + self.start_command = bytes([0x02, 0x06, 0x60, 0x02, 0x00, 0x10, 0x37, 0xF5]) # 启动指令 + self.stop_command = bytes([0x02, 0x06, 0x60, 0x02, 0x00, 0x40, 0x37, 0xC9]) # 停止指令 + self.speed_commands = [ + bytes([0x02, 0x06, 0x62, 0x00, 0x00, 0x02, 0x17, 0x80]), # 设定PR0为速度模式 + bytes([0x02, 0x06, 0x62, 0x03, 0xFF, 0xE2, 0xA7, 0xF8]), # 设定PR0速度 -30 + bytes([0x02, 0x06, 0x62, 0x04, 0x00, 0x32, 0x56, 0x55]), # 设定PR0加速度 + bytes([0x02, 0x06, 0x62, 0x05, 0x00, 0x32, 0x07, 0x95]), # 设定PR0减速度 + ] + else: + raise ValueError(f"不支持的轴地址:{self.slave_id},仅支持1/2") + + # 打印指令(调试用) + print(f"[传送带{self.conveyor_id}] 加载轴地址{self.slave_id}指令集:") + print(f" 启动指令: {self.start_command.hex(' ')}") + print(f" 停止指令: {self.stop_command.hex(' ')}") + + def send_command_list(self, command_list, delay=0.05): + """ + 批量发送指令列表(加全局锁,指令间延时避免总线冲突) + :param command_list: 待发送的指令列表 + :param delay: 指令间延时(默认0.05s) + """ + if not (self.ser and self.ser.is_open): + print(f"传送带{self.conveyor_id}串口未打开,跳过指令列表发送") + return + for idx, cmd in enumerate(command_list): + self.send_and_receive_raw(cmd, f"指令{idx + 1}") + time.sleep(delay) # 485总线指令间必须加延时 + + def clear_buffer(self): + """清空串口缓冲区(加全局锁)""" + if self.ser and self.ser.is_open: + with self.global_serial_lock: + self.ser.reset_input_buffer() + self.ser.reset_output_buffer() + while self.ser.in_waiting > 0: + self.ser.read(self.ser.in_waiting) + time.sleep(0.001) + + def send_and_receive_raw(self, command, description): + """ + 底层串口通信(核心:使用全局锁保证单指令发送) + :param command: 待发送的指令 + :param description: 指令描述 + :return: 接收到的响应 + """ + if not (self.ser and self.ser.is_open): + print(f"传送带{self.conveyor_id}串口未打开,跳过发送") + return None + + try: + self.clear_buffer() + # 全局锁:同一时间仅一个设备发送指令 + with self.global_serial_lock: + send_start = time.perf_counter() + self.ser.write(command) + self.ser.flush() + send_cost = (time.perf_counter() - send_start) * 1000 + + # 接收响应(仅对应轴地址的设备会回复) + recv_start = time.perf_counter() + response = b"" + while (time.perf_counter() - recv_start) < SERIAL_TIMEOUT: + if self.ser.in_waiting > 0: + chunk = self.ser.read(8) + response += chunk + if len(response) >= 8: + break + time.sleep(0.001) + recv_cost = (time.perf_counter() - recv_start) * 1000 + + # 处理响应 + valid_resp = response[:8] if len(response) >= 8 else response + print(f"[传送带{self.conveyor_id}] [{datetime.now().strftime('%H:%M:%S.%f')[:-3]}]") + print(f" 发送 {description}: {command.hex(' ')} (耗时: {send_cost:.2f}ms)") + print(f" 接收响应: {valid_resp.hex(' ')} (长度: {len(valid_resp)}, 耗时: {recv_cost:.2f}ms)") + + return valid_resp + + except Exception as e: + print(f"传送带{self.conveyor_id}通信异常 ({description}): {e}") + return None + + def start_motor(self): + """启动电机""" + with self.state_lock: + if self.is_running or self.is_stopped: + print(f"[传送带{self.conveyor_id}] 电机已启动/停止,无需重复启动") + return + + # 1. 发送速度模式配置指令 + print(f"[传送带{self.conveyor_id}] 配置速度模式...") + self.send_command_list(self.speed_commands[:4]) + + # 2. 发送启动指令 + print(f"[传送带{self.conveyor_id}] 发送启动指令") + self.send_and_receive_raw(self.start_command, "启动传送带") + + with self.state_lock: + self.is_running = True + + def stop_motor(self): + """停止电机""" + with self.state_lock: + if self.is_stopped: + return + self.is_stopped = True + self.is_running = False + + # 发送停止指令 + print(f"[传送带{self.conveyor_id}] 发送停止指令") + self.send_and_receive_raw(self.stop_command, "停止传送带") + + def wait_init_sensor_release(self): + """等待初始挡板离开传感器(传感器从无信号→有信号)""" + print(f"[传送带{self.conveyor_id}] 等待初始挡板离开传感器(超时{WAIT_INIT_RELEASE_TIMEOUT}秒)") + start_time = time.time() + + while time.time() - start_time < WAIT_INIT_RELEASE_TIMEOUT: + # 读取传感器状态:True=有信号(无遮挡),False=无信号(遮挡) + sensor_status = self.get_sensor_status() + + if sensor_status: # 挡板离开,传感器有信号 + print(f"[传送带{self.conveyor_id}] 初始挡板已离开传感器") + with self.state_lock: + self.init_release_done = True + return True + + time.sleep(DETECTION_INTERVAL) + + # 超时处理 + print(f"[传送带{self.conveyor_id}] 等待初始挡板离开超时!强制标记为已离开") + with self.state_lock: + self.init_release_done = True + return False + + def get_sensor_status(self): + """读取传感器状态""" + if self.conveyor_id == 1: + return self.relay_controller.get_device_status('conveyor1_sensor', 'sensors') + else: + return self.relay_controller.get_device_status('conveyor2_sensor', 'sensors') + + def monitor_conveyors_sensor_status(self, master_controller): + """ + 传感器检测线程(仅检测运行中挡板遮挡) + :param master_controller: 主控制器对象 + """ + print(f"[传送带{self.conveyor_id}] 传感器检测线程已启动(检测间隔:{DETECTION_INTERVAL}s)") + + while self.status_thread_is_running and not master_controller.global_stop_flag: + try: + with self.sensor_lock: + # 1. 全局停止/电机已停止时跳过检测 + if master_controller.global_stop_flag or self.is_stopped: + time.sleep(DETECTION_INTERVAL) + continue + + # 2. 等待初始挡板离开后再开始检测 + if not self.init_release_done: + time.sleep(DETECTION_INTERVAL) + continue + + # 3. 读取传感器状态 + sensor_status = self.get_sensor_status() + current_time = time.time() + + # 4. 检测到挡板遮挡(无信号)且满足防抖条件 → 触发停止 + if (not sensor_status) and (not self.sensor_locked) and \ + (current_time - self.last_sensor_trigger) > SENSOR_DEBOUNCE_TIME: + print( + f"\n[传送带{self.conveyor_id}] [{datetime.now().strftime('%H:%M:%S')}] 检测到挡板遮挡!准备停止") + self.last_sensor_trigger = current_time + self.sensor_triggered = True + self.sensor_locked = True + + # 立即停止当前传送带 + self.stop_motor() + + # 通知主控制器同步状态 + master_controller.on_sensor_triggered() + + time.sleep(DETECTION_INTERVAL) + + except Exception as e: + print(f"[传送带{self.conveyor_id}] 传感器检测异常: {e}") + time.sleep(0.1) + + print(f"[传送带{self.conveyor_id}] 传感器检测线程已停止") + + def start_sensor_thread(self, master_controller): + """ + 启动传感器检测线程 + :param master_controller: 主控制器对象 + """ + if self.monitor_thread and self.monitor_thread.is_alive(): + return + + self.status_thread_is_running = True + self.monitor_thread = threading.Thread( + target=self.monitor_conveyors_sensor_status, + args=(master_controller,), + daemon=True + ) + self.monitor_thread.start() + +class MasterConveyorController: + """主控制器 - 单串口管理两个传送带(485总线)""" + def __init__(self): + self.global_stop_flag = False + self.both_stopped = False # 两个传送带是否都已停止 + + # 1. 初始化单串口(485总线) + self.ser = None + self._init_serial() + + # 2. 初始化两个电机控制器(复用同一个串口) + self.conveyor1 = SingleMotorController( + slave_id=SLAVE_ID_1, + conveyor_id=1, + action_delay=ACTION_DELAY, + serial_obj=self.ser, + global_serial_lock=GLOBAL_SERIAL_LOCK + ) + self.conveyor2 = SingleMotorController( + slave_id=SLAVE_ID_2, + conveyor_id=2, + action_delay=ACTION_DELAY, + serial_obj=self.ser, + global_serial_lock=GLOBAL_SERIAL_LOCK + ) + + # 同步锁 + self.sync_lock = threading.Lock() + + def _init_serial(self): + """初始化485总线串口(主控制器统一管理)""" + try: + self.ser = serial.Serial( + port=SERIAL_PORT, + baudrate=BAUD_RATE, + bytesize=serial.EIGHTBITS, + parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE, + timeout=SERIAL_TIMEOUT, + write_timeout=SERIAL_TIMEOUT, + xonxoff=False, + rtscts=False, + dsrdtr=False + ) + if self.ser.is_open: + print(f"成功初始化485总线串口 {SERIAL_PORT}(波特率{BAUD_RATE})") + # 初始化时清空缓冲区 + with GLOBAL_SERIAL_LOCK: + self.ser.reset_input_buffer() + self.ser.reset_output_buffer() + else: + raise RuntimeError("串口初始化失败:无法打开串口") + except Exception as e: + raise RuntimeError(f"485串口初始化失败: {e}") + + def on_sensor_triggered(self): + """传感器触发回调(同步两个传送带停止)""" + with self.sync_lock: + # 检查是否两个传送带都检测到挡板遮挡 + if self.conveyor1.sensor_triggered and self.conveyor2.sensor_triggered: + # 停止未停止的传送带 + if not self.conveyor1.is_stopped: + self.conveyor1.stop_motor() + if not self.conveyor2.is_stopped: + self.conveyor2.stop_motor() + + self.both_stopped = True + print(f"\n[主控制器] 两个传送带都已检测到挡板并停止!任务完成") + self.global_stop_flag = True # 标记全局停止 + + def start_all_conveyors(self): + """启动所有传送带(按需求顺序:启动电机→等待初始挡板离开→开启传感器检测)""" + print("\n=== 主控制器:启动所有传送带===") + + # 检查串口是否正常 + if not (self.ser and self.ser.is_open): + print("[主控制器] 485串口未打开,无法启动") + return False + + # 1. 第一步:同步启动两个电机 + print("[主控制器] 同步启动两个传送带(初始挡板遮挡传感器)") + self.conveyor1.start_motor() + self.conveyor2.start_motor() + + # 2. 第二步:等待两个传送带的初始挡板都离开传感器 + print("[主控制器] 等待两个传送带的初始挡板离开传感器...") + self.conveyor1.wait_init_sensor_release() + self.conveyor2.wait_init_sensor_release() + + # 3. 第三步:初始挡板都离开后,启动传感器检测线程 + print("[主控制器] 初始挡板已全部离开,启动传感器检测线程") + self.conveyor1.start_sensor_thread(self) + self.conveyor2.start_sensor_thread(self) + + print("[主控制器] 传送带启动流程完成,等待检测挡板遮挡...") + return True + + def stop_all_conveyors(self): + """停止所有传送带并关闭串口""" + print("\n=== 主控制器:停止所有传送带 ===") + self.global_stop_flag = True + + # 强制停止两个电机 + self.conveyor1.stop_motor() + self.conveyor2.stop_motor() + + # 关闭串口 + time.sleep(1) + with GLOBAL_SERIAL_LOCK: + if self.ser and self.ser.is_open: + self.ser.close() + print(f"485总线串口 {SERIAL_PORT} 已关闭") + + print("[主控制器] 所有传送带已停止并关闭串口") + + def run(self): + """主运行函数""" + try: + if not self.start_all_conveyors(): + return + + # 主线程等待:直到两个传送带都停止或手动退出 + while not self.global_stop_flag: + if self.both_stopped: + break + time.sleep(0.5) + + except KeyboardInterrupt: + print("\n\n[主控制器] 检测到退出指令,正在停止系统...") + except Exception as e: + print(f"\n[主控制器] 程序异常: {e}") + finally: + self.stop_all_conveyors() + print("\n=== 主控制器:程序执行完毕 ===") + +# -----------传送带对外接口-------------- +def conveyor_control(): + """主函数""" + try: + master = MasterConveyorController() + master.run() + except RuntimeError as e: + print(f"系统启动失败: {e}") + except Exception as e: + print(f"未知异常: {e}") + +# ------------测试接口-------------- +if __name__ == '__main__': + conveyor_control() + + diff --git a/main_control.py b/main_control.py new file mode 100644 index 0000000..95d4500 --- /dev/null +++ b/main_control.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/12/12 11:05 +# @Author : reenrr +# @File : main_control.py +# @Desc : 主控程序 +''' +import multiprocessing # 多进程模块 +import threading +from threading import Event +import time +from EMV import sensor_triggered, global_relay, control_solenoid +from visual_algorithm import flaw_detection + +# ------------全局事件------------- +manger = multiprocessing.Manager() +conveyor_start_event = manger.Event() + + +def quality_testing(): + print("线条开始质量检测:") + + # 执行质量检测 + result = flaw_detection({"line_id": "L001", "straightness": 0.95, "noise_ratio": 0.08}) + if result == "qualified": + result = "合格" + print("该线条是否合格:", result) + print("等待线条落到传送带(双压传感器触发)...") + # 等待时间触发,超时时间设为10秒(避免无限等待) + if sensor_triggered.wait(timeout=10): + print("线条已落到传送带,控制两个传送带向前移动") + # 触发传送带启动事件 + conveyor_start_event.set() + else: + print("超时警告:线条未落到传送带,请检查") + elif result == "unqualified": + result = "不合格" + print("该线条是否合格:", result) + print("进入NG动作") + control_solenoid() # 执行NG动作,控制电磁阀 + print("NG动作结束") + # print("判断NG线条是否落入肥料区:") + +# -----------对外接口------------- +def main_control(): + print("开始摆放线条") + + # 质量检测 + quality_testing() + + while True: # 防止跳出循环 + time.sleep(1) + +# ------------测试接口------------- +if __name__ == '__main__': + main_control() + + + + diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..31bd38a --- /dev/null +++ b/readme.md @@ -0,0 +1,14 @@ +# 开发板上找串口号 +## 找串口号 +sudo ls /dev/tty{S*,ACM*,USB*,rfcomm*} +看看是哪儿个串口号,比如我的是/dev/ttyACM0 + +使用sudo dmesg | grep -i acm查看,如果出现: +ttyACM0: USB ACM device +说明是这个串口 + +## 给串口赋权限 +sudo chmod 666 /dev/ttyACM0 + +# python版本 +3.9 diff --git a/visual_algorithm/visual_algorithm.py b/visual_algorithm/visual_algorithm.py new file mode 100644 index 0000000..d31b776 --- /dev/null +++ b/visual_algorithm/visual_algorithm.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +# @Time : 2025/12/23 16:44 +# @Author : reenrr +# @File : visual_algorithm.py +# @Desc : 留给视觉--质量检测的接口 +""" +import random + + +# -------------------------- 核心算法接口(后续替换此处即可) -------------------------- +def visual_algorithm_core(line_data: dict) -> str: + """ + 视觉算法核心判定函数(模拟版本) + 后续接入真实视觉算法时,直接替换此函数的实现逻辑即可 + + param: line_data: 线条特征数据(字典格式,可根据实际需求扩展字段) + 示例:{"line_id": "L001", "straightness": 0.95, "noise_ratio": 0.08} + return: 判定结果,固定返回 "qualified" 或 "unqualified" + """ + # 模拟随机返回合格/不合格(真实算法时,替换为实际判定逻辑) + # return random.choice(["qualified", "unqualified"]) + return "qualified" + + +# --------------外部接口--------------- +def flaw_detection(line_data: dict) -> str: + """ + 视觉算法缺陷检测统一接口(对外调用入口) + 封装核心算法,保证接口格式统一,后续替换算法不影响调用方 + + :param line_data: 线条特征数据(需和核心算法入参一致) + :return: 检测结果,"qualified"(合格) / "unqualified"(不合格) + """ + # 调用核心算法(后续仅需修改 visual_algorithm_core 函数) + result = visual_algorithm_core(line_data) + # 结果校验(保证返回值符合规范) + if result not in ["qualified", "unqualified"]: + raise ValueError("视觉算法返回值异常,仅支持 'qualified' 或 'unqualified'") + return result + +# ------------测试接口--------------- +if __name__ == '__main__': + result = flaw_detection({"line_id": "L001", "straightness": 0.95, "noise_ratio": 0.08}) + print("线条质量检测") + + + +