添加了5个电磁阀+修改了传送带传感器的逻辑
This commit is contained in:
365
robot/drop.py
Normal file
365
robot/drop.py
Normal file
@ -0,0 +1,365 @@
|
||||
"""
|
||||
# @Time : 2025/12/12 11:05
|
||||
# @Author : reenrr
|
||||
# @File : drop.py
|
||||
# @Desc :
|
||||
"""
|
||||
|
||||
from ast import mod
|
||||
import configparser
|
||||
import os
|
||||
from typing import Optional
|
||||
from Position import Real_Position
|
||||
from FeedModel import LineModel, PositionModel
|
||||
import Constant
|
||||
|
||||
|
||||
class DropPositionManager:
|
||||
"""
|
||||
负责读取/写入ini格式的码垛点位配置文件
|
||||
"""
|
||||
def __init__(self, config_path=Constant.dropLine_set_file):
|
||||
"""
|
||||
:param config_path: 配置文件路径
|
||||
"""
|
||||
self.config_path = config_path
|
||||
self.config = configparser.ConfigParser() # 配置文件解析器实例,用于读取/写入ini文件
|
||||
self._load_config()
|
||||
|
||||
# --- 本地缓存 ---
|
||||
self._current_lineid: Optional[int] = None # 当前缓存的路径组ID(对应DropLineX)
|
||||
self._current_point_id: Optional[int] = None # 当前缓存的投料点ID(对应DropPoints{point})
|
||||
self._current_path: list = [] # 当前缓存的完整路径点列表(PositionModel对象列表)
|
||||
self._current_index: int = 0 # 当前路径中的索引(用于遍历路径点,返回下一个点位)
|
||||
self._current_drop_section: dict = {} # 存储每个主配置节(DropLineX)对应的最大码垛点编号,用于新增点位时的编号自增
|
||||
|
||||
def _load_config(self):
|
||||
"""加载配置文件"""
|
||||
if not os.path.exists(self.config_path):
|
||||
raise FileNotFoundError(f"配置文件不存在: {self.config_path}")
|
||||
self.config.read(self.config_path, encoding='utf-8')
|
||||
|
||||
def get_next_drop_position(self, lineid: int, point: int) -> Optional[PositionModel]:
|
||||
"""
|
||||
获取指定 lineid 和 point 的下一个路径点。
|
||||
|
||||
:param lineid: 路径组 ID(对应 DropLineX)
|
||||
:param point: 投料点 ID(对应 DropPoints{point})
|
||||
:return: 下一个路径点,若无则返回 None
|
||||
"""
|
||||
# 如果 lineid 或 point 改变,重新加载路径
|
||||
if self._current_lineid != lineid or self._current_point_id != point:
|
||||
self._load_point_path(lineid, point)
|
||||
self._current_lineid = lineid
|
||||
self._current_point_id = point
|
||||
self._current_index = 0
|
||||
|
||||
# 如果索引未超过路径长度,返回当前索引的点
|
||||
if self._current_index < len(self._current_path):
|
||||
next_point = self._current_path[self._current_index]
|
||||
self._current_index += 1
|
||||
pos = next_point.get_position()
|
||||
print(f"🎯 返回点: status={next_point.status}, linetype={next_point.lineType}, "
|
||||
f"position=({pos.X:.3f}, {pos.Y:.3f}, {pos.Z:.3f})")
|
||||
return next_point
|
||||
|
||||
# 路径结束
|
||||
print("✅ 当前点集合路径已结束")
|
||||
return None
|
||||
|
||||
def _load_point_path(self, lineid: int, point_id: int):
|
||||
"""加载指定 lineid 和 point_id 的完整路径
|
||||
(点集合dropmidpoint、droppoint、resetpoint)
|
||||
:param lineid: 路径组 ID(对应 DropLineX)
|
||||
param point_id: 投料点 ID(对应 DropPoints{point})
|
||||
"""
|
||||
self._current_path = []
|
||||
|
||||
# 检查是否存在 DropPoints{point_id}
|
||||
drop_point_sec = f"DropPoints{point_id}"
|
||||
if not self.config.has_section(drop_point_sec):
|
||||
print(f"❌ 配置错误:不存在 {drop_point_sec}")
|
||||
return
|
||||
|
||||
if self.config.getint(drop_point_sec, "lineid") != lineid:
|
||||
print(f"❌ {drop_point_sec} 不属于 lineid={lineid}")
|
||||
return
|
||||
|
||||
try:
|
||||
drop_pos = self._read_position_from_section(drop_point_sec)
|
||||
except Exception as e:
|
||||
print(f"❌ 读取 {drop_point_sec} 失败: {e}")
|
||||
return
|
||||
|
||||
# 1. 加载 DropMidPoint{point_id}-*(按 level 升序)
|
||||
mid_points = []
|
||||
# _mid_section_start=f"DropMidPoint{point_id}-"
|
||||
_mid_section_start="DropMidPoint1-"
|
||||
for sec in self.config.sections():
|
||||
if sec.startswith(_mid_section_start) and self.config.getint(sec, "lineid") == lineid:
|
||||
try:
|
||||
level = int(sec.split('-')[1])
|
||||
pos = self._read_position_from_section(sec)
|
||||
mid_points.append((level, pos))
|
||||
break
|
||||
except Exception as e:
|
||||
print(f"❌ 解析 {sec} 失败: {e}")
|
||||
|
||||
mid_points.sort(key=lambda x: x[0])
|
||||
|
||||
# 2. 加载 ResetPoint{point_id}-*(按 level 升序)
|
||||
reset_points = []
|
||||
# _reset_section_start=f"ResetPoint{point_id}-"
|
||||
_reset_section_start="ResetPoint1-"
|
||||
for sec in self.config.sections():
|
||||
if sec.startswith(_reset_section_start) and self.config.getint(sec, "lineid") == lineid:
|
||||
try:
|
||||
level = int(sec.split('-')[1])
|
||||
pos = self._read_position_from_section(sec)
|
||||
reset_points.append((level, pos))
|
||||
break
|
||||
except Exception as e:
|
||||
print(f"❌ 解析 {sec} 失败: {e}")
|
||||
reset_points.sort(key=lambda x: x[0])
|
||||
|
||||
# 3. 组装路径
|
||||
# a. DropMidPoint
|
||||
for _, pos in mid_points:
|
||||
self._current_path.append(pos)
|
||||
|
||||
# b. DropPoints
|
||||
self._current_path.append(drop_pos)
|
||||
|
||||
# c. ResetPoint
|
||||
for _, pos in reset_points:
|
||||
# model = PositionModel()
|
||||
# model.init_position(pos)
|
||||
# model.status = 10 # FReverse
|
||||
# model.lineType = 4
|
||||
self._current_path.append(pos)
|
||||
|
||||
print(f"✅ 已加载 DropLine{lineid} 中 DropPoints{point_id} 的路径,共 {len(self._current_path)} 个点")
|
||||
|
||||
def _read_position_from_section(self, section: str) -> PositionModel:
|
||||
"""从配置文件的 section 中读取位置信息"""
|
||||
model = PositionModel()
|
||||
|
||||
pos = Real_Position()
|
||||
pos.X = self.config.getfloat(section, "x")
|
||||
pos.Y = self.config.getfloat(section, "y")
|
||||
pos.Z = self.config.getfloat(section, "z")
|
||||
pos.U = self.config.getfloat(section, "u")
|
||||
pos.V = self.config.getfloat(section, "v")
|
||||
pos.W = self.config.getfloat(section, "w")
|
||||
model.init_position(pos)
|
||||
|
||||
model.lineType=self.config.getint(section, "linetype")
|
||||
model.status=self.config.getint(section, "status")
|
||||
model.id=self.config.getint(section, "id")
|
||||
model.order=self.config.getint(section, "order")
|
||||
model.lineId=self.config.getint(section, "lineid")
|
||||
# 保存section名称,用于排序
|
||||
model.section = section
|
||||
|
||||
return model
|
||||
|
||||
def _read_position_return_leveL(self,config_reader,section,lineid)->Optional[tuple]:
|
||||
"""
|
||||
读取指定配置节的点位信息,并返回元组
|
||||
:param config_reader: 配置文件解析器
|
||||
:param section: 配置节名称
|
||||
:param lineid: 路径组ID
|
||||
"""
|
||||
try:
|
||||
if config_reader.getint(section, "lineid") == lineid:
|
||||
# 提取 - 后的数字
|
||||
level = int(section.split('-')[1])
|
||||
position_model = self._read_position_from_section(section)
|
||||
return (level, position_model)
|
||||
except Exception as e:
|
||||
print(f"{__name__}:_read_position_return_leveL:{e}")
|
||||
return None
|
||||
|
||||
|
||||
|
||||
def load_path_points(self,lineid: int) ->Optional[LineModel]:
|
||||
"""根据lineid加载所有码垛的路径信息"""
|
||||
#默认码垛的lineid从10开始
|
||||
_lineid=lineid+10
|
||||
line_model = LineModel(_lineid)
|
||||
line_model.line_category = 2
|
||||
line_model.id = _lineid
|
||||
|
||||
# 查找主表 DropLineX
|
||||
main_section = f"{Constant.dropLine_set_section}{lineid}"
|
||||
if self.config.has_section(main_section):
|
||||
line_model.name = self.config.get(main_section, "name", fallback=f"路径{lineid}")
|
||||
else:
|
||||
return None
|
||||
|
||||
# 先收集所有 DropPoints 开头的 section
|
||||
drop_points_sections = []
|
||||
for sec in self.config.sections():
|
||||
if sec.startswith("DropPoints"):
|
||||
try:
|
||||
if self.config.getint(sec, "lineid") == lineid:
|
||||
# 提取数字部分
|
||||
num_part = sec.replace("DropPoints", "")
|
||||
if num_part.isdigit():
|
||||
drop_points_sections.append((sec, int(num_part)))
|
||||
except Exception as e:
|
||||
print(f"{__name__}:_load_path_points异常1:{e}")
|
||||
continue
|
||||
|
||||
# 按数字排序 DropPoints sections
|
||||
drop_points_sections.sort(key=lambda x: x[1])
|
||||
# 使用main_section作为键存储当前最大的drop-section编号
|
||||
self._current_drop_section[main_section] = drop_points_sections[-1][1] if drop_points_sections else 1
|
||||
mid_points_other=[] #最后点没有匹配的droppoints
|
||||
# 遍历每个 DropPoints,按照_load_point_path的逻辑加载对应的中间点和复位点
|
||||
for sec, point_num in drop_points_sections:
|
||||
try:
|
||||
# 1. 加载 DropMidPoint{point_num}-*(按 level 升序)
|
||||
mid_points = []
|
||||
for s in self.config.sections():
|
||||
if s.startswith(f"DropMidPoint{point_num}-"):
|
||||
_mid=self._read_position_return_leveL(self.config,s,lineid)
|
||||
if _mid:
|
||||
mid_points.append(_mid)
|
||||
if point_num==(len(drop_points_sections)):
|
||||
if s.startswith(f"DropMidPoint{point_num+1}-"):
|
||||
_mid=self._read_position_return_leveL(self.config,s,lineid)
|
||||
if _mid:
|
||||
mid_points_other.append(_mid)
|
||||
|
||||
|
||||
# 按level升序排序
|
||||
mid_points.sort(key=lambda x: x[0])
|
||||
# 添加中间点到路径
|
||||
for _, pos in mid_points:
|
||||
line_model.positions.append(pos)
|
||||
|
||||
# 加载 ResetPoint{point_num}-*(按 level 升序)
|
||||
reset_points = []
|
||||
for s in self.config.sections():
|
||||
if s.startswith(f"ResetPoint{point_num}-"):
|
||||
_reset=self._read_position_return_leveL(self.config,s,lineid)
|
||||
if _reset:
|
||||
reset_points.append(_reset)
|
||||
# 按level升序排序
|
||||
reset_points.sort(key=lambda x: x[0])
|
||||
# 添加复位点到路径
|
||||
for _, pos in reset_points:
|
||||
line_model.positions.append(pos)
|
||||
#添加当前 DropPoints
|
||||
try:
|
||||
position_model = self._read_position_from_section(sec)
|
||||
line_model.positions.append(position_model)
|
||||
except Exception as e:
|
||||
print(f"{__name__}:_load_path_points异常3:{e}")
|
||||
except Exception as e:
|
||||
print(f"{__name__}:_load_path_points异常4:{e}")
|
||||
|
||||
if mid_points_other:
|
||||
mid_points_other.sort(key=lambda x: x[0])
|
||||
for _, pos in mid_points_other:
|
||||
line_model.positions.append(pos)
|
||||
return line_model
|
||||
|
||||
def save_path_points(self, line_model: LineModel):
|
||||
"""根据lineid保存所有码垛的路径信息"""
|
||||
#默认码垛的lineid从10开始,保存的时候减一
|
||||
_lineid=line_model.id-10
|
||||
if _lineid<=0:
|
||||
return
|
||||
self.config.read(Constant.dropLine_set_file, encoding='utf-8')
|
||||
# 查找主表 DropLineX
|
||||
main_section = f"{Constant.dropLine_set_section}{_lineid}"
|
||||
if not self.config.has_section(main_section):
|
||||
self.config.add_section(main_section)
|
||||
self.config.set(main_section, "name", line_model.name)
|
||||
self.config.set(main_section, "id", str(_lineid))
|
||||
_current_reset_index=1
|
||||
_current_mid_index=1
|
||||
_current_drop_section_val=self._current_drop_section[main_section]
|
||||
# 保存每个DropPoints
|
||||
for i, pos in enumerate(line_model.positions):
|
||||
if pos.lineId == _lineid or pos.lineId == line_model.id:
|
||||
#最后一个扔包点
|
||||
if pos.section.startswith(f"DropMidPoint{_current_drop_section_val+1}"):
|
||||
_current_mid_index=int(pos.section.split('-')[-1])+1
|
||||
if pos.section.startswith(f"ResetPoint{_current_drop_section_val}"):
|
||||
_current_reset_index=int(pos.section.split('-')[-1])+1
|
||||
#新增的数据,如果是前点,需要获取后点的数据
|
||||
if pos.section.startswith("Position"):
|
||||
pos.lineId = _lineid
|
||||
|
||||
if pos.status is None:
|
||||
continue
|
||||
# FDropMid = 7
|
||||
elif pos.status==7:
|
||||
#只有一个
|
||||
pos.section = f"DropMidPoint{_current_drop_section_val+1}-{_current_mid_index}"
|
||||
_current_mid_index+=1
|
||||
|
||||
elif pos.status==9:
|
||||
pos.section = f"DropPoints{_current_drop_section_val+1}"
|
||||
_current_drop_section_val+=1
|
||||
_current_mid_index=1
|
||||
_current_reset_index=1
|
||||
|
||||
elif pos.status==10:
|
||||
pos.section = f"ResetPoint{_current_drop_section_val}-{_current_reset_index}"
|
||||
_current_reset_index+=1
|
||||
#保存数据
|
||||
pos.save_position_model(self.config)
|
||||
|
||||
with open(Constant.dropLine_set_file, 'w', encoding='utf-8') as f:
|
||||
self.config.write(f)
|
||||
|
||||
def del_drop_point(self,section):
|
||||
self.config.read(Constant.dropLine_set_file, encoding = 'utf-8')
|
||||
self.config.remove_section(section)
|
||||
with open(Constant.dropLine_set_file, 'w', encoding='utf-8') as f:
|
||||
self.config.write(f)
|
||||
|
||||
def _get_point_debug_info(manager, pos, model):
|
||||
config = manager.config
|
||||
for sec in config.sections():
|
||||
if sec.startswith("DropPoints"):
|
||||
try:
|
||||
x, y, z = config.getfloat(sec, "x"), config.getfloat(sec, "y"), config.getfloat(sec, "z")
|
||||
if abs(x - pos.X) < 0.001 and abs(y - pos.Y) < 0.001 and abs(z - pos.Z) < 0.001:
|
||||
point_id = config.getint(sec, "id")
|
||||
return f"📌 DropPoints{point_id} | id={point_id}"
|
||||
except: pass
|
||||
|
||||
elif sec.startswith("DropMidPoint"):
|
||||
try:
|
||||
parts = sec.split('-')
|
||||
if len(parts) != 2: continue
|
||||
point_id = int(''.join(filter(str.isdigit, parts[0])))
|
||||
level = int(parts[1])
|
||||
x, y, z = config.getfloat(sec, "x"), config.getfloat(sec, "y"), config.getfloat(sec, "z")
|
||||
if abs(x - pos.X) < 0.001 and abs(y - pos.Y) < 0.001 and abs(z - pos.Z) < 0.001:
|
||||
return f"📍 DropMidPoint{point_id}-{level} | id={point_id}, level={level}"
|
||||
except: pass
|
||||
|
||||
elif sec.startswith("ResetPoint"):
|
||||
try:
|
||||
parts = sec.split('-')
|
||||
if len(parts) != 2: continue
|
||||
point_id = int(''.join(filter(str.isdigit, parts[0])))
|
||||
level = int(parts[1])
|
||||
x, y, z = config.getfloat(sec, "x"), config.getfloat(sec, "y"), config.getfloat(sec, "z")
|
||||
if abs(x - pos.X) < 0.001 and abs(y - pos.Y) < 0.001 and abs(z - pos.Z) < 0.001:
|
||||
return f"🔙 ResetPoint{point_id}-{level} | id={point_id}, level={level}"
|
||||
except: pass
|
||||
return "❓ 未知点位"
|
||||
|
||||
# 测试
|
||||
if __name__ == "__main__":
|
||||
# manager = DropPositionManager("drop.ini")
|
||||
manager = DropPositionManager()
|
||||
lineid = 1
|
||||
manager.load_path_points(1)
|
||||
@ -16,7 +16,7 @@ class MyTimer:
|
||||
ts = time.time()
|
||||
return int(ts * 1000) # Convert to milliseconds
|
||||
|
||||
# CTon class equivalent in Python
|
||||
# 通断延时器类
|
||||
class CTon:
|
||||
def __init__(self):
|
||||
self.m_unET = 0
|
||||
@ -47,18 +47,34 @@ class CTon:
|
||||
self.m_bPause = False
|
||||
|
||||
def SetPause(self, value):
|
||||
if self.m_bIn:
|
||||
"""
|
||||
设置延时暂停/恢复状态,实现延时过程的暂停与继续
|
||||
:param value: 暂停标志 True表示暂停,False表示恢复
|
||||
"""
|
||||
if self.m_bIn: # 仅当输入有效时,运行设置暂停
|
||||
self.m_bPause = value
|
||||
if self.m_bPause:
|
||||
if self.m_bPause: # 暂停时,记录当前已流逝的延时时间,用于恢复记时补偿
|
||||
self.m_unPauseET = MyTimer.gMyGetTickCount() - self.m_unStartTime
|
||||
|
||||
def SetOver(self, value):
|
||||
"""
|
||||
手动设置延时完成标记
|
||||
"""
|
||||
self.m_bOver = value
|
||||
|
||||
def GetStartTime(self):
|
||||
"""
|
||||
获取延时启动时的时间戳
|
||||
"""
|
||||
return self.m_unStartTime
|
||||
|
||||
def Q(self, value_i, value_pt):
|
||||
"""
|
||||
触发延时逻辑,判断延时是否完成
|
||||
:param value_i: 当前输入信号,True表示输入有效,False表示无效
|
||||
:param value_pt: 延时时间,单位毫秒
|
||||
:return: True表示延时完成,False表示延时未完成
|
||||
"""
|
||||
self.m_bIn = value_i
|
||||
self.m_unPT = value_pt
|
||||
un_tick = MyTimer.gMyGetTickCount()
|
||||
@ -78,36 +94,52 @@ class CTon:
|
||||
|
||||
return self.m_bIn and (un_tick >= (self.m_unStartTime + self.m_unPT))
|
||||
|
||||
# CClockPulse class equivalent in Python
|
||||
# 时钟脉冲类
|
||||
class CClockPulse:
|
||||
def __init__(self):
|
||||
self.m_bFirstOut = True
|
||||
self.m_bTonAOut = False
|
||||
self.m_bTonBOut = False
|
||||
self.m_cTonA = CTon()
|
||||
self.m_cTonB = CTon()
|
||||
self.m_bFirstOut = True # 首次输出标记
|
||||
self.m_bTonAOut = False # 延时器A的输出状态
|
||||
self.m_bTonBOut = False # 延时器B的输出状态
|
||||
self.m_cTonA = CTon() # 延时器A
|
||||
self.m_cTonB = CTon() # 延时器B
|
||||
|
||||
def Q(self, value_i, run_time, stop_time):
|
||||
if self.m_bFirstOut:
|
||||
"""
|
||||
生成周期性脉冲信号,返回当前脉冲输出状态
|
||||
:param value_i: 输入信号,True表示输入有效,False表示无效
|
||||
:param run_time: 运行时间,单位毫秒
|
||||
:param stop_time: 停止时间,单位毫秒
|
||||
:return: True表示脉冲输出有效,False表示脉冲输出无效
|
||||
"""
|
||||
if self.m_bFirstOut: # 首次输出:按照[运行时间->停止时间]的顺序生成脉冲
|
||||
self.m_bTonAOut = self.m_cTonA.Q(not self.m_bTonBOut and value_i, run_time)
|
||||
self.m_bTonBOut = self.m_cTonB.Q(self.m_bTonAOut and value_i, stop_time)
|
||||
return not self.m_bTonAOut and value_i
|
||||
else:
|
||||
else: # 非首次输出:循环交替[停止时间->运行时间],生成周期性脉冲
|
||||
self.m_bTonAOut = self.m_cTonA.Q(not self.m_bTonBOut and value_i, stop_time)
|
||||
self.m_bTonBOut = self.m_cTonB.Q(self.m_bTonAOut and value_i, run_time)
|
||||
return self.m_bTonAOut and value_i
|
||||
|
||||
# CDelayOut class equivalent in Python
|
||||
# 延时输出类,显示先等待,后输出,输出完成后自动复位的逻辑
|
||||
class CDelayOut:
|
||||
def __init__(self):
|
||||
self.m_cOutTon = CTon()
|
||||
self.m_cmWaitTon = CTon()
|
||||
|
||||
def Reset(self):
|
||||
"""
|
||||
复位两个延时器的所有状态
|
||||
"""
|
||||
self.m_cOutTon.SetReset()
|
||||
self.m_cmWaitTon.SetReset()
|
||||
|
||||
def Q(self, value_i, wait_time, out_time):
|
||||
"""
|
||||
实现等待->输出->复位的闭环逻辑
|
||||
:param value_i: 输入信号,True表示输入有效,False表示无效
|
||||
:param wait_time: 等待时间,单位毫秒
|
||||
:param out_time: 输出时间,单位毫秒
|
||||
"""
|
||||
if self.m_cmWaitTon.Q(value_i, wait_time):
|
||||
if self.m_cOutTon.Q(True, out_time):
|
||||
self.m_cOutTon.SetReset()
|
||||
@ -117,12 +149,15 @@ class CDelayOut:
|
||||
return True
|
||||
return False
|
||||
|
||||
# CRisOrFall class equivalent in Python
|
||||
# 边沿检测类,上升沿或下降沿触发
|
||||
class CRisOrFall:
|
||||
def __init__(self):
|
||||
self.m_bTemp = False
|
||||
|
||||
def Q(self, value_i, ris_or_fall):
|
||||
"""
|
||||
边沿检测方法:检测输入信号的上升沿或下降沿,返回检测结果
|
||||
"""
|
||||
result = False
|
||||
if value_i != self.m_bTemp:
|
||||
if ris_or_fall and value_i: # Rising edge
|
||||
@ -132,7 +167,7 @@ class CRisOrFall:
|
||||
self.m_bTemp = value_i
|
||||
return result
|
||||
|
||||
# CTof class equivalent in Python
|
||||
# 断通电延时器类
|
||||
class CTof:
|
||||
def __init__(self):
|
||||
self.m_cDelayTon = CTon()
|
||||
|
||||
Reference in New Issue
Block a user