Files
AutoControlSystem-G/CU/drop.py
2025-09-10 09:16:57 +08:00

230 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from ast import mod
import configparser
import os
from typing import Optional
from Model.Position import Real_Position
from Model.FeedModel import PositionModel
class DropPositionManager:
def __init__(self, config_path="CU/drop.ini"):
self.config_path = config_path
self.config = configparser.ConfigParser()
self._load_config()
# --- 本地缓存 ---
self._current_lineid: Optional[int] = None
self._current_point_id: Optional[int] = None
self._current_path: list = [] # 当前路径点列表
self._current_index: int = 0 # 当前路径中的索引
def _load_config(self):
"""加载配置文件"""
if not os.path.exists(self.config_path):
raise FileNotFoundError(f"配置文件不存在: {self.config_path}")
self.config.read(self.config_path, encoding='utf-8')
def get_next_drop_position(self, lineid: int, point: int) -> Optional[PositionModel]:
"""
获取指定 lineid 和 point 的下一个路径点。
:param lineid: 路径组 ID对应 DropLineX
:param point: 投料点 ID对应 DropPoints{point}
:return: 下一个路径点,若无则返回 None
"""
# 如果 lineid 或 point 改变,重新加载路径
if self._current_lineid != lineid or self._current_point_id != point:
self._load_point_path(lineid, point)
self._current_lineid = lineid
self._current_point_id = point
self._current_index = 0
# 返回当前索引的点
if self._current_index < len(self._current_path):
next_point = self._current_path[self._current_index]
self._current_index += 1
pos = next_point.get_position()
print(f"🎯 返回点: status={next_point.status}, linetype={next_point.lineType}, "
f"position=({pos.X:.3f}, {pos.Y:.3f}, {pos.Z:.3f})")
return next_point
# 路径结束
print("✅ 当前点集合路径已结束")
return None
def _load_point_path(self, lineid: int, point_id: int):
"""加载指定 lineid 和 point_id 的完整路径
(点集合dropmidpoint、droppoint、resetpoint)"""
self._current_path = []
# 检查是否存在 DropPoints{point_id}
drop_point_sec = f"DropPoints{point_id}"
if not self.config.has_section(drop_point_sec):
print(f"❌ 配置错误:不存在 {drop_point_sec}")
return
if self.config.getint(drop_point_sec, "lineid") != lineid:
print(f"{drop_point_sec} 不属于 lineid={lineid}")
return
try:
drop_pos = self._read_position_from_section(drop_point_sec)
except Exception as e:
print(f"❌ 读取 {drop_point_sec} 失败: {e}")
return
# 1. 加载 DropMidPoint{point_id}-*(按 level 升序)
mid_points = []
for sec in self.config.sections():
if sec.startswith(f"DropMidPoint{point_id}-") and self.config.getint(sec, "lineid") == lineid:
try:
level = int(sec.split('-')[1])
pos = self._read_position_from_section(sec)
mid_points.append((level, pos))
except Exception as e:
print(f"❌ 解析 {sec} 失败: {e}")
mid_points.sort(key=lambda x: x[0])
# 2. 加载 ResetPoint{point_id}-*(按 level 升序)
reset_points = []
for sec in self.config.sections():
if sec.startswith(f"ResetPoint{point_id}-") and self.config.getint(sec, "lineid") == lineid:
try:
level = int(sec.split('-')[1])
pos = self._read_position_from_section(sec)
reset_points.append((level, pos))
except Exception as e:
print(f"❌ 解析 {sec} 失败: {e}")
reset_points.sort(key=lambda x: x[0])
# 3. 组装路径
# a. DropMidPoint
for _, pos in mid_points:
# model = PositionModel()
# model.init_position(pos)
# model.status = 7 # FMid
# model.lineType = 4 # WORLD
self._current_path.append(pos)
# b. DropPoints
# main_model = PositionModel()
# main_model.init_position(drop_pos)
# main_model.status = 9 # FDropBag
# main_model.lineType = 4
self._current_path.append(drop_pos)
# c. ResetPoint
for _, pos in reset_points:
# model = PositionModel()
# model.init_position(pos)
# model.status = 10 # FReverse
# model.lineType = 4
self._current_path.append(pos)
print(f"✅ 已加载 DropLine{lineid} 中 DropPoints{point_id} 的路径,共 {len(self._current_path)} 个点")
def _read_position_from_section(self, section: str) -> PositionModel:
"""从配置文件的 section 中读取位置信息"""
model = PositionModel()
pos = Real_Position()
pos.X = self.config.getfloat(section, "x")
pos.Y = self.config.getfloat(section, "y")
pos.Z = self.config.getfloat(section, "z")
pos.U = self.config.getfloat(section, "u")
pos.V = self.config.getfloat(section, "v")
pos.W = self.config.getfloat(section, "w")
model.init_position(pos)
model.lineType=self.config.getint(section, "linetype")
model.status=self.config.getint(section, "status")
return model
def _get_point_debug_info(manager, pos, model):
config = manager.config
for sec in config.sections():
if sec.startswith("DropPoints"):
try:
x, y, z = config.getfloat(sec, "x"), config.getfloat(sec, "y"), config.getfloat(sec, "z")
if abs(x - pos.X) < 0.001 and abs(y - pos.Y) < 0.001 and abs(z - pos.Z) < 0.001:
point_id = config.getint(sec, "id")
return f"📌 DropPoints{point_id} | id={point_id}"
except: pass
elif sec.startswith("DropMidPoint"):
try:
parts = sec.split('-')
if len(parts) != 2: continue
point_id = int(''.join(filter(str.isdigit, parts[0])))
level = int(parts[1])
x, y, z = config.getfloat(sec, "x"), config.getfloat(sec, "y"), config.getfloat(sec, "z")
if abs(x - pos.X) < 0.001 and abs(y - pos.Y) < 0.001 and abs(z - pos.Z) < 0.001:
return f"📍 DropMidPoint{point_id}-{level} | id={point_id}, level={level}"
except: pass
elif sec.startswith("ResetPoint"):
try:
parts = sec.split('-')
if len(parts) != 2: continue
point_id = int(''.join(filter(str.isdigit, parts[0])))
level = int(parts[1])
x, y, z = config.getfloat(sec, "x"), config.getfloat(sec, "y"), config.getfloat(sec, "z")
if abs(x - pos.X) < 0.001 and abs(y - pos.Y) < 0.001 and abs(z - pos.Z) < 0.001:
return f"🔙 ResetPoint{point_id}-{level} | id={point_id}, level={level}"
except: pass
return "❓ 未知点位"
# 测试
if __name__ == "__main__":
# manager = DropPositionManager("drop.ini")
manager = DropPositionManager()
lineid = 1
print(f"\n🔁 测试:通过 point 参数切换路径集合\n")
# 先走 point=1
print("=" * 60)
print("📦 开始执行 point=1 的路径")
print("=" * 60)
for i in range(10):
pos_model = manager.get_next_drop_position(lineid=lineid, point=1)
if pos_model is None:
print("🔚 point=1 路径结束")
break
pos = pos_model.get_position()
info = _get_point_debug_info(manager, pos, pos_model)
print(f" 🚀 {info}")
if pos_model.status == 9:
print(" 💥 执行【扔包】操作!")
# 再走 point=2
print("\n" + "=" * 60)
print("📦 开始执行 point=2 的路径")
print("=" * 60)
for i in range(10):
pos_model = manager.get_next_drop_position(lineid=lineid, point=2)
if pos_model is None:
print("🔚 point=2 路径结束")
break
pos = pos_model.get_position()
info = _get_point_debug_info(manager, pos, pos_model)
print(f" 🚀 {info}")
if pos_model.status == 9:
print(" 💥 执行【扔包】操作!")
# 再切回 point=1
print("\n" + "=" * 60)
print("🔄 切换回 point=1从头开始")
print("=" * 60)
for i in range(3):
pos_model = manager.get_next_drop_position(lineid=lineid, point=1)
if pos_model is None:
break
pos = pos_model.get_position()
info = _get_point_debug_info(manager, pos, pos_model)
print(f" 🚀 {info}")