Files
AutoControlSystem-G/CU/drop.py
2025-09-30 14:44:12 +08:00

334 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from ast import mod
import configparser
import os
from typing import Optional
from Model.Position import Real_Position
from Model.FeedModel import LineModel, PositionModel
import Constant
class DropPositionManager:
def __init__(self, config_path):
self.config_path = config_path
self.config = configparser.ConfigParser()
self._load_config()
# --- 本地缓存 ---
self._current_lineid: Optional[int] = None
self._current_point_id: Optional[int] = None
self._current_path: list = [] # 当前路径点列表
self._current_index: int = 0 # 当前路径中的索引
self._current_drop_section:dict = {} # 使用字典存储每个main_section对应的最大drop-section
def _load_config(self):
"""加载配置文件"""
if not os.path.exists(self.config_path):
raise FileNotFoundError(f"配置文件不存在: {self.config_path}")
self.config.read(self.config_path, encoding='utf-8')
def get_next_drop_position(self, lineid: int, point: int) -> Optional[PositionModel]:
"""
获取指定 lineid 和 point 的下一个路径点。
:param lineid: 路径组 ID对应 DropLineX
:param point: 投料点 ID对应 DropPoints{point}
:return: 下一个路径点,若无则返回 None
"""
# 如果 lineid 或 point 改变,重新加载路径
if self._current_lineid != lineid or self._current_point_id != point:
self._load_point_path(lineid, point)
self._current_lineid = lineid
self._current_point_id = point
self._current_index = 0
# 返回当前索引的点
if self._current_index < len(self._current_path):
next_point = self._current_path[self._current_index]
self._current_index += 1
pos = next_point.get_position()
print(f"🎯 返回点: status={next_point.status}, linetype={next_point.lineType}, "
f"position=({pos.X:.3f}, {pos.Y:.3f}, {pos.Z:.3f})")
return next_point
# 路径结束
print("✅ 当前点集合路径已结束")
return None
def _load_point_path(self, lineid: int, point_id: int):
"""加载指定 lineid 和 point_id 的完整路径
(点集合dropmidpoint、droppoint、resetpoint)"""
self._current_path = []
# 检查是否存在 DropPoints{point_id}
drop_point_sec = f"DropPoints{point_id}"
if not self.config.has_section(drop_point_sec):
print(f"❌ 配置错误:不存在 {drop_point_sec}")
return
if self.config.getint(drop_point_sec, "lineid") != lineid:
print(f"{drop_point_sec} 不属于 lineid={lineid}")
return
try:
drop_pos = self._read_position_from_section(drop_point_sec)
except Exception as e:
print(f"❌ 读取 {drop_point_sec} 失败: {e}")
return
# 1. 加载 DropMidPoint{point_id}-*(按 level 升序)
mid_points = []
# _mid_section_start=f"DropMidPoint{point_id}-"
_mid_section_start="DropMidPoint1-"
for sec in self.config.sections():
if sec.startswith(_mid_section_start) and self.config.getint(sec, "lineid") == lineid:
try:
level = int(sec.split('-')[1])
pos = self._read_position_from_section(sec)
mid_points.append((level, pos))
break
except Exception as e:
print(f"❌ 解析 {sec} 失败: {e}")
mid_points.sort(key=lambda x: x[0])
# 2. 加载 ResetPoint{point_id}-*(按 level 升序)
reset_points = []
# _reset_section_start=f"ResetPoint{point_id}-"
_reset_section_start="ResetPoint1-"
for sec in self.config.sections():
if sec.startswith(_reset_section_start) and self.config.getint(sec, "lineid") == lineid:
try:
level = int(sec.split('-')[1])
pos = self._read_position_from_section(sec)
reset_points.append((level, pos))
break
except Exception as e:
print(f"❌ 解析 {sec} 失败: {e}")
reset_points.sort(key=lambda x: x[0])
# 3. 组装路径
# a. DropMidPoint
for _, pos in mid_points:
# model = PositionModel()
# model.init_position(pos)
# model.status = 7 # FMid
# model.lineType = 4 # WORLD
self._current_path.append(pos)
# b. DropPoints
# main_model = PositionModel()
# main_model.init_position(drop_pos)
# main_model.status = 9 # FDropBag
# main_model.lineType = 4
self._current_path.append(drop_pos)
# c. ResetPoint
for _, pos in reset_points:
# model = PositionModel()
# model.init_position(pos)
# model.status = 10 # FReverse
# model.lineType = 4
self._current_path.append(pos)
print(f"✅ 已加载 DropLine{lineid} 中 DropPoints{point_id} 的路径,共 {len(self._current_path)} 个点")
def _read_position_from_section(self, section: str) -> PositionModel:
"""从配置文件的 section 中读取位置信息"""
model = PositionModel()
pos = Real_Position()
pos.X = self.config.getfloat(section, "x")
pos.Y = self.config.getfloat(section, "y")
pos.Z = self.config.getfloat(section, "z")
pos.U = self.config.getfloat(section, "u")
pos.V = self.config.getfloat(section, "v")
pos.W = self.config.getfloat(section, "w")
model.init_position(pos)
model.lineType=self.config.getint(section, "linetype")
model.status=self.config.getint(section, "status")
model.id=self.config.getint(section, "id")
model.order=self.config.getint(section, "order")
model.lineId=self.config.getint(section, "lineid")
# 保存section名称用于排序
model.section = section
return model
def _read_position_return_leveL(self,config_reader,section,lineid)->Optional[tuple]:
try:
if config_reader.getint(section, "lineid") == lineid:
# 提取 - 后的数字
level = int(section.split('-')[1])
position_model = self._read_position_from_section(section)
return (level, position_model)
except Exception as e:
print(f"{__name__}:_read_position_return_leveL:{e}")
return None
#region 前端UI编辑码垛点位调用方法
def load_path_points(self,lineid: int) ->Optional[LineModel]:
"""根据lineid加载所有码垛的路径信息"""
#默认码垛的lineid从10开始
if self.config_path==Constant.dropLine_set_file_35:
_lineid=lineid+11
else:
_lineid=lineid+10
line_model = LineModel(_lineid)
line_model.line_category = 2
line_model.id = _lineid
# 查找主表 DropLineX
main_section = f"{Constant.dropLine_set_section}{lineid}"
if self.config.has_section(main_section):
line_model.name = self.config.get(main_section, "name", fallback=f"路径{lineid}")
else:
return None
# 先收集所有 DropPoints 开头的 section
drop_points_sections = []
for sec in self.config.sections():
if sec.startswith("DropPoints"):
try:
if self.config.getint(sec, "lineid") == lineid:
# 提取数字部分
num_part = sec.replace("DropPoints", "")
if num_part.isdigit():
drop_points_sections.append((sec, int(num_part)))
except Exception as e:
print(f"{__name__}:_load_path_points异常1:{e}")
continue
# 按数字排序 DropPoints sections
drop_points_sections.sort(key=lambda x: x[1])
# 使用main_section作为键存储当前最大的drop-section编号
self._current_drop_section[main_section] = drop_points_sections[-1][1] if drop_points_sections else 1
mid_points_other=[] #最后点没有匹配的droppoints
# 遍历每个 DropPoints按照_load_point_path的逻辑加载对应的中间点和复位点
for sec, point_num in drop_points_sections:
try:
# 1. 加载 DropMidPoint{point_num}-*(按 level 升序)
mid_points = []
for s in self.config.sections():
if s.startswith(f"DropMidPoint{point_num}-"):
_mid=self._read_position_return_leveL(self.config,s,lineid)
if _mid:
mid_points.append(_mid)
if point_num==(len(drop_points_sections)):
if s.startswith(f"DropMidPoint{point_num+1}-"):
_mid=self._read_position_return_leveL(self.config,s,lineid)
if _mid:
mid_points_other.append(_mid)
# 按level升序排序
mid_points.sort(key=lambda x: x[0])
# 添加中间点到路径
for _, pos in mid_points:
line_model.positions.append(pos)
# 加载 ResetPoint{point_num}-*(按 level 升序)
reset_points = []
for s in self.config.sections():
if s.startswith(f"ResetPoint{point_num}-"):
_reset=self._read_position_return_leveL(self.config,s,lineid)
if _reset:
reset_points.append(_reset)
# 按level升序排序
reset_points.sort(key=lambda x: x[0])
# 添加复位点到路径
for _, pos in reset_points:
line_model.positions.append(pos)
#添加当前 DropPoints
try:
position_model = self._read_position_from_section(sec)
line_model.positions.append(position_model)
except Exception as e:
print(f"{__name__}:_load_path_points异常3:{e}")
except Exception as e:
print(f"{__name__}:_load_path_points异常4:{e}")
if mid_points_other:
mid_points_other.sort(key=lambda x: x[0])
for _, pos in mid_points_other:
line_model.positions.append(pos)
return line_model
def save_path_points(self, line_model: LineModel):
"""根据lineid保存所有码垛的路径信息"""
#默认码垛的lineid从10开始,保存的时候减一
if self.config_path==Constant.dropLine_set_file_35:
_lineid=line_model.id-11
else:
_lineid=line_model.id-10
if _lineid<=0:
return
self.config.read(self.config_path, encoding='utf-8')
# 查找主表 DropLineX
main_section = f"{Constant.dropLine_set_section}{_lineid}"
if not self.config.has_section(main_section):
self.config.add_section(main_section)
self.config.set(main_section, "name", line_model.name)
self.config.set(main_section, "id", str(_lineid))
_current_reset_index=1
_current_mid_index=1
_current_drop_section_val=self._get_max_drop_section()
# 保存每个DropPoints
for i, pos in enumerate(line_model.positions):
if pos.lineId == _lineid or pos.lineId == line_model.id:
#最后一个扔包点
if pos.section.startswith(f"DropMidPoint{_current_drop_section_val+1}"):
_current_mid_index=int(pos.section.split('-')[-1])+1
if pos.section.startswith(f"ResetPoint{_current_drop_section_val}"):
_current_reset_index=int(pos.section.split('-')[-1])+1
#新增的数据,如果是前点,需要获取后点的数据
if pos.section.startswith("Position"):
pos.lineId = _lineid
if pos.status is None:
continue
# FDropMid = 7
elif pos.status==7:
#只有一个
pos.section = f"DropMidPoint{_current_drop_section_val+1}-{_current_mid_index}"
_current_mid_index+=1
elif pos.status==9:
pos.section = f"DropPoints{_current_drop_section_val+1}"
_current_drop_section_val+=1
_current_mid_index=1
_current_reset_index=1
elif pos.status==10:
pos.section = f"ResetPoint{_current_drop_section_val}-{_current_reset_index}"
_current_reset_index+=1
#保存数据
pos.save_position_model(self.config)
with open(self.config_path, 'w', encoding='utf-8') as f:
self.config.write(f)
def del_drop_point(self,section):
self.config.read(self.config_path, encoding = 'utf-8')
self.config.remove_section(section)
with open(self.config_path, 'w', encoding='utf-8') as f:
self.config.write(f)
def _get_max_drop_section(self):
"""获取最大的DropPoints序号"""
max_section = 1
for section in self.config.sections():
if section.startswith("DropPoints"):
num_part = int(section.replace("DropPoints", "0"))
if num_part > max_section:
max_section = num_part
return max_section
#endregion
# 测试
if __name__ == "__main__":
# manager = DropPositionManager("drop.ini")
manager = DropPositionManager()
lineid = 1
manager.load_path_points(1)