import copy import logging import random import threading import time import cv2 import numpy as np from PyQt5.QtWidgets import QMessageBox from PySide6.QtCore import Signal, QObject from PySide6.QtGui import QPixmap import Constant import Expection from CU.Catch import Catch, CatchStatus from CU.Detect import Detect, DetectStatus from Model.FeedModel import PositionModel from Model.Position import Real_Position, Detection_Position from enum import Enum, IntEnum from COM.COM_Robot import RobotClient, DetectType from Model.RobotModel import CMDInstructRequest, MoveType from Trace.handeye_calibration import getPosition from Trace.handeye_calibration import getxyz,getxyz1 from Util.util_math import get_distance from Util.util_time import CRisOrFall #from Vision.camera_coordinate_dete import Detection from Util.util_log import log from Model.RobotModel import Instruction from EMV.EMV import RelayController class ResetStatus(Enum): RNone = 0 RStart = 1 RRunging = 2 ROk =3 class FeedStatus(IntEnum): FNone = 0 FStart = 1 FCheck = 2 FMid = 3 FPhoto = 4 FTake = 5 FBroken1 = 6 FBroken2 =7 FShake = 8 FDropBag = 9 FFinished = 10 FReverse = 11 FStartReverse = 12 class LineType(Enum): Straight = 0 CureMid = 2 CureEnd = 3 WORLD = 4 class FeedMidStatus(Enum): FMid_Start = 1 FMid_Take = 2 FMid_Feed= 3 class FeedPosition: def __init__(self,status:FeedStatus,position:Real_Position): self.status = status self.position = position class FeedLine: def __init__(self, id, name, feed_positions:list,drop_positons:list): self.feed_positions = copy.deepcopy(feed_positions) self.drop_positions = copy.deepcopy(drop_positons) self.feeding2end_pos_index = 0 self.origin2start_pos_index = 0 self.start2take_pos_index = 0 self.name = name self.id = id # --- 新增:用于存储从 ini 文件读取的多个投料点坐标 --- # 这个列表将在加载 ini 时填充 [[x1,y1,z1,u1,v1,w1], [x2,y2,z2,u2,v2,w2], ...] self.drop_point_list = [] def get_current_feed_position(self,is_reverse): pos = self.feeding_to_end[ (self.feeding2end_pos_index+1)%len(self.feeding_to_end) if is_reverse else self.feeding2end_pos_index-1] return pos def get_current_take_position(self,is_reverse): pos = self.start_to_take[ (self.start2take_pos_index+1)%len(self.start_to_take) if is_reverse else self.start2take_pos_index-1] return pos def get_current_start_position(self,is_reverse): pos = self.origin_to_start[(self.origin2start_pos_index+1)%len(self.origin_to_start) if is_reverse else self.origin2start_pos_index-1] return pos def get_next_feed_position(self,reverse:bool=False): pos = self.feeding_to_end[self.feeding2end_pos_index] if reverse: self.feeding2end_pos_index -= 1 if self.feeding2end_pos_index < 0: self.feeding2end_pos_index = len(self.feeding_to_end) - 1 else: self.feeding2end_pos_index += 1 if self.feeding2end_pos_index >= len(self.feeding_to_end): self.feeding2end_pos_index = 0 return pos def get_next_start_position(self,reverse:bool=False): pos = self.origin_to_start[self.origin2start_pos_index] if reverse: self.origin2start_pos_index -= 1 if self.origin2start_pos_index < 0: self.origin2start_pos_index = len(self.origin_to_start) - 1 else: self.origin2start_pos_index += 1 if self.origin2start_pos_index >= len(self.origin_to_start): self.origin2start_pos_index = 0 return pos def get_next_take_position(self,reverse:bool=False): pos = self.start_to_take[self.start2take_pos_index] if reverse: self.start2take_pos_index -= 1 if self.start2take_pos_index < 0: self.start2take_pos_index = len(self.start_to_take) - 1 else: self.start2take_pos_index += 1 if self.start2take_pos_index >= len(self.start_to_take): self.start2take_pos_index = 0 return pos def get_take_position(self): for i in range(len(self.feeding_to_end)): if self.feeding_to_end[i].status == FeedStatus.FTake.value: return self.feeding_to_end[i] def get_drop_position(self): for i in range(len(self.feeding_to_end)): if self.feeding_to_end[i].status == FeedStatus.FTake.value: return self.feeding_to_end[i] def set_take_position(self, position: Real_Position, dynamic_height=0): print("[调试] 开始设置抓取位置") print(f"传入的 position 坐标: X={position.X}, Y={position.Y}, Z={position.Z}, " f"a={position.a}, b={position.b}, c={position.c}, " f"U={position.U}, V={position.V}, W={position.W}") for i in range(len(self.feeding_to_end)): if self.feeding_to_end[i].status == FeedStatus.FTake.value: xyz = getxyz1(position.X, position.Y, position.Z, position.a, position.b, position.c) print(f"[调试] getxyz1 返回值: X={xyz[0]}, Y={xyz[1]}, Z={xyz[2]}") befor_take_position = Real_Position().init_position(xyz[0], xyz[1], xyz[2], position.U, position.V, position.W) print( f"[调试] 抓取前位置: X={befor_take_position.X}, Y={befor_take_position.Y}, Z={befor_take_position.Z}, " f"U={befor_take_position.U}, V={befor_take_position.V}, W={befor_take_position.W}") after_take_position = Real_Position().init_position(xyz[0], xyz[1], xyz[2], position.U, position.V, position.W) print( f"[调试] 抓取后位置: X={after_take_position.X}, Y={after_take_position.Y}, Z={after_take_position.Z}, " f"U={after_take_position.U}, V={after_take_position.V}, W={after_take_position.W}") self.feeding_to_end[i - 1].set_position(befor_take_position) self.feeding_to_end[i + 1].set_position(after_take_position) self.feeding_to_end[i].set_position(position) print(f"[调试] 当前抓取点已设置: X={position.X}, Y={position.Y}, Z={position.Z}, " f"U={position.U}, V={position.V}, W={position.W}") print("[调试] 抓取前后位置已设置完成") def set_drop_position(self, position: Real_Position): """ 设置 FDropBag 位置,只设置当前点,不处理前后点。 :param position: 新的丢包位置 """ for i in range(len(self.feeding_to_end)): if self.feeding_to_end[i].status == FeedStatus.FDropBag.value: # 直接设置当前点的位置 self.feeding_to_end[i].set_position(position) print( f"✅ FDropBag 位置已更新: ({position.X}, {position.Y}, {position.Z}, {position.U}, {position.V}, {position.W})") break # 假设只有一个丢包点 def get_position_list(self): index_start = -1 for i in range(len(self.feed_positions)): if self.feed_positions[i].status == FeedStatus.FCheck.value: index_start = i break for i in range(len(self.feed_positions)): if self.feed_positions[i].status == FeedStatus.FPhoto.value: index_take = i self.origin_to_start = self.feed_positions[: index_start+1] self.start_to_take = self.feed_positions[index_start:index_take+1] self.feeding_to_end = self.feed_positions[index_take:] for i in range(len(self.feeding_to_end)): #插入动态中间点 if self.feeding_to_end[i].status == FeedStatus.FTake.value: befor_position_model = PositionModel() befor_position_model.init_position(None) after_position_model = PositionModel() after_position_model.init_position(None) self.feeding_to_end.insert(i, befor_position_model) self.feeding_to_end.insert(i+2, after_position_model) break class FeedingConfig: def __init__(self, num: int, feedLine: FeedLine, photo_locs, drop_positions=None): self.num = num self.feedLine = feedLine self.photo_locs = [self.deal_photo_locs(p) for p in photo_locs] # --- 新增: 存储投料点列表 --- # 从UI界面读取 or 从txt文本读取 self.drop_positions = [self.deal_photo_locs(p) for p in drop_positions] if drop_positions else [] def deal_photo_locs(self, photo_loc): position_photo = Real_Position() position_photo.init_position(photo_loc[0], photo_loc[1], photo_loc[2], photo_loc[3], photo_loc[4], photo_loc[5]) return position_photo def get_line_info(self): pass class Feeding(QObject): need_origin_signal = Signal(str) take_no_photo_sigal = Signal() update_detect_image = Signal(np.ndarray) log_signal = Signal(int,str) def __init__(self, robotClient: RobotClient): super().__init__() self.feedConfig = None self.feedStatus = FeedStatus.FNone self.robotClient = robotClient # 添加 RelayController 实例 self.relay_controller = RelayController() self.sensor_thread = None self.detection_image = None # self.init_detection_image() self.pause = False self.cRis_photo = CRisOrFall() self.cRis_shake = CRisOrFall() self.feed_Mid_Status = FeedMidStatus.FMid_Start self.is_reverse = False # 复位集合 self.run_reverse = False self.take_no_photo = False self.reset_status = ResetStatus.RNone self.reversed_positions = [] self.current_position = None self.index=1 self.pos_index = -1 self.pos_near_index = -1 self.catch = Catch(self.robotClient) # self.detect = Detect() # self.is_detected = True # self.detect_thread = threading.Thread(target=self.run_detect) # self.detect_thread.start() self.onekey = False # self.debug_run_count = 0 # 初始化计数器 self.mid_take_count = 0 #传感器判断抓包参数 self.sensor2_ready = False # 传感器2是否检测到料包 self.motor_stopped_by_sensor2 = False # 是否由传感器2触发停止电机 self.sensor_thread = None self.relay_controller = RelayController() # 启动传感器2线程 self.relay_controller._running = True self.sensor2_thread = None # --- 新增: 用于码垛模式的投料点索引 --- self.current_drop_index = 0 def close_feed(self): pass # self.is_detected = False # self.detect_thread.join() # self.detect.detection.release() # def init_detection_image(self): # detection_image = cv2.imread(Constant.feed_sign_path) # self.update_detect_image.emit(detection_image) # def run_detect(self): # while self.is_detected: # self.detect.run() # time.sleep(0.02) def run(self): self.catch.run() # 获取事件坐标 real_position = Real_Position() # self.detect.position_index = 0 real_position.init_position(self.robotClient.status_model.world_0, self.robotClient.status_model.world_1, self.robotClient.status_model.world_2, self.robotClient.status_model.world_3, self.robotClient.status_model.world_4, self.robotClient.status_model.world_5) # real_position.init_position(0, # 0, # 0, # 0, # 0, # 0); # img_path = f'Image/{self.index}.png' # img = cv2.imread(img_path) # self.index += 1 # self.index = self.index % 4 # self.detection_image = img if self.feedConfig == None: self.feedStatus = FeedStatus.FNone if self.feedConfig != None and self.feedConfig.num == 0 and self.is_reverse and self.robotClient.origin_position.compare( real_position, is_action=True): self.feedStatus = FeedStatus.FNone self.is_reverse = False self.log_signal.emit(logging.INFO, Constant.str_feed_reverse) if self.feedStatus == FeedStatus.FNone or self.pause: return elif self.feedStatus == FeedStatus.FCheck: self.log_signal.emit(logging.INFO, Constant.str_feed_check) if self.is_reverse: self.feed_Mid_Status = FeedMidStatus.FMid_Start else: self.feed_Mid_Status = FeedMidStatus.FMid_Take if self.feedConfig.feedLine.start_to_take[0].get_position().compare(real_position): self.next_position(self.is_reverse) elif self.feedStatus == FeedStatus.FStart: self.log_signal.emit(logging.INFO, Constant.str_feed_start) self.relay_controller.open(conveyor2=True)#开电机 #self.sensor2_thread = threading.Thread(target=self.relay_controller.handle_sensor2, daemon=True)#线程2的开始,但是在那里设置结束呢 #self.sensor2_thread.start() if not self.robotClient.origin_position.compare(real_position,is_action=True) and not self.is_reverse: # QMessageBox.information(None, "提示", Constant.str_feed_start_error) # Fuck 引起异常 self.log_signal.emit(logging.ERROR, Constant.str_feed_start_error) self.need_origin_signal.emit(Constant.str_feed_start_error) self.feedStatus = FeedStatus.FNone return if self.is_reverse: self.feedStatus = FeedStatus.FNone self.is_reverse = False return self.feedConfig.feedLine.get_position_list() # self.detect.detect_status = DetectStatus.DNone self.feed_Mid_Status = FeedMidStatus.FMid_Start # --- 修改: 初始化投料点索引 --- self.current_drop_index = 0 self.next_position(self.is_reverse) elif self.feedStatus == FeedStatus.FMid: feed_pos = self.get_current_position(self.is_reverse) if feed_pos.get_position().compare(real_position): self.log_signal.emit(logging.INFO, Constant.str_feed_mid) self.next_position(self.is_reverse) # 增加计数器逻辑 self.mid_take_count += 1 # 可选:在 Debug1 模式下输出日志 if Constant.Debug1: self.log_signal.emit( logging.INFO, f"[调试计数] 已进入 FMid 分支 {self.mid_take_count} 次" ) if self.feedStatus == FeedStatus.FTake: self.catch.catch_status = CatchStatus.CTake elif self.feedStatus == FeedStatus.FPhoto: self.log_signal.emit(logging.INFO, "🟡 [码垛模式] 跳过拍照定位步骤") # 直接进入抓取状态或准备投料 # 所以在 FPhoto 后,应该移动到抓取点 take_position_model = self.feedConfig.feedLine.get_take_position() if take_position_model: self.log_signal.emit(logging.INFO, "[码垛模式] 准备移动到抓取点") self.sendTargPosition(take_position_model.get_position(), speed=self.robotClient.feed_speed) self.feedStatus = FeedStatus.FTake else: self.log_signal.emit(logging.ERROR, "[码垛模式] 无法获取抓取点位置!") self.feedStatus = FeedStatus.FNone return # 退出当前循环 elif self.feedStatus == FeedStatus.FTake: self.log_signal.emit(logging.INFO, Constant.str_feed_take) take_position = self.feedConfig.feedLine.get_take_position() if not take_position or not take_position.get_position(): self.log_signal.emit(logging.ERROR, Constant.str_feed_takePhoto_fail) return if not take_position.get_position().compare(real_position, is_action=True): self.log_signal.emit(logging.INFO, "🟡 机器人尚未到达抓料点位") return self.log_signal.emit(logging.INFO, "🟢 机器人已到达抓料点位") '''real_position''' # 一直等待传感器2信号,永不超时 wait_start = time.time() timeout = 30 # 30秒超时,可根据需要调整 sensor2_detected = False while not sensor2_detected: if time.time() - wait_start > timeout: self.log_signal.emit(logging.ERROR, "⏰ FTake: 等待传感器2超时!") self.feedStatus = FeedStatus.FNone return sensors = self.relay_controller.get_all_device_status('sensors') sensor2_value = sensors.get(self.relay_controller.SENSOR2, False) if sensor2_value: self.log_signal.emit(logging.INFO, "✅ 传感器2检测到料包到位,开始执行抓取") sensor2_detected = True else: self.log_signal.emit(logging.INFO, "⏳ FTake: 等待传感器2料包信号...") time.sleep(0.2) # 每0.5秒检查一次 # 执行抓取动作 if self.catch.catch_status == CatchStatus.CNone: self.catch.catch_status = CatchStatus.CTake if self.catch.catch_status == CatchStatus.CTake: self.log_signal.emit(logging.INFO, "正在执行抓料动作...") self.catch.catch_status = CatchStatus.COk if self.catch.catch_status == CatchStatus.COk: self.log_signal.emit(logging.INFO, Constant.str_feed_take_success) if not self.sensor2_ready: self.log_signal.emit(logging.INFO, "抓取完成,重新启动 conveyor2") self.relay_controller.open(conveyor2=True) self.catch.catch_status = CatchStatus.CNone self.log_signal.emit(logging.INFO, "[码垛模式] 跳过视觉检测,准备投料") # 移动到第一个(或当前)投料点 if self.feedConfig.drop_positions and len(self.feedConfig.drop_positions) > self.current_drop_index: first_drop_pos = self.feedConfig.drop_positions[self.current_drop_index] self.log_signal.emit(logging.INFO, f"[码垛模式] 移动到投料点 {self.current_drop_index}") self.sendTargPosition(first_drop_pos, speed=self.robotClient.feed_speed) self.feedStatus = FeedStatus.FDropBag else: self.log_signal.emit(logging.ERROR, "[码垛模式] 投料点列表为空或索引错误!") self.feedStatus = FeedStatus.FNone return # 退出当前循环 else: self.log_signal.emit(logging.ERROR, Constant.str_feed_takePhoto_fail) elif self.feedStatus == FeedStatus.FBroken1: if self.get_current_position().get_position().compare(real_position): self.log_signal.emit(logging.INFO, Constant.str_feed_broken) self.next_position() elif self.feedStatus == FeedStatus.FBroken2: if self.get_current_position().get_position().compare(real_position): self.log_signal.emit(logging.INFO, Constant.str_feed_broken) self.next_position() elif self.feedStatus == FeedStatus.FShake: if self.get_current_position().get_position().compare(real_position,is_action=True): # TODO 震动方案 self.log_signal.emit(logging.INFO, Constant.str_feed_shake) if self.catch.catch_status == CatchStatus.CNone: self.catch.catch_status = CatchStatus.CShake return if self.catch.catch_status == CatchStatus.CShake: # if self.feedConfig.feedLine.feeding_to_end[ # self.feedConfig.feedLine.feeding2end_pos_index + 1].status != FeedStatus.FShake: # self.catch.catch_status = CatchStatus.COk # else: self.catch.shake_continue.SetReset() self.next_position() if self.feedStatus!=FeedStatus.FShake: self.catch.catch_status = CatchStatus.CNone return elif self.feedStatus == FeedStatus.FDropBag: # 1. 确保配置了投料点 if not self.feedConfig or not self.feedConfig.drop_positions: self.log_signal.emit(logging.ERROR, "[码垛模式] 错误:未配置投料点列表!") self.feedStatus = FeedStatus.FNone return # 2. 获取当前目标投料点 (基于索引) if self.current_drop_index >= len(self.feedConfig.drop_positions): self.log_signal.emit(logging.ERROR, f"[码垛模式] 错误:投料点索引 {self.current_drop_index} 超出范围!") self.feedStatus = FeedStatus.FNone return target_drop_position = self.feedConfig.drop_positions[self.current_drop_index] self.log_signal.emit(logging.INFO, f"[码垛模式] 当前目标投料点索引: {self.current_drop_index}") # 3. 检查是否到达目标投料点 if target_drop_position.compare(real_position, is_action=True): self.log_signal.emit(logging.INFO, f"[码垛模式] 到达投料点 {self.current_drop_index}") # 4. 执行投料动作 if self.catch.catch_status == CatchStatus.CNone: self.catch.catch_status = CatchStatus.CDrop self.log_signal.emit(logging.INFO, "[码垛模式] 开始执行投料动作...") return # 等待下一周期检查 COk if self.catch.catch_status == CatchStatus.CDrop: self.log_signal.emit(logging.INFO, "[码垛模式] 投料动作进行中...") return # 等待抓取模块完成 if self.catch.catch_status == CatchStatus.COk: self.catch.catch_status = CatchStatus.CNone self.log_signal.emit(logging.INFO, f"[码垛模式] 在投料点 {self.current_drop_index} 投料完成") # 5. 更新投料计数 self.feedConfig.num -= 1 self.log_signal.emit(logging.INFO, f'{Constant.str_feed_feed_num}{self.feedConfig.num}') # 6. 检查是否所有投料任务完成 if self.feedConfig.num > 0: # 7. 更新投料点索引 (循环) self.current_drop_index = (self.current_drop_index + 1) % len(self.feedConfig.drop_positions) self.log_signal.emit(logging.INFO, f"[码垛模式] 更新投料点索引为: {self.current_drop_index}") # 8. 移动到固定的抓取点 take_position_model = self.feedConfig.feedLine.get_take_position() if take_position_model: self.log_signal.emit(logging.INFO, f"[码垛模式] 移动到固定抓取点...") self.sendTargPosition(take_position_model.get_position(), speed=self.robotClient.feed_speed) self.feedStatus = FeedStatus.FTake # 设置状态为 FTake else: self.log_signal.emit(logging.ERROR, "[码垛模式] 无法获取抓取点位置!") self.feedStatus = FeedStatus.FNone else: # 9. 所有投料完成 self.log_signal.emit(logging.INFO, "[码垛模式] 所有投料任务完成") # 可以选择返回原点或进入空闲状态 # 例如,移动到原点 # self.sendTargPosition(self.robotClient.origin_position, speed=self.robotClient.reset_speed) # self.feedStatus = FeedStatus.FStartReverse # 或者自定义一个结束状态 self.feedStatus = FeedStatus.FNone # 简单地结束 else: # 如果还没到达目标投料点,可以打印日志或等待 # (通常机器人移动指令发出后,会自动移动到位,下次循环再检查) self.log_signal.emit(logging.INFO, f"[码垛模式] 正在移动到投料点 {self.current_drop_index}...") def run_reset(self): real_position = Real_Position() real_position.init_position(self.robotClient.status_model.world_0, self.robotClient.status_model.world_1, self.robotClient.status_model.world_2, self.robotClient.status_model.world_3, self.robotClient.status_model.world_4, self.robotClient.status_model.world_5) if self.reset_status == ResetStatus.RNone: return if self.reset_status == ResetStatus.RStart: if self.feedConfig == None: return # for index in range(len(self.feedConfig.feedLine.positions)): # if self.feedConfig.feedLine.positions[index].status == 2: # start_index = index self.pos_index = -1 self.pos_near_index = -1 self.reversed_positions = [] for index, pos_model in enumerate(self.feedConfig.feedLine.feed_positions): if pos_model.get_position().compare(real_position,is_action=True): self.pos_index = index break if self.pos_index == -1: self.log_signal.emit(logging.ERROR, Constant.str_feed_return_original_position_fail) min_distance = 99999999 for index, pos_model in enumerate(self.feedConfig.feedLine.feed_positions): if get_distance(pos_model.get_position(), real_position) < min_distance: min_distance = get_distance(pos_model.get_position(), real_position) self.pos_near_index = index if self.pos_near_index != -1: self.reversed_positions = self.feedConfig.feedLine.feed_positions[:self.pos_near_index + 1] else: return False else: self.reversed_positions = self.feedConfig.feedLine.feed_positions[:self.pos_index+1] self.reversed_positions = list(reversed(self.reversed_positions)) self.reverse_index = 0 self.send_emergency_sound() self.current_position = PositionModel() self.current_position.init_position(real_position) self.reset_status = ResetStatus.RRunging if self.reset_status == ResetStatus.RRunging: if not real_position.compare(self.current_position.get_position(),is_action=True): return pos_model = self.reversed_positions[self.reverse_index] if pos_model.status == FeedStatus.FTake.value: # 跳过取袋节点 pos_model = self.reversed_positions[self.reverse_index + 1] self.reverse_index = self.reverse_index+1 if pos_model.lineType == LineType.CureMid.value: pos_model1 = self.reversed_positions[self.reverse_index + 1] self.sendTargPosition(real_position=pos_model.get_position(), move_type=MoveType.Cure, real_position1=pos_model1.get_position(), speed=self.robotClient.reset_speed) self.current_position = pos_model1 self.reverse_index = self.reverse_index + 2 else: self.sendTargPosition(real_position=pos_model.get_position(), speed=self.robotClient.reset_speed) self.current_position = pos_model self.reverse_index = self.reverse_index + 1 if self.reverse_index == len(self.reversed_positions): self.reset_status = ResetStatus.ROk if self.reset_status == ResetStatus.ROk: self.reset_status = ResetStatus.RNone self.send_emergency_stop() def return_original_position(self): self.run_reverse = True real_position = Real_Position() real_position.init_position(self.robotClient.status_model.world_0, self.robotClient.status_model.world_1, self.robotClient.status_model.world_2, self.robotClient.status_model.world_3, self.robotClient.status_model.world_4, self.robotClient.status_model.world_5) if self.feedConfig == None: return start_index = -1 # for index in range(len(self.feedConfig.feedLine.positions)): # if self.feedConfig.feedLine.positions[index].status == 2: # start_index = index pos_index = -1 pos_near_index = -1 reversed_positions = [] for index, pos_model in enumerate(self.feedConfig.feedLine.feed_positions): if pos_model.get_position().compare(real_position): pos_index = index break if pos_index == -1: self.log_signal.emit(logging.ERROR, Constant.str_feed_return_original_position_fail) min_distance = 99999999 for index, pos_model in enumerate(self.feedConfig.feedLine.feed_positions): if get_distance(pos_model.get_position(), real_position)