Files
fluent_widgets_pyside6/app/service/robot_service.py

395 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import threading
from typing import List
# # 法奥机器人
from fairino import Robot
# 夹具和吸盘控制
from .catch import CatchTool
# 震动函数
from .vibrate import start_vibrate
# 称重函数
from .weight import start_weight
# 倒料函数
from .drop import start_drop, start_drop_rotate, start_drop_reset
# 高周波
from .high_machine import start_high
# 冲压机
from .press_machine import start_press
import time
class RobotService:
def __init__(self, robot_client: Robot.RPC):
self.robot_client = robot_client # 机器臂实例
self.move_thread = None # 移动任务线程对象
self.moving_running = False # 移动线程运行状态标记
# 跟踪线程相关
# 跟踪线程的机器臂调用
self.robot_tracking_client = Robot.RPC(robot_client.ip_address)
self.point_sequence = [] # 点位序列元数据 保存点位状态status和索引index
self.current_target_index = -1 # 当前正在朝向(移动)的点位索引(初始为-1
self.completed_indices = set() # 已运动过的点位索引集合
self.tracking_lock = threading.Lock()
# 末端工具对象
self.catch_tool = CatchTool()
def start_moves(
self,
pos_list: List[list],
name_list: List[str],
tool_id: int = 1,
vel: float = 5.0,
):
"""启动移动任务(对外接口)"""
# 校验参数:坐标列表和名称列表长度必须一致
if len(pos_list) != len(name_list):
raise ValueError("pos_list与name_list长度不一致")
# 如果已有移动任务线程在运行,先停止
if self.move_thread and self.move_thread.is_alive():
self.stop_moves()
# 保存任务参数
self.pos_list = pos_list
self.name_list = name_list
self.tool_id = tool_id
self.vel = vel
self.moving_running = True
# 初始化点位序列
self.init_point_sequence()
# 启动移动点位跟踪
self.start_tracking()
# 启动移动任务线程, 执行移动任务
self.move_thread = threading.Thread(target=self._process_moves)
self.move_thread.daemon = True
self.move_thread.start()
def stop_moves(self):
"""停止移动任务"""
self.moving_running = False
if self.stop_motion() == 0:
print("stop_moves: 机器臂停止移动成功...")
# 打印 停止时,正在朝哪个点移动
point_dict = self.get_current_target_point()
if point_dict:
print("停止前, 正在朝向移动的目标点位为:")
print(
f"所选中的所有点中的第{point_dict['index']+1} 个点",
"pos:",
point_dict["pos"],
)
else:
print("所有的点位已经移动完成, 无朝向移动的目标点!!!!")
# 停止跟踪
self.stop_tracking()
def stop_motion(self):
error = self.robot_client.send_message("/f/bIII4III102III4IIISTOPIII/b/f")
return error
def _process_moves(self):
"""线程内执行的移动任务逻辑"""
for i in range(len(self.pos_list)):
if not self.moving_running:
break # 若已触发停止,则退出循环
current_pos = self.pos_list[i]
current_name = self.name_list[i]
print(f"坐标名: {current_name},坐标:{current_pos}")
if current_name == "normal":
# 中间的普通点加入平滑时间
error_code = self.robot_client.MoveCart(
desc_pos=current_pos,
tool=self.tool_id,
user=0,
vel=self.vel,
blendT=200, # 平滑, 200毫秒
)
# 加了平滑会出现一个问题,那就是该函数不会阻塞,就直接返回了
else:
# 执行机器臂移动
error_code = self.robot_client.MoveCart(
desc_pos=current_pos, tool=self.tool_id, user=0, vel=self.vel
)
# 根据移动结果处理
# 必须要根据函数返回来决定
if self.moving_running and error_code == 0:
print(f" {current_name} 移动成功 或者 移动指令发送成功!!! ")
# 根据点位名称调用对应函数(核心判断逻辑)
# 功能点会调用相应的函数来进行相应的操作
# 注意: 功能点不能设置为 平滑点!!!!! 否则不能根据函数返回判断是否已经运动到该点位
self._call_action_by_name(current_name) # 可以优化
else:
# 1、该点位移动失败....
# 2、机器臂停止运动还会进入到这里....
print(
f"移动到 {current_name} 失败,错误码:{error_code}, 运行状态: {self.moving_running}"
)
# 可选:是否继续执行下一个点?
self.moving_running = False
# 所有移动任务完成
self.moving_running = False
print("================所有点位任务已完成===============")
def _call_action_by_name(self, point_name: str):
"""根据点位名称调用对应动作(核心判断逻辑)"""
# 1. 夹取相关点位
if point_name in ["抓取点", "夹取点"]:
self.catch_tool.gripper_grasp()
# 2. 放开相关点位
elif point_name in ["放开点"]:
self.catch_tool.gripper_release()
# 3. 称重相关点位
elif point_name in ["称重点"]:
start_weight()
# 4. 震频点相关点位
elif point_name in ["震频点", "震动点"]:
start_vibrate()
# 5. 倒料相关点位
elif point_name in ["倒料点"]:
start_drop(self.robot_client)
# start_high
elif point_name in ["高周波点"]:
start_high()
# start_press
elif point_name in ["冲压点", "冲压机点"]:
start_press()
elif point_name in ["冲压机等待点"]:
# 冲压机花费60秒这里等待一段时间
print("等待冲压机冲压完成, 等待时间为40秒......")
# 因为高周波点等待了80秒所以这里只需要等待40秒一共120秒
# 这里会一起计算高周波等待点的80秒
time.sleep(40)
print("冲压已经完成......")
# start_drop_rotate
# 旋转180°倒料
elif point_name in ["倒料旋转点", "旋转点"]:
start_drop_rotate(self.robot_client)
# 旋转复原点
elif point_name in ["倒料复原点", "旋转复原点"]:
start_drop_reset(self.robot_client)
# 吸取点
# 吸盘吸取
elif point_name in ["吸取点"]:
self.catch_tool.suction_pick()
# 释放点
# 吸盘释放
elif point_name in ["释放点"]:
self.catch_tool.suction_release()
elif point_name in ["冲压机等待点2"]:
# 冲压机花费90秒这里等待一段时间
# 这里是专门等待冲压机的....,不会一起计算高周波等待点的时间...
print("等待冲压机冲压完成, 等待时间为90秒......")
time.sleep(90)
print("冲压已经完成......")
# 4. 可扩展其他功能点位类型
# elif point_name in ["XXX点"]:
# self._xxx_action()
# else:
# print(f"点位 {point_name} 无对应动作,不执行额外操作")
def init_point_sequence(self):
"""初始化点位序列,添加元数据"""
self.point_sequence.clear() # 清空
for idx, point in enumerate(self.pos_list):
self.point_sequence.append(
{
"index": idx,
"pos": point, # 坐标 [x,y,z,rx,ry,rz]
"status": "pending", # 状态pending(未发送)/sent(已发送)/completed(已经过)/current(当前目标)
}
)
self.current_target_index = (
0 if self.point_sequence else -1
) # 初始目标为第一个点
def get_robot_status(self):
"""获取机器人实时状态:是否在运动 + 当前位置"""
try:
with self.tracking_lock:
# 1. 判断是否正在运动
is_moving = self.is_moving()
# 2. 获取当前TCP位置
# [x,y,z,rx,ry,rz]
result = self.robot_tracking_client.GetActualTCPPose()
# 解析返回值
if isinstance(result, tuple) and len(result) >= 2 and result[0] == 0:
# 成功:提取坐标列表(第二个元素)
current_pos = result[1]
# 校验坐标格式6个数字
if not (
isinstance(current_pos, list)
and len(current_pos) == 6
and all(isinstance(p, (int, float)) for p in current_pos)
):
print(f"坐标格式错误:{current_pos}, 需为6个数字的列表")
return None
else:
# 失败:打印错误码
error_code = result[0] if isinstance(result, tuple) else result
print(f"get_robot_status: 获取TCP位置失败, 错误码:{error_code}")
return None
return {
"is_moving": is_moving,
"current_pos": current_pos,
"timestamp": time.time(),
}
except Exception as e:
print(f"get_robot_status: 获取状态失败:{e}")
return None
def is_moving(self) -> bool:
"""
判断机器臂是否正在运动
返回:
bool: True表示正在运动robot_state=2False表示停止/暂停/拖动/异常
"""
# 1. 检查状态包是否已初始化
if self.robot_client.robot_state_pkg is None:
print("is_moving: 警告:机器人状态包未初始化,无法判断运动状态")
return False
try:
# 2. 从状态包中获取robot_state取值1-4
time.sleep(0.05) # 等待状态刷新
robot_state = self.robot_client.robot_state_pkg.robot_state
# 3. 校验robot_state是否在合法范围内
if robot_state not in (1, 2, 3, 4):
print(
f"is_moving: 警告: 无效的robot_state值 {robot_state},按'停止'处理"
)
return False
# 4. 仅robot_state=2表示正在运动
return robot_state == 2
except Exception as e:
print(f"is_moving: 获取运动状态失败:{str(e)}")
return False
def update_point_status(self):
"""更新点位状态:哪些已完成,当前移动的目标点位是哪个"""
status = self.get_robot_status()
if not status:
return
current_pos = status["current_pos"]
is_moving = status["is_moving"]
threshold = 6.0 # 判定“经过该点”的距离阈值mm根据精度调整
# 遍历所有点位,判断是否已经过
for point in self.point_sequence:
idx = point["index"]
# 跳过已标记为“已完成”的点
if point["status"] == "completed":
continue
# 计算当前位置与点位的三维距离
dx = current_pos[0] - point["pos"][0]
dy = current_pos[1] - point["pos"][1]
dz = current_pos[2] - point["pos"][2]
distance = (dx**2 + dy**2 + dz**2) ** 0.5
# 如果距离小于阈值,标记为“已完成”
if distance < threshold:
self.completed_indices.add(idx)
point["status"] = "completed"
max_completed = (
max(self.completed_indices) if self.completed_indices else -1
)
self.current_target_index = max_completed + 1
if 0 <= self.current_target_index < len(self.point_sequence):
self.point_sequence[self.current_target_index]["status"] = "current"
# 停止移动时判断当前目标点
if not is_moving and 0 <= self.current_target_index < len(self.point_sequence):
target_point = self.point_sequence[self.current_target_index]
if target_point["status"] != "completed":
dx = current_pos[0] - target_point["pos"][0]
dy = current_pos[1] - target_point["pos"][1]
dz = current_pos[2] - target_point["pos"][2]
distance = (dx**2 + dy**2 + dz**2) ** 0.5
if distance < threshold:
self.completed_indices.add(self.current_target_index)
target_point["status"] = "completed"
if self.current_target_index + 1 < len(self.point_sequence):
# 将下一个点设置为目前正在朝向移动的点
self.current_target_index += 1
self.point_sequence[self.current_target_index][
"status"
] = "current"
def start_tracking(self):
"""清空已经移动的点位的集合, 开启下一轮跟踪"""
self.completed_indices.clear()
"""启动实时跟踪线程"""
self.tracking_running = True
self.tracking_thread = threading.Thread(target=self._tracking_loop, daemon=True)
self.tracking_thread.start()
def _tracking_loop(self):
"""跟踪循环:持续更新点位状态"""
while self.tracking_running:
self.update_point_status()
time.sleep(0.2) # 5Hz更新频率
def stop_tracking(self):
"""停止跟踪线程"""
self.tracking_running = False
if self.tracking_thread.is_alive():
self.tracking_thread.join()
def get_completed_points(self):
"""返回已运动过的点(索引和坐标)"""
return [
{"index": point["index"], "pos": point["pos"]}
for point in self.point_sequence
if point["status"] == "completed"
]
def get_current_target_point(self):
"""返回当前正在朝向移动的点(索引和坐标)"""
if 0 <= self.current_target_index < len(self.point_sequence):
point = self.point_sequence[self.current_target_index]
return {"index": point["index"], "pos": point["pos"]}
return None # 无当前朝向移动的目标点位(已完成所有点位的移动)