forked from huangxin/ailai
942 lines
44 KiB
Python
942 lines
44 KiB
Python
|
|
import copy
|
|||
|
|
import logging
|
|||
|
|
import random
|
|||
|
|
import threading
|
|||
|
|
import time
|
|||
|
|
|
|||
|
|
import cv2
|
|||
|
|
import numpy as np
|
|||
|
|
from PyQt5.QtWidgets import QMessageBox
|
|||
|
|
from PySide6.QtCore import Signal, QObject
|
|||
|
|
from PySide6.QtGui import QPixmap
|
|||
|
|
|
|||
|
|
import Constant
|
|||
|
|
import Expection
|
|||
|
|
from CU.Catch import Catch, CatchStatus
|
|||
|
|
from CU.Detect import Detect, DetectStatus
|
|||
|
|
from Model.FeedModel import PositionModel
|
|||
|
|
from Model.Position import Real_Position, Detection_Position
|
|||
|
|
from enum import Enum, IntEnum
|
|||
|
|
from COM.COM_Robot import RobotClient, DetectType
|
|||
|
|
from Model.RobotModel import CMDInstructRequest, MoveType
|
|||
|
|
from Trace.handeye_calibration import getPosition
|
|||
|
|
from Trace.handeye_calibration import getxyz1,getxyz
|
|||
|
|
from Util.util_math import get_distance
|
|||
|
|
from Util.util_time import CRisOrFall
|
|||
|
|
#from Vision.camera_coordinate_dete import Detection
|
|||
|
|
from Util.util_log import log
|
|||
|
|
from Model.RobotModel import Instruction
|
|||
|
|
|
|||
|
|
|
|||
|
|
class ResetStatus(Enum):
|
|||
|
|
RNone = 0
|
|||
|
|
RStart = 1
|
|||
|
|
RRunging = 2
|
|||
|
|
ROk = 3
|
|||
|
|
|
|||
|
|
|
|||
|
|
class FeedStatus(IntEnum):
|
|||
|
|
FNone = 0
|
|||
|
|
FStart = 1
|
|||
|
|
FCheck = 2
|
|||
|
|
FMid = 3
|
|||
|
|
FPhoto = 4
|
|||
|
|
FTake = 5
|
|||
|
|
FBroken1 = 6
|
|||
|
|
FBroken2 = 7
|
|||
|
|
FShake = 8
|
|||
|
|
FDropBag = 9
|
|||
|
|
FFinished = 10
|
|||
|
|
FReverse = 11
|
|||
|
|
FStartReverse = 12
|
|||
|
|
|
|||
|
|
|
|||
|
|
class LineType(Enum):
|
|||
|
|
Straight = 0
|
|||
|
|
CureMid = 2
|
|||
|
|
CureEnd = 3
|
|||
|
|
WORLD = 4
|
|||
|
|
|
|||
|
|
|
|||
|
|
class FeedMidStatus(Enum):
|
|||
|
|
FMid_Start = 1
|
|||
|
|
FMid_Take = 2
|
|||
|
|
FMid_Feed = 3
|
|||
|
|
|
|||
|
|
|
|||
|
|
class FeedPosition:
|
|||
|
|
def __init__(self, status: FeedStatus, position: Real_Position):
|
|||
|
|
self.status = status
|
|||
|
|
self.position = position
|
|||
|
|
|
|||
|
|
|
|||
|
|
class FeedLine:
|
|||
|
|
"""
|
|||
|
|
FeedLine 类负责管理一个投料路径的所有位置点。
|
|||
|
|
它将路径分割成不同的阶段,并提供获取当前/下一个位置的方法。
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
def __init__(self, id, name, feed_positions: list):
|
|||
|
|
"""
|
|||
|
|
初始化 FeedLine。
|
|||
|
|
:param id: 路径ID
|
|||
|
|
:param name: 路径名称
|
|||
|
|
:param feed_positions: 包含所有 PositionModel 对象的列表
|
|||
|
|
"""
|
|||
|
|
# 深拷贝位置列表,避免外部修改影响
|
|||
|
|
self.feed_positions = copy.deepcopy(feed_positions)
|
|||
|
|
# 初始化各个阶段的位置索引
|
|||
|
|
self.feeding2end_pos_index = 0 # 投料到结束的索引 (用于 feeding_to_end 列表)
|
|||
|
|
self.origin2start_pos_index = 0 # 原点到起始点的索引 (用于 origin_to_start 列表)
|
|||
|
|
self.start2take_pos_index = 0 # 起始点到抓取点的索引 (用于 start_to_take 列表)
|
|||
|
|
self.name = name
|
|||
|
|
self.id = id
|
|||
|
|
|
|||
|
|
# 初始化分割后的子列表
|
|||
|
|
self.origin_to_start = []
|
|||
|
|
self.start_to_take = []
|
|||
|
|
self.feeding_to_end = []
|
|||
|
|
|
|||
|
|
def get_current_feed_position(self, is_reverse):
|
|||
|
|
"""
|
|||
|
|
获取当前投料阶段的目标位置。
|
|||
|
|
:param is_reverse: 是否反向运行 (此参数当前未用于索引计算)
|
|||
|
|
:return: PositionModel 对象或 None
|
|||
|
|
"""
|
|||
|
|
if not self.feeding_to_end or self.feeding2end_pos_index < 0 or self.feeding2end_pos_index >= len(
|
|||
|
|
self.feeding_to_end):
|
|||
|
|
return None # 或抛出异常
|
|||
|
|
# 直接返回当前索引指向的位置
|
|||
|
|
return self.feeding_to_end[self.feeding2end_pos_index]
|
|||
|
|
|
|||
|
|
def get_current_take_position(self, is_reverse):
|
|||
|
|
"""获取当前抓取阶段的目标位置 """
|
|||
|
|
if not self.start_to_take or self.start2take_pos_index < 0 or self.start2take_pos_index >= len(
|
|||
|
|
self.start_to_take):
|
|||
|
|
return None # 或抛出异常
|
|||
|
|
return self.start_to_take[self.start2take_pos_index]
|
|||
|
|
|
|||
|
|
def get_current_start_position(self, is_reverse):
|
|||
|
|
"""获取当前起始阶段的目标位置 """
|
|||
|
|
if not self.origin_to_start or self.origin2start_pos_index < 0 or self.origin2start_pos_index >= len(
|
|||
|
|
self.origin_to_start):
|
|||
|
|
return None
|
|||
|
|
return self.origin_to_start[self.origin2start_pos_index]
|
|||
|
|
|
|||
|
|
def _update_index(self, index_attr, list_attr, reverse):
|
|||
|
|
"""辅助函数:更新索引"""
|
|||
|
|
current_index = getattr(self, index_attr)
|
|||
|
|
current_list = getattr(self, list_attr)
|
|||
|
|
if reverse:
|
|||
|
|
current_index -= 1
|
|||
|
|
if current_index < 0:
|
|||
|
|
current_index = len(current_list) - 1
|
|||
|
|
else:
|
|||
|
|
current_index += 1
|
|||
|
|
if current_index >= len(current_list):
|
|||
|
|
current_index = 0
|
|||
|
|
setattr(self, index_attr, current_index)
|
|||
|
|
return current_index
|
|||
|
|
|
|||
|
|
def get_next_feed_position(self, reverse: bool = False):
|
|||
|
|
"""
|
|||
|
|
获取下一个投料位置,并更新索引。
|
|||
|
|
这个方法负责处理 feeding_to_end 列表中的所有点,包括 FTake 前后动态点和 FDropBag 点。
|
|||
|
|
:param reverse: 是否反向获取
|
|||
|
|
:return: PositionModel 对象
|
|||
|
|
"""
|
|||
|
|
if not self.feeding_to_end or self.feeding2end_pos_index < 0 or self.feeding2end_pos_index >= len(
|
|||
|
|
self.feeding_to_end):
|
|||
|
|
return None # 或抛出异常
|
|||
|
|
# 1. 获取当前索引指向的位置
|
|||
|
|
pos = self.feeding_to_end[self.feeding2end_pos_index]
|
|||
|
|
# 2. 更新索引 (关键:实现顺序访问)
|
|||
|
|
self._update_index('feeding2end_pos_index', 'feeding_to_end', reverse)
|
|||
|
|
# 3. 返回之前获取的位置
|
|||
|
|
return pos
|
|||
|
|
|
|||
|
|
def get_next_start_position(self, reverse: bool = False):
|
|||
|
|
"""获取下一个起始位置,并更新索引。"""
|
|||
|
|
pos = self.origin_to_start[self.origin2start_pos_index]
|
|||
|
|
self._update_index('origin2start_pos_index', 'origin_to_start', reverse)
|
|||
|
|
return pos
|
|||
|
|
|
|||
|
|
def get_next_take_position(self, reverse: bool = False):
|
|||
|
|
"""获取下一个抓取位置,并更新索引。"""
|
|||
|
|
pos = self.start_to_take[self.start2take_pos_index]
|
|||
|
|
self._update_index('start2take_pos_index', 'start_to_take', reverse)
|
|||
|
|
return pos
|
|||
|
|
|
|||
|
|
def get_take_position(self):
|
|||
|
|
"""
|
|||
|
|
查找并返回 FTake 状态的位置。
|
|||
|
|
:return: PositionModel 对象或 None
|
|||
|
|
"""
|
|||
|
|
for i in range(len(self.feeding_to_end)):
|
|||
|
|
if self.feeding_to_end[i].status == FeedStatus.FTake.value:
|
|||
|
|
return self.feeding_to_end[i]
|
|||
|
|
|
|||
|
|
def set_take_position(self, position: Real_Position, dynamic_height=0):
|
|||
|
|
"""
|
|||
|
|
设置 FTake 位置,并更新其前后动态点的位置。
|
|||
|
|
:param position: 新的抓取位置
|
|||
|
|
:param dynamic_height: 动态高度调整 (如果需要)
|
|||
|
|
"""
|
|||
|
|
for i in range(len(self.feeding_to_end)):
|
|||
|
|
if self.feeding_to_end[i].status == FeedStatus.FTake.value:
|
|||
|
|
# 计算 XYZ 坐标
|
|||
|
|
#xyz = getxyz(position.X, position.Y, position.Z, position.a, position.b, position.c)
|
|||
|
|
xyz = getxyz1(position.X, position.Y, position.Z, position.a, position.b, position.c)
|
|||
|
|
# 创建 before 和 after 位置
|
|||
|
|
befor_take_position = Real_Position().init_position(xyz[0],
|
|||
|
|
xyz[1],
|
|||
|
|
xyz[2],
|
|||
|
|
position.U,
|
|||
|
|
position.V,
|
|||
|
|
position.W)
|
|||
|
|
after_take_position = Real_Position().init_position(xyz[0],
|
|||
|
|
xyz[1],
|
|||
|
|
xyz[2],
|
|||
|
|
position.U,
|
|||
|
|
position.V,
|
|||
|
|
position.W)
|
|||
|
|
# 安全检查索引
|
|||
|
|
if i > 0:
|
|||
|
|
self.feeding_to_end[i - 1].set_position(befor_take_position)
|
|||
|
|
else:
|
|||
|
|
print("Warning: No position before FTake to update.")
|
|||
|
|
self.feeding_to_end[i].set_position(position)
|
|||
|
|
if i + 1 < len(self.feeding_to_end):
|
|||
|
|
self.feeding_to_end[i + 1].set_position(after_take_position)
|
|||
|
|
else:
|
|||
|
|
print("Warning: No position after FTake to update.")
|
|||
|
|
break # 抓料点暂时就一个
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
def get_position_list(self):
|
|||
|
|
"""
|
|||
|
|
根据状态将 feed_positions 分割成 origin_to_start, start_to_take, feeding_to_end 三个列表。
|
|||
|
|
并在 FTake 前后插入动态点。
|
|||
|
|
"""
|
|||
|
|
index_start = -1
|
|||
|
|
index_take = -1
|
|||
|
|
for i in range(len(self.feed_positions)):
|
|||
|
|
if self.feed_positions[i].status == FeedStatus.FCheck.value:
|
|||
|
|
index_start = i
|
|||
|
|
break
|
|||
|
|
for i in range(len(self.feed_positions)):
|
|||
|
|
if self.feed_positions[i].status == FeedStatus.FPhoto.value:
|
|||
|
|
index_take = i
|
|||
|
|
|
|||
|
|
if index_start == -1 or index_take == -1:
|
|||
|
|
print("Error: FCheck or FPhoto position not found in feed_positions.")
|
|||
|
|
# 可能需要抛出异常或设置默认值
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
self.origin_to_start = self.feed_positions[: index_start + 1]
|
|||
|
|
self.start_to_take = self.feed_positions[index_start:index_take + 1]
|
|||
|
|
self.feeding_to_end = self.feed_positions[index_take:]
|
|||
|
|
|
|||
|
|
# 查找 FTake 并插入动态点
|
|||
|
|
take_found = False
|
|||
|
|
i = 0
|
|||
|
|
while i < len(self.feeding_to_end):
|
|||
|
|
if self.feeding_to_end[i].status == FeedStatus.FTake.value:
|
|||
|
|
if take_found:
|
|||
|
|
print("Warning: Multiple FTake positions found. Processing first one.")
|
|||
|
|
break # 或处理多个,或抛出异常
|
|||
|
|
|
|||
|
|
# 创建动态点
|
|||
|
|
befor_position_model = PositionModel()
|
|||
|
|
befor_position_model.init_position(None)
|
|||
|
|
after_position_model = PositionModel()
|
|||
|
|
after_position_model.init_position(None)
|
|||
|
|
|
|||
|
|
# 插入动态点 (注意列表长度和索引的变化)
|
|||
|
|
self.feeding_to_end.insert(i, befor_position_model) # 在 FTake 前插入
|
|||
|
|
i += 1 # 列表变长,索引后移
|
|||
|
|
self.feeding_to_end.insert(i + 1, after_position_model) # 在 FTake 后插入 (原 FTake 现在是 i)
|
|||
|
|
# i += 1 # 如果需要继续遍历,可以增加索引
|
|||
|
|
take_found = True
|
|||
|
|
break # 处理完第一个 FTake 就退出
|
|||
|
|
i += 1
|
|||
|
|
|
|||
|
|
|
|||
|
|
class FeedingConfig:
|
|||
|
|
"""
|
|||
|
|
FeedingConfig 类存储当前投料任务的配置信息,
|
|||
|
|
包括剩余数量、路径信息和拍照位置。
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
def __init__(self, num: int, feedLine: FeedLine, photo_locs):
|
|||
|
|
"""
|
|||
|
|
初始化投料配置。
|
|||
|
|
:param num: 剩余袋数
|
|||
|
|
:param feedLine: FeedLine 对象,包含路径信息
|
|||
|
|
:param photo_locs: 拍照位置列表
|
|||
|
|
"""
|
|||
|
|
self.num = num
|
|||
|
|
self.feedLine = feedLine
|
|||
|
|
self.photo_locs = [self.deal_photo_locs(p) for p in photo_locs]
|
|||
|
|
|
|||
|
|
def deal_photo_locs(self, photo_loc):
|
|||
|
|
"""
|
|||
|
|
将元组或列表形式的坐标转换为 Real_Position 对象。
|
|||
|
|
:param photo_loc: [X, Y, Z, U, V, W] 的列表或元组
|
|||
|
|
:return: Real_Position 对象
|
|||
|
|
"""
|
|||
|
|
position_photo = Real_Position()
|
|||
|
|
position_photo.init_position(photo_loc[0], photo_loc[1], photo_loc[2], photo_loc[3], photo_loc[4], photo_loc[5])
|
|||
|
|
return position_photo
|
|||
|
|
|
|||
|
|
def get_line_info(self):
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
|
|||
|
|
class Feeding(QObject):
|
|||
|
|
need_origin_signal = Signal(str)
|
|||
|
|
take_no_photo_sigal = Signal()
|
|||
|
|
update_detect_image = Signal(np.ndarray)
|
|||
|
|
log_signal = Signal(int, str)
|
|||
|
|
|
|||
|
|
def __init__(self, robotClient: RobotClient):
|
|||
|
|
super().__init__()
|
|||
|
|
self.feedConfig = None
|
|||
|
|
self.feedStatus = FeedStatus.FNone
|
|||
|
|
self.robotClient = robotClient
|
|||
|
|
|
|||
|
|
self.detection_image = None
|
|||
|
|
self.init_detection_image()
|
|||
|
|
self.pause = False
|
|||
|
|
self.cRis_photo = CRisOrFall()
|
|||
|
|
self.cRis_shake = CRisOrFall()
|
|||
|
|
self.feed_Mid_Status = FeedMidStatus.FMid_Start
|
|||
|
|
self.is_reverse = False
|
|||
|
|
# 复位集合
|
|||
|
|
self.run_reverse = False
|
|||
|
|
self.take_no_photo = False
|
|||
|
|
self.reset_status = ResetStatus.RNone
|
|||
|
|
self.reversed_positions = []
|
|||
|
|
self.current_position = None
|
|||
|
|
self.index = 1
|
|||
|
|
self.pos_index = -1
|
|||
|
|
self.pos_near_index = -1
|
|||
|
|
self.catch = Catch(self.robotClient)
|
|||
|
|
self.detect = Detect()
|
|||
|
|
self.is_detected = True
|
|||
|
|
self.detect_thread = threading.Thread(target=self.run_detect)
|
|||
|
|
self.detect_thread.start()
|
|||
|
|
self.onekey = False
|
|||
|
|
self.mid_take_count = 0
|
|||
|
|
self.debug_run_count = 0
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
def close_feed(self):
|
|||
|
|
self.is_detected = False
|
|||
|
|
self.detect_thread.join()
|
|||
|
|
self.detect.detection.release()
|
|||
|
|
|
|||
|
|
def init_detection_image(self):
|
|||
|
|
detection_image = cv2.imread(Constant.feed_sign_path)
|
|||
|
|
self.update_detect_image.emit(detection_image)
|
|||
|
|
|
|||
|
|
def run_detect(self):
|
|||
|
|
"""检测线程函数。"""
|
|||
|
|
while self.is_detected:
|
|||
|
|
self.detect.run()
|
|||
|
|
time.sleep(0.02) # 控制检测频率,米厂可以设置较大值
|
|||
|
|
|
|||
|
|
def run(self):
|
|||
|
|
"""
|
|||
|
|
主运行逻辑,由外部循环调用。
|
|||
|
|
这是一个状态机,根据 self.feedStatus 的值执行不同的操作。
|
|||
|
|
"""
|
|||
|
|
self.catch.run()
|
|||
|
|
# 获取机械臂位置
|
|||
|
|
real_position = Real_Position()
|
|||
|
|
real_position.init_position(self.robotClient.status_model.world_0,
|
|||
|
|
self.robotClient.status_model.world_1,
|
|||
|
|
self.robotClient.status_model.world_2,
|
|||
|
|
self.robotClient.status_model.world_3,
|
|||
|
|
self.robotClient.status_model.world_4,
|
|||
|
|
self.robotClient.status_model.world_5)
|
|||
|
|
|
|||
|
|
# --- 状态机主逻辑 ---
|
|||
|
|
if self.feedConfig == None:
|
|||
|
|
self.feedStatus = FeedStatus.FNone
|
|||
|
|
|
|||
|
|
# 检查是否完成反向运行并回到原点
|
|||
|
|
if self.feedConfig != None and self.feedConfig.num == 0 and self.is_reverse and self.robotClient.origin_position.compare(
|
|||
|
|
real_position, is_action=True):
|
|||
|
|
self.feedStatus = FeedStatus.FNone
|
|||
|
|
self.is_reverse = False
|
|||
|
|
self.log_signal.emit(logging.INFO, Constant.str_feed_reverse)
|
|||
|
|
|
|||
|
|
if self.feedStatus == FeedStatus.FNone or self.pause:
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
elif self.feedStatus == FeedStatus.FCheck:
|
|||
|
|
self.log_signal.emit(logging.INFO, Constant.str_feed_check)
|
|||
|
|
# 1, 检查是否是三列
|
|||
|
|
# 2, 检查是否有人
|
|||
|
|
# if self.safe_check_columns() and self.safe_check_person():
|
|||
|
|
# pass
|
|||
|
|
# else:
|
|||
|
|
# if self.feedConfig.num != 0:
|
|||
|
|
# self.next_target()
|
|||
|
|
# if == 原点 继续判断
|
|||
|
|
# else:
|
|||
|
|
# QMessageBox.information(None, "提示", Constant.str_feed_safe_error_msgbox)
|
|||
|
|
|
|||
|
|
# 根据是否反转,进行状态设置
|
|||
|
|
if self.is_reverse:
|
|||
|
|
self.feed_Mid_Status = FeedMidStatus.FMid_Start
|
|||
|
|
else:
|
|||
|
|
self.feed_Mid_Status = FeedMidStatus.FMid_Take
|
|||
|
|
|
|||
|
|
# 如果在起始点,则移动到下一个位置
|
|||
|
|
if self.feedConfig.feedLine.start_to_take[0].get_position().compare(real_position):
|
|||
|
|
self.next_position(self.is_reverse)
|
|||
|
|
|
|||
|
|
elif self.feedStatus == FeedStatus.FStart:
|
|||
|
|
self.log_signal.emit(logging.INFO, Constant.str_feed_start)
|
|||
|
|
# 检查是否在原点 (正向启动时)
|
|||
|
|
if not self.robotClient.origin_position.compare(real_position, is_action=True) and not self.is_reverse:
|
|||
|
|
# QMessageBox.information(None, "提示", Constant.str_feed_start_error) # Fuck 引起异常
|
|||
|
|
self.log_signal.emit(logging.ERROR, Constant.str_feed_start_error)
|
|||
|
|
self.need_origin_signal.emit(Constant.str_feed_start_error)
|
|||
|
|
self.feedStatus = FeedStatus.FNone
|
|||
|
|
return
|
|||
|
|
# 反向运行结束
|
|||
|
|
if self.is_reverse:
|
|||
|
|
self.feedStatus = FeedStatus.FNone
|
|||
|
|
self.is_reverse = False
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
# 初始化位置列表
|
|||
|
|
self.feedConfig.feedLine.get_position_list()
|
|||
|
|
self.detect.detect_status = DetectStatus.DNone
|
|||
|
|
self.feed_Mid_Status = FeedMidStatus.FMid_Start
|
|||
|
|
self.next_position(self.is_reverse)
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
elif self.feedStatus == FeedStatus.FMid:
|
|||
|
|
feed_pos = self.get_current_position(self.is_reverse)
|
|||
|
|
if feed_pos.get_position().compare(real_position):
|
|||
|
|
self.log_signal.emit(logging.INFO, Constant.str_feed_mid)
|
|||
|
|
self.next_position(self.is_reverse)
|
|||
|
|
|
|||
|
|
# 增加计数器逻辑
|
|||
|
|
self.mid_take_count += 1
|
|||
|
|
# 可选:在 Debug1 模式下输出日志
|
|||
|
|
if Constant.Debug1:
|
|||
|
|
self.log_signal.emit(
|
|||
|
|
logging.INFO,
|
|||
|
|
f"[调试计数] 已进入 FMid 分支 {self.mid_take_count} 次"
|
|||
|
|
)
|
|||
|
|
if self.feedStatus == FeedStatus.FTake:
|
|||
|
|
self.catch.catch_status = CatchStatus.CTake
|
|||
|
|
|
|||
|
|
|
|||
|
|
elif self.feedStatus == FeedStatus.FPhoto:
|
|||
|
|
if self.feedConfig.num == 0:
|
|||
|
|
self.log_signal.emit(logging.INFO, Constant.str_feed_finish)
|
|||
|
|
self.is_reverse = True
|
|||
|
|
self.feed_Mid_Status = FeedMidStatus.FMid_Take
|
|||
|
|
self.feedConfig.feedLine.start2take_pos_index = len(self.feedConfig.feedLine.start_to_take) - 2
|
|||
|
|
self.feedConfig.feedLine.origin2start_pos_index = len(self.feedConfig.feedLine.origin_to_start) - 2
|
|||
|
|
self.next_position(self.is_reverse)
|
|||
|
|
self.log_signal.emit(logging.INFO, Constant.str_feed_photo)
|
|||
|
|
self.init_detection_image()
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
|
|||
|
|
if not Constant.Debug:
|
|||
|
|
self.log_signal.emit(logging.INFO, Constant.str_feed_takePhoto)
|
|||
|
|
self.feed_Mid_Status = FeedMidStatus.FMid_Feed
|
|||
|
|
self.detect.detect_status = DetectStatus.DOk
|
|||
|
|
self.log_signal.emit(logging.INFO, Constant.str_feed_takePhoto_success)
|
|||
|
|
# ✅ 输出当前抓取的索引
|
|||
|
|
print(f"[调试] 即将抓取点位索引: {self.detect.position_index}")
|
|||
|
|
self.log_signal.emit(logging.INFO, f"[调试] 当前抓取点位索引: {self.detect.position_index}")
|
|||
|
|
# 设置当前抓取位置(使用当前 index)
|
|||
|
|
self.feedConfig.feedLine.set_take_position(self.detect.detect_position, self.detect.position_index)
|
|||
|
|
# ✅ 递增索引
|
|||
|
|
self.detect.position_index += 1
|
|||
|
|
# ✅ 输出下一个要加载的索引
|
|||
|
|
print(f"[调试] 下一个要加载的索引: {self.detect.position_index}")
|
|||
|
|
# ✅ 重新加载下一个点位(基于更新后的 index)
|
|||
|
|
self.detect.run()
|
|||
|
|
# 增加计数器(用于调试显示执行次数)
|
|||
|
|
self.debug_run_count += 1
|
|||
|
|
if Constant.Debug1:
|
|||
|
|
self.log_signal.emit(logging.INFO, Constant.str_sys_runing)
|
|||
|
|
self.log_signal.emit(logging.INFO, f"[调试计数] 当前已执行 {self.debug_run_count} 次")
|
|||
|
|
self.next_position()
|
|||
|
|
self.detect.detect_status = DetectStatus.DNone
|
|||
|
|
if Constant.Debug1:
|
|||
|
|
self.log_signal.emit(logging.INFO, Constant.str_sys_runing1)
|
|||
|
|
self.log_signal.emit(logging.INFO, Constant.str_sys_runing2)
|
|||
|
|
#self.feedStatus = FeedStatus.FTake
|
|||
|
|
|
|||
|
|
elif self.feedStatus == FeedStatus.FTake:
|
|||
|
|
self.log_signal.emit(logging.INFO, Constant.str_feed_take)
|
|||
|
|
if self.feedConfig.feedLine.get_take_position().get_position() != None:
|
|||
|
|
# 检查是否到达抓取点
|
|||
|
|
# self.take_no_photo = False
|
|||
|
|
if self.feedConfig.feedLine.get_take_position().get_position().compare(real_position, is_action=True):
|
|||
|
|
# 打开吸嘴并返回
|
|||
|
|
# self.sendIOControl(self.robotClient.con_ios[0], 1)
|
|||
|
|
# self.sendIOControl(self.robotClient.con_ios[1], 1)
|
|||
|
|
# self.sendIOControl(self.robotClient.con_ios[2], 1)
|
|||
|
|
|
|||
|
|
self.log_signal.emit(logging.INFO, "到达抓料点位")
|
|||
|
|
# 触发抓取动作
|
|||
|
|
# self.feedConfig.feedLine.set_take_position(None)
|
|||
|
|
# time.sleep(self.robotClient.time_delay_take)
|
|||
|
|
self.log_signal.emit(logging.INFO, Constant.str_feed_take_success)
|
|||
|
|
if self.catch.catch_status == CatchStatus.COk:
|
|||
|
|
# 抓取完成,移动到下一步
|
|||
|
|
self.next_position()
|
|||
|
|
self.catch.catch_status = CatchStatus.CNone
|
|||
|
|
if Constant.Debug1:
|
|||
|
|
self.log_signal.emit(logging.INFO, Constant.str_sys_runing3) # 运行到这里了
|
|||
|
|
self.feedConfig.feedLine.get_take_position()
|
|||
|
|
return
|
|||
|
|
if self.catch.catch_status == CatchStatus.CTake:
|
|||
|
|
# 抓取动作已发出,等待完成 (COk)
|
|||
|
|
# self.catch.catch_status 应由 Catch 类在完成后设置为 COk
|
|||
|
|
self.catch.catch_status = CatchStatus.COk
|
|||
|
|
if Constant.Debug1:
|
|||
|
|
self.log_signal.emit(logging.INFO, Constant.str_sys_runing1) # 运行到这里了2
|
|||
|
|
else:
|
|||
|
|
self.log_signal.emit(logging.ERROR, Constant.str_feed_takePhoto_fail)
|
|||
|
|
# self.take_no_photo = True
|
|||
|
|
|
|||
|
|
# 继续获取图像
|
|||
|
|
# TODO
|
|||
|
|
# self.feedConfig.feedLine.set_take_position(self.get_take_position())
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
elif self.feedStatus == FeedStatus.FBroken1:
|
|||
|
|
if self.get_current_position().get_position().compare(real_position):
|
|||
|
|
self.log_signal.emit(logging.INFO, Constant.str_feed_broken)
|
|||
|
|
self.next_position()
|
|||
|
|
|
|||
|
|
|
|||
|
|
elif self.feedStatus == FeedStatus.FBroken2:
|
|||
|
|
if self.get_current_position().get_position().compare(real_position):
|
|||
|
|
self.log_signal.emit(logging.INFO, Constant.str_feed_broken)
|
|||
|
|
self.next_position()
|
|||
|
|
|
|||
|
|
elif self.feedStatus == FeedStatus.FShake:
|
|||
|
|
|
|||
|
|
if self.get_current_position().get_position().compare(real_position, is_action=True):
|
|||
|
|
# TODO 震动方案
|
|||
|
|
self.log_signal.emit(logging.INFO, Constant.str_feed_shake)
|
|||
|
|
if self.catch.catch_status == CatchStatus.CNone:
|
|||
|
|
self.catch.catch_status = CatchStatus.CShake
|
|||
|
|
return
|
|||
|
|
if self.catch.catch_status == CatchStatus.CShake:
|
|||
|
|
# if self.feedConfig.feedLine.feeding_to_end[
|
|||
|
|
# self.feedConfig.feedLine.feeding2end_pos_index + 1].status != FeedStatus.FShake:
|
|||
|
|
# self.catch.catch_status = CatchStatus.COk
|
|||
|
|
# else:
|
|||
|
|
self.catch.shake_continue.SetReset()
|
|||
|
|
self.next_position()
|
|||
|
|
if self.feedStatus != FeedStatus.FShake:
|
|||
|
|
self.catch.catch_status = CatchStatus.CNone
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
|
|||
|
|
elif self.feedStatus == FeedStatus.FDropBag:
|
|||
|
|
# *** 处理投料点 (FDropBag) 的核心逻辑 ***
|
|||
|
|
# 1. 确认机械臂是否已精确到达当前目标投料点
|
|||
|
|
# get_current_position() 会根据 self.feed_Mid_Status (应为 FMid_Feed)
|
|||
|
|
# 调用 feedLine.get_current_feed_position(),从 feeding_to_end 列表获取
|
|||
|
|
# 由 feeding2end_pos_index 指向的点。
|
|||
|
|
if self.get_current_position().get_position().compare(real_position, is_action=True):
|
|||
|
|
# 2. 记录日志:已到达投料点
|
|||
|
|
self.log_signal.emit(logging.INFO, Constant.str_feed_drop)
|
|||
|
|
# 3. 与 Catch 模块进行状态交互来驱动投料动作
|
|||
|
|
# a. 初始状态 (CNone): 触发投料动作
|
|||
|
|
if self.catch.catch_status == CatchStatus.CNone:
|
|||
|
|
# 将 Catch 状态设置为 CDrop,通知 Catch 模块开始执行物理投料动作
|
|||
|
|
self.catch.catch_status = CatchStatus.CDrop
|
|||
|
|
# 立即返回,等待 Catch 模块处理
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
# b. 投料进行中 (CDrop): 等待完成
|
|||
|
|
if self.catch.catch_status == CatchStatus.CDrop:
|
|||
|
|
# 什么都不做,等待 Catch 模块完成动作并更新状态
|
|||
|
|
return
|
|||
|
|
# c. 投料完成 (COk): 处理后续逻辑并移动到下一个点
|
|||
|
|
if self.catch.catch_status == CatchStatus.COk:
|
|||
|
|
# 重置 Catch 状态,为下一次操作做准备
|
|||
|
|
self.catch.catch_status = CatchStatus.CNone
|
|||
|
|
|
|||
|
|
# (后续增加) 延时: 让物料稳定
|
|||
|
|
# time.sleep(self.robotClient.time_delay_put)
|
|||
|
|
|
|||
|
|
# (后续增加) 视觉确认: 拍照确认袋子已放置
|
|||
|
|
# self.detection.get_position(...)
|
|||
|
|
|
|||
|
|
# (后续增加) 更新抓取点: 如果需要根据放置情况调整下次抓取
|
|||
|
|
# self.feedConfig.feedLine.set_take_position(...)
|
|||
|
|
|
|||
|
|
# 触发放置后检测 (如果需要,米厂可以不用)
|
|||
|
|
if self.detect.detect_status == DetectStatus.DNone:
|
|||
|
|
self.detect.detect_status = DetectStatus.DDetect
|
|||
|
|
|
|||
|
|
# 4. 更新业务逻辑:减少剩余袋数
|
|||
|
|
self.feedConfig.num = self.feedConfig.num - 1
|
|||
|
|
self.log_signal.emit(logging.INFO, f'{Constant.str_feed_feed_num}{self.feedConfig.num}')
|
|||
|
|
|
|||
|
|
# 5. *** 关键步骤 ***: 移动到路径中的下一个点
|
|||
|
|
# next_position() 会根据当前的 feed_Mid_Status (FMid_Feed)
|
|||
|
|
# 调用 next_Feed()。
|
|||
|
|
self.next_position()
|
|||
|
|
|
|||
|
|
def run_reset(self):
|
|||
|
|
"""执行复位操作 (状态机驱动)。"""
|
|||
|
|
# 获取当前机械臂位置
|
|||
|
|
real_position = Real_Position()
|
|||
|
|
real_position.init_position(self.robotClient.status_model.world_0,
|
|||
|
|
self.robotClient.status_model.world_1,
|
|||
|
|
self.robotClient.status_model.world_2,
|
|||
|
|
self.robotClient.status_model.world_3,
|
|||
|
|
self.robotClient.status_model.world_4,
|
|||
|
|
self.robotClient.status_model.world_5)
|
|||
|
|
if self.reset_status == ResetStatus.RNone:
|
|||
|
|
# 无复位请求
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
if self.reset_status == ResetStatus.RStart:
|
|||
|
|
# 无配置无法复位
|
|||
|
|
if self.feedConfig == None: return
|
|||
|
|
# for index in range(len(self.feedConfig.feedLine.positions)):
|
|||
|
|
# if self.feedConfig.feedLine.positions[index].status == 2:
|
|||
|
|
# start_index = index
|
|||
|
|
self.pos_index = -1
|
|||
|
|
self.pos_near_index = -1
|
|||
|
|
self.reversed_positions = []
|
|||
|
|
|
|||
|
|
# 查找当前位置在路径中的索引
|
|||
|
|
for index, pos_model in enumerate(self.feedConfig.feedLine.feed_positions):
|
|||
|
|
if pos_model.get_position().compare(real_position, is_action=True):
|
|||
|
|
self.pos_index = index
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
if self.pos_index == -1:
|
|||
|
|
self.log_signal.emit(logging.ERROR, Constant.str_feed_return_original_position_fail)
|
|||
|
|
# 如果未精确找到,查找最近的位置
|
|||
|
|
min_distance = 99999999
|
|||
|
|
for index, pos_model in enumerate(self.feedConfig.feedLine.feed_positions):
|
|||
|
|
if get_distance(pos_model.get_position(), real_position) < min_distance:
|
|||
|
|
min_distance = get_distance(pos_model.get_position(), real_position)
|
|||
|
|
self.pos_near_index = index
|
|||
|
|
# 确定复位路径 (从开始到最近/当前点)
|
|||
|
|
if self.pos_near_index != -1:
|
|||
|
|
self.reversed_positions = self.feedConfig.feedLine.feed_positions[:self.pos_near_index + 1]
|
|||
|
|
else:
|
|||
|
|
self.log_signal.emit(logging.ERROR, "Cannot determine reset path.")
|
|||
|
|
return False
|
|||
|
|
else:
|
|||
|
|
self.reversed_positions = self.feedConfig.feedLine.feed_positions[:self.pos_index + 1]
|
|||
|
|
# 反转路径
|
|||
|
|
self.reversed_positions = list(reversed(self.reversed_positions))
|
|||
|
|
self.reverse_index = 0 # 复位路径索引
|
|||
|
|
self.send_emergency_sound() # 发出急停声音
|
|||
|
|
self.current_position = PositionModel()
|
|||
|
|
self.current_position.init_position(real_position)
|
|||
|
|
self.reset_status = ResetStatus.RRunging # 进入运行状态
|
|||
|
|
|
|||
|
|
if self.reset_status == ResetStatus.RRunging:
|
|||
|
|
if not real_position.compare(self.current_position.get_position(), is_action=True):
|
|||
|
|
return
|
|||
|
|
pos_model = self.reversed_positions[self.reverse_index]
|
|||
|
|
if pos_model.status == FeedStatus.FTake.value: # 跳过取袋节点
|
|||
|
|
pos_model = self.reversed_positions[self.reverse_index + 1]
|
|||
|
|
self.reverse_index = self.reverse_index + 1
|
|||
|
|
|
|||
|
|
if pos_model.lineType == LineType.CureMid.value:
|
|||
|
|
pos_model1 = self.reversed_positions[self.reverse_index + 1]
|
|||
|
|
self.sendTargPosition(real_position=pos_model.get_position(), move_type=MoveType.Cure,
|
|||
|
|
real_position1=pos_model1.get_position(), speed=self.robotClient.reset_speed)
|
|||
|
|
self.current_position = pos_model1
|
|||
|
|
self.reverse_index = self.reverse_index + 2
|
|||
|
|
else:
|
|||
|
|
self.sendTargPosition(real_position=pos_model.get_position(), speed=self.robotClient.reset_speed)
|
|||
|
|
self.current_position = pos_model
|
|||
|
|
self.reverse_index = self.reverse_index + 1
|
|||
|
|
|
|||
|
|
if self.reverse_index == len(self.reversed_positions):
|
|||
|
|
self.reset_status = ResetStatus.ROk
|
|||
|
|
if self.reset_status == ResetStatus.ROk:
|
|||
|
|
self.reset_status = ResetStatus.RNone
|
|||
|
|
self.send_emergency_stop()
|
|||
|
|
|
|||
|
|
# 废弃代码
|
|||
|
|
def return_original_position(self):
|
|||
|
|
|
|||
|
|
self.run_reverse = True
|
|||
|
|
real_position = Real_Position()
|
|||
|
|
real_position.init_position(self.robotClient.status_model.world_0,
|
|||
|
|
self.robotClient.status_model.world_1,
|
|||
|
|
self.robotClient.status_model.world_2,
|
|||
|
|
self.robotClient.status_model.world_3,
|
|||
|
|
self.robotClient.status_model.world_4,
|
|||
|
|
self.robotClient.status_model.world_5)
|
|||
|
|
if self.feedConfig == None: return
|
|||
|
|
start_index = -1
|
|||
|
|
# for index in range(len(self.feedConfig.feedLine.positions)):
|
|||
|
|
# if self.feedConfig.feedLine.positions[index].status == 2:
|
|||
|
|
# start_index = index
|
|||
|
|
pos_index = -1
|
|||
|
|
pos_near_index = -1
|
|||
|
|
reversed_positions = []
|
|||
|
|
for index, pos_model in enumerate(self.feedConfig.feedLine.feed_positions):
|
|||
|
|
if pos_model.get_position().compare(real_position):
|
|||
|
|
pos_index = index
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
if pos_index == -1:
|
|||
|
|
self.log_signal.emit(logging.ERROR, Constant.str_feed_return_original_position_fail)
|
|||
|
|
min_distance = 99999999
|
|||
|
|
for index, pos_model in enumerate(self.feedConfig.feedLine.feed_positions):
|
|||
|
|
if get_distance(pos_model.get_position(), real_position) < min_distance:
|
|||
|
|
min_distance = get_distance(pos_model.get_position(), real_position)
|
|||
|
|
pos_near_index = index
|
|||
|
|
if pos_near_index != -1:
|
|||
|
|
reversed_positions = self.feedConfig.feedLine.feed_positions[:pos_near_index + 1]
|
|||
|
|
else:
|
|||
|
|
return False
|
|||
|
|
else:
|
|||
|
|
reversed_positions = self.feedConfig.feedLine.feed_positions[:pos_index]
|
|||
|
|
reversed_positions = list(reversed(reversed_positions))
|
|||
|
|
self.reverse_index = 0
|
|||
|
|
self.send_emergency_sound()
|
|||
|
|
current_position = PositionModel()
|
|||
|
|
current_position.init_position(real_position)
|
|||
|
|
while self.run_reverse and self.reverse_index != len(reversed_positions):
|
|||
|
|
if self.reverse_index != 0 and not real_position.compare(current_position.get_position()):
|
|||
|
|
continue
|
|||
|
|
# todo 缺少比对
|
|||
|
|
pos_model = reversed_positions[self.reverse_index]
|
|||
|
|
if pos_model.status == FeedStatus.FTake.value: # 跳过取袋节点
|
|||
|
|
|
|||
|
|
pos_model = reversed_positions[self.reverse_index + 1]
|
|||
|
|
# TODO take节点判断
|
|||
|
|
|
|||
|
|
if pos_model.lineType == LineType.CureMid.value:
|
|||
|
|
pos_model1 = reversed_positions[self.reverse_index + 1]
|
|||
|
|
self.sendTargPosition(real_position=pos_model.get_position(), move_type=MoveType.Cure,
|
|||
|
|
real_position1=pos_model1.get_position(), speed=self.robotClient.reset_speed)
|
|||
|
|
current_position = pos_model1
|
|||
|
|
self.reverse_index = self.reverse_index + 2
|
|||
|
|
else:
|
|||
|
|
self.sendTargPosition(real_position=pos_model.get_position(), speed=self.robotClient.reset_speed)
|
|||
|
|
current_position = pos_model
|
|||
|
|
self.reverse_index = self.reverse_index + 1
|
|||
|
|
self.send_emergency_stop()
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
def send_emergency_sound(self):
|
|||
|
|
"""发送急停声音 IO 控制。"""
|
|||
|
|
self.sendIOControl(Constant.IO_EmergencyPoint, 1)
|
|||
|
|
|
|||
|
|
def send_emergency_stop(self):
|
|||
|
|
"""停止急停声音 IO 控制。"""
|
|||
|
|
self.sendIOControl(Constant.IO_EmergencyPoint, 0)
|
|||
|
|
|
|||
|
|
def sendIOControl(self, IO_bit, IO_Status: int):
|
|||
|
|
"""发送 IO 控制指令。"""
|
|||
|
|
IO_command = CMDInstructRequest()
|
|||
|
|
io_instruction = Instruction()
|
|||
|
|
io_instruction.IO = True
|
|||
|
|
io_instruction.io_status = IO_Status
|
|||
|
|
io_instruction.point = IO_bit # {"dsID":"HCRemoteCommand","reqType":"AddRCC","emptyList":"1","instructions":[{"oneshot":"1","action":"200","type":"0","io_status":"1","point":"15","delay":"0"}]}
|
|||
|
|
IO_command.dsID = 'HCRemoteCommand'
|
|||
|
|
IO_command.instructions.append(io_instruction)
|
|||
|
|
self.robotClient.add_sendQuene(IO_command.toString())
|
|||
|
|
self.log_signal.emit(logging.INFO, f'{Constant.str_feed_io_control}{IO_bit},{IO_Status}')
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
def sendTargPosition(self, real_position, move_type: MoveType = MoveType.WORLD, speed=5, real_position1=None):
|
|||
|
|
"""发送目标位置移动指令。"""
|
|||
|
|
position_instruction = Instruction()
|
|||
|
|
position_instruction.speed = speed
|
|||
|
|
position_instruction.m0 = real_position.X
|
|||
|
|
position_instruction.m1 = real_position.Y
|
|||
|
|
position_instruction.m2 = real_position.Z
|
|||
|
|
position_instruction.m3 = real_position.U
|
|||
|
|
position_instruction.m4 = real_position.V
|
|||
|
|
position_instruction.m5 = real_position.W
|
|||
|
|
position_instruction.smooth = self.robotClient.smooth
|
|||
|
|
|
|||
|
|
position_instruction.action = move_type.value
|
|||
|
|
if position_instruction.action == 17:
|
|||
|
|
position_instruction.m0_p = real_position1.X
|
|||
|
|
position_instruction.m1_p = real_position1.Y
|
|||
|
|
position_instruction.m2_p = real_position1.Z
|
|||
|
|
position_instruction.m3_p = real_position1.U
|
|||
|
|
position_instruction.m4_p = real_position1.V
|
|||
|
|
position_instruction.m5_p = real_position1.W
|
|||
|
|
instruction_command = CMDInstructRequest()
|
|||
|
|
instruction_command.instructions.append(position_instruction)
|
|||
|
|
request_command = instruction_command.toString()
|
|||
|
|
|
|||
|
|
log_str = f'移动到位置:{"姿势直线" if move_type == MoveType.WORLD else "姿势曲线"}:' \
|
|||
|
|
f'X:{position_instruction.m0}-' \
|
|||
|
|
f'Y:{position_instruction.m1}-' \
|
|||
|
|
f'Z:{position_instruction.m2}-' \
|
|||
|
|
f'U:{position_instruction.m3}-' \
|
|||
|
|
f'V:{position_instruction.m4}-' \
|
|||
|
|
f'W:{position_instruction.m5}'
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
self.log_signal.emit(logging.INFO, log_str)
|
|||
|
|
except Exception as e:
|
|||
|
|
self.log_signal.emit(logging.ERROR, f"Logging error in sendTargPosition: {e}")
|
|||
|
|
return # 如果日志发送失败,至少不要中断主流程
|
|||
|
|
print(request_command)
|
|||
|
|
self.robotClient.add_sendQuene(request_command)
|
|||
|
|
|
|||
|
|
# def get_take_position(self):
|
|||
|
|
# if Constant.Debug:
|
|||
|
|
# return self.robotClient.status_model.getRealPosition()
|
|||
|
|
# _, img, xyz, uvw, points = self.detection.get_position(Point_isVision=False, Box_isPoint=True, First_Depth=True, Iter_Max_Pixel=30,save_img_point=0, Height_reduce=30, width_reduce=30)
|
|||
|
|
# self.detection_image = img.copy()
|
|||
|
|
# if xyz ==None or uvw==None or points==None:
|
|||
|
|
# return None
|
|||
|
|
# target_position,noraml_base = getPosition(*xyz,*uvw,None,points)
|
|||
|
|
#
|
|||
|
|
# position = Real_Position().init_position(*target_position[:3],*noraml_base[:3])
|
|||
|
|
# position.Z = position.Z+200
|
|||
|
|
# return position
|
|||
|
|
|
|||
|
|
def next_start(self, reverse=False):
|
|||
|
|
"""移动到下一个起始位置。"""
|
|||
|
|
start_pos = self.feedConfig.feedLine.get_next_start_position(reverse)
|
|||
|
|
self.feedStatus = FeedStatus(start_pos.status) if self.feedStatus != FeedStatus.FNone else FeedStatus.FNone
|
|||
|
|
if start_pos.lineType == LineType.CureMid.value:
|
|||
|
|
start_pos1 = self.feedConfig.feedLine.get_next_start_position(reverse)
|
|||
|
|
self.feedStatus = FeedStatus(start_pos1.status) if self.feedStatus != FeedStatus.FNone else FeedStatus.FNone
|
|||
|
|
self.sendTargPosition(real_position=start_pos.get_position(), speed=self.robotClient.reset_speed,
|
|||
|
|
move_type=MoveType.Cure, real_position1=start_pos1.get_position())
|
|||
|
|
elif start_pos.lineType == LineType.WORLD.value:
|
|||
|
|
if self.robotClient.status_model.getAnglePosition().is_error_angel_move(start_pos.get_position(),
|
|||
|
|
self.robotClient.max_angle_interval):
|
|||
|
|
self.feedStatus = None
|
|||
|
|
self.log_signal.emit(logging.ERROR, Constant.str_feed_angle_error)
|
|||
|
|
else:
|
|||
|
|
self.sendTargPosition(real_position=start_pos.get_position(), speed=self.robotClient.reset_speed,
|
|||
|
|
move_type=MoveType.AXIS)
|
|||
|
|
else:
|
|||
|
|
self.sendTargPosition(real_position=start_pos.get_position(), speed=self.robotClient.reset_speed)
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
def next_take(self, reverse=False):
|
|||
|
|
"""移动到下一个抓取位置。"""
|
|||
|
|
take_pos = self.feedConfig.feedLine.get_next_take_position(reverse)
|
|||
|
|
self.feedStatus = FeedStatus(take_pos.status) if self.feedStatus != FeedStatus.FNone else FeedStatus.FNone
|
|||
|
|
if take_pos.lineType == LineType.CureMid.value:
|
|||
|
|
take_pos1 = self.feedConfig.feedLine.get_next_take_position(reverse)
|
|||
|
|
self.feedStatus = FeedStatus(take_pos1.status) if self.feedStatus != FeedStatus.FNone else FeedStatus.FNone
|
|||
|
|
self.sendTargPosition(real_position=take_pos.get_position(), move_type=MoveType.Cure,
|
|||
|
|
real_position1=take_pos1.get_position(), speed=self.robotClient.feed_speed)
|
|||
|
|
elif take_pos.lineType == LineType.WORLD.value:
|
|||
|
|
if self.robotClient.status_model.getAnglePosition().is_error_angel_move(take_pos.get_position(),
|
|||
|
|
self.robotClient.max_angle_interval):
|
|||
|
|
self.feedStatus = None
|
|||
|
|
self.log_signal.emit(logging.ERROR, Constant.str_feed_angle_error)
|
|||
|
|
else:
|
|||
|
|
self.sendTargPosition(real_position=take_pos.get_position(), speed=self.robotClient.feed_speed,
|
|||
|
|
move_type=MoveType.AXIS)
|
|||
|
|
|
|||
|
|
else:
|
|||
|
|
self.sendTargPosition(real_position=take_pos.get_position(), speed=self.robotClient.feed_speed)
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
def next_Feed(self, reverse=False):
|
|||
|
|
"""
|
|||
|
|
移动到 feeding_to_end 列表中的下一个位置。
|
|||
|
|
这个方法负责处理所有在 feeding_to_end 列表中的点,包括 FTake 前后的动态点、FDropBag 点等。
|
|||
|
|
"""
|
|||
|
|
# 1. 调用 FeedLine 的方法获取下一个位置对象 (并更新内部索引)
|
|||
|
|
# get_next_feed_position 会返回当前索引指向的点,
|
|||
|
|
# 然后根据 reverse 参数更新 feeding2end_pos_index。
|
|||
|
|
feed_pos = self.feedConfig.feedLine.get_next_feed_position(reverse)
|
|||
|
|
|
|||
|
|
# 2. 更新主状态机,使其与即将移动到的位置状态一致
|
|||
|
|
self.feedStatus = FeedStatus(feed_pos.status) if self.feedStatus != FeedStatus.FNone else FeedStatus.FNone
|
|||
|
|
# 3. 根据位置的 lineType 发送相应的移动指令
|
|||
|
|
if feed_pos.lineType == LineType.CureMid.value:
|
|||
|
|
# 处理曲线运动 (需要两个点)
|
|||
|
|
feed_pos1 = self.feedConfig.feedLine.get_next_feed_position(reverse)
|
|||
|
|
self.feedStatus = FeedStatus(feed_pos1.status) if self.feedStatus != FeedStatus.FNone else FeedStatus.FNone
|
|||
|
|
self.sendTargPosition(real_position=feed_pos.get_position(), move_type=MoveType.Cure,
|
|||
|
|
real_position1=feed_pos1.get_position(), speed=self.robotClient.feed_speed)
|
|||
|
|
elif feed_pos.lineType == LineType.WORLD.value:
|
|||
|
|
# 处理直线/关节运动
|
|||
|
|
if self.robotClient.status_model.getAnglePosition().is_error_angel_move(feed_pos.get_position(),
|
|||
|
|
self.robotClient.max_angle_interval):
|
|||
|
|
self.feedStatus = None
|
|||
|
|
self.log_signal.emit(logging.ERROR, Constant.str_feed_angle_error)
|
|||
|
|
else:
|
|||
|
|
self.sendTargPosition(real_position=feed_pos.get_position(), speed=self.robotClient.feed_speed,
|
|||
|
|
move_type=MoveType.AXIS)
|
|||
|
|
|
|||
|
|
else:
|
|||
|
|
# 默认处理 (通常是直线 WORLD 运动)
|
|||
|
|
# *** 处理 FDropBag 点 ***
|
|||
|
|
# 当 feeding_to_end 列表中的点是 FDropBag (status=9) 时,
|
|||
|
|
# 这段默认代码会被执行,发送移动到该点的指令。
|
|||
|
|
# 一旦机械臂到达该点,run() 方法的 FDropBag 分支就会被触发,
|
|||
|
|
# 完成投料后,next_position() 又会被调用,继续移动到列表中的下一个点
|
|||
|
|
|
|||
|
|
self.sendTargPosition(real_position=feed_pos.get_position(), speed=self.robotClient.feed_speed)
|
|||
|
|
|
|||
|
|
def get_current_position(self, is_reverse=False):
|
|||
|
|
"""
|
|||
|
|
根据当前子阶段状态获取对应的目标位置。
|
|||
|
|
:param is_reverse: 是否反向 (传递给 FeedLine 方法)
|
|||
|
|
:return: PositionModel 对象或 None
|
|||
|
|
"""
|
|||
|
|
if self.feed_Mid_Status == FeedMidStatus.FMid_Start:
|
|||
|
|
return self.feedConfig.feedLine.get_current_start_position(self.is_reverse)
|
|||
|
|
elif self.feed_Mid_Status == FeedMidStatus.FMid_Take:
|
|||
|
|
return self.feedConfig.feedLine.get_current_take_position(self.is_reverse)
|
|||
|
|
elif self.feed_Mid_Status == FeedMidStatus.FMid_Feed:
|
|||
|
|
return self.feedConfig.feedLine.get_current_feed_position(self.is_reverse)
|
|||
|
|
|
|||
|
|
def next_position(self, reverse=False):
|
|||
|
|
"""
|
|||
|
|
根据当前子阶段状态移动到下一个位置。
|
|||
|
|
这是驱动整个喂料流程前进的核心方法。
|
|||
|
|
:param reverse: 是否反向移动
|
|||
|
|
"""
|
|||
|
|
if self.feed_Mid_Status == FeedMidStatus.FMid_Start:
|
|||
|
|
self.next_start(reverse)
|
|||
|
|
elif self.feed_Mid_Status == FeedMidStatus.FMid_Take:
|
|||
|
|
self.next_take(reverse)
|
|||
|
|
elif self.feed_Mid_Status == FeedMidStatus.FMid_Feed:
|
|||
|
|
# *** 关键调用 ***:
|
|||
|
|
# 当 feed_Mid_Status 是 FMid_Feed 时,next_position() 会调用 next_Feed()。
|
|||
|
|
# next_Feed() 内部调用 feedLine.get_next_feed_position(),该方法会获取
|
|||
|
|
# feeding_to_end 列表中当前索引指向的点,并将索引递增,从而实现按顺序访问。
|
|||
|
|
# 这使得 FTake 前后动态点和 FDropBag 点都能被顺序处理。
|
|||
|
|
self.next_Feed(reverse)
|
|||
|
|
|
|||
|
|
def safe_check_columns(self):
|
|||
|
|
return True
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
def safe_check_person(self):
|
|||
|
|
return True
|
|||
|
|
pass
|