Files
AutoControlSystem-G/CU/Feeding.py
2025-09-30 14:44:12 +08:00

973 lines
46 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import copy
from dis import stack_effect
import logging
import random
import threading
import time
import cv2
import numpy as np
from PyQt5.QtWidgets import QMessageBox
from PySide6.QtCore import Signal, QObject
from PySide6.QtGui import QPixmap
import Constant
import Expection
from CU.Catch import Catch, CatchStatus
from CU.Detect import Detect, DetectStatus
from Model.FeedModel import PositionModel
from Model.Position import Real_Position, Detection_Position
from enum import Enum, IntEnum
from COM.COM_Robot import RobotClient, DetectType
from Model.RobotModel import CMDInstructRequest, MoveType
from Trace.handeye_calibration import getPosition
from Trace.handeye_calibration import getxyz,getxyz1
from Util.util_math import get_distance
from Util.util_time import CRisOrFall
#from Vision.camera_coordinate_dete import Detection
from Util.util_log import log
from Model.RobotModel import Instruction
from EMV.EMV import RelayController
from CU.drop import DropPositionManager
# from Mv3D.CameraImg import CameraImg
class ResetStatus(Enum):
RNone = 0
RStart = 1
RRunging = 2
ROk =3
class FeedStatus(IntEnum):
FNone = 0
FStart = 1
FCheck = 2
FMid = 3
FPhoto = 4
FTake = 5
FBroken1 = 6
FDropMid = 7 # 暂时替换为扔包中间点
FShake = 8
FDropBag = 9 #码垛点
FDropReset = 10 #码垛后点
FFinished = 11
FReverse = 12
FStartReverse = 13
class LineType(Enum):
#直线
Straight = 0
#曲线中间点
CureMid = 2
#曲线终点
CureEnd = 3
#关节(自由路径)
WORLD = 4
class FeedMidStatus(Enum):
FMid_Start = 1
FMid_Take = 2
FMid_Feed= 3
FMid_Reverse = 4
class FeedPosition:
def __init__(self,status:FeedStatus,position:Real_Position):
self.status = status
self.position = position
class FeedLine:
def __init__(self, id, name, feed_positions:list,remain_count:int,drop_manage:DropPositionManager):
self.feed_positions = copy.deepcopy(feed_positions)
self.feeding2end_pos_index = 0
self.origin2start_pos_index = 0
self.start2take_pos_index = 0
self.name = name
self.id = id
self.drop_manager = drop_manage
# 初始化各个阶段的位置列表
self.feeding_to_end = []
# --- 新增:用于存储从 ini 文件读取的多个投料点坐标 ---
# 这个列表将在加载 ini 时填充 [[x1,y1,z1,u1,v1,w1], [x2,y2,z2,u2,v2,w2], ...]
self.drop_point_list = []
#读取码垛点位置
self.current_index = remain_count+1
#记录feed_positions当前扔包点索引
self.current_dropbag_index=0
#记录当前抓包点索引
self.current_take_index = 0
def get_current_index(self):
return self.current_index
def set_current_index(self, index):
self.current_index = index
def get_current_feed_position(self,is_reverse):
pos = self.feeding_to_end[ (self.feeding2end_pos_index+1)%len(self.feeding_to_end) if is_reverse else self.feeding2end_pos_index-1]
return pos
def get_current_take_position(self,is_reverse):
pos = self.start_to_take[ (self.start2take_pos_index+1)%len(self.start_to_take) if is_reverse else self.start2take_pos_index-1]
return pos
def get_current_start_position(self,is_reverse):
pos = self.origin_to_start[(self.origin2start_pos_index+1)%len(self.origin_to_start) if is_reverse else self.origin2start_pos_index-1]
return pos
def get_next_feed_position(self,reverse:bool=False):
pos = self.feeding_to_end[self.feeding2end_pos_index]
if reverse:
self.feeding2end_pos_index -= 1
if self.feeding2end_pos_index < 0:
self.feeding2end_pos_index = len(self.feeding_to_end) - 1
else:
self.feeding2end_pos_index += 1
if self.feeding2end_pos_index >= len(self.feeding_to_end):
self.feeding2end_pos_index = 0
return pos
def get_next_start_position(self,reverse:bool=False):
pos = self.origin_to_start[self.origin2start_pos_index]
if reverse:
self.origin2start_pos_index -= 1
if self.origin2start_pos_index < 0:
self.origin2start_pos_index = len(self.origin_to_start) - 1
else:
self.origin2start_pos_index += 1
if self.origin2start_pos_index >= len(self.origin_to_start):
self.origin2start_pos_index = 0
return pos
def get_next_take_position(self,reverse:bool=False):
pos = self.start_to_take[self.start2take_pos_index]
if reverse:
self.start2take_pos_index -= 1
if self.start2take_pos_index < 0:
self.start2take_pos_index = len(self.start_to_take) - 1
else:
self.start2take_pos_index += 1
if self.start2take_pos_index >= len(self.start_to_take):
self.start2take_pos_index = 0
return pos
def get_take_position(self):
for i in range(len(self.feeding_to_end)):
if self.feeding_to_end[i].status == FeedStatus.FTake.value:
return self.feeding_to_end[i]
def set_take_position(self, position: Real_Position, dynamic_height=0):
"""
设置 FTake 位置(),并更新其前后动态点的位置。
:param position: 新的抓取位置
:param dynamic_height: 动态高度调整 (如果需要)
"""
for i in range(len(self.feeding_to_end)):
if self.feeding_to_end[i].status == FeedStatus.FTake.value:
# 计算 XYZ 坐标
# xyz = getxyz(position.X, position.Y, position.Z, position.a, position.b, position.c)
xyz = getxyz1(position.X, position.Y, position.Z, position.a, position.b, position.c)
# 创建 before 和 after 位置
befor_take_position = Real_Position().init_position(xyz[0],
xyz[1],
xyz[2],
position.U,
position.V,
position.W)
after_take_position = Real_Position().init_position(xyz[0],
xyz[1],
xyz[2],
position.U,
position.V,
position.W)
# 安全检查索引
if i > 0:
self.feeding_to_end[i - 1].set_position(befor_take_position)
else:
print("Warning: No position before FTake to update.")
self.feeding_to_end[i].set_position(position)
if i + 1 < len(self.feeding_to_end):
self.feeding_to_end[i + 1].set_position(after_take_position)
else:
print("Warning: No position after FTake to update.")
break # 抓料点暂时就一个
def set_drop_position(self, position: Real_Position):
"""
设置 FDropBag 位置,只设置当前点,不处理前后点。
:param position: 新的丢包位置
"""
for i in range(len(self.feeding_to_end)):
if self.feeding_to_end[i].status == FeedStatus.FDropBag.value:
# 直接设置当前点的位置
self.feeding_to_end[i].set_position(position)
print(
f"✅ FDropBag 位置已更新: ({position.X}, {position.Y}, {position.Z}, {position.U}, {position.V}, {position.W})")
break # 假设只有一个丢包点
def get_drop_path(self) -> list:
"""获取动态扔包路径"""
if self.drop_manager is None:
return []
path = []
while True:
#current_index,当前扔包点id
#drop.ini定义point1,point2,current_index对应后面的数字
pos_model = self.drop_manager.get_next_drop_position(
self.id,
self.current_index
)
if pos_model is None:
break
path.append(pos_model)
return path
def get_position_list(self):
index_start = -1
for i in range(len(self.feed_positions)):
if self.feed_positions[i].status == FeedStatus.FCheck.value:
index_start = i
break
for i in range(len(self.feed_positions)):
if self.feed_positions[i].status == FeedStatus.FPhoto.value:
index_take = i
#开始插入动态扔包点,按照 动态扔包中间点,扔包点,动态复位点的顺序
#记录feed_positions扔包点初始位置,注意feed_positions会动态变化
#如动态变化,重新获取
if self.current_dropbag_index==0:
for i in range(len(self.feed_positions)):
if self.feed_positions[i].status == FeedStatus.FDropBag.value:
self.current_dropbag_index = i
break
self.origin_to_start = self.feed_positions[: index_start+1]
self.start_to_take = self.feed_positions[index_start:index_take+1]
self.feeding_to_end = self.feed_positions[index_take:]
def set_feeding_to_end(self):
for i in range(len(self.feed_positions)):
if self.feed_positions[i].status == FeedStatus.FPhoto.value:
index_take = i
break
index_drop=self.current_dropbag_index
test_path = self.get_drop_path()
self.current_index+=1
# 将总list的drop部分替换为动态路径
self.feed_positions = self.feed_positions[:index_drop] + test_path
# self.feeding_to_end = self.feed_positions[index_take:index_drop]
self.feeding_to_end = self.feed_positions[index_take:index_drop]+test_path
class FeedingConfig:
def __init__(self, num: int, feedLine: FeedLine, photo_locs,remain_count:int):
#需码垛数量如50或30
self.num = num
#已经码垛数量
self.remain_count=remain_count
self.feedLine = feedLine
self.photo_locs = [self.deal_photo_locs(p) for p in photo_locs]
def deal_photo_locs(self, photo_loc):
position_photo = Real_Position()
position_photo.init_position(photo_loc[0], photo_loc[1], photo_loc[2], photo_loc[3], photo_loc[4], photo_loc[5])
return position_photo
def get_line_info(self):
pass
class Feeding(QObject):
need_origin_signal = Signal(str)
take_photo_sigal = Signal()
update_detect_image = Signal(np.ndarray)
log_signal = Signal(int,str)
#码垛完成通知
stack_finish_signal=Signal()
def __init__(self, robotClient: RobotClient,relay_controller:RelayController):
super().__init__()
self.feedConfig = None
self.feedStatus = FeedStatus.FNone
self.robotClient = robotClient
# self.sensor_thread = None
self.detection_image = None
self.pause = False
self.cRis_photo = CRisOrFall()
self.cRis_shake = CRisOrFall()
self.feed_Mid_Status = FeedMidStatus.FMid_Start
self.is_reverse = False
# 复位集合
self.run_reverse = False
self.take_no_photo = False
self.reset_status = ResetStatus.RNone
self.reversed_positions = []
self.current_position = None
self.index=1
self.pos_index = -1
self.pos_near_index = -1
self.catch = Catch(self.robotClient)
self.detect = Detect()
self.is_detected = True
self.detect_thread = threading.Thread(target=self.run_detect,name="run_detect")
# self.detect_thread.start()
self.onekey = False
self.debug_run_count = 0 # 初始化计数器
self.mid_take_count = 0
#记录抓取传感器状态通知
self.take_sensor_signal=False
#传感器判断抓包参数
# self.sensor2_ready = False # 传感器2是否检测到料包
# self.motor_stopped_by_sensor2 = False # 是否由传感器2触发停止电机
# self.sensor_thread = None
# self.relay_controller = RelayController()
#用于同步控制EMV相关
self.relay_controller = relay_controller
self.relay_controller.take_robot_signal.connect(self.take_feed_notice)
# self.camera_img=CameraImg()
# 启动传感器2线程
# self.relay_controller._running = True
# self.sensor2_thread = None
# --- 新增: 用于码垛模式的投料点索引 ---
self.current_drop_index = 1
pass
def close_feed(self):
self.is_detected = False
# self.detect_thread.join()
if self.detect.detection:
self.detect.detection.release()
# if self.camera_img:
# self.camera_img.close_camera()
def run_detect(self):
#图片相关线程
while self.is_detected:
self.detect.run()
time.sleep(0.02)
def take_feed_notice(self):
"""接收机器人信号通知"""
self.take_sensor_signal = True
def run(self):
self.catch.run()
# 获取事件坐标
real_position = Real_Position()
real_position.init_position_joint_and_world(self.robotClient.status_model.world_0,
self.robotClient.status_model.world_1,
self.robotClient.status_model.world_2,
self.robotClient.status_model.world_3,
self.robotClient.status_model.world_4,
self.robotClient.status_model.world_5,
self.robotClient.status_model.axis_0,
self.robotClient.status_model.axis_1,
self.robotClient.status_model.axis_2,
self.robotClient.status_model.axis_3,
self.robotClient.status_model.axis_4,
self.robotClient.status_model.axis_5
)
# real_position.init_position(0,
# 0,
# 0,
# 0,
# 0,
# 0);
# img_path = f'Image/{self.index}.png'
# img = cv2.imread(img_path)
# self.index += 1
# self.index = self.index % 4
# self.detection_image = img
if self.feedConfig == None:
self.feedStatus = FeedStatus.FNone
#
if self.feedConfig !=None and self.is_reverse and real_position.compare(self.robotClient.origin_position,is_action=True):
self.feedStatus = FeedStatus.FNone
self.is_reverse = False
self.log_signal.emit(logging.INFO, Constant.str_feed_reverse)
if self.feedStatus == FeedStatus.FNone or self.pause:
return
elif self.feedStatus == FeedStatus.FCheck:
self.log_signal.emit(logging.INFO, Constant.str_feed_check)
# 1, 检查是否是三列
# 2, 检查是否有人
# if self.safe_check_columns() and self.safe_check_person():
# pass
# else:
# if self.feedConfig.num != 0:
# self.next_target()
# if == 原点 继续判断
# else:
# QMessageBox.information(None, "提示", Constant.str_feed_sae_error_msgbox)
if self.is_reverse:
self.feed_Mid_Status = FeedMidStatus.FMid_Start
else:
self.feed_Mid_Status = FeedMidStatus.FMid_Take
if real_position.compare(self.feedConfig.feedLine.start_to_take[0].get_position()):
self.next_position(self.is_reverse)
elif self.feedStatus == FeedStatus.FStart:
self.log_signal.emit(logging.INFO, Constant.str_feed_start)
# self.relay_controller.open(conveyor2=True)#开电机
if not real_position.compare(self.robotClient.origin_position,is_action=True) and not self.is_reverse:
# QMessageBox.information(None, "提示", Constant.str_feed_start_error) # Fuck 引起异常
self.log_signal.emit(logging.ERROR, Constant.str_feed_start_error)
self.need_origin_signal.emit(Constant.str_feed_start_error)
self.feedStatus = FeedStatus.FNone
return
if self.is_reverse:
self.feedStatus = FeedStatus.FNone
self.is_reverse = False
return
self.feedConfig.feedLine.get_position_list()
self.detect.detect_status = DetectStatus.DNone
self.feed_Mid_Status = FeedMidStatus.FMid_Start
self.next_position(self.is_reverse)
elif self.feedStatus == FeedStatus.FMid:
feed_pos = self.get_current_position(self.is_reverse)
if real_position.compare(feed_pos.get_position()):
self.log_signal.emit(logging.INFO, Constant.str_feed_mid)
self.next_position(self.is_reverse)
# 增加计数器逻辑
self.mid_take_count += 1
# 可选:在 Debug1 模式下输出日志
if Constant.Debug1:
self.log_signal.emit(
logging.INFO,
f"[调试计数] 已进入 FMid 分支 {self.mid_take_count}"
)
#这里他写的中间点过完马上进行抓料和整个流程不符合我就注释掉了因为实际测试的时候还没到FTake的点他就进行了继电器关闭夹爪的抓取
#if self.feedStatus == FeedStatus.FTake:
#self.catch.catch_status = CatchStatus.CTake
elif self.feedStatus == FeedStatus.FPhoto:
#码垛的数量和配置的数量一致时
if self.feedConfig.remain_count >=self.feedConfig.num:
#关闭,暂停
self.log_signal.emit(logging.INFO, Constant.str_feed_finish)
self.is_reverse = True
self.feed_Mid_Status = FeedMidStatus.FMid_Take
self.feedConfig.feedLine.start2take_pos_index = len(self.feedConfig.feedLine.start_to_take) - 2
self.feedConfig.feedLine.origin2start_pos_index = len(self.feedConfig.feedLine.origin_to_start) - 2
#码垛数量重置
self.feedConfig.remain_count=0
self.next_position(self.is_reverse)
#码垛完成信号通知
self.stack_finish_signal.emit()
self.log_signal.emit(logging.INFO, Constant.str_feed_photo)
return
if not Constant.Debug:
self.log_signal.emit(logging.INFO, Constant.str_feed_takePhoto)
if(self.feed_Mid_Status != FeedMidStatus.FMid_Feed):
self.feed_Mid_Status = FeedMidStatus.FMid_Feed
self.log_signal.emit(logging.INFO, Constant.str_feed_takePhoto_success)
self.next_position()
return
#初始点无论如何先打开夹爪
if self.relay_controller.close(clamp=True):
#重新抓去信号料带
self.take_sensor_signal=False
self.relay_controller.sensor2_ready=True
#去除list.ini读取抓取点20250915
#self.feedConfig.feedLine.set_take_position(self.detect.detect_position, 0)
#self.feedConfig.feedLine.set_take_position(real_position, 0)#必须设置
# 一直等待传感器2信号永不超时
# TODO:逻辑需改变不能用while循环
if Constant.DebugPosition:
self.take_sensor_signal=True
while True:
# sensors = self.relay_controller.get_all_device_status('sensors')
# sensor2_value = sensors.get(self.relay_controller.SENSOR2, False)
if self.take_sensor_signal:
self.log_signal.emit(logging.INFO, "传感器2检测到料包到位开始执行抓取")
break # ✅ 条件满足,跳出循环,继续执行下面的代码
else:
if self.feedStatus == FeedStatus.FNone:
return
time.sleep(1) # 每秒检查一次
#第二次执行FeedStatus.FPhoto时改变码垛点
# self.camera_img.save_frame_path()
self.feedConfig.feedLine.set_feeding_to_end()
# self.take_photo_sigal.emit()
self.next_position()
self.log_signal.emit(logging.INFO, Constant.str_sys_runing2)
else:
self.log_signal.emit(logging.ERROR, Constant.str_clamp_open_error)
time.sleep(2)
# self.feedStatus = FeedStatus.FTake
elif self.feedStatus == FeedStatus.FTake:
self.log_signal.emit(logging.INFO, Constant.str_feed_take)
take_position = self.feedConfig.feedLine.get_take_position()
if not take_position or not take_position.get_position():
self.log_signal.emit(logging.ERROR, Constant.str_feed_takePhoto_fail)
return
if not real_position.compare(take_position.get_position(), is_action=True):
self.log_signal.emit(logging.INFO, "机器人尚未到达抓料点位")
return
self.log_signal.emit(logging.INFO, "机器人已到达抓料点位")
# 执行抓取动作
#self.catch.catch_status = CatchStatus.CTake
#if self.catch.catch_status == CatchStatus.CNone:
#self.catch.catch_status = CatchStatus.CTake
#if self.catch.catch_status == CatchStatus.CTake:
#self.log_signal.emit(logging.INFO, "正在执行抓料动作...")
#self.catch.catch_status = CatchStatus.COk
#if self.catch.catch_status == CatchStatus.COk:
#self.log_signal.emit(logging.INFO, Constant.str_feed_take_success)
#self.catch.catch_status = CatchStatus.CNone
# 移动到下一个抓取点
# 更新丢包点: 如果需要根据放置情况调整下次抓取
if self.relay_controller.open(clamp=True):
self.next_position(self.is_reverse)
else:
self.log_signal.emit(logging.ERROR, Constant.str_clamp_close_error)
time.sleep(2)
#self.feedConfig.feedLine.set_drop_position(real_position)#我想在这里读取我的一个ini文件值里面有很多个drop点每一次索引递增的点
#return
#else:
#self.log_signal.emit(logging.ERROR, Constant.str_feed_takePhoto_fail)
elif self.feedStatus == FeedStatus.FBroken1:
if real_position.compare(self.get_current_position().get_position()):
self.log_signal.emit(logging.INFO, Constant.str_feed_broken)
self.next_position()
elif self.feedStatus == FeedStatus.FDropMid:
if real_position.compare(self.get_current_position().get_position(),is_action=True):
self.log_signal.emit(logging.INFO, Constant.str_feed_drop_mid)
self.next_position()
elif self.feedStatus == FeedStatus.FShake:
if real_position.compare(self.get_current_position().get_position(),is_action=True):
# TODO 震动方案
self.log_signal.emit(logging.INFO, Constant.str_feed_shake)
if self.catch.catch_status == CatchStatus.CNone:
self.catch.catch_status = CatchStatus.CShake
return
if self.catch.catch_status == CatchStatus.CShake:
# if self.feedConfig.feedLine.feeding_to_end[
# self.feedConfig.feedLine.feeding2end_pos_index + 1].status != FeedStatus.FShake:
# self.catch.catch_status = CatchStatus.COk
# else:
self.catch.shake_continue.SetReset()
self.next_position()
if self.feedStatus!=FeedStatus.FShake:
self.catch.catch_status = CatchStatus.CNone
return
elif self.feedStatus == FeedStatus.FDropBag:
#"""*** 处理投料点 (FDropBag) 的核心逻辑 ***"""
# 1. 确认机械臂是否已精确到达当前目标投料点
# get_current_position() 会根据 self.feed_Mid_Status (应为 FMid_Feed)
# 调用 feedLine.get_current_feed_position(),从 feeding_to_end 列表获取
# 由 feeding2end_pos_index 指向的点。
if real_position.compare(self.get_current_position().get_position(),is_action=True):
# 2. 记录日志:已到达投料点
self.log_signal.emit(logging.INFO, Constant.str_feed_drop)
# 3. 与 Catch 模块进行状态交互来驱动投料动作
# a. 初始状态 (CNone): 触发投料动作
if self.catch.catch_status == CatchStatus.CNone:
# 将 Catch 状态设置为 CDrop通知 Catch 模块开始执行物理投料动作
self.catch.catch_status = CatchStatus.CDrop
# 立即返回,等待 Catch 模块处理
return
# b. 投料进行中 (CDrop): 等待完成
if self.catch.catch_status == CatchStatus.CDrop:
# 什么都不做,等待 Catch 模块完成动作并更新状态
return
# c. 投料完成 (COk): 处理后续逻辑并移动到下一个点
if self.catch.catch_status == CatchStatus.COk:
# 重置 Catch 状态,为下一次操作做准备
self.catch.catch_status = CatchStatus.CNone
# (后续增加) 延时: 让物料稳定
# time.sleep(self.robotClient.time_delay_put)
# (后续增加) 视觉确认: 拍照确认袋子已放置
# self.detection.get_position(...)
# self.feedConfig.feedLine.set_take_position(...)
# 码垛数量增加
self.feedConfig.remain_count = self.feedConfig.remain_count + 1
# self.feedConfig.num = self.feedConfig.num - 1
self.log_signal.emit(logging.INFO, f'{Constant.str_feed_feed_num}{self.feedConfig.remain_count}')
# 5. *** 关键步骤 ***: 移动到路径中的下一个点
# next_position() 会根据当前的 feed_Mid_Status (FMid_Feed)
# 调用 next_Feed()。
self.next_position()
elif self.feedStatus == FeedStatus.FDropReset:
if real_position.compare(self.get_current_position().get_position()):
self.log_signal.emit(logging.INFO, Constant.str_feed_drop_reset)
self.next_position()
elif self.feedStatus == None:
print(self.feedStatus)
pass
def run_reset(self):
real_position = Real_Position()
real_position.init_position_joint_and_world(self.robotClient.status_model.world_0,
self.robotClient.status_model.world_1,
self.robotClient.status_model.world_2,
self.robotClient.status_model.world_3,
self.robotClient.status_model.world_4,
self.robotClient.status_model.world_5,
self.robotClient.status_model.axis_0,
self.robotClient.status_model.axis_1,
self.robotClient.status_model.axis_2,
self.robotClient.status_model.axis_3,
self.robotClient.status_model.axis_4,
self.robotClient.status_model.axis_5)
if self.reset_status == ResetStatus.RNone:
return
if self.reset_status == ResetStatus.RStart:
if self.feedConfig == None: return
# for index in range(len(self.feedConfig.feedLine.positions)):
# if self.feedConfig.feedLine.positions[index].status == 2:
# start_index = index
self.pos_index = -1
self.pos_near_index = -1
self.reversed_positions = []
for index, pos_model in enumerate(self.feedConfig.feedLine.feed_positions):
if real_position.compare(pos_model.get_position(),is_action=True):
self.pos_index = index
break
if self.pos_index == -1:
self.log_signal.emit(logging.ERROR, Constant.str_feed_return_original_position_fail)
min_distance = 99999999
for index, pos_model in enumerate(self.feedConfig.feedLine.feed_positions):
if get_distance(pos_model.get_position(), real_position) < min_distance:
min_distance = get_distance(pos_model.get_position(), real_position)
self.pos_near_index = index
if self.pos_near_index != -1:
self.reversed_positions = self.feedConfig.feedLine.feed_positions[:self.pos_near_index + 1]
else:
return False
else:
self.reversed_positions = self.feedConfig.feedLine.feed_positions[:self.pos_index+1]
self.reversed_positions = list(reversed(self.reversed_positions))
self.reverse_index = 0
self.send_emergency_sound()
self.current_position = PositionModel()
self.current_position.init_position(real_position)
self.reset_status = ResetStatus.RRunging
if self.reset_status == ResetStatus.RRunging:
if not real_position.compare(self.current_position.get_position(),is_action=True):
return
pos_model = self.reversed_positions[self.reverse_index]
if pos_model.status == FeedStatus.FTake.value: # 跳过取袋节点
pos_model = self.reversed_positions[self.reverse_index + 1]
self.reverse_index = self.reverse_index+1
if pos_model.lineType == LineType.CureMid.value:
pos_model1 = self.reversed_positions[self.reverse_index + 1]
self.sendTargPosition(real_position=pos_model.get_position(), move_type=MoveType.Cure,
real_position1=pos_model1.get_position(), speed=self.robotClient.reset_speed)
self.current_position = pos_model1
self.reverse_index = self.reverse_index + 2
elif pos_model.lineType==LineType.WORLD.value:
#关节移动
self.sendTargPosition(real_position=pos_model.get_position(), speed=self.robotClient.reset_speed,move_type=MoveType.AXIS)
self.current_position = pos_model
self.reverse_index = self.reverse_index + 1
else:
self.sendTargPosition(real_position=pos_model.get_position(), speed=self.robotClient.reset_speed)
self.current_position = pos_model
self.reverse_index = self.reverse_index + 1
if self.reverse_index == len(self.reversed_positions):
self.reset_status = ResetStatus.ROk
if self.reset_status == ResetStatus.ROk:
self.reset_status = ResetStatus.RNone
self.send_emergency_stop()
def return_original_position(self):
self.run_reverse = True
real_position = Real_Position()
real_position.init_position_joint_and_world(self.robotClient.status_model.world_0,
self.robotClient.status_model.world_1,
self.robotClient.status_model.world_2,
self.robotClient.status_model.world_3,
self.robotClient.status_model.world_4,
self.robotClient.status_model.world_5,
self.robotClient.status_model.axis_0,
self.robotClient.status_model.axis_1,
self.robotClient.status_model.axis_2,
self.robotClient.status_model.axis_3,
self.robotClient.status_model.axis_4,
self.robotClient.status_model.axis_5)
if self.feedConfig == None: return
start_index = -1
# for index in range(len(self.feedConfig.feedLine.positions)):
# if self.feedConfig.feedLine.positions[index].status == 2:
# start_index = index
pos_index = -1
pos_near_index = -1
reversed_positions = []
for index, pos_model in enumerate(self.feedConfig.feedLine.feed_positions):
if real_position.compare(pos_model.get_position()):
pos_index = index
break
if pos_index == -1:
self.log_signal.emit(logging.ERROR, Constant.str_feed_return_original_position_fail)
min_distance = 99999999
for index, pos_model in enumerate(self.feedConfig.feedLine.feed_positions):
if get_distance(pos_model.get_position(), real_position)<min_distance:
min_distance = get_distance(pos_model.get_position(), real_position)
pos_near_index = index
if pos_near_index != -1:
reversed_positions = self.feedConfig.feedLine.feed_positions[:pos_near_index+1]
else:
return False
else:
reversed_positions = self.feedConfig.feedLine.feed_positions[:pos_index]
reversed_positions = list(reversed(reversed_positions))
self.reverse_index = 0
self.send_emergency_sound()
current_position = PositionModel()
current_position.init_position(real_position)
while self.run_reverse and self.reverse_index!=len(reversed_positions):
if self.reverse_index!=0 and not real_position.compare(current_position.get_position()):
continue
#todo 缺少比对
pos_model = reversed_positions[self.reverse_index]
if pos_model.status == FeedStatus.FTake.value: # 跳过取袋节点
pos_model = reversed_positions[self.reverse_index + 1]
# TODO take节点判断
if pos_model.lineType == LineType.CureMid.value:
pos_model1 = reversed_positions[self.reverse_index + 1]
self.sendTargPosition(real_position=pos_model.get_position(), move_type=MoveType.Cure,
real_position1=pos_model1.get_position(),speed= self.robotClient.reset_speed)
current_position = pos_model1
self.reverse_index = self.reverse_index+2
else:
self.sendTargPosition(real_position=pos_model.get_position(),speed=self.robotClient.reset_speed)
current_position = pos_model
self.reverse_index = self.reverse_index + 1
self.send_emergency_stop()
return True
def send_emergency_sound(self):
self.sendIOControl(Constant.IO_EmergencyPoint, 1)
def send_emergency_stop(self):
self.sendIOControl(Constant.IO_EmergencyPoint, 0)
def sendIOControl(self, IO_bit, IO_Status: int):
IO_command = CMDInstructRequest()
io_instruction = Instruction()
io_instruction.IO = True
io_instruction.io_status = IO_Status
io_instruction.point = IO_bit # {"dsID":"HCRemoteCommand","reqType":"AddRCC","emptyList":"1","instructions":[{"oneshot":"1","action":"200","type":"0","io_status":"1","point":"15","delay":"0"}]}
IO_command.dsID = 'HCRemoteCommand'
IO_command.instructions.append(io_instruction)
self.robotClient.add_sendQuene(IO_command.toString())
self.log_signal.emit(logging.INFO, f'{Constant.str_feed_io_control}{IO_bit}{IO_Status}')
pass
def sendTargPosition1(self, real_position, move_type: MoveType = MoveType.WORLD, speed=5,real_position1=None):
pass
def sendTargPosition(self, real_position, move_type: MoveType = MoveType.WORLD, speed=5,real_position1=None):
position_instruction = Instruction()
position_instruction.speed = speed
position_instruction.m0 = real_position.X
position_instruction.m1 = real_position.Y
position_instruction.m2 = real_position.Z
position_instruction.m3 = real_position.U
position_instruction.m4 = real_position.V
position_instruction.m5 = real_position.W
position_instruction.smooth = self.robotClient.smooth
position_instruction.action = move_type.value
if position_instruction.action == 17:
position_instruction.m0_p = real_position1.X
position_instruction.m1_p = real_position1.Y
position_instruction.m2_p = real_position1.Z
position_instruction.m3_p = real_position1.U
position_instruction.m4_p = real_position1.V
position_instruction.m5_p = real_position1.W
instruction_command = CMDInstructRequest()
instruction_command.instructions.append(position_instruction)
request_command = instruction_command.toString()
log_str = f'移动到位置:{ "姿势直线" if move_type==MoveType.WORLD else "姿势曲线"}' \
f'X:{position_instruction.m0}-' \
f'Y:{position_instruction.m1}-' \
f'Z:{position_instruction.m2}-' \
f'U:{position_instruction.m3}-' \
f'V:{position_instruction.m4}-' \
f'W:{position_instruction.m5}'
try:
self.log_signal.emit(logging.INFO, log_str)
print(log_str)
except:
return
print(request_command)
self.robotClient.add_sendQuene(request_command)
pass
# def get_take_position(self):
# if Constant.Debug:
# return self.robotClient.status_model.getRealPosition()
# _, img, xyz, uvw, points = self.detection.get_position(Point_isVision=False, Box_isPoint=True, First_Depth=True, Iter_Max_Pixel=30,save_img_point=0, Height_reduce=30, width_reduce=30)
# self.detection_image = img.copy()
# if xyz ==None or uvw==None or points==None:
# return None
# target_position,noraml_base = getPosition(*xyz,*uvw,None,points)
#
# position = Real_Position().init_position(*target_position[:3],*noraml_base[:3])
# position.Z = position.Z+200
# return position
def next_start(self,reverse=False):
start_pos = self.feedConfig.feedLine.get_next_start_position(reverse)
self.feedStatus = FeedStatus(start_pos.status) if self.feedStatus != FeedStatus.FNone else FeedStatus.FNone
if start_pos.lineType == LineType.CureMid.value:
start_pos1 = self.feedConfig.feedLine.get_next_start_position(reverse)
self.feedStatus = FeedStatus(start_pos1.status) if self.feedStatus != FeedStatus.FNone else FeedStatus.FNone
self.sendTargPosition(real_position=start_pos.get_position(),speed=self.robotClient.reset_speed, move_type=MoveType.Cure, real_position1=start_pos1.get_position())
elif start_pos.lineType == LineType.WORLD.value:
self.feedStatus = FeedStatus(start_pos1.status) if self.feedStatus != FeedStatus.FNone else FeedStatus.FNone
self.sendTargPosition(real_position=start_pos.get_position(), speed=self.robotClient.reset_speed,
move_type=MoveType.AXIS)
# if self.robotClient.status_model.getAnglePosition().is_error_angel_move(start_pos.get_position(),self.robotClient.max_angle_interval):
# self.feedStatus = None
# self.log_signal.emit(logging.ERROR, Constant.str_feed_angle_error)
# else:
# self.sendTargPosition(real_position=start_pos.get_position(),speed=self.robotClient.reset_speed,move_type=MoveType.AXIS)
else:
self.sendTargPosition(real_position=start_pos.get_position(),speed=self.robotClient.reset_speed)
pass
def next_take(self,reverse=False):
take_pos = self.feedConfig.feedLine.get_next_take_position(reverse)
self.feedStatus = FeedStatus(take_pos.status) if self.feedStatus != FeedStatus.FNone else FeedStatus.FNone
if take_pos.lineType == LineType.CureMid.value:
take_pos1 = self.feedConfig.feedLine.get_next_take_position(reverse)
self.feedStatus = FeedStatus(take_pos1.status) if self.feedStatus != FeedStatus.FNone else FeedStatus.FNone
self.sendTargPosition(real_position=take_pos.get_position(), move_type=MoveType.Cure, real_position1=take_pos1.get_position(),speed=self.robotClient.feed_speed)
elif take_pos.lineType == LineType.WORLD.value:
take_pos1 = self.feedConfig.feedLine.get_next_take_position(reverse)
self.feedStatus = FeedStatus(take_pos1.status) if self.feedStatus != FeedStatus.FNone else FeedStatus.FNone
self.sendTargPosition(real_position=take_pos.get_position(), speed=self.robotClient.feed_speed,
move_type=MoveType.AXIS)
# if self.robotClient.status_model.getAnglePosition().is_error_angel_move(take_pos.get_position(),
# self.robotClient.max_angle_interval):
# self.feedStatus = None
# self.log_signal.emit(logging.ERROR, Constant.str_feed_angle_error)
# else:
# self.sendTargPosition(real_position=take_pos.get_position(), speed=self.robotClient.feed_speed,
# move_type=MoveType.AXIS)
else:
self.sendTargPosition(real_position=take_pos.get_position(),speed=self.robotClient.feed_speed)
pass
def next_Feed(self,reverse=False):
feed_pos = self.feedConfig.feedLine.get_next_feed_position(reverse)
self.feedStatus = FeedStatus(feed_pos.status) if self.feedStatus != FeedStatus.FNone else FeedStatus.FNone
if feed_pos.lineType == LineType.CureMid.value:
feed_pos1 = self.feedConfig.feedLine.get_next_feed_position(reverse)
self.feedStatus = FeedStatus(feed_pos1.status) if self.feedStatus != FeedStatus.FNone else FeedStatus.FNone
self.sendTargPosition(real_position=feed_pos.get_position(), move_type=MoveType.Cure, real_position1=feed_pos1.get_position(),speed=self.robotClient.feed_speed)
elif feed_pos.lineType == LineType.WORLD.value:
# 暂时不考虑世界坐标到关节坐标的转换,强行不判断接近
# feed_pos1 = self.feedConfig.feedLine.get_next_feed_position(reverse)
# self.feedStatus = FeedStatus(feed_pos1.status) if self.feedStatus != FeedStatus.FNone else FeedStatus.FNone
self.sendTargPosition(real_position=feed_pos.get_position(), speed=self.robotClient.feed_speed,
move_type=MoveType.AXIS)
# 注释掉 by chuyiwen
# if self.robotClient.status_model.getAnglePosition().is_error_angel_move(feed_pos.get_position(),
# self.robotClient.max_angle_interval):
# self.feedStatus = FeedStatus.FNone
# self.log_signal.emit(logging.ERROR, Constant.str_feed_angle_error)
# else:
# self.sendTargPosition(real_position=feed_pos.get_position(), speed=self.robotClient.feed_speed,
# move_type=MoveType.AXIS)
else:
# if not reverse and self.feedStatus == FeedStatus.FShake:
# if not reverse:
# if self.cRis_shake.Q(self.feedStatus == FeedStatus.FShake, True):
# pass
# elif self.cRis_shake.Q(self.feedStatus == FeedStatus.FShake, False):
# self.feedStatus = FeedStatus.FShake
# self.sendTargPosition(real_position=feed_pos.get_position(),speed=self.robotClient.feed_speed)
# return
self.sendTargPosition(real_position=feed_pos.get_position(),speed=self.robotClient.feed_speed)
def get_current_position(self,is_reverse=False):
if self.feed_Mid_Status == FeedMidStatus.FMid_Start:
return self.feedConfig.feedLine.get_current_start_position(self.is_reverse)
elif self.feed_Mid_Status == FeedMidStatus.FMid_Take:
return self.feedConfig.feedLine.get_current_take_position(self.is_reverse)
elif self.feed_Mid_Status == FeedMidStatus.FMid_Feed:
return self.feedConfig.feedLine.get_current_feed_position(self.is_reverse)
def next_position(self,reverse=False):
if self.feed_Mid_Status == FeedMidStatus.FMid_Start:
self.next_start(reverse)
if Constant.Debug1:
print('next_start')
elif self.feed_Mid_Status == FeedMidStatus.FMid_Take:
self.next_take(reverse)
if Constant.Debug1:
print('next_take')
elif self.feed_Mid_Status == FeedMidStatus.FMid_Feed:
self.next_Feed(reverse)
if Constant.Debug1:
print('next_feed')
def safe_check_columns(self):
return True
pass
def safe_check_person(self):
return True
pass