# core/system.py import threading import time import cv2 from config.settings import Settings from core.state import SystemState from hardware.relay import RelayController from hardware.inverter import InverterController from hardware.transmitter import TransmitterController from hardware.RFID.rfid_service import rfid_service from vision.camera import CameraController from vision.detector import VisionDetector from feeding.controller import FeedingController class FeedingControlSystem: def __init__(self, settings: Settings): self.settings = settings self.state = SystemState() # 初始化硬件控制器 self.relay_controller = RelayController( host=settings.relay_host, port=settings.relay_port ) self.inverter_controller = InverterController(self.relay_controller) self.transmitter_controller = TransmitterController(self.relay_controller) # 初始化视觉系统 self.camera_controller = CameraController() self.vision_detector = VisionDetector(settings) # 初始化下料控制器 self.feeding_controller = FeedingController( self.relay_controller, self.inverter_controller, self.transmitter_controller, self.vision_detector, self.camera_controller, self.rfid_controller, self.state, settings ) # 初始化RFID控制器 self.rfid_controller = rfid_service( host=settings.rfid_host, port=settings.rfid_port ) # 线程管理 self.monitor_thread = None self.visual_control_thread = None self.alignment_check_thread = None self.lower_feeding_thread = None def initialize(self): """初始化系统""" print("初始化控制系统...") # 设置摄像头配置 self.camera_controller.set_config( camera_type=self.settings.camera_type, ip=self.settings.camera_ip, port=self.settings.camera_port, username=self.settings.camera_username, password=self.settings.camera_password, channel=self.settings.camera_channel ) # 初始化摄像头 if not self.camera_controller.setup_capture(): raise Exception("摄像头初始化失败") # 加载视觉模型 if not self.vision_detector.load_models(): raise Exception("视觉模型加载失败") if not self.check_device_connectivity(): raise Exception("设备连接失败") # 启动系统监控 self.start_monitoring() # 启动视觉控制 self.start_visual_control() # 启动对齐检查 self.start_alignment_check() # 启动下料轮询线程 self.start_lower_feeding() print("控制系统初始化完成") def start_monitoring(self): """启动系统监控""" self.state.running = True self.monitor_thread = threading.Thread( target=self._monitor_loop, daemon=True ) self.monitor_thread.start() def _monitor_loop(self): """监控循环""" while self.state.running: try: self.feeding_controller.check_upper_material_request() self.feeding_controller.check_arch_blocking() time.sleep(1) except Exception as e: print(f"监控线程错误: {e}") def start_visual_control(self): """启动视觉控制""" self.visual_control_thread = threading.Thread( target=self._visual_control_loop, daemon=True ) self.visual_control_thread.start() def _visual_control_loop(self): """视觉控制循环""" while self.state.running: try: current_frame = self.camera_controller.capture_frame() if current_frame is not None: # 执行视觉控制逻辑 self.feeding_controller.visual_control(current_frame) time.sleep(self.settings.visual_check_interval) except Exception as e: print(f"视觉控制循环错误: {e}") time.sleep(self.settings.visual_check_interval) def start_alignment_check(self): """启动对齐检查""" self.alignment_check_thread = threading.Thread( target=self._alignment_check_loop, daemon=True ) self.alignment_check_thread.start() def _alignment_check_loop(self): """对齐检查循环""" while self.state.running: try: if self.state._lower_feeding_stage == 4: # 等待对齐阶段 current_frame = self.camera_controller.capture_frame() if current_frame is not None: self.state.vehicle_aligned = self.vision_detector.detect_vehicle_alignment(current_frame) if self.state.vehicle_aligned: print("检测到模具车对齐") else: print("模具车未对齐") time.sleep(self.settings.alignment_check_interval) except Exception as e: print(f"对齐检查循环错误: {e}") time.sleep(self.settings.alignment_check_interval) def start_lower_feeding(self): """启动下料流程""" self.lower_feeding_thread = threading.Thread( target=self._start_lower_feeding, name="Feeding", daemon=True ) self.lower_feeding_thread.start() def _start_lower_feeding(self): """启动下料流程""" while self.state.running: self.feeding_controller.start_feeding() time.sleep(self.settings.lower_feeding_interval) def check_device_connectivity(self) -> bool: """检查关键设备连接状态""" try: # 检查网络继电器连接 test_response = self.relay_controller.send_command(self.relay_controller.read_status_command) if not test_response: print("网络继电器连接失败") return False # 检查变频器连接 if not self.relay_controller.modbus_client.connect(): print("无法连接到网络继电器Modbus服务") return False # 尝试读取变频器一个寄存器(测试连接) test_result = self.relay_controller.modbus_client.read_holding_registers( address=0x00, count=1, slave=self.inverter_controller.config['slave_id'] ) if isinstance(test_result, Exception): print("变频器连接测试失败") return False # 检查下料斗变送器连接 test_weight = self.transmitter_controller.read_data(2) if test_weight is None: print("下料斗变送器连接失败") return False self.relay_controller.modbus_client.close() return True except Exception as e: print(f"设备连接检查失败: {e}") return False def stop(self): """停止系统""" print("停止控制系统...") self.state.running = False # 等待线程结束 if self.monitor_thread: self.monitor_thread.join() if self.visual_control_thread: self.visual_control_thread.join() if self.alignment_check_thread: self.alignment_check_thread.join() if self.lower_feeding_thread: self.lower_feeding_thread.join() # 释放摄像头资源 self.camera_controller.release() print("控制系统已停止")