Files
Feeding_control_system/feeding/process.py

346 lines
15 KiB
Python
Raw Normal View History

2025-11-01 17:33:26 +08:00
from enum import IntEnum
from core.state import FeedStatus
from service.mould_service import MouldService
from busisness.blls import ArtifactBll
from busisness.models import ArtifactInfoModel
class FeedingProcess:
def __init__(self, relay_controller, inverter_controller,
transmitter_controller, vision_detector,
camera_controller, state, settings):
self.relay_controller = relay_controller
2025-11-01 17:33:26 +08:00
self.artifact_bll = ArtifactBll()
self.inverter_controller = inverter_controller
self.transmitter_controller = transmitter_controller
self.vision_detector = vision_detector
self.camera_controller = camera_controller
self.state = state
2025-11-01 17:33:26 +08:00
self.state.feed_status = FeedStatus.FNone
2025-11-12 09:22:21 +08:00
self.settings = settings
def start_feeding(self):
2025-11-12 09:22:21 +08:00
loc_state=self.state
2025-11-01 17:33:26 +08:00
"""开始生产管片"""
2025-11-12 09:22:21 +08:00
if loc_state.feed_status == FeedStatus.FNone:
loc_state.feed_status = FeedStatus.FCheckM
2025-11-01 17:33:26 +08:00
return
2025-11-12 09:22:21 +08:00
elif loc_state.feed_status == FeedStatus.FCheckM:
loc_state._lower_feeding_stage = 4
2025-11-01 17:33:26 +08:00
self.wait_for_vehicle_alignment()
2025-11-12 09:22:21 +08:00
loc_state.feed_status = FeedStatus.FStart
2025-11-01 17:33:26 +08:00
print("生产已检查模车")
return
2025-11-12 09:22:21 +08:00
elif loc_state.feed_status == FeedStatus.FApiCheck:
2025-11-01 17:33:26 +08:00
print("生产已开始")
time.sleep(2)
loc_modules = MouldService.get_not_pour_artifacts()
if loc_modules:
# 取第一个未浇筑的管片
loc_module = loc_modules[0]
self.state.current_artifact = loc_module
else:
#未读取到AIP接口数据.
self.state.current_artifact = None
return
2025-11-12 09:22:21 +08:00
elif loc_state.feed_status == FeedStatus.FRFID:
2025-11-01 17:33:26 +08:00
print("生产已检查RFID")
#RFID格式模具编号,分块号,尺寸规格,方量
rfid_info =''
loc_MouldCode='SHZB1-4'
loc_SizeSpecification='6600*1200'
loc_BlockNumber='B1'
loc.BetonVolume=1.56
if self.state.current_artifact:
#检测是否和RFID识别的管理一致
self.state.feed_status = FeedStatus.FCheckGB
else:
#以RFID为准
loc_module= ArtifactInfoModel()
loc_module.MouldCode=loc_MouldCode
loc_module.SizeSpecification=loc_SizeSpecification
loc_module.BlockNumber=loc_BlockNumber
loc_module.BetonVolume=loc.BetonVolume
self.state.current_artifact = loc_module
#确认是否保存到数据库
self.state.feed_status = FeedStatus.FCheckGB
return
2025-11-12 09:22:21 +08:00
elif loc_state.feed_status == FeedStatus.FCheckGB:
print("检查盖板对齐,")
2025-11-01 17:33:26 +08:00
time.sleep(10)
2025-11-12 09:22:21 +08:00
loc_state.feed_status = FeedStatus.FUpperToLower
#计算本次生产需要的总重量
print(f"本次生产需要的总重量:{self.state.need_total_weight}")
return
elif loc_state.feed_status == FeedStatus.FUpperToLower:
print("上料斗向下料斗转移")
#上料斗重量
loc_state.initial_upper_weight=self.transmitter_controller.read_data(1)
#下料斗重量
loc_state.initial_lower_weight=self.transmitter_controller.read_data(2)
#需要的总重量
loc_state.need_total_weight=loc_state.current_artifact.BetonVolume*loc_state.density
if loc_state.need_total_weight > loc_state.initial_upper_weight + loc_state.initial_lower_weight:
# 等待上料斗重量增加(多久不够报警可能出现F块不足的情况)
print('重量不够,需要增加')
return
if loc_state.need_total_weight>loc_state.initial_lower_weight:
if self.state._upper_door_position != 'over_lower':
#是否需要等待上料斗下料,如果下料斗够重量,则不需要等待
return
else:
# 需要等待上料斗下料
# 最后一块进行尾数控制
# 最后一块F块前面多要0.25,0.3F块直接下料先多下0.3后续)
loc_FWeight=0.3*loc_state.density
loc_feed_weight=loc_state.need_total_weight-loc_state.initial_lower_weight-loc_FWeight
self.transfer_material_from_upper_to_lower(loc_state.initial_upper_weight,loc_state.initial_lower_weight,loc_feed_weight)
self.state.feed_status = FeedStatus.FFeed1
# time.sleep(10)
return
elif self.state.feed_status == FeedStatus.FFeed1:
#下料
# self._start_feeding_stage()
print("下料1")
return
elif self.state.feed_status == FeedStatus.FFeed2:
#上料
# self._start_feeding_stage()
print("下料2")
2025-11-01 17:33:26 +08:00
return
2025-11-12 09:22:21 +08:00
elif self.state.feed_status == FeedStatus.FFeed3:
#下料
# self._start_feeding_stage()
print("下料3")
2025-11-01 17:33:26 +08:00
return
elif self.state.feed_status == FeedStatus.FFinished:
print("生产已完成")
self.state.feed_status = FeedStatus.FCheckM
return
def _start_feeding_stage(self):
"""启动指定下料阶段"""
"""开始分步下料"""
print("开始分步下料过程")
self.transfer_material_from_upper_to_lower()
2025-11-12 09:22:21 +08:00
def transfer_material_from_upper_to_lower(self,initial_upper_weight,initial_lower_weight,feed_weight):
2025-11-01 17:33:26 +08:00
2025-11-12 09:22:21 +08:00
"""target_upper_weight转移后剩下的上料斗重量"""
# 如果低于单次,全部卸掉
_max_lower_weight=self.settings.max_lower_volume*self.state.density
if (initial_lower_weight+feed_weight>_max_lower_weight):
feed_weight=_max_lower_weight-initial_lower_weight
target_upper_weight=initial_upper_weight-feed_weight
target_upper_weight = max(target_upper_weight, 0)
# 确保下料斗出砼门关闭
self.relay_controller.control(self.relay_controller.DOOR_LOWER_2, 'close')
# 打开上料斗出砼门
self.relay_controller.control(self.relay_controller.DOOR_LOWER_1, 'open')
# 等待物料流入下料斗,基于上料斗重量变化控制
import time
start_time = time.time()
2025-11-12 09:22:21 +08:00
# timeout = 30 # 30秒超时
2025-11-12 09:22:21 +08:00
while self.state.running:
current_upper_weight = self.transmitter_controller.read_data(1)
# 如果无法读取重量,继续尝试
if current_upper_weight is None:
print("无法读取上料斗重量,继续尝试...")
time.sleep(1)
continue
print(f"上料斗当前重量: {current_upper_weight:.2f}kg")
# 如果达到目标重量,则关闭上料斗出砼门
if current_upper_weight <= target_upper_weight + 50: # 允许50kg的误差范围
print(f"达到目标重量,当前重量: {current_upper_weight:.2f}kg")
2025-11-12 09:22:21 +08:00
print(f"花费时间 {time.time() - start_time:.2f}")
break
elif time.time() - start_time > 25: # 如果25秒后重量变化过小
weight_change = initial_upper_weight - current_upper_weight
if weight_change < 100: # 如果重量变化小于100kg
2025-11-12 09:22:21 +08:00
#需要增加报警处理
print("重量变化过小,可能存在堵塞")
time.sleep(1)
# 关闭上料斗出砼门
self.relay_controller.control(self.relay_controller.DOOR_LOWER_1, 'close')
print("上料斗下料完成")
def wait_for_vehicle_alignment(self):
"""等待模具车对齐"""
print("等待模具车对齐...")
2025-11-01 17:33:26 +08:00
self.state._lower_feeding_stage = 4
import time
2025-11-01 17:33:26 +08:00
while self.state._lower_feeding_stage == 4 and self.state.running:
if self.state.vehicle_aligned:
print("模具车已对齐,开始下料")
2025-11-01 17:33:26 +08:00
self.state._lower_feeding_stage = 1
# self.feeding_stage_one()
break
time.sleep(self.settings.alignment_check_interval)
def feeding_stage_one(self):
"""第一阶段下料:下料斗向模具车下料(低速)"""
print("开始第一阶段下料:下料斗低速下料")
self.inverter_controller.set_frequency(self.settings.frequencies[0])
self.inverter_controller.control('start')
# 确保上料斗出砼门关闭
self.relay_controller.control(self.relay_controller.DOOR_LOWER_1, 'close')
# 打开下料斗出砼门
self.relay_controller.control(self.relay_controller.DOOR_LOWER_2, 'open')
import time
start_time = time.time()
initial_weight = self.transmitter_controller.read_data(2)
if initial_weight is None:
print("无法获取初始重量,取消下料")
self.finish_feeding_process()
return
target_weight = initial_weight + self.settings.single_batch_weight
2025-11-01 17:33:26 +08:00
while self.state._lower_feeding_stage == 1:
current_weight = self.transmitter_controller.read_data(2)
if current_weight is None:
self.state.lower_weight_error_count += 1
if self.state.lower_weight_error_count >= self.settings.max_error_count:
print("下料斗传感器连续读取失败,停止下料")
self.finish_feeding_process()
return
else:
self.state.lower_weight_error_count = 0
if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30:
2025-11-01 17:33:26 +08:00
self.state._lower_feeding_stage = 2
self.feeding_stage_two()
break
time.sleep(2)
def feeding_stage_two(self):
"""第二阶段下料:下料斗向模具车下料(中速)"""
print("开始第二阶段下料:下料斗中速下料")
self.inverter_controller.set_frequency(self.settings.frequencies[1])
# 保持下料斗出砼门打开
self.relay_controller.control(self.relay_controller.DOOR_LOWER_2, 'open')
# 确保上料斗出砼门关闭
self.relay_controller.control(self.relay_controller.DOOR_LOWER_1, 'close')
import time
start_time = time.time()
initial_weight = self.transmitter_controller.read_data(2)
if initial_weight is None:
print("无法获取初始重量,取消下料")
self.finish_feeding_process()
return
target_weight = initial_weight + self.settings.single_batch_weight
2025-11-01 17:33:26 +08:00
while self.state._lower_feeding_stage == 2:
current_weight = self.transmitter_controller.read_data(2)
if current_weight is None:
self.state.lower_weight_error_count += 1
if self.state.lower_weight_error_count >= self.settings.max_error_count:
print("下料斗传感器连续读取失败,停止下料")
self.finish_feeding_process()
return
else:
self.state.lower_weight_error_count = 0
if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30:
2025-11-01 17:33:26 +08:00
self.state._lower_feeding_stage = 3
self.feeding_stage_three()
break
time.sleep(2)
def feeding_stage_three(self):
"""第三阶段下料:下料斗向模具车下料(高速)"""
print("开始第三阶段下料:下料斗高速下料")
self.inverter_controller.set_frequency(self.settings.frequencies[2])
# 保持下料斗出砼门打开
self.relay_controller.control(self.relay_controller.DOOR_LOWER_2, 'open')
# 确保上料斗出砼门关闭
self.relay_controller.control(self.relay_controller.DOOR_LOWER_1, 'close')
import time
start_time = time.time()
initial_weight = self.transmitter_controller.read_data(2)
if initial_weight is None:
print("无法获取初始重量,取消下料")
self.finish_feeding_process()
return
target_weight = initial_weight + self.settings.single_batch_weight
2025-11-01 17:33:26 +08:00
while self.state._lower_feeding_stage == 3:
current_weight = self.transmitter_controller.read_data(2)
if current_weight is None:
self.state.lower_weight_error_count += 1
if self.state.lower_weight_error_count >= self.settings.max_error_count:
print("下料斗传感器连续读取失败,停止下料")
self.finish_feeding_process()
return
else:
self.state.lower_weight_error_count = 0
if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30:
2025-11-01 17:33:26 +08:00
self.state._lower_feeding_stage = 4
self.finish_current_batch()
break
time.sleep(2)
def finish_current_batch(self):
"""完成当前批次下料"""
print("当前批次下料完成,关闭出砼门")
self.inverter_controller.control('stop')
self.relay_controller.control(self.relay_controller.DOOR_LOWER_1, 'close')
self.relay_controller.control(self.relay_controller.DOOR_LOWER_2, 'close')
# 增加三阶段下料轮次计数
self.state.lower_feeding_cycle += 1
# 检查是否完成两轮三阶段下料
if self.state.lower_feeding_cycle >= self.state.upper_feeding_max:
print("完成两轮三阶段下料5吨下料任务完成")
self.finish_feeding_process()
return
# 如果只完成一轮三阶段下料,进行第二次上料
print("第一轮三阶段下料完成,准备第二次上料")
# 上料斗第二次向下料斗下料
try:
self.transfer_material_from_upper_to_lower()
except Exception as e:
print(f"第二次上料失败: {e}")
print("停止下料流程")
self.finish_feeding_process() # 出现严重错误时结束整个流程
return
# 继续等待当前模具车对齐(不需要重新等待对齐,因为是同一辆模具车)
print("第二次上料完成,继续三阶段下料")
2025-11-01 17:33:26 +08:00
self.state._lower_feeding_stage = 1 # 直接进入第一阶段下料
self.feeding_stage_one() # 开始第二轮第一阶段下料
def finish_feeding_process(self):
"""完成整个下料流程"""
print("整个下料流程完成")
2025-11-01 17:33:26 +08:00
self.state._lower_feeding_stage = 0
self.state.lower_feeding_cycle = 0
self.state.upper_feeding_count = 0
2025-11-01 17:33:26 +08:00
# self.return_upper_door_to_default()
def return_upper_door_to_default(self):
"""上料斗回到默认位置(搅拌楼下接料位置)"""
print("上料斗回到默认位置")
self.relay_controller.control(self.relay_controller.DOOR_UPPER, 'close')
2025-11-01 17:33:26 +08:00
self.state._upper_door_position = 'default'