Files
ailai_show/CU/save_feeding.py
2025-07-29 13:16:30 +08:00

942 lines
44 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import copy
import logging
import random
import threading
import time
import cv2
import numpy as np
from PyQt5.QtWidgets import QMessageBox
from PySide6.QtCore import Signal, QObject
from PySide6.QtGui import QPixmap
import Constant
import Expection
from CU.Catch import Catch, CatchStatus
from CU.Detect import Detect, DetectStatus
from Model.FeedModel import PositionModel
from Model.Position import Real_Position, Detection_Position
from enum import Enum, IntEnum
from COM.COM_Robot import RobotClient, DetectType
from Model.RobotModel import CMDInstructRequest, MoveType
from Trace.handeye_calibration import getPosition
from Trace.handeye_calibration import getxyz1,getxyz
from Util.util_math import get_distance
from Util.util_time import CRisOrFall
#from Vision.camera_coordinate_dete import Detection
from Util.util_log import log
from Model.RobotModel import Instruction
class ResetStatus(Enum):
RNone = 0
RStart = 1
RRunging = 2
ROk = 3
class FeedStatus(IntEnum):
FNone = 0
FStart = 1
FCheck = 2
FMid = 3
FPhoto = 4
FTake = 5
FBroken1 = 6
FBroken2 = 7
FShake = 8
FDropBag = 9
FFinished = 10
FReverse = 11
FStartReverse = 12
class LineType(Enum):
Straight = 0
CureMid = 2
CureEnd = 3
WORLD = 4
class FeedMidStatus(Enum):
FMid_Start = 1
FMid_Take = 2
FMid_Feed = 3
class FeedPosition:
def __init__(self, status: FeedStatus, position: Real_Position):
self.status = status
self.position = position
class FeedLine:
"""
FeedLine 类负责管理一个投料路径的所有位置点。
它将路径分割成不同的阶段,并提供获取当前/下一个位置的方法。
"""
def __init__(self, id, name, feed_positions: list):
"""
初始化 FeedLine。
:param id: 路径ID
:param name: 路径名称
:param feed_positions: 包含所有 PositionModel 对象的列表
"""
# 深拷贝位置列表,避免外部修改影响
self.feed_positions = copy.deepcopy(feed_positions)
# 初始化各个阶段的位置索引
self.feeding2end_pos_index = 0 # 投料到结束的索引 (用于 feeding_to_end 列表)
self.origin2start_pos_index = 0 # 原点到起始点的索引 (用于 origin_to_start 列表)
self.start2take_pos_index = 0 # 起始点到抓取点的索引 (用于 start_to_take 列表)
self.name = name
self.id = id
# 初始化分割后的子列表
self.origin_to_start = []
self.start_to_take = []
self.feeding_to_end = []
def get_current_feed_position(self, is_reverse):
"""
获取当前投料阶段的目标位置。
:param is_reverse: 是否反向运行 (此参数当前未用于索引计算)
:return: PositionModel 对象或 None
"""
if not self.feeding_to_end or self.feeding2end_pos_index < 0 or self.feeding2end_pos_index >= len(
self.feeding_to_end):
return None # 或抛出异常
# 直接返回当前索引指向的位置
return self.feeding_to_end[self.feeding2end_pos_index]
def get_current_take_position(self, is_reverse):
"""获取当前抓取阶段的目标位置 """
if not self.start_to_take or self.start2take_pos_index < 0 or self.start2take_pos_index >= len(
self.start_to_take):
return None # 或抛出异常
return self.start_to_take[self.start2take_pos_index]
def get_current_start_position(self, is_reverse):
"""获取当前起始阶段的目标位置 """
if not self.origin_to_start or self.origin2start_pos_index < 0 or self.origin2start_pos_index >= len(
self.origin_to_start):
return None
return self.origin_to_start[self.origin2start_pos_index]
def _update_index(self, index_attr, list_attr, reverse):
"""辅助函数:更新索引"""
current_index = getattr(self, index_attr)
current_list = getattr(self, list_attr)
if reverse:
current_index -= 1
if current_index < 0:
current_index = len(current_list) - 1
else:
current_index += 1
if current_index >= len(current_list):
current_index = 0
setattr(self, index_attr, current_index)
return current_index
def get_next_feed_position(self, reverse: bool = False):
"""
获取下一个投料位置,并更新索引。
这个方法负责处理 feeding_to_end 列表中的所有点,包括 FTake 前后动态点和 FDropBag 点。
:param reverse: 是否反向获取
:return: PositionModel 对象
"""
if not self.feeding_to_end or self.feeding2end_pos_index < 0 or self.feeding2end_pos_index >= len(
self.feeding_to_end):
return None # 或抛出异常
# 1. 获取当前索引指向的位置
pos = self.feeding_to_end[self.feeding2end_pos_index]
# 2. 更新索引 (关键:实现顺序访问)
self._update_index('feeding2end_pos_index', 'feeding_to_end', reverse)
# 3. 返回之前获取的位置
return pos
def get_next_start_position(self, reverse: bool = False):
"""获取下一个起始位置,并更新索引。"""
pos = self.origin_to_start[self.origin2start_pos_index]
self._update_index('origin2start_pos_index', 'origin_to_start', reverse)
return pos
def get_next_take_position(self, reverse: bool = False):
"""获取下一个抓取位置,并更新索引。"""
pos = self.start_to_take[self.start2take_pos_index]
self._update_index('start2take_pos_index', 'start_to_take', reverse)
return pos
def get_take_position(self):
"""
查找并返回 FTake 状态的位置。
:return: PositionModel 对象或 None
"""
for i in range(len(self.feeding_to_end)):
if self.feeding_to_end[i].status == FeedStatus.FTake.value:
return self.feeding_to_end[i]
def set_take_position(self, position: Real_Position, dynamic_height=0):
"""
设置 FTake 位置,并更新其前后动态点的位置。
:param position: 新的抓取位置
:param dynamic_height: 动态高度调整 (如果需要)
"""
for i in range(len(self.feeding_to_end)):
if self.feeding_to_end[i].status == FeedStatus.FTake.value:
# 计算 XYZ 坐标
#xyz = getxyz(position.X, position.Y, position.Z, position.a, position.b, position.c)
xyz = getxyz1(position.X, position.Y, position.Z, position.a, position.b, position.c)
# 创建 before 和 after 位置
befor_take_position = Real_Position().init_position(xyz[0],
xyz[1],
xyz[2],
position.U,
position.V,
position.W)
after_take_position = Real_Position().init_position(xyz[0],
xyz[1],
xyz[2],
position.U,
position.V,
position.W)
# 安全检查索引
if i > 0:
self.feeding_to_end[i - 1].set_position(befor_take_position)
else:
print("Warning: No position before FTake to update.")
self.feeding_to_end[i].set_position(position)
if i + 1 < len(self.feeding_to_end):
self.feeding_to_end[i + 1].set_position(after_take_position)
else:
print("Warning: No position after FTake to update.")
break # 抓料点暂时就一个
def get_position_list(self):
"""
根据状态将 feed_positions 分割成 origin_to_start, start_to_take, feeding_to_end 三个列表。
并在 FTake 前后插入动态点。
"""
index_start = -1
index_take = -1
for i in range(len(self.feed_positions)):
if self.feed_positions[i].status == FeedStatus.FCheck.value:
index_start = i
break
for i in range(len(self.feed_positions)):
if self.feed_positions[i].status == FeedStatus.FPhoto.value:
index_take = i
if index_start == -1 or index_take == -1:
print("Error: FCheck or FPhoto position not found in feed_positions.")
# 可能需要抛出异常或设置默认值
return
self.origin_to_start = self.feed_positions[: index_start + 1]
self.start_to_take = self.feed_positions[index_start:index_take + 1]
self.feeding_to_end = self.feed_positions[index_take:]
# 查找 FTake 并插入动态点
take_found = False
i = 0
while i < len(self.feeding_to_end):
if self.feeding_to_end[i].status == FeedStatus.FTake.value:
if take_found:
print("Warning: Multiple FTake positions found. Processing first one.")
break # 或处理多个,或抛出异常
# 创建动态点
befor_position_model = PositionModel()
befor_position_model.init_position(None)
after_position_model = PositionModel()
after_position_model.init_position(None)
# 插入动态点 (注意列表长度和索引的变化)
self.feeding_to_end.insert(i, befor_position_model) # 在 FTake 前插入
i += 1 # 列表变长,索引后移
self.feeding_to_end.insert(i + 1, after_position_model) # 在 FTake 后插入 (原 FTake 现在是 i)
# i += 1 # 如果需要继续遍历,可以增加索引
take_found = True
break # 处理完第一个 FTake 就退出
i += 1
class FeedingConfig:
"""
FeedingConfig 类存储当前投料任务的配置信息,
包括剩余数量、路径信息和拍照位置。
"""
def __init__(self, num: int, feedLine: FeedLine, photo_locs):
"""
初始化投料配置。
:param num: 剩余袋数
:param feedLine: FeedLine 对象,包含路径信息
:param photo_locs: 拍照位置列表
"""
self.num = num
self.feedLine = feedLine
self.photo_locs = [self.deal_photo_locs(p) for p in photo_locs]
def deal_photo_locs(self, photo_loc):
"""
将元组或列表形式的坐标转换为 Real_Position 对象。
:param photo_loc: [X, Y, Z, U, V, W] 的列表或元组
:return: Real_Position 对象
"""
position_photo = Real_Position()
position_photo.init_position(photo_loc[0], photo_loc[1], photo_loc[2], photo_loc[3], photo_loc[4], photo_loc[5])
return position_photo
def get_line_info(self):
pass
class Feeding(QObject):
need_origin_signal = Signal(str)
take_no_photo_sigal = Signal()
update_detect_image = Signal(np.ndarray)
log_signal = Signal(int, str)
def __init__(self, robotClient: RobotClient):
super().__init__()
self.feedConfig = None
self.feedStatus = FeedStatus.FNone
self.robotClient = robotClient
self.detection_image = None
self.init_detection_image()
self.pause = False
self.cRis_photo = CRisOrFall()
self.cRis_shake = CRisOrFall()
self.feed_Mid_Status = FeedMidStatus.FMid_Start
self.is_reverse = False
# 复位集合
self.run_reverse = False
self.take_no_photo = False
self.reset_status = ResetStatus.RNone
self.reversed_positions = []
self.current_position = None
self.index = 1
self.pos_index = -1
self.pos_near_index = -1
self.catch = Catch(self.robotClient)
self.detect = Detect()
self.is_detected = True
self.detect_thread = threading.Thread(target=self.run_detect)
self.detect_thread.start()
self.onekey = False
self.mid_take_count = 0
self.debug_run_count = 0
pass
def close_feed(self):
self.is_detected = False
self.detect_thread.join()
self.detect.detection.release()
def init_detection_image(self):
detection_image = cv2.imread(Constant.feed_sign_path)
self.update_detect_image.emit(detection_image)
def run_detect(self):
"""检测线程函数。"""
while self.is_detected:
self.detect.run()
time.sleep(0.02) # 控制检测频率,米厂可以设置较大值
def run(self):
"""
主运行逻辑,由外部循环调用。
这是一个状态机,根据 self.feedStatus 的值执行不同的操作。
"""
self.catch.run()
# 获取机械臂位置
real_position = Real_Position()
real_position.init_position(self.robotClient.status_model.world_0,
self.robotClient.status_model.world_1,
self.robotClient.status_model.world_2,
self.robotClient.status_model.world_3,
self.robotClient.status_model.world_4,
self.robotClient.status_model.world_5)
# --- 状态机主逻辑 ---
if self.feedConfig == None:
self.feedStatus = FeedStatus.FNone
# 检查是否完成反向运行并回到原点
if self.feedConfig != None and self.feedConfig.num == 0 and self.is_reverse and self.robotClient.origin_position.compare(
real_position, is_action=True):
self.feedStatus = FeedStatus.FNone
self.is_reverse = False
self.log_signal.emit(logging.INFO, Constant.str_feed_reverse)
if self.feedStatus == FeedStatus.FNone or self.pause:
return
elif self.feedStatus == FeedStatus.FCheck:
self.log_signal.emit(logging.INFO, Constant.str_feed_check)
# 1, 检查是否是三列
# 2, 检查是否有人
# if self.safe_check_columns() and self.safe_check_person():
# pass
# else:
# if self.feedConfig.num != 0:
# self.next_target()
# if == 原点 继续判断
# else:
# QMessageBox.information(None, "提示", Constant.str_feed_safe_error_msgbox)
# 根据是否反转,进行状态设置
if self.is_reverse:
self.feed_Mid_Status = FeedMidStatus.FMid_Start
else:
self.feed_Mid_Status = FeedMidStatus.FMid_Take
# 如果在起始点,则移动到下一个位置
if self.feedConfig.feedLine.start_to_take[0].get_position().compare(real_position):
self.next_position(self.is_reverse)
elif self.feedStatus == FeedStatus.FStart:
self.log_signal.emit(logging.INFO, Constant.str_feed_start)
# 检查是否在原点 (正向启动时)
if not self.robotClient.origin_position.compare(real_position, is_action=True) and not self.is_reverse:
# QMessageBox.information(None, "提示", Constant.str_feed_start_error) # Fuck 引起异常
self.log_signal.emit(logging.ERROR, Constant.str_feed_start_error)
self.need_origin_signal.emit(Constant.str_feed_start_error)
self.feedStatus = FeedStatus.FNone
return
# 反向运行结束
if self.is_reverse:
self.feedStatus = FeedStatus.FNone
self.is_reverse = False
return
# 初始化位置列表
self.feedConfig.feedLine.get_position_list()
self.detect.detect_status = DetectStatus.DNone
self.feed_Mid_Status = FeedMidStatus.FMid_Start
self.next_position(self.is_reverse)
elif self.feedStatus == FeedStatus.FMid:
feed_pos = self.get_current_position(self.is_reverse)
if feed_pos.get_position().compare(real_position):
self.log_signal.emit(logging.INFO, Constant.str_feed_mid)
self.next_position(self.is_reverse)
# 增加计数器逻辑
self.mid_take_count += 1
# 可选:在 Debug1 模式下输出日志
if Constant.Debug1:
self.log_signal.emit(
logging.INFO,
f"[调试计数] 已进入 FMid 分支 {self.mid_take_count}"
)
if self.feedStatus == FeedStatus.FTake:
self.catch.catch_status = CatchStatus.CTake
elif self.feedStatus == FeedStatus.FPhoto:
if self.feedConfig.num == 0:
self.log_signal.emit(logging.INFO, Constant.str_feed_finish)
self.is_reverse = True
self.feed_Mid_Status = FeedMidStatus.FMid_Take
self.feedConfig.feedLine.start2take_pos_index = len(self.feedConfig.feedLine.start_to_take) - 2
self.feedConfig.feedLine.origin2start_pos_index = len(self.feedConfig.feedLine.origin_to_start) - 2
self.next_position(self.is_reverse)
self.log_signal.emit(logging.INFO, Constant.str_feed_photo)
self.init_detection_image()
return
if not Constant.Debug:
self.log_signal.emit(logging.INFO, Constant.str_feed_takePhoto)
self.feed_Mid_Status = FeedMidStatus.FMid_Feed
self.detect.detect_status = DetectStatus.DOk
self.log_signal.emit(logging.INFO, Constant.str_feed_takePhoto_success)
# ✅ 输出当前抓取的索引
print(f"[调试] 即将抓取点位索引: {self.detect.position_index}")
self.log_signal.emit(logging.INFO, f"[调试] 当前抓取点位索引: {self.detect.position_index}")
# 设置当前抓取位置(使用当前 index
self.feedConfig.feedLine.set_take_position(self.detect.detect_position, self.detect.position_index)
# ✅ 递增索引
self.detect.position_index += 1
# ✅ 输出下一个要加载的索引
print(f"[调试] 下一个要加载的索引: {self.detect.position_index}")
# ✅ 重新加载下一个点位(基于更新后的 index
self.detect.run()
# 增加计数器(用于调试显示执行次数)
self.debug_run_count += 1
if Constant.Debug1:
self.log_signal.emit(logging.INFO, Constant.str_sys_runing)
self.log_signal.emit(logging.INFO, f"[调试计数] 当前已执行 {self.debug_run_count}")
self.next_position()
self.detect.detect_status = DetectStatus.DNone
if Constant.Debug1:
self.log_signal.emit(logging.INFO, Constant.str_sys_runing1)
self.log_signal.emit(logging.INFO, Constant.str_sys_runing2)
#self.feedStatus = FeedStatus.FTake
elif self.feedStatus == FeedStatus.FTake:
self.log_signal.emit(logging.INFO, Constant.str_feed_take)
if self.feedConfig.feedLine.get_take_position().get_position() != None:
# 检查是否到达抓取点
# self.take_no_photo = False
if self.feedConfig.feedLine.get_take_position().get_position().compare(real_position, is_action=True):
# 打开吸嘴并返回
# self.sendIOControl(self.robotClient.con_ios[0], 1)
# self.sendIOControl(self.robotClient.con_ios[1], 1)
# self.sendIOControl(self.robotClient.con_ios[2], 1)
self.log_signal.emit(logging.INFO, "到达抓料点位")
# 触发抓取动作
# self.feedConfig.feedLine.set_take_position(None)
# time.sleep(self.robotClient.time_delay_take)
self.log_signal.emit(logging.INFO, Constant.str_feed_take_success)
if self.catch.catch_status == CatchStatus.COk:
# 抓取完成,移动到下一步
self.next_position()
self.catch.catch_status = CatchStatus.CNone
if Constant.Debug1:
self.log_signal.emit(logging.INFO, Constant.str_sys_runing3) # 运行到这里了
self.feedConfig.feedLine.get_take_position()
return
if self.catch.catch_status == CatchStatus.CTake:
# 抓取动作已发出,等待完成 (COk)
# self.catch.catch_status 应由 Catch 类在完成后设置为 COk
self.catch.catch_status = CatchStatus.COk
if Constant.Debug1:
self.log_signal.emit(logging.INFO, Constant.str_sys_runing1) # 运行到这里了2
else:
self.log_signal.emit(logging.ERROR, Constant.str_feed_takePhoto_fail)
# self.take_no_photo = True
# 继续获取图像
# TODO
# self.feedConfig.feedLine.set_take_position(self.get_take_position())
elif self.feedStatus == FeedStatus.FBroken1:
if self.get_current_position().get_position().compare(real_position):
self.log_signal.emit(logging.INFO, Constant.str_feed_broken)
self.next_position()
elif self.feedStatus == FeedStatus.FBroken2:
if self.get_current_position().get_position().compare(real_position):
self.log_signal.emit(logging.INFO, Constant.str_feed_broken)
self.next_position()
elif self.feedStatus == FeedStatus.FShake:
if self.get_current_position().get_position().compare(real_position, is_action=True):
# TODO 震动方案
self.log_signal.emit(logging.INFO, Constant.str_feed_shake)
if self.catch.catch_status == CatchStatus.CNone:
self.catch.catch_status = CatchStatus.CShake
return
if self.catch.catch_status == CatchStatus.CShake:
# if self.feedConfig.feedLine.feeding_to_end[
# self.feedConfig.feedLine.feeding2end_pos_index + 1].status != FeedStatus.FShake:
# self.catch.catch_status = CatchStatus.COk
# else:
self.catch.shake_continue.SetReset()
self.next_position()
if self.feedStatus != FeedStatus.FShake:
self.catch.catch_status = CatchStatus.CNone
return
elif self.feedStatus == FeedStatus.FDropBag:
# *** 处理投料点 (FDropBag) 的核心逻辑 ***
# 1. 确认机械臂是否已精确到达当前目标投料点
# get_current_position() 会根据 self.feed_Mid_Status (应为 FMid_Feed)
# 调用 feedLine.get_current_feed_position(),从 feeding_to_end 列表获取
# 由 feeding2end_pos_index 指向的点。
if self.get_current_position().get_position().compare(real_position, is_action=True):
# 2. 记录日志:已到达投料点
self.log_signal.emit(logging.INFO, Constant.str_feed_drop)
# 3. 与 Catch 模块进行状态交互来驱动投料动作
# a. 初始状态 (CNone): 触发投料动作
if self.catch.catch_status == CatchStatus.CNone:
# 将 Catch 状态设置为 CDrop通知 Catch 模块开始执行物理投料动作
self.catch.catch_status = CatchStatus.CDrop
# 立即返回,等待 Catch 模块处理
return
# b. 投料进行中 (CDrop): 等待完成
if self.catch.catch_status == CatchStatus.CDrop:
# 什么都不做,等待 Catch 模块完成动作并更新状态
return
# c. 投料完成 (COk): 处理后续逻辑并移动到下一个点
if self.catch.catch_status == CatchStatus.COk:
# 重置 Catch 状态,为下一次操作做准备
self.catch.catch_status = CatchStatus.CNone
# (后续增加) 延时: 让物料稳定
# time.sleep(self.robotClient.time_delay_put)
# (后续增加) 视觉确认: 拍照确认袋子已放置
# self.detection.get_position(...)
# (后续增加) 更新抓取点: 如果需要根据放置情况调整下次抓取
# self.feedConfig.feedLine.set_take_position(...)
# 触发放置后检测 (如果需要,米厂可以不用)
if self.detect.detect_status == DetectStatus.DNone:
self.detect.detect_status = DetectStatus.DDetect
# 4. 更新业务逻辑:减少剩余袋数
self.feedConfig.num = self.feedConfig.num - 1
self.log_signal.emit(logging.INFO, f'{Constant.str_feed_feed_num}{self.feedConfig.num}')
# 5. *** 关键步骤 ***: 移动到路径中的下一个点
# next_position() 会根据当前的 feed_Mid_Status (FMid_Feed)
# 调用 next_Feed()。
self.next_position()
def run_reset(self):
"""执行复位操作 (状态机驱动)。"""
# 获取当前机械臂位置
real_position = Real_Position()
real_position.init_position(self.robotClient.status_model.world_0,
self.robotClient.status_model.world_1,
self.robotClient.status_model.world_2,
self.robotClient.status_model.world_3,
self.robotClient.status_model.world_4,
self.robotClient.status_model.world_5)
if self.reset_status == ResetStatus.RNone:
# 无复位请求
return
if self.reset_status == ResetStatus.RStart:
# 无配置无法复位
if self.feedConfig == None: return
# for index in range(len(self.feedConfig.feedLine.positions)):
# if self.feedConfig.feedLine.positions[index].status == 2:
# start_index = index
self.pos_index = -1
self.pos_near_index = -1
self.reversed_positions = []
# 查找当前位置在路径中的索引
for index, pos_model in enumerate(self.feedConfig.feedLine.feed_positions):
if pos_model.get_position().compare(real_position, is_action=True):
self.pos_index = index
break
if self.pos_index == -1:
self.log_signal.emit(logging.ERROR, Constant.str_feed_return_original_position_fail)
# 如果未精确找到,查找最近的位置
min_distance = 99999999
for index, pos_model in enumerate(self.feedConfig.feedLine.feed_positions):
if get_distance(pos_model.get_position(), real_position) < min_distance:
min_distance = get_distance(pos_model.get_position(), real_position)
self.pos_near_index = index
# 确定复位路径 (从开始到最近/当前点)
if self.pos_near_index != -1:
self.reversed_positions = self.feedConfig.feedLine.feed_positions[:self.pos_near_index + 1]
else:
self.log_signal.emit(logging.ERROR, "Cannot determine reset path.")
return False
else:
self.reversed_positions = self.feedConfig.feedLine.feed_positions[:self.pos_index + 1]
# 反转路径
self.reversed_positions = list(reversed(self.reversed_positions))
self.reverse_index = 0 # 复位路径索引
self.send_emergency_sound() # 发出急停声音
self.current_position = PositionModel()
self.current_position.init_position(real_position)
self.reset_status = ResetStatus.RRunging # 进入运行状态
if self.reset_status == ResetStatus.RRunging:
if not real_position.compare(self.current_position.get_position(), is_action=True):
return
pos_model = self.reversed_positions[self.reverse_index]
if pos_model.status == FeedStatus.FTake.value: # 跳过取袋节点
pos_model = self.reversed_positions[self.reverse_index + 1]
self.reverse_index = self.reverse_index + 1
if pos_model.lineType == LineType.CureMid.value:
pos_model1 = self.reversed_positions[self.reverse_index + 1]
self.sendTargPosition(real_position=pos_model.get_position(), move_type=MoveType.Cure,
real_position1=pos_model1.get_position(), speed=self.robotClient.reset_speed)
self.current_position = pos_model1
self.reverse_index = self.reverse_index + 2
else:
self.sendTargPosition(real_position=pos_model.get_position(), speed=self.robotClient.reset_speed)
self.current_position = pos_model
self.reverse_index = self.reverse_index + 1
if self.reverse_index == len(self.reversed_positions):
self.reset_status = ResetStatus.ROk
if self.reset_status == ResetStatus.ROk:
self.reset_status = ResetStatus.RNone
self.send_emergency_stop()
# 废弃代码
def return_original_position(self):
self.run_reverse = True
real_position = Real_Position()
real_position.init_position(self.robotClient.status_model.world_0,
self.robotClient.status_model.world_1,
self.robotClient.status_model.world_2,
self.robotClient.status_model.world_3,
self.robotClient.status_model.world_4,
self.robotClient.status_model.world_5)
if self.feedConfig == None: return
start_index = -1
# for index in range(len(self.feedConfig.feedLine.positions)):
# if self.feedConfig.feedLine.positions[index].status == 2:
# start_index = index
pos_index = -1
pos_near_index = -1
reversed_positions = []
for index, pos_model in enumerate(self.feedConfig.feedLine.feed_positions):
if pos_model.get_position().compare(real_position):
pos_index = index
break
if pos_index == -1:
self.log_signal.emit(logging.ERROR, Constant.str_feed_return_original_position_fail)
min_distance = 99999999
for index, pos_model in enumerate(self.feedConfig.feedLine.feed_positions):
if get_distance(pos_model.get_position(), real_position) < min_distance:
min_distance = get_distance(pos_model.get_position(), real_position)
pos_near_index = index
if pos_near_index != -1:
reversed_positions = self.feedConfig.feedLine.feed_positions[:pos_near_index + 1]
else:
return False
else:
reversed_positions = self.feedConfig.feedLine.feed_positions[:pos_index]
reversed_positions = list(reversed(reversed_positions))
self.reverse_index = 0
self.send_emergency_sound()
current_position = PositionModel()
current_position.init_position(real_position)
while self.run_reverse and self.reverse_index != len(reversed_positions):
if self.reverse_index != 0 and not real_position.compare(current_position.get_position()):
continue
# todo 缺少比对
pos_model = reversed_positions[self.reverse_index]
if pos_model.status == FeedStatus.FTake.value: # 跳过取袋节点
pos_model = reversed_positions[self.reverse_index + 1]
# TODO take节点判断
if pos_model.lineType == LineType.CureMid.value:
pos_model1 = reversed_positions[self.reverse_index + 1]
self.sendTargPosition(real_position=pos_model.get_position(), move_type=MoveType.Cure,
real_position1=pos_model1.get_position(), speed=self.robotClient.reset_speed)
current_position = pos_model1
self.reverse_index = self.reverse_index + 2
else:
self.sendTargPosition(real_position=pos_model.get_position(), speed=self.robotClient.reset_speed)
current_position = pos_model
self.reverse_index = self.reverse_index + 1
self.send_emergency_stop()
return True
def send_emergency_sound(self):
"""发送急停声音 IO 控制。"""
self.sendIOControl(Constant.IO_EmergencyPoint, 1)
def send_emergency_stop(self):
"""停止急停声音 IO 控制。"""
self.sendIOControl(Constant.IO_EmergencyPoint, 0)
def sendIOControl(self, IO_bit, IO_Status: int):
"""发送 IO 控制指令。"""
IO_command = CMDInstructRequest()
io_instruction = Instruction()
io_instruction.IO = True
io_instruction.io_status = IO_Status
io_instruction.point = IO_bit # {"dsID":"HCRemoteCommand","reqType":"AddRCC","emptyList":"1","instructions":[{"oneshot":"1","action":"200","type":"0","io_status":"1","point":"15","delay":"0"}]}
IO_command.dsID = 'HCRemoteCommand'
IO_command.instructions.append(io_instruction)
self.robotClient.add_sendQuene(IO_command.toString())
self.log_signal.emit(logging.INFO, f'{Constant.str_feed_io_control}{IO_bit}{IO_Status}')
pass
def sendTargPosition(self, real_position, move_type: MoveType = MoveType.WORLD, speed=5, real_position1=None):
"""发送目标位置移动指令。"""
position_instruction = Instruction()
position_instruction.speed = speed
position_instruction.m0 = real_position.X
position_instruction.m1 = real_position.Y
position_instruction.m2 = real_position.Z
position_instruction.m3 = real_position.U
position_instruction.m4 = real_position.V
position_instruction.m5 = real_position.W
position_instruction.smooth = self.robotClient.smooth
position_instruction.action = move_type.value
if position_instruction.action == 17:
position_instruction.m0_p = real_position1.X
position_instruction.m1_p = real_position1.Y
position_instruction.m2_p = real_position1.Z
position_instruction.m3_p = real_position1.U
position_instruction.m4_p = real_position1.V
position_instruction.m5_p = real_position1.W
instruction_command = CMDInstructRequest()
instruction_command.instructions.append(position_instruction)
request_command = instruction_command.toString()
log_str = f'移动到位置:{"姿势直线" if move_type == MoveType.WORLD else "姿势曲线"}' \
f'X:{position_instruction.m0}-' \
f'Y:{position_instruction.m1}-' \
f'Z:{position_instruction.m2}-' \
f'U:{position_instruction.m3}-' \
f'V:{position_instruction.m4}-' \
f'W:{position_instruction.m5}'
try:
self.log_signal.emit(logging.INFO, log_str)
except Exception as e:
self.log_signal.emit(logging.ERROR, f"Logging error in sendTargPosition: {e}")
return # 如果日志发送失败,至少不要中断主流程
print(request_command)
self.robotClient.add_sendQuene(request_command)
# def get_take_position(self):
# if Constant.Debug:
# return self.robotClient.status_model.getRealPosition()
# _, img, xyz, uvw, points = self.detection.get_position(Point_isVision=False, Box_isPoint=True, First_Depth=True, Iter_Max_Pixel=30,save_img_point=0, Height_reduce=30, width_reduce=30)
# self.detection_image = img.copy()
# if xyz ==None or uvw==None or points==None:
# return None
# target_position,noraml_base = getPosition(*xyz,*uvw,None,points)
#
# position = Real_Position().init_position(*target_position[:3],*noraml_base[:3])
# position.Z = position.Z+200
# return position
def next_start(self, reverse=False):
"""移动到下一个起始位置。"""
start_pos = self.feedConfig.feedLine.get_next_start_position(reverse)
self.feedStatus = FeedStatus(start_pos.status) if self.feedStatus != FeedStatus.FNone else FeedStatus.FNone
if start_pos.lineType == LineType.CureMid.value:
start_pos1 = self.feedConfig.feedLine.get_next_start_position(reverse)
self.feedStatus = FeedStatus(start_pos1.status) if self.feedStatus != FeedStatus.FNone else FeedStatus.FNone
self.sendTargPosition(real_position=start_pos.get_position(), speed=self.robotClient.reset_speed,
move_type=MoveType.Cure, real_position1=start_pos1.get_position())
elif start_pos.lineType == LineType.WORLD.value:
if self.robotClient.status_model.getAnglePosition().is_error_angel_move(start_pos.get_position(),
self.robotClient.max_angle_interval):
self.feedStatus = None
self.log_signal.emit(logging.ERROR, Constant.str_feed_angle_error)
else:
self.sendTargPosition(real_position=start_pos.get_position(), speed=self.robotClient.reset_speed,
move_type=MoveType.AXIS)
else:
self.sendTargPosition(real_position=start_pos.get_position(), speed=self.robotClient.reset_speed)
pass
def next_take(self, reverse=False):
"""移动到下一个抓取位置。"""
take_pos = self.feedConfig.feedLine.get_next_take_position(reverse)
self.feedStatus = FeedStatus(take_pos.status) if self.feedStatus != FeedStatus.FNone else FeedStatus.FNone
if take_pos.lineType == LineType.CureMid.value:
take_pos1 = self.feedConfig.feedLine.get_next_take_position(reverse)
self.feedStatus = FeedStatus(take_pos1.status) if self.feedStatus != FeedStatus.FNone else FeedStatus.FNone
self.sendTargPosition(real_position=take_pos.get_position(), move_type=MoveType.Cure,
real_position1=take_pos1.get_position(), speed=self.robotClient.feed_speed)
elif take_pos.lineType == LineType.WORLD.value:
if self.robotClient.status_model.getAnglePosition().is_error_angel_move(take_pos.get_position(),
self.robotClient.max_angle_interval):
self.feedStatus = None
self.log_signal.emit(logging.ERROR, Constant.str_feed_angle_error)
else:
self.sendTargPosition(real_position=take_pos.get_position(), speed=self.robotClient.feed_speed,
move_type=MoveType.AXIS)
else:
self.sendTargPosition(real_position=take_pos.get_position(), speed=self.robotClient.feed_speed)
pass
def next_Feed(self, reverse=False):
"""
移动到 feeding_to_end 列表中的下一个位置。
这个方法负责处理所有在 feeding_to_end 列表中的点,包括 FTake 前后的动态点、FDropBag 点等。
"""
# 1. 调用 FeedLine 的方法获取下一个位置对象 (并更新内部索引)
# get_next_feed_position 会返回当前索引指向的点,
# 然后根据 reverse 参数更新 feeding2end_pos_index。
feed_pos = self.feedConfig.feedLine.get_next_feed_position(reverse)
# 2. 更新主状态机,使其与即将移动到的位置状态一致
self.feedStatus = FeedStatus(feed_pos.status) if self.feedStatus != FeedStatus.FNone else FeedStatus.FNone
# 3. 根据位置的 lineType 发送相应的移动指令
if feed_pos.lineType == LineType.CureMid.value:
# 处理曲线运动 (需要两个点)
feed_pos1 = self.feedConfig.feedLine.get_next_feed_position(reverse)
self.feedStatus = FeedStatus(feed_pos1.status) if self.feedStatus != FeedStatus.FNone else FeedStatus.FNone
self.sendTargPosition(real_position=feed_pos.get_position(), move_type=MoveType.Cure,
real_position1=feed_pos1.get_position(), speed=self.robotClient.feed_speed)
elif feed_pos.lineType == LineType.WORLD.value:
# 处理直线/关节运动
if self.robotClient.status_model.getAnglePosition().is_error_angel_move(feed_pos.get_position(),
self.robotClient.max_angle_interval):
self.feedStatus = None
self.log_signal.emit(logging.ERROR, Constant.str_feed_angle_error)
else:
self.sendTargPosition(real_position=feed_pos.get_position(), speed=self.robotClient.feed_speed,
move_type=MoveType.AXIS)
else:
# 默认处理 (通常是直线 WORLD 运动)
# *** 处理 FDropBag 点 ***
# 当 feeding_to_end 列表中的点是 FDropBag (status=9) 时,
# 这段默认代码会被执行,发送移动到该点的指令。
# 一旦机械臂到达该点run() 方法的 FDropBag 分支就会被触发,
# 完成投料后next_position() 又会被调用,继续移动到列表中的下一个点
self.sendTargPosition(real_position=feed_pos.get_position(), speed=self.robotClient.feed_speed)
def get_current_position(self, is_reverse=False):
"""
根据当前子阶段状态获取对应的目标位置。
:param is_reverse: 是否反向 (传递给 FeedLine 方法)
:return: PositionModel 对象或 None
"""
if self.feed_Mid_Status == FeedMidStatus.FMid_Start:
return self.feedConfig.feedLine.get_current_start_position(self.is_reverse)
elif self.feed_Mid_Status == FeedMidStatus.FMid_Take:
return self.feedConfig.feedLine.get_current_take_position(self.is_reverse)
elif self.feed_Mid_Status == FeedMidStatus.FMid_Feed:
return self.feedConfig.feedLine.get_current_feed_position(self.is_reverse)
def next_position(self, reverse=False):
"""
根据当前子阶段状态移动到下一个位置。
这是驱动整个喂料流程前进的核心方法。
:param reverse: 是否反向移动
"""
if self.feed_Mid_Status == FeedMidStatus.FMid_Start:
self.next_start(reverse)
elif self.feed_Mid_Status == FeedMidStatus.FMid_Take:
self.next_take(reverse)
elif self.feed_Mid_Status == FeedMidStatus.FMid_Feed:
# *** 关键调用 ***:
# 当 feed_Mid_Status 是 FMid_Feed 时next_position() 会调用 next_Feed()。
# next_Feed() 内部调用 feedLine.get_next_feed_position(),该方法会获取
# feeding_to_end 列表中当前索引指向的点,并将索引递增,从而实现按顺序访问。
# 这使得 FTake 前后动态点和 FDropBag 点都能被顺序处理。
self.next_Feed(reverse)
def safe_check_columns(self):
return True
pass
def safe_check_person(self):
return True
pass