from enum import IntEnum from core.state import FeedStatus from service.mould_service import MouldService from busisness.blls import ArtifactBll from busisness.models import ArtifactInfoModel class FeedingProcess: def __init__(self, relay_controller, inverter_controller, transmitter_controller, vision_detector, camera_controller, state, settings): self.relay_controller = relay_controller self.artifact_bll = ArtifactBll() self.inverter_controller = inverter_controller self.transmitter_controller = transmitter_controller self.vision_detector = vision_detector self.camera_controller = camera_controller self.state = state self.state.feed_status = FeedStatus.FNone self.settings = settings def start_feeding(self): """开始生产管片""" if self.state.feed_status == FeedStatus.FNone: self.state.feed_status = FeedStatus.FCheckM return elif self.state.feed_status == FeedStatus.FCheckM: self.state._lower_feeding_stage = 4 self.wait_for_vehicle_alignment() self.state.feed_status = FeedStatus.FStart print("生产已检查模车") return elif self.state.feed_status == FeedStatus.FStart: print("生产已开始") time.sleep(2) loc_modules = MouldService.get_not_pour_artifacts() if loc_modules: # 取第一个未浇筑的管片 loc_module = loc_modules[0] self.state.current_artifact = loc_module else: #未读取到AIP接口数据. self.state.current_artifact = None return elif self.state.feed_status == FeedStatus.FRFID: print("生产已检查RFID") #RFID格式:模具编号,分块号,尺寸规格,方量 rfid_info ='' loc_MouldCode='SHZB1-4' loc_SizeSpecification='6600*1200' loc_BlockNumber='B1' loc.BetonVolume=1.56 if self.state.current_artifact: #检测是否和RFID识别的管理一致 self.state.feed_status = FeedStatus.FCheckGB else: #以RFID为准 loc_module= ArtifactInfoModel() loc_module.MouldCode=loc_MouldCode loc_module.SizeSpecification=loc_SizeSpecification loc_module.BlockNumber=loc_BlockNumber loc_module.BetonVolume=loc.BetonVolume self.state.current_artifact = loc_module #确认是否保存到数据库 self.state.feed_status = FeedStatus.FCheckGB return elif self.state.feed_status == FeedStatus.FCheckGB: print("生产已检查盖板") time.sleep(10) self.state.feed_status = FeedStatus.FFeed return elif self.state.feed_status == FeedStatus.FFeed: self._start_feeding_stage() print("生产已下料") return elif self.state.feed_status == FeedStatus.FFinished: print("生产已完成") self.state.feed_status = FeedStatus.FCheckM return def _start_feeding_stage(self): """启动指定下料阶段""" """开始分步下料""" if self.state._lower_feeding_stage != 0: print("下料已在进行中") return print("开始分步下料过程") # 重置计数器 #下料斗下料循环次数 self.state.lower_feeding_cycle = 0 #上料斗已下料次数 self.state.upper_feeding_count = 0 #上料斗最大下料次数 self.state.upper_feeding_max = 2 # 第一次上料 self.transfer_material_from_upper_to_lower() # 等待模具车对齐并开始第一轮下料 # self.state._lower_feeding_stage = 4 # self.wait_for_vehicle_alignment() def transfer_material_from_upper_to_lower(self): """上料斗向下料斗下料""" print(f"上料斗向下料斗下料 (第 {self.state.upper_feeding_count + 1} 次)") # 记录下料前的重量 initial_upper_weight = self.transmitter_controller.read_data(1) # 如果无法读取重量,直接报错 if initial_upper_weight is None: raise Exception("无法读取上料斗重量传感器数据,下料操作终止") # 单次下料重量(kg),self.settings.single_batch_weight target_upper_weight = initial_upper_weight - self.settings.single_batch_weight target_upper_weight = max(target_upper_weight, 0) # 确保不低于0 print(f"上料斗初始重量: {initial_upper_weight:.2f}kg, 目标重量: {target_upper_weight:.2f}kg") # 确保下料斗出砼门关闭 self.relay_controller.control(self.relay_controller.DOOR_LOWER_2, 'close') # 打开上料斗出砼门 self.relay_controller.control(self.relay_controller.DOOR_LOWER_1, 'open') # 等待物料流入下料斗,基于上料斗重量变化控制 import time start_time = time.time() timeout = 30 # 30秒超时 while time.time() - start_time < timeout: current_upper_weight = self.transmitter_controller.read_data(1) # 如果无法读取重量,继续尝试 if current_upper_weight is None: print("无法读取上料斗重量,继续尝试...") time.sleep(1) continue print(f"上料斗当前重量: {current_upper_weight:.2f}kg") # 如果达到目标重量,则关闭上料斗出砼门 if current_upper_weight <= target_upper_weight + 50: # 允许50kg的误差范围 print(f"达到目标重量,当前重量: {current_upper_weight:.2f}kg") break elif time.time() - start_time > 25: # 如果25秒后重量变化过小 weight_change = initial_upper_weight - current_upper_weight if weight_change < 100: # 如果重量变化小于100kg print("重量变化过小,可能存在堵塞,交由监控系统处理...") break time.sleep(1) # 关闭上料斗出砼门 self.relay_controller.control(self.relay_controller.DOOR_LOWER_1, 'close') # 验证下料结果 final_upper_weight = self.transmitter_controller.read_data(1) if final_upper_weight is not None: actual_transferred = initial_upper_weight - final_upper_weight print(f"实际下料重量: {actual_transferred:.2f}kg") # 增加上料计数 self.state.upper_feeding_count += 1 print("上料斗下料完成") def wait_for_vehicle_alignment(self): """等待模具车对齐""" print("等待模具车对齐...") self.state._lower_feeding_stage = 4 import time while self.state._lower_feeding_stage == 4 and self.state.running: if self.state.vehicle_aligned: print("模具车已对齐,开始下料") self.state._lower_feeding_stage = 1 # self.feeding_stage_one() break time.sleep(self.settings.alignment_check_interval) def feeding_stage_one(self): """第一阶段下料:下料斗向模具车下料(低速)""" print("开始第一阶段下料:下料斗低速下料") self.inverter_controller.set_frequency(self.settings.frequencies[0]) self.inverter_controller.control('start') # 确保上料斗出砼门关闭 self.relay_controller.control(self.relay_controller.DOOR_LOWER_1, 'close') # 打开下料斗出砼门 self.relay_controller.control(self.relay_controller.DOOR_LOWER_2, 'open') import time start_time = time.time() initial_weight = self.transmitter_controller.read_data(2) if initial_weight is None: print("无法获取初始重量,取消下料") self.finish_feeding_process() return target_weight = initial_weight + self.settings.single_batch_weight while self.state._lower_feeding_stage == 1: current_weight = self.transmitter_controller.read_data(2) if current_weight is None: self.state.lower_weight_error_count += 1 if self.state.lower_weight_error_count >= self.settings.max_error_count: print("下料斗传感器连续读取失败,停止下料") self.finish_feeding_process() return else: self.state.lower_weight_error_count = 0 if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30: self.state._lower_feeding_stage = 2 self.feeding_stage_two() break time.sleep(2) def feeding_stage_two(self): """第二阶段下料:下料斗向模具车下料(中速)""" print("开始第二阶段下料:下料斗中速下料") self.inverter_controller.set_frequency(self.settings.frequencies[1]) # 保持下料斗出砼门打开 self.relay_controller.control(self.relay_controller.DOOR_LOWER_2, 'open') # 确保上料斗出砼门关闭 self.relay_controller.control(self.relay_controller.DOOR_LOWER_1, 'close') import time start_time = time.time() initial_weight = self.transmitter_controller.read_data(2) if initial_weight is None: print("无法获取初始重量,取消下料") self.finish_feeding_process() return target_weight = initial_weight + self.settings.single_batch_weight while self.state._lower_feeding_stage == 2: current_weight = self.transmitter_controller.read_data(2) if current_weight is None: self.state.lower_weight_error_count += 1 if self.state.lower_weight_error_count >= self.settings.max_error_count: print("下料斗传感器连续读取失败,停止下料") self.finish_feeding_process() return else: self.state.lower_weight_error_count = 0 if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30: self.state._lower_feeding_stage = 3 self.feeding_stage_three() break time.sleep(2) def feeding_stage_three(self): """第三阶段下料:下料斗向模具车下料(高速)""" print("开始第三阶段下料:下料斗高速下料") self.inverter_controller.set_frequency(self.settings.frequencies[2]) # 保持下料斗出砼门打开 self.relay_controller.control(self.relay_controller.DOOR_LOWER_2, 'open') # 确保上料斗出砼门关闭 self.relay_controller.control(self.relay_controller.DOOR_LOWER_1, 'close') import time start_time = time.time() initial_weight = self.transmitter_controller.read_data(2) if initial_weight is None: print("无法获取初始重量,取消下料") self.finish_feeding_process() return target_weight = initial_weight + self.settings.single_batch_weight while self.state._lower_feeding_stage == 3: current_weight = self.transmitter_controller.read_data(2) if current_weight is None: self.state.lower_weight_error_count += 1 if self.state.lower_weight_error_count >= self.settings.max_error_count: print("下料斗传感器连续读取失败,停止下料") self.finish_feeding_process() return else: self.state.lower_weight_error_count = 0 if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30: self.state._lower_feeding_stage = 4 self.finish_current_batch() break time.sleep(2) def finish_current_batch(self): """完成当前批次下料""" print("当前批次下料完成,关闭出砼门") self.inverter_controller.control('stop') self.relay_controller.control(self.relay_controller.DOOR_LOWER_1, 'close') self.relay_controller.control(self.relay_controller.DOOR_LOWER_2, 'close') # 增加三阶段下料轮次计数 self.state.lower_feeding_cycle += 1 # 检查是否完成两轮三阶段下料 if self.state.lower_feeding_cycle >= self.state.upper_feeding_max: print("完成两轮三阶段下料,5吨下料任务完成") self.finish_feeding_process() return # 如果只完成一轮三阶段下料,进行第二次上料 print("第一轮三阶段下料完成,准备第二次上料") # 上料斗第二次向下料斗下料 try: self.transfer_material_from_upper_to_lower() except Exception as e: print(f"第二次上料失败: {e}") print("停止下料流程") self.finish_feeding_process() # 出现严重错误时结束整个流程 return # 继续等待当前模具车对齐(不需要重新等待对齐,因为是同一辆模具车) print("第二次上料完成,继续三阶段下料") self.state._lower_feeding_stage = 1 # 直接进入第一阶段下料 self.feeding_stage_one() # 开始第二轮第一阶段下料 def finish_feeding_process(self): """完成整个下料流程""" print("整个下料流程完成") self.state._lower_feeding_stage = 0 self.state.lower_feeding_cycle = 0 self.state.upper_feeding_count = 0 # self.return_upper_door_to_default() def return_upper_door_to_default(self): """上料斗回到默认位置(搅拌楼下接料位置)""" print("上料斗回到默认位置") self.relay_controller.control(self.relay_controller.DOOR_UPPER, 'close') self.state._upper_door_position = 'default'