import socket import binascii import time import threading import struct import cv2 import os from pymodbus.client import ModbusTcpClient from pymodbus.exceptions import ModbusException # 添加视觉模块路径 import sys sys.path.append(os.path.join(os.path.dirname(__file__), 'src', 'vision')) # 导入视觉处理模块 from src.vision.anger_caculate import predict_obb_best_angle from src.vision.resize_tuili_image_main import classify_image_weighted, load_global_rois, crop_and_resize, YOLO class FeedingControlSystem: def __init__(self, relay_host='192.168.0.18', relay_port=50000): # 网络继电器配置 self.relay_host = relay_host self.relay_port = relay_port self.relay_modbus_client = ModbusTcpClient(relay_host, port=relay_port) # 继电器映射 self.DOOR_UPPER = 'door_upper' # DO0 - 上料斗滑动 self.DOOR_LOWER_1 = 'door_lower_1' # DO1 - 上料斗出砼门 self.DOOR_LOWER_2 = 'door_lower_2' # DO2 - 下料斗出砼门 self.BREAK_ARCH_UPPER = 'break_arch_upper' # DO3 - 上料斗破拱 self.BREAK_ARCH_LOWER = 'break_arch_lower' # DO4 - 下料斗破拱 # 继电器命令(原始Socket)mudbus TCP模式 self.relay_commands = { self.DOOR_UPPER: {'open': '00000000000601050000FF00', 'close': '000000000006010500000000'}, self.DOOR_LOWER_1: {'open': '00000000000601050001FF00', 'close': '000000000006010500010000'}, self.DOOR_LOWER_2: {'open': '00000000000601050002FF00', 'close': '000000000006010500020000'}, self.BREAK_ARCH_UPPER: {'open': '00000000000601050003FF00', 'close': '000000000006010500030000'}, self.BREAK_ARCH_LOWER: {'open': '00000000000601050004FF00', 'close': '000000000006010500040000'} } # 读取状态命令 self.read_status_command = '000000000006010100000008' # 设备位映射 self.device_bit_map = { self.DOOR_UPPER: 0, self.DOOR_LOWER_1: 1, self.DOOR_LOWER_2: 2, self.BREAK_ARCH_UPPER: 3, self.BREAK_ARCH_LOWER: 4 } # 变频器配置(Modbus RTU 协议) self.inverter_config = { 'slave_id': 1, 'frequency_register': 0x01, # 2001H 'start_register': 0x00, # 2000H 'stop_register': 0x00, # 2000H(用于停机) 'start_command': 0x0013, # 正转点动运行 'stop_command': 0x0001 # 停机 } # 变送器配置(Modbus RTU) self.transmitter_config = { 1: { # 上料斗 'slave_id': 1, 'weight_register': 0x01, 'register_count': 2 },#发出去的内容01 03 00 01 00 02 2: { # 下料斗 'slave_id': 2, 'weight_register': 0x01, 'register_count': 2 }#发出去的内容02 03 00 01 00 02 } # 系统状态 self._running = False self._monitor_thread = None self._visual_control_thread = None self._alignment_check_thread = None # 下料控制相关 self.min_required_weight = 500 # 模具车最小需要重量(kg) self.upper_door_position = 'default' # default(在搅拌楼下接料), over_lower(在下料斗上方), returning(返回中) self.lower_feeding_stage = 0 # 0:未下料, 1:第一阶段, 2:第二阶段, 3:等待上料, 4:等待模具车对齐 self.lower_feeding_cycle = 0 # 下料斗下料循环次数 (1, 2, 3) self.upper_feeding_count = 0 # 上料斗已下料次数 (0, 1, 2) self.last_upper_weight = 0 self.last_lower_weight = 0 self.last_weight_time = time.time() self.target_vehicle_weight = 5000 # 目标模具车重量(kg) self.upper_buffer_weight = 500 # 上料斗缓冲重量(kg),每次下料多下这么多 self.single_batch_weight = 2500 # 单次下料重量(kg) #夹角状态 self.angle_control_mode = "normal" # 角度控制模式: normal, reducing, maintaining, recovery # 错误计数 self.upper_weight_error_count = 0 self.lower_weight_error_count = 0 self.max_error_count = 3 # 下料阶段频率(Hz) self.inverter_max_frequency = 400.0#频率最大值 self.frequencies = [220.0, 230.0, 240.0] # 视觉系统接口 self.overflow_detected = False # 堆料检测 self.door_opening_large = False # 夹角 self.vehicle_aligned = False # 模具车是否对齐 # 视觉控制参数 self.angle_threshold = 60.0 # 角度阈值,超过此值认为开口过大 self.target_angle = 20.0 # 目标角度 self.min_angle = 10.0 # 最小角度 self.max_angle = 80.0 # 最大角度 self.angle_tolerance = 5.0 # 角度容差 self.visual_control_enabled = True # 视觉控制使能 self.last_angle = None # 上次检测角度 self.visual_check_interval = 1.0 # 视觉检查间隔(秒) self.alignment_check_interval = 0.5 # 对齐检查间隔(秒) # 模型路径配置 self.angle_model_path = "src/vision/angle.pt" self.overflow_model_path = "src/vision/overflow.pt" self.alignment_model_path = "src/vision/alig.pt" # 模具车对齐检测模型 self.roi_file_path = "src/vision/roi_coordinates/1_rois.txt" # 模型实例 self.angle_model = None # 夹角检测模型实例 self.overflow_model = None # 堆料检测模型实例 self.alignment_model = None # 对齐检测模型实例 # 摄像头相关配置 self.camera = None self.camera_type = "ip" self.camera_ip = "192.168.1.51" self.camera_port = 554 self.camera_username = "admin" self.camera_password = "XJ123456" self.camera_channel = 1 self.current_image_path = "current_frame.jpg" def set_camera_config(self, camera_type="ip", ip=None, port=None, username=None, password=None, channel=1): """ 设置摄像头配置 :param camera_type: 摄像头类型 "usb" 或 "ip" :param ip: 网络摄像头IP地址 :param port: 网络摄像头端口 :param username: 网络摄像头用户名 :param password: 网络摄像头密码 :param channel: 摄像头通道号 """ self.camera_type = camera_type if ip: self.camera_ip = ip if port: self.camera_port = port if username: self.camera_username = username if password: self.camera_password = password self.camera_channel = channel def set_angle_parameters(self, target_angle=20.0, min_angle=10.0, max_angle=80.0, threshold=60.0): """ 设置角度控制参数 """ self.target_angle = target_angle self.min_angle = min_angle self.max_angle = max_angle self.angle_threshold = threshold def set_feeding_parameters(self, target_vehicle_weight=5000, upper_buffer_weight=500, single_batch_weight=2500): """ 设置下料参数 :param target_vehicle_weight: 目标模具车重量(kg) :param upper_buffer_weight: 上料斗缓冲重量(kg) :param single_batch_weight: 单次下料重量(kg) """ self.target_vehicle_weight = target_vehicle_weight self.upper_buffer_weight = upper_buffer_weight self.single_batch_weight = single_batch_weight def load_all_models(self): """ 加载所有视觉检测模型 """ success = True # 加载夹角检测模型 try: if not os.path.exists(self.angle_model_path): print(f"夹角检测模型不存在: {self.angle_model_path}") success = False else: # 注意:angle.pt模型通过predict_obb_best_angle函数使用,不需要预加载 print(f"夹角检测模型路径: {self.angle_model_path}") except Exception as e: print(f"检查夹角检测模型失败: {e}") success = False # 加载堆料检测模型 try: if not os.path.exists(self.overflow_model_path): print(f"堆料检测模型不存在: {self.overflow_model_path}") success = False else: self.overflow_model = YOLO(self.overflow_model_path) print(f"成功加载堆料检测模型: {self.overflow_model_path}") except Exception as e: print(f"加载堆料检测模型失败: {e}") success = False # 加载对齐检测模型 try: if not os.path.exists(self.alignment_model_path): print(f"对齐检测模型不存在: {self.alignment_model_path}") success = False else: self.alignment_model = YOLO(self.alignment_model_path) print(f"成功加载对齐检测模型: {self.alignment_model_path}") except Exception as e: print(f"加载对齐检测模型失败: {e}") success = False return success def send_relay_command(self, command_hex): """发送原始Socket命令""" try: byte_data = binascii.unhexlify(command_hex) with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.connect((self.relay_host, self.relay_port)) sock.send(byte_data) response = sock.recv(1024) print(f"收到继电器响应: {binascii.hexlify(response)}") return response except Exception as e: print(f"继电器通信错误: {e}") return None def get_relay_status(self): """获取继电器状态""" response = self.send_relay_command(self.read_status_command) status_dict = {} if response and len(response) >= 10: status_byte = response[9] status_bin = f"{status_byte:08b}"[::-1] for key, bit_index in self.device_bit_map.items(): status_dict[key] = status_bin[bit_index] == '1' else: print("读取继电器状态失败") return status_dict def control_relay(self, device, action): """控制继电器""" if device in self.relay_commands and action in self.relay_commands[device]: print(f"控制继电器 {device} {action}") self.send_relay_command(self.relay_commands[device][action]) time.sleep(0.1) else: print(f"无效设备或动作: {device}, {action}") def read_transmitter_data_via_relay(self, transmitter_id): """读取变送器数据(Modbus TCP 转 RS485)""" try: if transmitter_id not in self.transmitter_config: print(f"无效变送器ID: {transmitter_id}") return None config = self.transmitter_config[transmitter_id] if not self.relay_modbus_client.connect(): print("无法连接网络继电器Modbus服务") return None result = self.relay_modbus_client.read_holding_registers( address=config['weight_register'], count=config['register_count'], slave=config['slave_id']#转发给哪台变送器 ) if isinstance(result, Exception): print(f"读取变送器 {transmitter_id} 失败: {result}") return None # 根据图片示例,正确解析数据 if config['register_count'] == 2:#读两个寄存器 # 获取原始字节数组 raw_data = result.registers # 组合成32位整数 weight = (raw_data[0] << 16) + raw_data[1] weight = weight / 1000.0 # 单位转换为千克 elif config['register_count'] == 1: weight = float(result.registers[0]) else: print(f"不支持的寄存器数量: {config['register_count']}") return None print(f"变送器 {transmitter_id} 读取重量: {weight}kg") return weight except ModbusException as e: print(f"Modbus通信错误: {e}") return None except Exception as e: print(f"数据解析错误: {e}") return None finally: self.relay_modbus_client.close() def set_inverter_frequency_via_relay(self, frequency): """设置变频器频率""" try: if not self.relay_modbus_client.connect(): print("无法连接网络继电器Modbus服务") return False # 使用最大频率变量计算百分比 percentage = frequency / self.inverter_max_frequency # 得到 0~1 的比例 value = int(percentage * 10000) # 转换为 -10000 ~ 10000 的整数 # 限制范围 value = max(-10000, min(10000, value)) result = self.relay_modbus_client.write_register( self.inverter_config['frequency_register'], value, slave=self.inverter_config['slave_id'] ) if isinstance(result, Exception): print(f"设置频率失败: {result}") return False print(f"设置变频器频率为 {frequency}Hz") return True except ModbusException as e: print(f"变频器Modbus通信错误: {e}") return False finally: self.relay_modbus_client.close() def control_inverter_via_relay(self, action): try: if not self.relay_modbus_client.connect(): print("无法连接网络继电器Modbus服务") return False if action == 'start': result = self.relay_modbus_client.write_register( address=self.inverter_config['start_register'], value=self.inverter_config['start_command'], slave=self.inverter_config['slave_id'] ) print("启动变频器") elif action == 'stop': result = self.relay_modbus_client.write_register( address=self.inverter_config['start_register'], value=self.inverter_config['stop_command'], slave=self.inverter_config['slave_id'] ) print("停止变频器") else: print(f"无效操作: {action}") return False if isinstance(result, Exception): print(f"控制失败: {result}") return False return True except ModbusException as e: print(f"变频器控制错误: {e}") return False finally: self.relay_modbus_client.close() def check_upper_material_request(self): """检查是否需要要料""" current_weight = self.read_transmitter_data_via_relay(1) if current_weight is None: self.upper_weight_error_count += 1 print(f"上料斗重量读取失败,错误计数: {self.upper_weight_error_count}") if self.upper_weight_error_count >= self.max_error_count: print("警告:上料斗传感器连续读取失败,请检查连接") return False self.upper_weight_error_count = 0 # 判断是否需要要料:当前重量 < 目标重量 + 缓冲重量 if current_weight < (self.single_batch_weight + self.min_required_weight): print("上料斗重量不足,通知搅拌楼要料") self.request_material_from_mixing_building() # 请求搅拌楼下料 return True return False def request_material_from_mixing_building(self): """ 请求搅拌楼下料(待完善) TODO: 与同事对接通信协议 """ print("发送要料请求至搅拌楼...") self.return_upper_door_to_default() # 这里需要与同事对接具体的通信方式 # 可能是Modbus写寄存器、TCP通信、HTTP请求等 pass def wait_for_mixing_building_material(self): """ 等待搅拌楼下料完成(待完善) TODO: 与同事对接信号接收 """ print("等待搅拌楼下料完成...") # 这里需要与同事对接具体的信号接收方式 # 可能是Modbus读寄存器、TCP通信、HTTP请求等 # 模拟等待 time.sleep(5) print("搅拌楼下料完成") self.move_upper_door_over_lower() return True def move_upper_door_over_lower(self): """移动上料斗到下料斗上方""" print("移动上料斗到下料斗上方") self.control_relay(self.DOOR_UPPER, 'open') self.upper_door_position = 'over_lower' def return_upper_door(self): """返回上料斗到搅拌楼""" print("上料斗返回搅拌楼") self.control_relay(self.DOOR_UPPER, 'close') self.upper_door_position = 'returning' def return_upper_door_to_default(self): """上料斗回到默认位置(搅拌楼下接料位置)""" print("上料斗回到默认位置") self.control_relay(self.DOOR_UPPER, 'close') self.upper_door_position = 'default' def start_lower_feeding(self): """开始分步下料""" if self.lower_feeding_stage != 0: print("下料已在进行中") return # 检查关键设备是否可连接 if not self._check_device_connectivity(): print("关键设备连接失败,无法开始下料") return print("开始分步下料过程") # 重置计数器 self.lower_feeding_cycle = 0 # 用于记录三阶段下料次数 self.upper_feeding_count = 0 # 用于记录上料次数 # 第一次上料(总共需要上料2次) self.transfer_material_from_upper_to_lower() # 等待模具车对齐并开始第一轮下料 self.lower_feeding_stage = 4 # 从等待模具车对齐开始 self.wait_for_vehicle_alignment() def _check_device_connectivity(self): """检查关键设备连接状态""" try: # 检查网络继电器连接 test_response = self.send_relay_command(self.read_status_command) if not test_response: print("网络继电器连接失败") return False # 检查变频器连接 if not self.relay_modbus_client.connect(): print("无法连接到网络继电器Modbus服务") return False # 尝试读取变频器一个寄存器(测试连接) test_result = self.relay_modbus_client.read_holding_registers( address=0x00, count=1, slave=self.inverter_config['slave_id'] ) if isinstance(test_result, Exception): print("变频器连接测试失败") return False # 检查下料斗变送器连接 test_weight = self.read_transmitter_data_via_relay(2) if test_weight is None: print("下料斗变送器连接失败") return False self.relay_modbus_client.close() return True except Exception as e: print(f"设备连接检查失败: {e}") return False def transfer_material_from_upper_to_lower(self): """上料斗向下料斗下料(基于上料斗重量传感器控制)""" print(f"上料斗向下料斗下料 (第 {self.upper_feeding_count + 1} 次)") # 记录下料前的重量 initial_upper_weight = self.read_transmitter_data_via_relay(1) # 如果无法读取重量,直接报错 if initial_upper_weight is None: raise Exception("无法读取上料斗重量传感器数据,下料操作终止") target_upper_weight = initial_upper_weight - self.single_batch_weight target_upper_weight = max(target_upper_weight, 0) # 确保不低于0 print(f"上料斗初始重量: {initial_upper_weight:.2f}kg, 目标重量: {target_upper_weight:.2f}kg") # 确保下料斗出砼门关闭 self.control_relay(self.DOOR_LOWER_2, 'close') # 打开上料斗出砼门 self.control_relay(self.DOOR_LOWER_1, 'open') # 等待物料流入下料斗,基于上料斗重量变化控制 start_time = time.time() timeout = 30 # 30秒超时 while time.time() - start_time < timeout: current_upper_weight = self.read_transmitter_data_via_relay(1) # 如果无法读取重量,继续尝试 if current_upper_weight is None: print("无法读取上料斗重量,继续尝试...") time.sleep(1) continue print(f"上料斗当前重量: {current_upper_weight:.2f}kg") # 如果达到目标重量,则关闭上料斗出砼门 if current_upper_weight <= target_upper_weight + 50: # 允许50kg的误差范围 print(f"达到目标重量,当前重量: {current_upper_weight:.2f}kg") break elif time.time() - start_time > 25: # 如果25秒后重量变化过小 weight_change = initial_upper_weight - current_upper_weight if weight_change < 100: # 如果重量变化小于100kg print("重量变化过小,可能存在堵塞,交由监控系统处理...") # 不再在这里直接处理破拱,而是依靠监控系统处理 break time.sleep(1) # 关闭上料斗出砼门 self.control_relay(self.DOOR_LOWER_1, 'close') # 验证下料结果 final_upper_weight = self.read_transmitter_data_via_relay(1) if final_upper_weight is not None: actual_transferred = initial_upper_weight - final_upper_weight print(f"实际下料重量: {actual_transferred:.2f}kg") # 增加上料计数 self.upper_feeding_count += 1 print("上料斗下料完成") # 关闭上料斗出砼门 self.control_relay(self.DOOR_LOWER_1, 'close') # 验证下料结果 final_upper_weight = self.read_transmitter_data_via_relay(1) if final_upper_weight is not None: actual_transferred = initial_upper_weight - final_upper_weight print(f"实际下料重量: {actual_transferred:.2f}kg") # 增加上料计数 self.upper_feeding_count += 1 print("上料斗下料完成") def wait_for_vehicle_alignment(self): """等待模具车对齐""" print("等待模具车对齐...") self.lower_feeding_stage = 4 while self.lower_feeding_stage == 4 and self._running: if self.vehicle_aligned: print("模具车已对齐,开始下料") self.lower_feeding_stage = 1 self.feeding_stage_one() break time.sleep(self.alignment_check_interval) def feeding_stage_one(self): """第一阶段下料:下料斗向模具车下料(低速)""" print("开始第一阶段下料:下料斗低速下料") self.set_inverter_frequency_via_relay(self.frequencies[0]) self.control_inverter_via_relay('start') # 确保上料斗出砼门关闭 self.control_relay(self.DOOR_LOWER_1, 'close') # 打开下料斗出砼门 self.control_relay(self.DOOR_LOWER_2, 'open') start_time = time.time() initial_weight = self.read_transmitter_data_via_relay(2) if initial_weight is None: print("无法获取初始重量,取消下料") self.finish_feeding_process() # 直接结束整个流程,而不是当前批次 return target_weight = initial_weight + self.single_batch_weight while self.lower_feeding_stage == 1: current_weight = self.read_transmitter_data_via_relay(2) if current_weight is None: self.lower_weight_error_count += 1 if self.lower_weight_error_count >= self.max_error_count: print("下料斗传感器连续读取失败,停止下料") self.finish_feeding_process() # 直接结束整个流程 return else: self.lower_weight_error_count = 0 if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30: self.lower_feeding_stage = 2 self.feeding_stage_two() break time.sleep(2) def feeding_stage_two(self): """第二阶段下料:下料斗向模具车下料(中速)""" print("开始第二阶段下料:下料斗中速下料") self.set_inverter_frequency_via_relay(self.frequencies[1]) # 保持下料斗出砼门打开 self.control_relay(self.DOOR_LOWER_2, 'open') # 确保上料斗出砼门关闭 self.control_relay(self.DOOR_LOWER_1, 'close') start_time = time.time() initial_weight = self.read_transmitter_data_via_relay(2) if initial_weight is None: print("无法获取初始重量,取消下料") self.finish_feeding_process() # 直接结束整个流程 return target_weight = initial_weight + self.single_batch_weight while self.lower_feeding_stage == 2: current_weight = self.read_transmitter_data_via_relay(2) if current_weight is None: self.lower_weight_error_count += 1 if self.lower_weight_error_count >= self.max_error_count: print("下料斗传感器连续读取失败,停止下料") self.finish_feeding_process() # 直接结束整个流程 return else: self.lower_weight_error_count = 0 if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30: self.lower_feeding_stage = 3 self.feeding_stage_three() break time.sleep(2) def feeding_stage_three(self): """第三阶段下料:下料斗向模具车下料(高速)""" print("开始第三阶段下料:下料斗高速下料") self.set_inverter_frequency_via_relay(self.frequencies[2]) # 保持下料斗出砼门打开 self.control_relay(self.DOOR_LOWER_2, 'open') # 确保上料斗出砼门关闭 self.control_relay(self.DOOR_LOWER_1, 'close') start_time = time.time() initial_weight = self.read_transmitter_data_via_relay(2) if initial_weight is None: print("无法获取初始重量,取消下料") self.finish_feeding_process() # 直接结束整个流程 return target_weight = initial_weight + self.single_batch_weight while self.lower_feeding_stage == 3: current_weight = self.read_transmitter_data_via_relay(2) if current_weight is None: self.lower_weight_error_count += 1 if self.lower_weight_error_count >= self.max_error_count: print("下料斗传感器连续读取失败,停止下料") self.finish_feeding_process() # 直接结束整个流程 return else: self.lower_weight_error_count = 0 if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30: self.lower_feeding_stage = 4 self.finish_current_batch() break time.sleep(2) def finish_current_batch(self): """完成当前批次下料""" print("当前批次下料完成,关闭出砼门") self.control_inverter_via_relay('stop') self.control_relay(self.DOOR_LOWER_1, 'close') self.control_relay(self.DOOR_LOWER_2, 'close') # 增加三阶段下料轮次计数 self.lower_feeding_cycle += 1 # 检查是否完成两轮三阶段下料(总共5吨) if self.lower_feeding_cycle >= 2: # 完成整个5吨下料任务 print("完成两轮三阶段下料,5吨下料任务完成") self.finish_feeding_process() return # 如果只完成一轮三阶段下料,进行第二次上料 print("第一轮三阶段下料完成,准备第二次上料") # 上料斗第二次向下料斗下料 try: self.transfer_material_from_upper_to_lower() except Exception as e: print(f"第二次上料失败: {e}") print("停止下料流程") self.finish_feeding_process() # 出现严重错误时结束整个流程 return # 继续等待当前模具车对齐(不需要重新等待对齐,因为是同一辆模具车) print("第二次上料完成,继续三阶段下料") self.lower_feeding_stage = 1 # 直接进入第一阶段下料 self.feeding_stage_one() # 开始第二轮第一阶段下料 def finish_feeding_process(self): """完成整个下料流程""" print("整个下料流程完成") self.lower_feeding_stage = 0 self.lower_feeding_cycle = 0 self.upper_feeding_count = 0 self.return_upper_door_to_default() def handle_overflow_control(self, overflow_detected, door_opening_large): """处理溢料控制""" if overflow_detected and door_opening_large: print("检测到溢料且出砼门开口较大,调小出砼门") self.control_relay(self.DOOR_LOWER_1, 'close') time.sleep(0.1) self.control_relay(self.DOOR_LOWER_1, 'open') time.sleep(0.1) def is_lower_door_open(self): """检查出砼门是否打开""" return self.lower_feeding_stage in [1, 2] # 只有在下料阶段才认为门是打开的 def check_arch_blocking(self): """检查是否需要破拱""" current_time = time.time() # 检查下料斗破拱(只有在下料过程中才检查) if self.lower_feeding_stage in [1, 2, 3]: # 在所有下料阶段检查 lower_weight = self.read_transmitter_data_via_relay(2) if lower_weight is not None: # 检查重量变化是否过慢(小于0.1kg变化且时间超过10秒) if (abs(lower_weight - self.last_lower_weight) < 0.1) and \ (current_time - self.last_weight_time) > 10: print("下料斗可能堵塞,启动破拱") self.control_relay(self.BREAK_ARCH_LOWER, 'open') time.sleep(2) self.control_relay(self.BREAK_ARCH_LOWER, 'close') self.last_lower_weight = lower_weight # 检查上料斗破拱(在上料斗向下料斗下料时检查) if (self.upper_door_position == 'over_lower' and self.lower_feeding_stage in [0, 1, 2, 3, 4]): # 在任何阶段都可能需要上料斗破拱 upper_weight = self.read_transmitter_data_via_relay(1) if upper_weight is not None: # 检查重量变化是否过慢(小于0.1kg变化且时间超过10秒) if (abs(upper_weight - self.last_upper_weight) < 0.1) and \ (current_time - self.last_weight_time) > 10: print("上料斗可能堵塞,启动破拱") self.control_relay(self.BREAK_ARCH_UPPER, 'open') time.sleep(2) self.control_relay(self.BREAK_ARCH_UPPER, 'close') self.last_upper_weight = upper_weight # 更新最后读取时间 if (self.read_transmitter_data_via_relay(1) is not None or self.read_transmitter_data_via_relay(2) is not None): self.last_weight_time = current_time def monitor_system(self): """监控系统状态""" while self._running: try: self.check_upper_material_request() self.check_arch_blocking() time.sleep(1) except Exception as e: print(f"监控线程错误: {e}") def setup_camera_capture(self, camera_index=0): """ 设置摄像头捕获 :param camera_index: USB摄像头索引或IP摄像头配置 """ try: rtsp_url = f"rtsp://{self.camera_username}:{self.camera_password}@{self.camera_ip}:{self.camera_port}/streaming/channels/{self.camera_channel}01" self.camera = cv2.VideoCapture(rtsp_url) if not self.camera.isOpened(): print(f"无法打开网络摄像头: {rtsp_url}") return False print(f"网络摄像头初始化成功,地址: {rtsp_url}") return True except Exception as e: print(f"摄像头设置失败: {e}") return False def capture_current_frame(self): """捕获当前帧并返回numpy数组""" try: if self.camera is None: print("摄像头未初始化") return None ret, frame = self.camera.read() if ret: return frame else: print("无法捕获图像帧") return None except Exception as e: print(f"图像捕获失败: {e}") return None def detect_overflow_from_image(self, image_array): """通过图像检测是否溢料(接受numpy数组)""" try: # 检查模型是否已加载 if self.overflow_model is None: print("堆料检测模型未加载") return False rois = load_global_rois(self.roi_file_path) if not rois: print("没有有效的ROI配置") return False if image_array is None: print("输入图像为空") return False crops = crop_and_resize(image_array, rois, 640) for roi_resized, _ in crops: final_class, _, _, _ = classify_image_weighted(roi_resized, self.overflow_model, threshold=0.4) if "大堆料" in final_class or "小堆料" in final_class: return True return False except Exception as e: print(f"溢料检测失败: {e}") return False def detect_vehicle_alignment(self, image_array): """通过图像检测模具车是否对齐(接受numpy数组)""" try: # 检查模型是否已加载 if self.alignment_model is None: print("对齐检测模型未加载") return False if image_array is None: print("输入图像为空") return False # 直接使用模型进行推理 results = self.alignment_model(image_array) pared_probs = results[0].probs.data.cpu().numpy().flatten() # 类别0: 未对齐, 类别1: 对齐 class_id = int(pared_probs.argmax()) confidence = float(pared_probs[class_id]) # 只有当对齐且置信度>95%时才认为对齐 if class_id == 1 and confidence > 0.95: return True return False except Exception as e: print(f"对齐检测失败: {e}") return False def get_current_door_angle(self, image=None, image_path=None): """ 通过视觉系统获取当前出砼门角度 :param image: 图像数组(numpy array) :param image_path: 图片路径 """ try: # 检查模型是否已加载 if self.angle_model is None: print("夹角检测模型未加载") return None angle_deg, _ = predict_obb_best_angle( model=self.angle_model, # 传递预加载的模型实例 image=image, # 传递图像数组 image_path=image_path # 或传递图像路径 ) return angle_deg except Exception as e: print(f"角度检测失败: {e}") return None def alignment_check_loop(self): """ 模具车对齐检查循环 """ while self._running: try: # 只在需要检查对齐时才检查 if self.lower_feeding_stage == 4: current_frame = self.capture_current_frame() if current_frame is not None: self.vehicle_aligned = self.detect_vehicle_alignment(current_frame) if self.vehicle_aligned: print("检测到模具车对齐") else: print("模具车未对齐") time.sleep(self.alignment_check_interval) except Exception as e: print(f"对齐检查循环错误: {e}") time.sleep(self.alignment_check_interval) def visual_control_loop(self): """ 视觉控制主循环 """ while self._running and self.visual_control_enabled: try: current_frame = self.capture_current_frame() if current_frame is None: print("无法获取当前图像,跳过本次调整") time.sleep(self.visual_check_interval) continue # 检测是否溢料 overflow = self.detect_overflow_from_image(current_frame) # 获取当前角度 current_angle = self.get_current_door_angle(image=current_frame) if current_angle is None: print("无法获取当前角度,跳过本次调整") time.sleep(self.visual_check_interval) continue print(f"当前角度: {current_angle:.2f}°, 溢料状态: {overflow}, 控制模式: {self.angle_control_mode}") # 状态机控制逻辑 if self.angle_control_mode == "normal": # 正常模式 if overflow and current_angle > self.angle_threshold: # 检测到堆料且角度过大,进入角度减小模式 print("检测到堆料且角度过大,关闭出砼门开始减小角度") self.control_relay(self.DOOR_LOWER_2, 'close') self.angle_control_mode = "reducing" else: # 保持正常开门 self.control_relay(self.DOOR_LOWER_2, 'open') elif self.angle_control_mode == "reducing": # 角度减小模式 if current_angle <= self.target_angle + self.angle_tolerance: # 角度已达到目标范围 if overflow: # 仍有堆料,进入维持模式 print(f"角度已降至{current_angle:.2f}°,仍有堆料,进入维持模式") self.angle_control_mode = "maintaining" self.control_relay(self.DOOR_LOWER_2, 'open') # 先打开门 else: # 无堆料,恢复正常模式 print(f"角度已降至{current_angle:.2f}°,无堆料,恢复正常模式") self.control_relay(self.DOOR_LOWER_2, 'open') self.angle_control_mode = "recovery" elif self.angle_control_mode == "maintaining": # 维持模式 - 使用脉冲控制 if not overflow: # 堆料已消除,恢复正常模式 print("堆料已消除,恢复正常模式") self.control_relay(self.DOOR_LOWER_2, 'open') self.angle_control_mode = "recovery" else: # 继续维持角度控制 self.pulse_control_door_for_maintaining() elif self.angle_control_mode == "recovery":#打开夹爪的过程中又堆料了 # 恢复模式 - 逐步打开门 if overflow: # 又出现堆料,回到角度减小模式 print("恢复过程中又检测到堆料,回到角度减小模式") self.angle_control_mode = "maintaining" else: # 堆料已消除,恢复正常模式 print("堆料已消除,恢复正常模式") self.control_relay(self.DOOR_LOWER_2, 'open') self.angle_control_mode = "normal" self.last_angle = current_angle time.sleep(self.visual_check_interval) except Exception as e: print(f"视觉控制循环错误: {e}") time.sleep(self.visual_check_interval) def pulse_control_door_for_maintaining(self): """ 用于维持模式的脉冲控制 保持角度在目标范围内 """ print("执行维持脉冲控制") # 关门1秒 self.control_relay(self.DOOR_LOWER_2, 'close') time.sleep(1.0) # 开门1秒 self.control_relay(self.DOOR_LOWER_2, 'open') time.sleep(1.0) def start_visual_control(self): """ 启动视觉控制线程 """ if not self.visual_control_enabled: print("视觉控制未启用") return print("启动视觉控制线程") self._visual_control_thread = threading.Thread( target=self.visual_control_loop, daemon=True ) self._visual_control_thread.start() return self._visual_control_thread def start_alignment_check(self): """ 启动模具车对齐检查线程 """ print("启动模具车对齐检查线程") self._alignment_check_thread = threading.Thread( target=self.alignment_check_loop, daemon=True ) self._alignment_check_thread.start() return self._alignment_check_thread def start(self): """启动系统""" if self._running: print("系统已在运行") return print("启动控制系统") self._running = True self._monitor_thread = threading.Thread(target=self.monitor_system, daemon=True) self._monitor_thread.start() def stop(self): """停止系统""" if not self._running: print("系统未在运行") return print("停止控制系统") self._running = False if self._monitor_thread is not None: self._monitor_thread.join() if self._visual_control_thread is not None: self._visual_control_thread.join() if self._alignment_check_thread is not None: self._alignment_check_thread.join() if self.camera is not None: self.camera.release() print("控制系统已停止") # 使用示例 if __name__ == "__main__": system = FeedingControlSystem(relay_host='192.168.0.18', relay_port=50000) # 设置角度控制参数 system.set_angle_parameters( target_angle=20.0, min_angle=10.0, max_angle=80.0, threshold=60.0 ) # 设置下料参数 system.set_feeding_parameters( target_vehicle_weight=5000, # 5吨 upper_buffer_weight=500, # 0.5吨缓冲 single_batch_weight=2500 # 每次下2.5吨 ) # 设置摄像头配置 system.set_camera_config( camera_type="ip", ip="192.168.1.51", port=554, username="admin", password="XJ123456", channel=1 ) # 初始化摄像头 if not system.setup_camera_capture(): print("摄像头初始化失败") exit(1) # 加载所有模型 if not system.load_all_models(): print("模型加载失败") exit(1) # 启动系统监控 system.start() # 启动视觉控制 system.start_visual_control() # 启动对齐检查 system.start_alignment_check() print("系统准备就绪,5秒后开始下料...") time.sleep(5) system.start_lower_feeding() # 启动下料流程 try: while True: time.sleep(1) except KeyboardInterrupt: print("收到停止信号") except Exception as e: print(f"系统错误: {e}") finally: system.stop()