# core/system.py import threading import time import cv2 from core.state import SystemState from hardware.relay import RelayController from hardware.inverter import InverterController from hardware.transmitter import TransmitterController from hardware.RFID.rfid_service import rfid_service from vision.camera import DualCameraController from vision.detector import VisionDetector from feeding.controller import FeedingController from service.mould_service import app_web_service from config.settings import app_set_config class FeedingControlSystem: def __init__(self): self.state = SystemState() # 初始化硬件控制器 self.relay_controller = RelayController( host=app_set_config.relay_host, port=app_set_config.relay_port ) self.inverter_controller = InverterController(self.relay_controller) self.transmitter_controller = TransmitterController(self.relay_controller) # 初始化视觉系统 self.camera_controller = DualCameraController(app_set_config.camera_configs) self.vision_detector = VisionDetector() # 初始化RFID控制器 self.rfid_controller = rfid_service( host=app_set_config.rfid_host, port=app_set_config.rfid_port ) # 初始化下料控制器 self.feeding_controller = FeedingController( self.relay_controller, self.inverter_controller, self.transmitter_controller, self.vision_detector, self.camera_controller, self.rfid_controller, self.state ) # 线程管理 self.monitor_thread = None self.visual_control_thread = None self.alignment_check_thread = None self.lower_feeding_thread = None self.led_thread = None def initialize(self): """初始化系统""" print("初始化控制系统...") # 设置摄像头配置 # self.camera_controller.set_config( # camera_type=app_set_config.camera_type, # ip=app_set_config.camera_ip, # port=app_set_config.camera_port, # username=app_set_config.camera_username, # password=app_set_config.camera_password, # channel=app_set_config.camera_channel # ) # # 初始化摄像头 # if not self.camera_controller.setup_capture(): # raise Exception("摄像头初始化失败") # 加载视觉模型 # if not self.vision_detector.load_models(): # raise Exception("视觉模型加载失败") self.check_device_connectivity() # self.camera_controller.start_cameras() # if not app_set_config.debug_feeding: # 启动系统监控(要料,破拱)线程 self.start_monitoring() # 启动视觉控制(角度、溢出)线程 # self.start_visual_control() # 启动对齐检查线程 # self.start_alignment_check() # 启动下料线程 self.start_lower_feeding() #LED屏 # self.start_led() print("控制系统初始化完成") def start_monitoring(self): """启动系统监控""" print('振动和要料监控线程启动') self.monitor_thread = threading.Thread( target=self._monitor_loop, daemon=True, name='monitor' ) self.monitor_thread.start() def _monitor_loop(self): """监控循环""" while self.state.running: try: # self.feeding_controller.check_upper_material_request() self.feeding_controller.check_arch_blocking() time.sleep(1) except Exception as e: print(f"监控线程错误: {e}") def start_visual_control(self): """启动视觉控制""" print('视觉控制线程启动') self.visual_control_thread = threading.Thread( target=self._visual_control_loop, daemon=True, name='visual_control' ) self.visual_control_thread.start() def _visual_control_loop(self): """视觉控制循环""" while self.state.running: try: print('visual_control') current_frame = self.camera_controller.get_single_latest_frame() if current_frame is not None: # 执行视觉控制逻辑 self.feeding_controller.visual_control(current_frame) time.sleep(app_set_config.visual_check_interval) except Exception as e: print(f"视觉控制循环错误: {e}") time.sleep(app_set_config.visual_check_interval) def start_alignment_check(self): """启动对齐检查""" print('对齐检查线程启动') self.alignment_check_thread = threading.Thread( target=self._alignment_check_loop, daemon=True, name='align_check' ) self.alignment_check_thread.start() def _alignment_check_loop(self): """对齐检查循环""" loc_align_status=False loc_before_status=None while self.state.running: try: if self.state.lower_feeding_stage == 4: # 等待对齐阶段 current_frame = self.camera_controller.get_single_latest_frame() if current_frame is not None: self.state.vehicle_aligned = self.alignment_check_status() if self.state.vehicle_aligned: # loc_count+=1 print("检测到模具车对齐") else: print("模具车未对齐") # time.sleep(app_set_config.alignment_check_interval) # loc_align_status=self.alignment_check_status() # if loc_align_status and not loc_before_status: # print("模具车由未对齐到对齐") # self.state.vehicle_aligned=True # elif not loc_align_status and loc_before_status: # print("模具车由对齐到未对齐") # self.state.vehicle_aligned=False # if loc_before_status!=loc_align_status: # loc_before_status=loc_align_status except Exception as e: print(f"对齐检查循环错误: {e}") finally: time.sleep(app_set_config.alignment_check_interval) def alignment_check_status(self)->bool: """对齐检查循环""" loc_aligned=False loc_count=0 for i in range(4): try: current_frame = self.camera_controller.get_single_latest_frame() if current_frame is not None: loc_aligned = self.vision_detector.detect_vehicle_alignment(current_frame) if loc_aligned: loc_count+=1 print("检测到模具车对齐") else: loc_count=0 print("模具车未对齐") time.sleep(app_set_config.alignment_check_interval) except Exception as e: print(f"对齐检查循环错误: {e}") time.sleep(app_set_config.alignment_check_interval) if loc_count>=3: loc_aligned=True else: loc_aligned=False return loc_aligned def start_lower_feeding(self): """启动下料流程""" self.lower_feeding_thread = threading.Thread( target=self._start_lower_feeding, name="Feeding", daemon=True ) self.lower_feeding_thread.start() def _start_lower_feeding(self): """启动下料流程""" while self.state.running: self.feeding_controller.start_feeding() time.sleep(app_set_config.lower_feeding_interval) def start_led(self): """启动LED流程""" self.led_thread = threading.Thread( target=self._start_led, name="LED", daemon=True ) self.led_thread.start() def _start_led(self): """启动LED流程""" while self.state.running: led_info = app_web_service.get_pouring_led() if led_info and self.state.current_artifact: if self.state.current_artifact.MouldCode==led_info.MouldCode: led_info.RingTypeCode=self.state.current_artifact.RingTypeCode led_info.UpperWeight=self.state._upper_weight led_info.LowerWeight=self.state._lower_weight led_info.VibrationFrequency=self.state._mould_frequency #发送到LED屏 time.sleep(app_set_config.led_interval) def check_device_connectivity(self) -> bool: """检查关键设备连接状态""" try: # 检查网络继电器连接 test_response = self.relay_controller.send_command(self.relay_controller.read_status_command) if not test_response: print("网络继电器连接失败") return False # 检查变频器连接 if not self.relay_controller.modbus_client.connect(): print("无法连接到网络继电器Modbus服务") return False # 尝试读取变频器一个寄存器(测试连接) # test_result = self.relay_controller.modbus_client.read_holding_registers( # address=0x00, # count=1, # slave=self.inverter_controller.config['slave_id'] # ) # if isinstance(test_result, Exception): # print("变频器连接测试失败") # return False # 检查下料斗变送器连接 test_weight = self.transmitter_controller.read_data(2) if test_weight is None: print("下料斗变送器连接失败") return False self.relay_controller.modbus_client.close() return True except Exception as e: print(f"设备连接检查失败: {e}") return False def stop(self): """停止系统""" print("停止控制系统...") self.state.running = False # 等待线程结束 if self.monitor_thread: self.monitor_thread.join() if self.visual_control_thread: self.visual_control_thread.join() if self.alignment_check_thread: self.alignment_check_thread.join() if self.lower_feeding_thread: self.lower_feeding_thread.join() # 释放摄像头资源 self.camera_controller.release() print("控制系统已停止")