import threading from typing import List # # 法奥机器人 from fairino import Robot # 夹具和吸盘控制 from .catch import CatchTool # 震动函数 from .vibrate import start_vibrate # 称重函数 from .weight import start_weight # 倒料函数 from .drop import start_drop, start_drop_rotate, start_drop_reset # 高周波 from .high_machine import start_high # 冲压机 from .press_machine import start_press import time class RobotService: def __init__(self, robot_client: Robot.RPC): self.robot_client = robot_client # 机器臂实例 self.move_thread = None # 移动任务线程对象 self.moving_running = False # 移动线程运行状态标记 # 跟踪线程相关 # 跟踪线程的机器臂调用 self.robot_tracking_client = Robot.RPC(robot_client.ip_address) self.point_sequence = [] # 点位序列元数据 (保存点位状态status和索引index) self.current_target_index = -1 # 当前正在朝向(移动)的点位索引(初始为-1) self.completed_indices = set() # 已运动过的点位索引集合 self.tracking_lock = threading.Lock() # 末端工具对象 self.catch_tool = CatchTool() def start_moves( self, pos_list: List[list], name_list: List[str], tool_id: int = 1, vel: float = 5.0, ): """启动移动任务(对外接口)""" # 校验参数:坐标列表和名称列表长度必须一致 if len(pos_list) != len(name_list): raise ValueError("pos_list与name_list长度不一致") # 如果已有移动任务线程在运行,先停止 if self.move_thread and self.move_thread.is_alive(): self.stop_moves() # 保存任务参数 self.pos_list = pos_list self.name_list = name_list self.tool_id = tool_id self.vel = vel self.moving_running = True # 初始化点位序列 self.init_point_sequence() # 启动移动点位跟踪 self.start_tracking() # 启动移动任务线程, 执行移动任务 self.move_thread = threading.Thread(target=self._process_moves) self.move_thread.daemon = True self.move_thread.start() def stop_moves(self): """停止移动任务""" self.moving_running = False if self.stop_motion() == 0: print("stop_moves: 机器臂停止移动成功...") # 打印 停止时,正在朝哪个点移动 point_dict = self.get_current_target_point() if point_dict: print("停止前, 正在朝向移动的目标点位为:") print( f"所选中的所有点中的第{point_dict['index']+1} 个点", "pos:", point_dict["pos"], ) else: print("所有的点位已经移动完成, 无朝向移动的目标点!!!!") # 停止跟踪 self.stop_tracking() def stop_motion(self): error = self.robot_client.send_message("/f/bIII4III102III4IIISTOPIII/b/f") return error def _process_moves(self): """线程内执行的移动任务逻辑""" for i in range(len(self.pos_list)): if not self.moving_running: break # 若已触发停止,则退出循环 current_pos = self.pos_list[i] current_name = self.name_list[i] print(f"坐标名: {current_name},坐标:{current_pos}") if current_name == "normal": # 中间的普通点加入平滑时间 error_code = self.robot_client.MoveCart( desc_pos=current_pos, tool=self.tool_id, user=0, vel=self.vel, blendT=200, # 平滑, 200毫秒 ) # 加了平滑会出现一个问题,那就是该函数不会阻塞,就直接返回了 else: # 执行机器臂移动 error_code = self.robot_client.MoveCart( desc_pos=current_pos, tool=self.tool_id, user=0, vel=self.vel ) # 根据移动结果处理 # 必须要根据函数返回来决定 if self.moving_running and error_code == 0: print(f" {current_name} 移动成功 或者 移动指令发送成功!!! ") # 根据点位名称调用对应函数(核心判断逻辑) # 功能点会调用相应的函数来进行相应的操作 # 注意: 功能点不能设置为 平滑点!!!!! 否则不能根据函数返回判断是否已经运动到该点位 self._call_action_by_name(current_name) # 可以优化 else: # 1、该点位移动失败.... # 2、机器臂停止运动,还会进入到这里.... print( f"移动到 {current_name} 失败,错误码:{error_code}, 运行状态: {self.moving_running}" ) # 可选:是否继续执行下一个点? self.moving_running = False # 所有移动任务完成 self.moving_running = False print("================所有点位任务已完成===============") def _call_action_by_name(self, point_name: str): """根据点位名称调用对应动作(核心判断逻辑)""" # 1. 夹取相关点位 if point_name in ["抓取点", "夹取点"]: self.catch_tool.gripper_grasp() # 2. 放开相关点位 elif point_name in ["放开点"]: self.catch_tool.gripper_release() # 3. 称重相关点位 elif point_name in ["称重点"]: start_weight() # 4. 震频点相关点位 elif point_name in ["震频点", "震动点"]: start_vibrate() # 5. 倒料相关点位 elif point_name in ["倒料点"]: start_drop(self.robot_client) # start_high elif point_name in ["高周波点"]: start_high() # start_press elif point_name in ["冲压点", "冲压机点"]: start_press() elif point_name in ["冲压机等待点"]: # 冲压机花费60秒,这里等待一段时间 print("等待冲压机冲压完成, 等待时间为40秒......") # 因为高周波点等待了80秒,所以这里只需要等待40秒,一共120秒 # 这里会一起计算高周波等待点的80秒 time.sleep(40) print("冲压已经完成......") # start_drop_rotate # 旋转180°倒料 elif point_name in ["倒料旋转点", "旋转点"]: start_drop_rotate(self.robot_client) # 旋转复原点 elif point_name in ["倒料复原点", "旋转复原点"]: start_drop_reset(self.robot_client) # 吸取点 # 吸盘吸取 elif point_name in ["吸取点"]: self.catch_tool.suction_pick() # 释放点 # 吸盘释放 elif point_name in ["释放点"]: self.catch_tool.suction_release() elif point_name in ["冲压机等待点2"]: # 冲压机花费90秒,这里等待一段时间 # 这里是专门等待冲压机的....,不会一起计算高周波等待点的时间... print("等待冲压机冲压完成, 等待时间为90秒......") time.sleep(90) print("冲压已经完成......") # 4. 可扩展其他功能点位类型 # elif point_name in ["XXX点"]: # self._xxx_action() # else: # print(f"点位 {point_name} 无对应动作,不执行额外操作") def init_point_sequence(self): """初始化点位序列,添加元数据""" self.point_sequence.clear() # 清空 for idx, point in enumerate(self.pos_list): self.point_sequence.append( { "index": idx, "pos": point, # 坐标 [x,y,z,rx,ry,rz] "status": "pending", # 状态:pending(未发送)/sent(已发送)/completed(已经过)/current(当前目标) } ) self.current_target_index = ( 0 if self.point_sequence else -1 ) # 初始目标为第一个点 def get_robot_status(self): """获取机器人实时状态:是否在运动 + 当前位置""" try: with self.tracking_lock: # 1. 判断是否正在运动 is_moving = self.is_moving() # 2. 获取当前TCP位置 # [x,y,z,rx,ry,rz] result = self.robot_tracking_client.GetActualTCPPose() # 解析返回值 if isinstance(result, tuple) and len(result) >= 2 and result[0] == 0: # 成功:提取坐标列表(第二个元素) current_pos = result[1] # 校验坐标格式(6个数字) if not ( isinstance(current_pos, list) and len(current_pos) == 6 and all(isinstance(p, (int, float)) for p in current_pos) ): print(f"坐标格式错误:{current_pos}, 需为6个数字的列表") return None else: # 失败:打印错误码 error_code = result[0] if isinstance(result, tuple) else result print(f"get_robot_status: 获取TCP位置失败, 错误码:{error_code}") return None return { "is_moving": is_moving, "current_pos": current_pos, "timestamp": time.time(), } except Exception as e: print(f"get_robot_status: 获取状态失败:{e}") return None def is_moving(self) -> bool: """ 判断机器臂是否正在运动 返回: bool: True表示正在运动(robot_state=2),False表示停止/暂停/拖动/异常 """ # 1. 检查状态包是否已初始化 if self.robot_client.robot_state_pkg is None: print("is_moving: 警告:机器人状态包未初始化,无法判断运动状态") return False try: # 2. 从状态包中获取robot_state(取值1-4) time.sleep(0.05) # 等待状态刷新 robot_state = self.robot_client.robot_state_pkg.robot_state # 3. 校验robot_state是否在合法范围内 if robot_state not in (1, 2, 3, 4): print( f"is_moving: 警告: 无效的robot_state值 {robot_state},按'停止'处理" ) return False # 4. 仅robot_state=2表示正在运动 return robot_state == 2 except Exception as e: print(f"is_moving: 获取运动状态失败:{str(e)}") return False def update_point_status(self): """更新点位状态:哪些已完成,当前移动的目标点位是哪个""" status = self.get_robot_status() if not status: return current_pos = status["current_pos"] is_moving = status["is_moving"] threshold = 6.0 # 判定“经过该点”的距离阈值(mm),根据精度调整 # 遍历所有点位,判断是否已经过 for point in self.point_sequence: idx = point["index"] # 跳过已标记为“已完成”的点 if point["status"] == "completed": continue # 计算当前位置与点位的三维距离 dx = current_pos[0] - point["pos"][0] dy = current_pos[1] - point["pos"][1] dz = current_pos[2] - point["pos"][2] distance = (dx**2 + dy**2 + dz**2) ** 0.5 # 如果距离小于阈值,标记为“已完成” if distance < threshold: self.completed_indices.add(idx) point["status"] = "completed" max_completed = ( max(self.completed_indices) if self.completed_indices else -1 ) self.current_target_index = max_completed + 1 if 0 <= self.current_target_index < len(self.point_sequence): self.point_sequence[self.current_target_index]["status"] = "current" # 停止移动时判断当前目标点 if not is_moving and 0 <= self.current_target_index < len(self.point_sequence): target_point = self.point_sequence[self.current_target_index] if target_point["status"] != "completed": dx = current_pos[0] - target_point["pos"][0] dy = current_pos[1] - target_point["pos"][1] dz = current_pos[2] - target_point["pos"][2] distance = (dx**2 + dy**2 + dz**2) ** 0.5 if distance < threshold: self.completed_indices.add(self.current_target_index) target_point["status"] = "completed" if self.current_target_index + 1 < len(self.point_sequence): # 将下一个点设置为目前正在朝向移动的点 self.current_target_index += 1 self.point_sequence[self.current_target_index][ "status" ] = "current" def start_tracking(self): """清空已经移动的点位的集合, 开启下一轮跟踪""" self.completed_indices.clear() """启动实时跟踪线程""" self.tracking_running = True self.tracking_thread = threading.Thread(target=self._tracking_loop, daemon=True) self.tracking_thread.start() def _tracking_loop(self): """跟踪循环:持续更新点位状态""" while self.tracking_running: self.update_point_status() time.sleep(0.2) # 5Hz更新频率 def stop_tracking(self): """停止跟踪线程""" self.tracking_running = False if self.tracking_thread.is_alive(): self.tracking_thread.join() def get_completed_points(self): """返回已运动过的点(索引和坐标)""" return [ {"index": point["index"], "pos": point["pos"]} for point in self.point_sequence if point["status"] == "completed" ] def get_current_target_point(self): """返回当前正在朝向移动的点(索引和坐标)""" if 0 <= self.current_target_index < len(self.point_sequence): point = self.point_sequence[self.current_target_index] return {"index": point["index"], "pos": point["pos"]} return None # 无当前朝向移动的目标点位(已完成所有点位的移动)