#!/usr/bin/python3 """ # @Time : 2025/12/12 11:05 # @Author : reenrr # @File : COM_Robot.py # @Desc : 机械臂TCP通信、指令发送、状态查询、IO控制等核心功能 """ import logging from enum import Enum from turtle import Turtle import Constant from COM_TCP import TCPClient import queue import json from RobotModel import DataAddress, DATARequest, DATAReply, CMDInstructRequest, Instruction # 视觉检测类型枚举:定义机器人视觉定位的两种模式, class DetectType(Enum): EyeOnHand = 0 # 手眼一体(相机安装在机器人末端执行器上,随机械臂移动) EyeOutHand = 1 # 手眼分离(相机固定安装,不随机械臂移动) """ 机械臂客户端核心类:继承自TCPClient基础TCP通信类,封装机械臂专属的业务逻辑与交互接口 核心职责:建立与机器人的TCP通信、发送控制指令、查询实时状态、控制IO口、处理紧急情况 继承TCPClient父类,继承父类的方法 """ class RobotClient(TCPClient): def __init__(self, ip, port, photo_locs,command_quene, status_model:DataAddress,con_ios, time_delay_take,time_delay_put,time_delay_shake,origin_position): """ :param ip: 机器人IP地址 :param port: 机器人端口号 :param photo_locs: 预设拍照点位列表 :param command_quene: 指令队列 :param status_model: 状态模型,用于存储和更新机器人的运行模型、位置、报警等状态 :param con_ios: 控制IO口列表 :param time_delay_take: 抓料延时参数(抓料动作执行后的等待时间,确保抓料机构稳定抓料物料,放置脱落) :param time_delay_put: 放料延时参数(放料动作执行后的等待时间,确保放料机构稳定放置物料,放置脱落) :param time_delay_shake: 抖动延时参数(抖料动作执行后的等待时间,确保摇摆机构稳定摇摆,放置脱落) :param origin_position: 机器人原点坐标 """ super().__init__(ip, port) self.command_quene = command_quene self.status_model = status_model self.errorCommands = {} self.photo_locs = photo_locs self.con_ios = con_ios self.time_delay_take= time_delay_take self.time_delay_put = time_delay_put self.time_delay_shake = time_delay_shake self.type_detection = DetectType.EyeOutHand self.origin_position = origin_position # 运动速度配置属性 self.debug_speed = 10 self.feed_speed = 10 self.reset_speed = 10 self.max_angle_interval = 0 # 最大关节角度间隔 self.smooth = 0 # 运动光滑度参数 self.dynamic_height = 0 # 动态高度参数 def add_sendQuene(self,command): #后面 命令分等级,紧急命令直接执行 """ 指令入队方法:将封装好的机械臂指令添加到指令队列,等待后续发送执行 :param command: 待发送的机器人指令 """ self.command_quene.put(command) logging.INFO(f'{Constant.str_sys_command}{command}') return def send_Command(self): """ 从指令队列中取出指令,通过TCP连接发送到机械臂,并接收响应结果 """ try: if self.command_quene.qsize()!=0: logging.INFO(f'robot-command:从队列获取命令') command = self.command_quene.get() logging.INFO(f'robot-command:{command}') self.client_socket.send(command.encode()) response = self.client_socket.recv(1024).decode('utf-8') if response: logging.INFO(f'robot-command:{response},剩余:{self.command_quene.qsize()}条命令') else: logging.INFO(f'robot-command:无返回值,剩余:{self.command_quene.qsize()}条命令') return True else: return True except Exception as e: logging.ERROR(Constant.str_tcp_robot_connect_fail) raise def send_Status(self): """ 状态查询:向机械臂发送状态查询请求,接收并解析实时状态数据,更新到status_model中 """ request = DATARequest() dataAddr = DataAddress() request.queryAddr.append(dataAddr) # 将数据地址添加到查询请求中,明确查询的状态数据范围 request_status_json = request.toString() try: self.client_socket.send(request_status_json.encode()) response = self.client_socket.recv(1024).decode('utf-8') response_message = json.loads(response) try: data_status = DATAReply() # 用于封装机器人返回的状态数据 # 将JSON解析后的字典数据赋值给响应对象的属性,完成数据封装 data_status.__dict__ = response_message # {'cmdReply': ['AddRCC', 'ok'], 'dsID': 'www.hc-system.com.HCRemoteCommand', 'reqType': 'AddRCC'} data_address_array = data_status.queryData # 提取全量状态数据数组 # 解析状态数据,更新到机械臂状态模型 self.status_model.curMode = int(data_address_array[0]) self.status_model.setPosition(*data_address_array[1:13]) self.status_model.curAlarm = int(data_address_array[13]) self.status_model.isMoving = int(data_address_array[14]) self.status_model.RemoteCmdLen = int(data_address_array[15]) self.status_model.toolCoord=int(data_address_array[16]) self.status_model.input_n = int(data_address_array[19]) self.status_model.output_n = int(data_address_array[20]) self.status_model.curSpeed_n = int(data_address_array[21]) except: if response_message.keys().__contains__('cmdReply'): # 捕获状态解析异常,判断是否为命令回复响应,若是则直接返回,不记录错误 return logging.ERROR(Constant.str_tcp_robot_data_error) return True except json.JSONDecodeError as e: logging.WARNING(Constant.str_sys_json_error+request_status_json) return True except Exception as e: logging.ERROR(f'{e}{request_status_json}') raise def send_emergency_sound(self): """ 紧急报警发声 """ self.sendIOControl(Constant.IO_EmergencyPoint, 1) def send_emergency_stop(self): """ 紧急停止 """ self.sendIOControl(Constant.IO_EmergencyPoint, 0) def sendIOControl(self, IO_bit, IO_Status: int, delay=0, emptyList='0'):# """ 发送IO口控制 :param IO_bit: IO口编号(指定需要控制的物理/虚拟IO口,对应机械臂的IO配置) :param IO_Status: IO口目标状态(0/1) :param delay: 延时时间 :param emptyList: 空指令列表 """ IO_command = CMDInstructRequest() IO_command.emptyList = emptyList io_instruction = Instruction() io_instruction.IO = True io_instruction.io_status = IO_Status io_instruction.delay = delay io_instruction.point = IO_bit # {"dsID":"HCRemoteCommand","reqType":"AddRCC","emptyList":"1","instructions":[{"oneshot":"1","action":"200","type":"0","io_status":"1","point":"15","delay":"0"}]} IO_command.dsID = 'HCRemoteCommand' IO_command.instructions.append(io_instruction) self.add_sendQuene(IO_command.toString()) logging.INFO(f'{Constant.str_feed_io_control}{IO_bit},{IO_Status}') pass def get_origin_position(self): """ 获取机器人原点坐标 """ return self.origin_position pass