195 lines
7.9 KiB
Python
195 lines
7.9 KiB
Python
|
|
#!/usr/bin/python3
|
|||
|
|
"""
|
|||
|
|
# @Time : 2025/12/12 11:05
|
|||
|
|
# @Author : reenrr
|
|||
|
|
# @File : COM_Robot.py
|
|||
|
|
# @Desc : 机械臂TCP通信、指令发送、状态查询、IO控制等核心功能
|
|||
|
|
"""
|
|||
|
|
import logging
|
|||
|
|
from enum import Enum
|
|||
|
|
from turtle import Turtle
|
|||
|
|
import Constant
|
|||
|
|
from COM_TCP import TCPClient
|
|||
|
|
import queue
|
|||
|
|
import json
|
|||
|
|
from RobotModel import DataAddress, DATARequest, DATAReply, CMDInstructRequest, Instruction
|
|||
|
|
|
|||
|
|
# 视觉检测类型枚举:定义机器人视觉定位的两种模式,
|
|||
|
|
class DetectType(Enum):
|
|||
|
|
EyeOnHand = 0 # 手眼一体(相机安装在机器人末端执行器上,随机械臂移动)
|
|||
|
|
EyeOutHand = 1 # 手眼分离(相机固定安装,不随机械臂移动)
|
|||
|
|
|
|||
|
|
"""
|
|||
|
|
机械臂客户端核心类:继承自TCPClient基础TCP通信类,封装机械臂专属的业务逻辑与交互接口
|
|||
|
|
核心职责:建立与机器人的TCP通信、发送控制指令、查询实时状态、控制IO口、处理紧急情况
|
|||
|
|
继承TCPClient父类,继承父类的方法
|
|||
|
|
"""
|
|||
|
|
class RobotClient(TCPClient):
|
|||
|
|
|
|||
|
|
def __init__(self, ip, port, photo_locs,command_quene, status_model:DataAddress,con_ios, time_delay_take,time_delay_put,time_delay_shake,origin_position):
|
|||
|
|
"""
|
|||
|
|
:param ip: 机器人IP地址
|
|||
|
|
:param port: 机器人端口号
|
|||
|
|
:param photo_locs: 预设拍照点位列表
|
|||
|
|
:param command_quene: 指令队列
|
|||
|
|
:param status_model: 状态模型,用于存储和更新机器人的运行模型、位置、报警等状态
|
|||
|
|
:param con_ios: 控制IO口列表
|
|||
|
|
:param time_delay_take: 抓料延时参数(抓料动作执行后的等待时间,确保抓料机构稳定抓料物料,放置脱落)
|
|||
|
|
:param time_delay_put: 放料延时参数(放料动作执行后的等待时间,确保放料机构稳定放置物料,放置脱落)
|
|||
|
|
:param time_delay_shake: 抖动延时参数(抖料动作执行后的等待时间,确保摇摆机构稳定摇摆,放置脱落)
|
|||
|
|
:param origin_position: 机器人原点坐标
|
|||
|
|
"""
|
|||
|
|
super().__init__(ip, port)
|
|||
|
|
self.command_quene = command_quene
|
|||
|
|
self.status_model = status_model
|
|||
|
|
self.errorCommands = {}
|
|||
|
|
|
|||
|
|
self.photo_locs = photo_locs
|
|||
|
|
self.con_ios = con_ios
|
|||
|
|
|
|||
|
|
self.time_delay_take= time_delay_take
|
|||
|
|
self.time_delay_put = time_delay_put
|
|||
|
|
self.time_delay_shake = time_delay_shake
|
|||
|
|
|
|||
|
|
self.type_detection = DetectType.EyeOutHand
|
|||
|
|
self.origin_position = origin_position
|
|||
|
|
|
|||
|
|
# 运动速度配置属性
|
|||
|
|
self.debug_speed = 10
|
|||
|
|
self.feed_speed = 10
|
|||
|
|
self.reset_speed = 10
|
|||
|
|
|
|||
|
|
self.max_angle_interval = 0 # 最大关节角度间隔
|
|||
|
|
self.smooth = 0 # 运动光滑度参数
|
|||
|
|
self.dynamic_height = 0 # 动态高度参数
|
|||
|
|
|
|||
|
|
def add_sendQuene(self,command): #后面 命令分等级,紧急命令直接执行
|
|||
|
|
"""
|
|||
|
|
指令入队方法:将封装好的机械臂指令添加到指令队列,等待后续发送执行
|
|||
|
|
:param command: 待发送的机器人指令
|
|||
|
|
"""
|
|||
|
|
self.command_quene.put(command)
|
|||
|
|
logging.INFO(f'{Constant.str_sys_command}{command}')
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
def send_Command(self):
|
|||
|
|
"""
|
|||
|
|
从指令队列中取出指令,通过TCP连接发送到机械臂,并接收响应结果
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
if self.command_quene.qsize()!=0:
|
|||
|
|
logging.INFO(f'robot-command:从队列获取命令')
|
|||
|
|
command = self.command_quene.get()
|
|||
|
|
|
|||
|
|
logging.INFO(f'robot-command:{command}')
|
|||
|
|
self.client_socket.send(command.encode())
|
|||
|
|
|
|||
|
|
response = self.client_socket.recv(1024).decode('utf-8')
|
|||
|
|
|
|||
|
|
if response:
|
|||
|
|
logging.INFO(f'robot-command:{response},剩余:{self.command_quene.qsize()}条命令')
|
|||
|
|
else:
|
|||
|
|
logging.INFO(f'robot-command:无返回值,剩余:{self.command_quene.qsize()}条命令')
|
|||
|
|
|
|||
|
|
return True
|
|||
|
|
else:
|
|||
|
|
return True
|
|||
|
|
except Exception as e:
|
|||
|
|
logging.ERROR(Constant.str_tcp_robot_connect_fail)
|
|||
|
|
raise
|
|||
|
|
|
|||
|
|
|
|||
|
|
def send_Status(self):
|
|||
|
|
"""
|
|||
|
|
状态查询:向机械臂发送状态查询请求,接收并解析实时状态数据,更新到status_model中
|
|||
|
|
"""
|
|||
|
|
request = DATARequest()
|
|||
|
|
dataAddr = DataAddress()
|
|||
|
|
request.queryAddr.append(dataAddr) # 将数据地址添加到查询请求中,明确查询的状态数据范围
|
|||
|
|
|
|||
|
|
request_status_json = request.toString()
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
self.client_socket.send(request_status_json.encode())
|
|||
|
|
|
|||
|
|
response = self.client_socket.recv(1024).decode('utf-8')
|
|||
|
|
response_message = json.loads(response)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
data_status = DATAReply() # 用于封装机器人返回的状态数据
|
|||
|
|
# 将JSON解析后的字典数据赋值给响应对象的属性,完成数据封装
|
|||
|
|
data_status.__dict__ = response_message # {'cmdReply': ['AddRCC', 'ok'], 'dsID': 'www.hc-system.com.HCRemoteCommand', 'reqType': 'AddRCC'}
|
|||
|
|
data_address_array = data_status.queryData # 提取全量状态数据数组
|
|||
|
|
|
|||
|
|
# 解析状态数据,更新到机械臂状态模型
|
|||
|
|
self.status_model.curMode = int(data_address_array[0])
|
|||
|
|
self.status_model.setPosition(*data_address_array[1:13])
|
|||
|
|
self.status_model.curAlarm = int(data_address_array[13])
|
|||
|
|
self.status_model.isMoving = int(data_address_array[14])
|
|||
|
|
self.status_model.RemoteCmdLen = int(data_address_array[15])
|
|||
|
|
self.status_model.toolCoord=int(data_address_array[16])
|
|||
|
|
self.status_model.input_n = int(data_address_array[19])
|
|||
|
|
self.status_model.output_n = int(data_address_array[20])
|
|||
|
|
self.status_model.curSpeed_n = int(data_address_array[21])
|
|||
|
|
|
|||
|
|
except:
|
|||
|
|
if response_message.keys().__contains__('cmdReply'): # 捕获状态解析异常,判断是否为命令回复响应,若是则直接返回,不记录错误
|
|||
|
|
return
|
|||
|
|
logging.ERROR(Constant.str_tcp_robot_data_error)
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
except json.JSONDecodeError as e:
|
|||
|
|
logging.WARNING(Constant.str_sys_json_error+request_status_json)
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logging.ERROR(f'{e}{request_status_json}')
|
|||
|
|
raise
|
|||
|
|
|
|||
|
|
def send_emergency_sound(self):
|
|||
|
|
"""
|
|||
|
|
紧急报警发声
|
|||
|
|
"""
|
|||
|
|
self.sendIOControl(Constant.IO_EmergencyPoint, 1)
|
|||
|
|
|
|||
|
|
def send_emergency_stop(self):
|
|||
|
|
"""
|
|||
|
|
紧急停止
|
|||
|
|
"""
|
|||
|
|
self.sendIOControl(Constant.IO_EmergencyPoint, 0)
|
|||
|
|
|
|||
|
|
def sendIOControl(self, IO_bit, IO_Status: int, delay=0, emptyList='0'):#
|
|||
|
|
"""
|
|||
|
|
发送IO口控制
|
|||
|
|
:param IO_bit: IO口编号(指定需要控制的物理/虚拟IO口,对应机械臂的IO配置)
|
|||
|
|
:param IO_Status: IO口目标状态(0/1)
|
|||
|
|
:param delay: 延时时间
|
|||
|
|
:param emptyList: 空指令列表
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
IO_command = CMDInstructRequest()
|
|||
|
|
IO_command.emptyList = emptyList
|
|||
|
|
io_instruction = Instruction()
|
|||
|
|
io_instruction.IO = True
|
|||
|
|
io_instruction.io_status = IO_Status
|
|||
|
|
io_instruction.delay = delay
|
|||
|
|
io_instruction.point = IO_bit # {"dsID":"HCRemoteCommand","reqType":"AddRCC","emptyList":"1","instructions":[{"oneshot":"1","action":"200","type":"0","io_status":"1","point":"15","delay":"0"}]}
|
|||
|
|
IO_command.dsID = 'HCRemoteCommand'
|
|||
|
|
IO_command.instructions.append(io_instruction)
|
|||
|
|
self.add_sendQuene(IO_command.toString())
|
|||
|
|
logging.INFO(f'{Constant.str_feed_io_control}{IO_bit},{IO_Status}')
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
def get_origin_position(self):
|
|||
|
|
"""
|
|||
|
|
获取机器人原点坐标
|
|||
|
|
"""
|
|||
|
|
return self.origin_position
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
|