import copy import logging import random import threading import time import cv2 import numpy as np from PyQt5.QtWidgets import QMessageBox from PySide6.QtCore import Signal, QObject from PySide6.QtGui import QPixmap import Constant import Expection from CU.Catch import Catch, CatchStatus from CU.Detect import Detect, DetectStatus from Model.FeedModel import PositionModel from Model.Position import Real_Position, Detection_Position from enum import Enum, IntEnum from COM.COM_Robot import RobotClient, DetectType from Model.RobotModel import CMDInstructRequest, MoveType from Trace.handeye_calibration import getPosition from Util.util_math import get_distance from Util.util_time import CRisOrFall from Vision.camera_coordinate_dete import Detection from Util.util_log import log from Model.RobotModel import Instruction class ResetStatus(Enum): RNone = 0 RStart = 1 RRunging = 2 ROk =3 class FeedStatus(IntEnum): FNone = 0 FStart = 1 FCheck = 2 FMid = 3 FPhoto = 4 FTake = 5 FBroken1 = 6 FBroken2 =7 FShake = 8 FDropBag = 9 FFinished = 10 FReverse = 11 FStartReverse = 12 class LineType(Enum): Straight = 0 CureMid = 2 CureEnd = 3 WORLD = 4 class FeedMidStatus(Enum): FMid_Start = 1 FMid_Take = 2 FMid_Feed= 3 class FeedPosition: def __init__(self,status:FeedStatus,position:Real_Position): self.status = status self.position = position class FeedLine: def __init__(self, id, name, feed_positions:list): self.feed_positions = copy.deepcopy(feed_positions) self.feeding2end_pos_index = 0 self.origin2start_pos_index = 0 self.start2take_pos_index = 0 self.name = name self.id = id def get_current_feed_position(self,is_reverse): pos = self.feeding_to_end[ (self.feeding2end_pos_index+1)%len(self.feeding_to_end) if is_reverse else self.feeding2end_pos_index-1] return pos def get_current_take_position(self,is_reverse): pos = self.start_to_take[ (self.start2take_pos_index+1)%len(self.start_to_take) if is_reverse else self.start2take_pos_index-1] return pos def get_current_start_position(self,is_reverse): pos = self.origin_to_start[(self.origin2start_pos_index+1)%len(self.origin_to_start) if is_reverse else self.origin2start_pos_index-1] return pos def get_next_feed_position(self,reverse:bool=False): pos = self.feeding_to_end[self.feeding2end_pos_index] if reverse: self.feeding2end_pos_index -= 1 if self.feeding2end_pos_index < 0: self.feeding2end_pos_index = len(self.feeding_to_end) - 1 else: self.feeding2end_pos_index += 1 if self.feeding2end_pos_index >= len(self.feeding_to_end): self.feeding2end_pos_index = 0 return pos def get_next_start_position(self,reverse:bool=False): pos = self.origin_to_start[self.origin2start_pos_index] if reverse: self.origin2start_pos_index -= 1 if self.origin2start_pos_index < 0: self.origin2start_pos_index = len(self.origin_to_start) - 1 else: self.origin2start_pos_index += 1 if self.origin2start_pos_index >= len(self.origin_to_start): self.origin2start_pos_index = 0 return pos def get_next_take_position(self,reverse:bool=False): pos = self.start_to_take[self.start2take_pos_index] if reverse: self.start2take_pos_index -= 1 if self.start2take_pos_index < 0: self.start2take_pos_index = len(self.start_to_take) - 1 else: self.start2take_pos_index += 1 if self.start2take_pos_index >= len(self.start_to_take): self.start2take_pos_index = 0 return pos def get_take_position(self): for i in range(len(self.feeding_to_end)): if self.feeding_to_end[i].status == FeedStatus.FTake.value: return self.feeding_to_end[i] def set_take_position(self,position:Real_Position,dynamic_height=0): for i in range(len(self.feeding_to_end)): if self.feeding_to_end[i].status == FeedStatus.FTake.value: if position != None: befor_take_position = Real_Position().init_position(position.X, position.Y, position.Z+dynamic_height, position.U, position.V, position.W) after_take_position = Real_Position().init_position(position.X, position.Y, position.Z+dynamic_height, position.U, position.V, position.W) self.feeding_to_end[i - 1].set_position(befor_take_position) self.feeding_to_end[i + 1].set_position(after_take_position) self.feeding_to_end[i].set_position(position) def get_position_list(self): index_start = -1 for i in range(len(self.feed_positions)): if self.feed_positions[i].status == FeedStatus.FCheck.value: index_start = i break for i in range(len(self.feed_positions)): if self.feed_positions[i].status == FeedStatus.FPhoto.value: index_take = i self.origin_to_start = self.feed_positions[: index_start+1] self.start_to_take = self.feed_positions[index_start:index_take+1] self.feeding_to_end = self.feed_positions[index_take:] for i in range(len(self.feeding_to_end)): #插入动态中间点 if self.feeding_to_end[i].status == FeedStatus.FTake.value: befor_position_model = PositionModel() befor_position_model.init_position(None) after_position_model = PositionModel() after_position_model.init_position(None) self.feeding_to_end.insert(i, befor_position_model) self.feeding_to_end.insert(i+2, after_position_model) break class FeedingConfig: def __init__(self, num: int, feedLine: FeedLine, photo_locs): self.num = num self.feedLine = feedLine self.photo_locs = [self.deal_photo_locs(p) for p in photo_locs] def deal_photo_locs(self, photo_loc): position_photo = Real_Position() position_photo.init_position(photo_loc[0], photo_loc[1], photo_loc[2], photo_loc[3], photo_loc[4], photo_loc[5]) return position_photo def get_line_info(self): pass class Feeding(QObject): need_origin_signal = Signal(str) take_no_photo_sigal = Signal() update_detect_image = Signal(np.ndarray) log_signal = Signal(int,str) def __init__(self, robotClient: RobotClient): super().__init__() self.feedConfig = None self.feedStatus = FeedStatus.FNone self.robotClient = robotClient self.detection_image = None self.init_detection_image() self.pause = False self.cRis_photo = CRisOrFall() self.cRis_shake = CRisOrFall() self.feed_Mid_Status = FeedMidStatus.FMid_Start self.is_reverse = False # 复位集合 self.run_reverse = False self.take_no_photo = False self.reset_status = ResetStatus.RNone self.reversed_positions = [] self.current_position = None self.index=1 self.pos_index = -1 self.pos_near_index = -1 self.catch = Catch(self.robotClient) self.detect = Detect() self.is_detected = True self.detect_thread = threading.Thread(target=self.run_detect) self.detect_thread.start() self.onekey = False pass def close_feed(self): self.is_detected = False self.detect_thread.join() self.detect.detection.release() def init_detection_image(self): detection_image = cv2.imread(Constant.feed_sign_path) self.update_detect_image.emit(detection_image) def run_detect(self): while self.is_detected: self.detect.run() time.sleep(0.02) def run(self): self.catch.run() # 获取事件坐标 real_position = Real_Position() real_position.init_position(self.robotClient.status_model.world_0, self.robotClient.status_model.world_1, self.robotClient.status_model.world_2, self.robotClient.status_model.world_3, self.robotClient.status_model.world_4, self.robotClient.status_model.world_5) # real_position.init_position(0, # 0, # 0, # 0, # 0, # 0); # img_path = f'Image/{self.index}.png' # img = cv2.imread(img_path) # self.index += 1 # self.index = self.index % 4 # self.detection_image = img if self.feedConfig == None: self.feedStatus = FeedStatus.FNone if self.feedConfig !=None and self.feedConfig.num == 0 and self.is_reverse and self.robotClient.origin_position.compare(real_position,is_action=True): self.feedStatus = FeedStatus.FNone self.is_reverse = False self.log_signal.emit(logging.INFO, Constant.str_feed_reverse) if self.feedStatus == FeedStatus.FNone or self.pause: return elif self.feedStatus == FeedStatus.FCheck: self.log_signal.emit(logging.INFO, Constant.str_feed_check) # 1, 检查是否是三列 # 2, 检查是否有人 # if self.safe_check_columns() and self.safe_check_person(): # pass # else: # if self.feedConfig.num != 0: # self.next_target() # if == 原点 继续判断 # else: # QMessageBox.information(None, "提示", Constant.str_feed_safe_error_msgbox) if self.is_reverse: self.feed_Mid_Status = FeedMidStatus.FMid_Start else: self.feed_Mid_Status = FeedMidStatus.FMid_Take if self.feedConfig.feedLine.start_to_take[0].get_position().compare(real_position): self.next_position(self.is_reverse) elif self.feedStatus == FeedStatus.FStart: self.log_signal.emit(logging.INFO, Constant.str_feed_start) if not self.robotClient.origin_position.compare(real_position,is_action=True) and not self.is_reverse: # QMessageBox.information(None, "提示", Constant.str_feed_start_error) # Fuck 引起异常 self.log_signal.emit(logging.ERROR, Constant.str_feed_start_error) self.need_origin_signal.emit(Constant.str_feed_start_error) self.feedStatus = FeedStatus.FNone return if self.is_reverse: self.feedStatus = FeedStatus.FNone self.is_reverse = False return self.feedConfig.feedLine.get_position_list() self.detect.detect_status = DetectStatus.DNone self.feed_Mid_Status = FeedMidStatus.FMid_Start self.next_position(self.is_reverse) elif self.feedStatus == FeedStatus.FMid: feed_pos = self.get_current_position(self.is_reverse) if feed_pos.get_position().compare(real_position): self.log_signal.emit(logging.INFO, Constant.str_feed_mid) self.next_position(self.is_reverse) elif self.feedStatus == FeedStatus.FPhoto: self.log_signal.emit(logging.INFO, Constant.str_feed_photo) if self.feedConfig.num == 0: self.log_signal.emit(logging.INFO, Constant.str_feed_finish) self.is_reverse = True self.feed_Mid_Status = FeedMidStatus.FMid_Take self.feedConfig.feedLine.start2take_pos_index = len(self.feedConfig.feedLine.start_to_take) -2 self.feedConfig.feedLine.origin2start_pos_index = len(self.feedConfig.feedLine.origin_to_start) - 2 self.next_position(self.is_reverse) self.init_detection_image() return if self.robotClient.type_detection == DetectType.EyeOutHand: self.feed_Mid_Status = FeedMidStatus.FMid_Feed if self.detect.detect_status == DetectStatus.DNone: self.detect.detect_status = DetectStatus.DDetect elif self.detect.detect_status == DetectStatus.DOk: if Constant.Debug: if self.feedConfig.num == 990 and self.onekey: self.feedConfig.num = 0 self.log_signal.emit(logging.ERROR, Constant.str_feed_finish) return self.detect.detect_status = DetectStatus.DNone self.feedConfig.feedLine.set_take_position(real_position) self.next_position() return if self.detect.detect_position != None: self.log_signal.emit(logging.INFO, Constant.str_feed_takePhoto_success) self.feedConfig.feedLine.set_take_position(self.detect.detect_position,dynamic_height=self.robotClient.dynamic_height) self.update_detect_image.emit(self.detect.detection_image) self.next_position() self.detect.detect_status = DetectStatus.DNone else: if self.onekey: self.feedConfig.num = 0 self.log_signal.emit(logging.ERROR, Constant.str_feed_finish) else: self.log_signal.emit(logging.ERROR, Constant.str_feed_takePhoto_fail) self.take_no_photo_sigal.emit() return detect_pos_list = [] if not Constant.Debug: try: from Util.util_time import CRisOrFall if self.cRis_photo.Q(self.error_photo_count >= 5, True): QMessageBox.information(None, "提示", Constant.str_feed_photo_error_msgbox) self.error_photo_count = 0 self.log_signal.emit(logging.INFO, Constant.str_feed_photo_confirm) # TODO 返回初始状态 for pos in self.feedConfig.photo_locs: self.sendTargPosition(pos) while not pos.compare(real_position): # 可以优化 TODO if self.feedStatus == FeedStatus.FNone or not self.pause: return time.sleep(0.1) code, img, xyz, uvw, mng = self.detection.get_position() # 检测结果 self.detection_image = img if xyz != None: self.log_signal.emit(logging.INFO, Constant.str_feed_takePhoto_success) # dp = Detection_Position().init_position(*xyz, *uvw) from Trace.handeye_calibration import R_matrix, getPosition rotation = R_matrix(self.robotClient.status_model.world_0, self.robotClient.status_model.world_1, self.robotClient.status_model.world_2, self.robotClient.status_model.world_3, self.robotClient.status_model.world_4, self.robotClient.status_model.world_5) # 黄老师给我的xyz和法向量 target_position, noraml_base = getPosition(*xyz, *uvw, rotation, *mng) detect_pos_list.append(Real_Position().init_position(*target_position[:3], *noraml_base)) self.log_signal.emit(logging.INFO, Constant.str_feed_takePhoto_success) else: self.log_signal.emit(logging.ERROR, Constant.str_feed_takePhoto_fail + real_position.to_string()) z_diff, max_z_index = (lambda pts: ( max(pts, key=lambda p: p.Z).Z - min(pts, key=lambda p: p.Z).Z, pts.index(max(pts, key=lambda p: p.Z)) ))(detect_pos_list) if len(self.feedConfig.photo_locs) == 5: if z_diff < Constant.bag_height and len( detect_pos_list) == 3: # 第一次检测到没有高度差距开始三列拍照 TODO 保留全部的开关 # 拍照位置从五个变为三个 self.feedConfig.photo_locs = [detect_pos_list[0], detect_pos_list[2], detect_pos_list[4]] take_position = detect_pos_list[0] self.log_signal.emit(logging.INFO, Constant.str_feed_takePhoto_front_finish) else: take_position = detect_pos_list[max_z_index] self.log_signal.emit(logging.INFO, Constant.str_feed_takePhoto_front) else: if z_diff < Constant.bag_height: take_position = detect_pos_list[0] self.log_signal.emit(logging.INFO, Constant.str_feed_takePhoto_new_line) else: take_position = detect_pos_list[max_z_index] self.log_signal.emit(logging.INFO, Constant.str_feed_takePhoto_line) self.feedConfig.feedLine.set_take_position(take_position) ##TODO 检查有没有异常 self.next_position() except: self.log_signal.emit(logging.ERROR, Constant.str_feed_takePhoto_fail) self.error_photo_count += 1 else: self.feedConfig.feedLine.set_take_position(real_position) ##TODO 检查有没有异常 self.next_position() self.log_signal.emit(logging.INFO, Constant.str_feed_takePhoto_move) elif self.feedStatus == FeedStatus.FTake: self.log_signal.emit(logging.INFO, Constant.str_feed_take) if self.feedConfig.feedLine.get_take_position().get_position() != None: # self.take_no_photo = False if self.feedConfig.feedLine.get_take_position().get_position().compare(real_position, is_action=True): # 打开吸嘴并返回 # self.sendIOControl(self.robotClient.con_ios[0], 1) # self.sendIOControl(self.robotClient.con_ios[1], 1) # self.sendIOControl(self.robotClient.con_ios[2], 1) self.log_signal.emit(logging.INFO, "到达抓料点位") if self.catch.catch_status == CatchStatus.CNone : self.catch.catch_status = CatchStatus.CTake return if self.catch.catch_status == CatchStatus.CTake: return if self.catch.catch_status == CatchStatus.COk : self.catch.catch_status = CatchStatus.CNone #self.feedConfig.feedLine.set_take_position(None) # time.sleep(self.robotClient.time_delay_take) self.log_signal.emit(logging.INFO, Constant.str_feed_take_success) self.next_position() else: self.log_signal.emit(logging.ERROR, Constant.str_feed_takePhoto_fail) # self.take_no_photo = True # 继续获取图像 # TODO # self.feedConfig.feedLine.set_take_position(self.get_take_position()) elif self.feedStatus == FeedStatus.FBroken1: if self.get_current_position().get_position().compare(real_position): self.log_signal.emit(logging.INFO, Constant.str_feed_broken) self.next_position() elif self.feedStatus == FeedStatus.FBroken2: if self.get_current_position().get_position().compare(real_position): self.log_signal.emit(logging.INFO, Constant.str_feed_broken) self.next_position() elif self.feedStatus == FeedStatus.FShake: if self.get_current_position().get_position().compare(real_position,is_action=True): # TODO 震动方案 self.log_signal.emit(logging.INFO, Constant.str_feed_shake) if self.catch.catch_status == CatchStatus.CNone: self.catch.catch_status = CatchStatus.CShake return if self.catch.catch_status == CatchStatus.CShake: return if self.catch.catch_status == CatchStatus.COk: self.catch.catch_status = CatchStatus.CNone self.next_position() elif self.feedStatus == FeedStatus.FDropBag: if self.get_current_position().get_position().compare(real_position,is_action=True): # self.sendIOControl(self.robotClient.con_ios[0], 0) # self.sendIOControl(self.robotClient.con_ios[1], 0) # self.sendIOControl(self.robotClient.con_ios[2], 0) self.log_signal.emit(logging.INFO, Constant.str_feed_drop) if self.catch.catch_status == CatchStatus.CNone: self.catch.catch_status = CatchStatus.CDrop return if self.catch.catch_status == CatchStatus.CDrop: return if self.catch.catch_status == CatchStatus.COk: self.catch.catch_status = CatchStatus.CNone # time.sleep(self.robotClient.time_delay_put) # self.detection.get_position(Point_isVision=False, Box_isPoint=True, First_Depth=True, Iter_Max_Pixel=30, save_img_point=0, Height_reduce=30, width_reduce=30) # self.feedConfig.feedLine.set_take_position(self.get_take_position()) if self.detect.detect_status == DetectStatus.DNone: self.detect.detect_status = DetectStatus.DDetect self.feedConfig.num = self.feedConfig.num - 1 self.log_signal.emit(logging.INFO, f'{Constant.str_feed_feed_num}{self.feedConfig.num}') self.next_position() def run_reset(self): real_position = Real_Position() real_position.init_position(self.robotClient.status_model.world_0, self.robotClient.status_model.world_1, self.robotClient.status_model.world_2, self.robotClient.status_model.world_3, self.robotClient.status_model.world_4, self.robotClient.status_model.world_5) if self.reset_status == ResetStatus.RNone: return if self.reset_status == ResetStatus.RStart: if self.feedConfig == None: return # for index in range(len(self.feedConfig.feedLine.positions)): # if self.feedConfig.feedLine.positions[index].status == 2: # start_index = index self.pos_index = -1 self.pos_near_index = -1 self.reversed_positions = [] for index, pos_model in enumerate(self.feedConfig.feedLine.feed_positions): if pos_model.get_position().compare(real_position,is_action=True): self.pos_index = index break if self.pos_index == -1: self.log_signal.emit(logging.ERROR, Constant.str_feed_return_original_position_fail) min_distance = 99999999 for index, pos_model in enumerate(self.feedConfig.feedLine.feed_positions): if get_distance(pos_model.get_position(), real_position) < min_distance: min_distance = get_distance(pos_model.get_position(), real_position) self.pos_near_index = index if self.pos_near_index != -1: self.reversed_positions = self.feedConfig.feedLine.feed_positions[:self.pos_near_index + 1] else: return False else: self.reversed_positions = self.feedConfig.feedLine.feed_positions[:self.pos_index+1] self.reversed_positions = list(reversed(self.reversed_positions)) self.reverse_index = 0 self.send_emergency_sound() self.current_position = PositionModel() self.current_position.init_position(real_position) self.reset_status = ResetStatus.RRunging if self.reset_status == ResetStatus.RRunging: if not real_position.compare(self.current_position.get_position(),is_action=True): return pos_model = self.reversed_positions[self.reverse_index] if pos_model.status == FeedStatus.FTake.value: # 跳过取袋节点 pos_model = self.reversed_positions[self.reverse_index + 1] self.reverse_index = self.reverse_index+1 if pos_model.lineType == LineType.CureMid.value: pos_model1 = self.reversed_positions[self.reverse_index + 1] self.sendTargPosition(real_position=pos_model.get_position(), move_type=MoveType.Cure, real_position1=pos_model1.get_position(), speed=self.robotClient.reset_speed) self.current_position = pos_model1 self.reverse_index = self.reverse_index + 2 else: self.sendTargPosition(real_position=pos_model.get_position(), speed=self.robotClient.reset_speed) self.current_position = pos_model self.reverse_index = self.reverse_index + 1 if self.reverse_index == len(self.reversed_positions): self.reset_status = ResetStatus.ROk if self.reset_status == ResetStatus.ROk: self.reset_status = ResetStatus.RNone self.send_emergency_stop() def return_original_position(self): self.run_reverse = True real_position = Real_Position() real_position.init_position(self.robotClient.status_model.world_0, self.robotClient.status_model.world_1, self.robotClient.status_model.world_2, self.robotClient.status_model.world_3, self.robotClient.status_model.world_4, self.robotClient.status_model.world_5) if self.feedConfig == None: return start_index = -1 # for index in range(len(self.feedConfig.feedLine.positions)): # if self.feedConfig.feedLine.positions[index].status == 2: # start_index = index pos_index = -1 pos_near_index = -1 reversed_positions = [] for index, pos_model in enumerate(self.feedConfig.feedLine.feed_positions): if pos_model.get_position().compare(real_position): pos_index = index break if pos_index == -1: self.log_signal.emit(logging.ERROR, Constant.str_feed_return_original_position_fail) min_distance = 99999999 for index, pos_model in enumerate(self.feedConfig.feedLine.feed_positions): if get_distance(pos_model.get_position(), real_position)