Files
Feeding_control_system/feeding/process.py
2025-11-12 09:22:21 +08:00

346 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from enum import IntEnum
from core.state import FeedStatus
from service.mould_service import MouldService
from busisness.blls import ArtifactBll
from busisness.models import ArtifactInfoModel
class FeedingProcess:
def __init__(self, relay_controller, inverter_controller,
transmitter_controller, vision_detector,
camera_controller, state, settings):
self.relay_controller = relay_controller
self.artifact_bll = ArtifactBll()
self.inverter_controller = inverter_controller
self.transmitter_controller = transmitter_controller
self.vision_detector = vision_detector
self.camera_controller = camera_controller
self.state = state
self.state.feed_status = FeedStatus.FNone
self.settings = settings
def start_feeding(self):
loc_state=self.state
"""开始生产管片"""
if loc_state.feed_status == FeedStatus.FNone:
loc_state.feed_status = FeedStatus.FCheckM
return
elif loc_state.feed_status == FeedStatus.FCheckM:
loc_state._lower_feeding_stage = 4
self.wait_for_vehicle_alignment()
loc_state.feed_status = FeedStatus.FStart
print("生产已检查模车")
return
elif loc_state.feed_status == FeedStatus.FApiCheck:
print("生产已开始")
time.sleep(2)
loc_modules = MouldService.get_not_pour_artifacts()
if loc_modules:
# 取第一个未浇筑的管片
loc_module = loc_modules[0]
self.state.current_artifact = loc_module
else:
#未读取到AIP接口数据.
self.state.current_artifact = None
return
elif loc_state.feed_status == FeedStatus.FRFID:
print("生产已检查RFID")
#RFID格式模具编号,分块号,尺寸规格,方量
rfid_info =''
loc_MouldCode='SHZB1-4'
loc_SizeSpecification='6600*1200'
loc_BlockNumber='B1'
loc.BetonVolume=1.56
if self.state.current_artifact:
#检测是否和RFID识别的管理一致
self.state.feed_status = FeedStatus.FCheckGB
else:
#以RFID为准
loc_module= ArtifactInfoModel()
loc_module.MouldCode=loc_MouldCode
loc_module.SizeSpecification=loc_SizeSpecification
loc_module.BlockNumber=loc_BlockNumber
loc_module.BetonVolume=loc.BetonVolume
self.state.current_artifact = loc_module
#确认是否保存到数据库
self.state.feed_status = FeedStatus.FCheckGB
return
elif loc_state.feed_status == FeedStatus.FCheckGB:
print("检查盖板对齐,")
time.sleep(10)
loc_state.feed_status = FeedStatus.FUpperToLower
#计算本次生产需要的总重量
print(f"本次生产需要的总重量:{self.state.need_total_weight}")
return
elif loc_state.feed_status == FeedStatus.FUpperToLower:
print("上料斗向下料斗转移")
#上料斗重量
loc_state.initial_upper_weight=self.transmitter_controller.read_data(1)
#下料斗重量
loc_state.initial_lower_weight=self.transmitter_controller.read_data(2)
#需要的总重量
loc_state.need_total_weight=loc_state.current_artifact.BetonVolume*loc_state.density
if loc_state.need_total_weight > loc_state.initial_upper_weight + loc_state.initial_lower_weight:
# 等待上料斗重量增加(多久不够报警可能出现F块不足的情况)
print('重量不够,需要增加')
return
if loc_state.need_total_weight>loc_state.initial_lower_weight:
if self.state._upper_door_position != 'over_lower':
#是否需要等待上料斗下料,如果下料斗够重量,则不需要等待
return
else:
# 需要等待上料斗下料
# 最后一块进行尾数控制
# 最后一块F块前面多要0.25,0.3F块直接下料先多下0.3后续)
loc_FWeight=0.3*loc_state.density
loc_feed_weight=loc_state.need_total_weight-loc_state.initial_lower_weight-loc_FWeight
self.transfer_material_from_upper_to_lower(loc_state.initial_upper_weight,loc_state.initial_lower_weight,loc_feed_weight)
self.state.feed_status = FeedStatus.FFeed1
# time.sleep(10)
return
elif self.state.feed_status == FeedStatus.FFeed1:
#下料
# self._start_feeding_stage()
print("下料1")
return
elif self.state.feed_status == FeedStatus.FFeed2:
#上料
# self._start_feeding_stage()
print("下料2")
return
elif self.state.feed_status == FeedStatus.FFeed3:
#下料
# self._start_feeding_stage()
print("下料3")
return
elif self.state.feed_status == FeedStatus.FFinished:
print("生产已完成")
self.state.feed_status = FeedStatus.FCheckM
return
def _start_feeding_stage(self):
"""启动指定下料阶段"""
"""开始分步下料"""
print("开始分步下料过程")
self.transfer_material_from_upper_to_lower()
def transfer_material_from_upper_to_lower(self,initial_upper_weight,initial_lower_weight,feed_weight):
"""target_upper_weight转移后剩下的上料斗重量"""
# 如果低于单次,全部卸掉
_max_lower_weight=self.settings.max_lower_volume*self.state.density
if (initial_lower_weight+feed_weight>_max_lower_weight):
feed_weight=_max_lower_weight-initial_lower_weight
target_upper_weight=initial_upper_weight-feed_weight
target_upper_weight = max(target_upper_weight, 0)
# 确保下料斗出砼门关闭
self.relay_controller.control(self.relay_controller.DOOR_LOWER_2, 'close')
# 打开上料斗出砼门
self.relay_controller.control(self.relay_controller.DOOR_LOWER_1, 'open')
# 等待物料流入下料斗,基于上料斗重量变化控制
import time
start_time = time.time()
# timeout = 30 # 30秒超时
while self.state.running:
current_upper_weight = self.transmitter_controller.read_data(1)
# 如果无法读取重量,继续尝试
if current_upper_weight is None:
print("无法读取上料斗重量,继续尝试...")
time.sleep(1)
continue
print(f"上料斗当前重量: {current_upper_weight:.2f}kg")
# 如果达到目标重量,则关闭上料斗出砼门
if current_upper_weight <= target_upper_weight + 50: # 允许50kg的误差范围
print(f"达到目标重量,当前重量: {current_upper_weight:.2f}kg")
print(f"花费时间 {time.time() - start_time:.2f}")
break
elif time.time() - start_time > 25: # 如果25秒后重量变化过小
weight_change = initial_upper_weight - current_upper_weight
if weight_change < 100: # 如果重量变化小于100kg
#需要增加报警处理
print("重量变化过小,可能存在堵塞")
time.sleep(1)
# 关闭上料斗出砼门
self.relay_controller.control(self.relay_controller.DOOR_LOWER_1, 'close')
print("上料斗下料完成")
def wait_for_vehicle_alignment(self):
"""等待模具车对齐"""
print("等待模具车对齐...")
self.state._lower_feeding_stage = 4
import time
while self.state._lower_feeding_stage == 4 and self.state.running:
if self.state.vehicle_aligned:
print("模具车已对齐,开始下料")
self.state._lower_feeding_stage = 1
# self.feeding_stage_one()
break
time.sleep(self.settings.alignment_check_interval)
def feeding_stage_one(self):
"""第一阶段下料:下料斗向模具车下料(低速)"""
print("开始第一阶段下料:下料斗低速下料")
self.inverter_controller.set_frequency(self.settings.frequencies[0])
self.inverter_controller.control('start')
# 确保上料斗出砼门关闭
self.relay_controller.control(self.relay_controller.DOOR_LOWER_1, 'close')
# 打开下料斗出砼门
self.relay_controller.control(self.relay_controller.DOOR_LOWER_2, 'open')
import time
start_time = time.time()
initial_weight = self.transmitter_controller.read_data(2)
if initial_weight is None:
print("无法获取初始重量,取消下料")
self.finish_feeding_process()
return
target_weight = initial_weight + self.settings.single_batch_weight
while self.state._lower_feeding_stage == 1:
current_weight = self.transmitter_controller.read_data(2)
if current_weight is None:
self.state.lower_weight_error_count += 1
if self.state.lower_weight_error_count >= self.settings.max_error_count:
print("下料斗传感器连续读取失败,停止下料")
self.finish_feeding_process()
return
else:
self.state.lower_weight_error_count = 0
if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30:
self.state._lower_feeding_stage = 2
self.feeding_stage_two()
break
time.sleep(2)
def feeding_stage_two(self):
"""第二阶段下料:下料斗向模具车下料(中速)"""
print("开始第二阶段下料:下料斗中速下料")
self.inverter_controller.set_frequency(self.settings.frequencies[1])
# 保持下料斗出砼门打开
self.relay_controller.control(self.relay_controller.DOOR_LOWER_2, 'open')
# 确保上料斗出砼门关闭
self.relay_controller.control(self.relay_controller.DOOR_LOWER_1, 'close')
import time
start_time = time.time()
initial_weight = self.transmitter_controller.read_data(2)
if initial_weight is None:
print("无法获取初始重量,取消下料")
self.finish_feeding_process()
return
target_weight = initial_weight + self.settings.single_batch_weight
while self.state._lower_feeding_stage == 2:
current_weight = self.transmitter_controller.read_data(2)
if current_weight is None:
self.state.lower_weight_error_count += 1
if self.state.lower_weight_error_count >= self.settings.max_error_count:
print("下料斗传感器连续读取失败,停止下料")
self.finish_feeding_process()
return
else:
self.state.lower_weight_error_count = 0
if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30:
self.state._lower_feeding_stage = 3
self.feeding_stage_three()
break
time.sleep(2)
def feeding_stage_three(self):
"""第三阶段下料:下料斗向模具车下料(高速)"""
print("开始第三阶段下料:下料斗高速下料")
self.inverter_controller.set_frequency(self.settings.frequencies[2])
# 保持下料斗出砼门打开
self.relay_controller.control(self.relay_controller.DOOR_LOWER_2, 'open')
# 确保上料斗出砼门关闭
self.relay_controller.control(self.relay_controller.DOOR_LOWER_1, 'close')
import time
start_time = time.time()
initial_weight = self.transmitter_controller.read_data(2)
if initial_weight is None:
print("无法获取初始重量,取消下料")
self.finish_feeding_process()
return
target_weight = initial_weight + self.settings.single_batch_weight
while self.state._lower_feeding_stage == 3:
current_weight = self.transmitter_controller.read_data(2)
if current_weight is None:
self.state.lower_weight_error_count += 1
if self.state.lower_weight_error_count >= self.settings.max_error_count:
print("下料斗传感器连续读取失败,停止下料")
self.finish_feeding_process()
return
else:
self.state.lower_weight_error_count = 0
if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30:
self.state._lower_feeding_stage = 4
self.finish_current_batch()
break
time.sleep(2)
def finish_current_batch(self):
"""完成当前批次下料"""
print("当前批次下料完成,关闭出砼门")
self.inverter_controller.control('stop')
self.relay_controller.control(self.relay_controller.DOOR_LOWER_1, 'close')
self.relay_controller.control(self.relay_controller.DOOR_LOWER_2, 'close')
# 增加三阶段下料轮次计数
self.state.lower_feeding_cycle += 1
# 检查是否完成两轮三阶段下料
if self.state.lower_feeding_cycle >= self.state.upper_feeding_max:
print("完成两轮三阶段下料5吨下料任务完成")
self.finish_feeding_process()
return
# 如果只完成一轮三阶段下料,进行第二次上料
print("第一轮三阶段下料完成,准备第二次上料")
# 上料斗第二次向下料斗下料
try:
self.transfer_material_from_upper_to_lower()
except Exception as e:
print(f"第二次上料失败: {e}")
print("停止下料流程")
self.finish_feeding_process() # 出现严重错误时结束整个流程
return
# 继续等待当前模具车对齐(不需要重新等待对齐,因为是同一辆模具车)
print("第二次上料完成,继续三阶段下料")
self.state._lower_feeding_stage = 1 # 直接进入第一阶段下料
self.feeding_stage_one() # 开始第二轮第一阶段下料
def finish_feeding_process(self):
"""完成整个下料流程"""
print("整个下料流程完成")
self.state._lower_feeding_stage = 0
self.state.lower_feeding_cycle = 0
self.state.upper_feeding_count = 0
# self.return_upper_door_to_default()
def return_upper_door_to_default(self):
"""上料斗回到默认位置(搅拌楼下接料位置)"""
print("上料斗回到默认位置")
self.relay_controller.control(self.relay_controller.DOOR_UPPER, 'close')
self.state._upper_door_position = 'default'