看机械臂代码,添加备注代码
This commit is contained in:
194
robot/COM_Robot.py
Normal file
194
robot/COM_Robot.py
Normal file
@ -0,0 +1,194 @@
|
|||||||
|
#!/usr/bin/python3
|
||||||
|
"""
|
||||||
|
# @Time : 2025/12/12 11:05
|
||||||
|
# @Author : reenrr
|
||||||
|
# @File : COM_Robot.py
|
||||||
|
# @Desc : 机械臂TCP通信、指令发送、状态查询、IO控制等核心功能
|
||||||
|
"""
|
||||||
|
import logging
|
||||||
|
from enum import Enum
|
||||||
|
from turtle import Turtle
|
||||||
|
import Constant
|
||||||
|
from COM_TCP import TCPClient
|
||||||
|
import queue
|
||||||
|
import json
|
||||||
|
from RobotModel import DataAddress, DATARequest, DATAReply, CMDInstructRequest, Instruction
|
||||||
|
|
||||||
|
# 视觉检测类型枚举:定义机器人视觉定位的两种模式,
|
||||||
|
class DetectType(Enum):
|
||||||
|
EyeOnHand = 0 # 手眼一体(相机安装在机器人末端执行器上,随机械臂移动)
|
||||||
|
EyeOutHand = 1 # 手眼分离(相机固定安装,不随机械臂移动)
|
||||||
|
|
||||||
|
"""
|
||||||
|
机械臂客户端核心类:继承自TCPClient基础TCP通信类,封装机械臂专属的业务逻辑与交互接口
|
||||||
|
核心职责:建立与机器人的TCP通信、发送控制指令、查询实时状态、控制IO口、处理紧急情况
|
||||||
|
继承TCPClient父类,继承父类的方法
|
||||||
|
"""
|
||||||
|
class RobotClient(TCPClient):
|
||||||
|
|
||||||
|
def __init__(self, ip, port, photo_locs,command_quene, status_model:DataAddress,con_ios, time_delay_take,time_delay_put,time_delay_shake,origin_position):
|
||||||
|
"""
|
||||||
|
:param ip: 机器人IP地址
|
||||||
|
:param port: 机器人端口号
|
||||||
|
:param photo_locs: 预设拍照点位列表
|
||||||
|
:param command_quene: 指令队列
|
||||||
|
:param status_model: 状态模型,用于存储和更新机器人的运行模型、位置、报警等状态
|
||||||
|
:param con_ios: 控制IO口列表
|
||||||
|
:param time_delay_take: 抓料延时参数(抓料动作执行后的等待时间,确保抓料机构稳定抓料物料,放置脱落)
|
||||||
|
:param time_delay_put: 放料延时参数(放料动作执行后的等待时间,确保放料机构稳定放置物料,放置脱落)
|
||||||
|
:param time_delay_shake: 抖动延时参数(抖料动作执行后的等待时间,确保摇摆机构稳定摇摆,放置脱落)
|
||||||
|
:param origin_position: 机器人原点坐标
|
||||||
|
"""
|
||||||
|
super().__init__(ip, port)
|
||||||
|
self.command_quene = command_quene
|
||||||
|
self.status_model = status_model
|
||||||
|
self.errorCommands = {}
|
||||||
|
|
||||||
|
self.photo_locs = photo_locs
|
||||||
|
self.con_ios = con_ios
|
||||||
|
|
||||||
|
self.time_delay_take= time_delay_take
|
||||||
|
self.time_delay_put = time_delay_put
|
||||||
|
self.time_delay_shake = time_delay_shake
|
||||||
|
|
||||||
|
self.type_detection = DetectType.EyeOutHand
|
||||||
|
self.origin_position = origin_position
|
||||||
|
|
||||||
|
# 运动速度配置属性
|
||||||
|
self.debug_speed = 10
|
||||||
|
self.feed_speed = 10
|
||||||
|
self.reset_speed = 10
|
||||||
|
|
||||||
|
self.max_angle_interval = 0 # 最大关节角度间隔
|
||||||
|
self.smooth = 0 # 运动光滑度参数
|
||||||
|
self.dynamic_height = 0 # 动态高度参数
|
||||||
|
|
||||||
|
def add_sendQuene(self,command): #后面 命令分等级,紧急命令直接执行
|
||||||
|
"""
|
||||||
|
指令入队方法:将封装好的机械臂指令添加到指令队列,等待后续发送执行
|
||||||
|
:param command: 待发送的机器人指令
|
||||||
|
"""
|
||||||
|
self.command_quene.put(command)
|
||||||
|
logging.INFO(f'{Constant.str_sys_command}{command}')
|
||||||
|
return
|
||||||
|
|
||||||
|
def send_Command(self):
|
||||||
|
"""
|
||||||
|
从指令队列中取出指令,通过TCP连接发送到机械臂,并接收响应结果
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if self.command_quene.qsize()!=0:
|
||||||
|
logging.INFO(f'robot-command:从队列获取命令')
|
||||||
|
command = self.command_quene.get()
|
||||||
|
|
||||||
|
logging.INFO(f'robot-command:{command}')
|
||||||
|
self.client_socket.send(command.encode())
|
||||||
|
|
||||||
|
response = self.client_socket.recv(1024).decode('utf-8')
|
||||||
|
|
||||||
|
if response:
|
||||||
|
logging.INFO(f'robot-command:{response},剩余:{self.command_quene.qsize()}条命令')
|
||||||
|
else:
|
||||||
|
logging.INFO(f'robot-command:无返回值,剩余:{self.command_quene.qsize()}条命令')
|
||||||
|
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logging.ERROR(Constant.str_tcp_robot_connect_fail)
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
def send_Status(self):
|
||||||
|
"""
|
||||||
|
状态查询:向机械臂发送状态查询请求,接收并解析实时状态数据,更新到status_model中
|
||||||
|
"""
|
||||||
|
request = DATARequest()
|
||||||
|
dataAddr = DataAddress()
|
||||||
|
request.queryAddr.append(dataAddr) # 将数据地址添加到查询请求中,明确查询的状态数据范围
|
||||||
|
|
||||||
|
request_status_json = request.toString()
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.client_socket.send(request_status_json.encode())
|
||||||
|
|
||||||
|
response = self.client_socket.recv(1024).decode('utf-8')
|
||||||
|
response_message = json.loads(response)
|
||||||
|
|
||||||
|
try:
|
||||||
|
data_status = DATAReply() # 用于封装机器人返回的状态数据
|
||||||
|
# 将JSON解析后的字典数据赋值给响应对象的属性,完成数据封装
|
||||||
|
data_status.__dict__ = response_message # {'cmdReply': ['AddRCC', 'ok'], 'dsID': 'www.hc-system.com.HCRemoteCommand', 'reqType': 'AddRCC'}
|
||||||
|
data_address_array = data_status.queryData # 提取全量状态数据数组
|
||||||
|
|
||||||
|
# 解析状态数据,更新到机械臂状态模型
|
||||||
|
self.status_model.curMode = int(data_address_array[0])
|
||||||
|
self.status_model.setPosition(*data_address_array[1:13])
|
||||||
|
self.status_model.curAlarm = int(data_address_array[13])
|
||||||
|
self.status_model.isMoving = int(data_address_array[14])
|
||||||
|
self.status_model.RemoteCmdLen = int(data_address_array[15])
|
||||||
|
self.status_model.toolCoord=int(data_address_array[16])
|
||||||
|
self.status_model.input_n = int(data_address_array[19])
|
||||||
|
self.status_model.output_n = int(data_address_array[20])
|
||||||
|
self.status_model.curSpeed_n = int(data_address_array[21])
|
||||||
|
|
||||||
|
except:
|
||||||
|
if response_message.keys().__contains__('cmdReply'): # 捕获状态解析异常,判断是否为命令回复响应,若是则直接返回,不记录错误
|
||||||
|
return
|
||||||
|
logging.ERROR(Constant.str_tcp_robot_data_error)
|
||||||
|
return True
|
||||||
|
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
logging.WARNING(Constant.str_sys_json_error+request_status_json)
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logging.ERROR(f'{e}{request_status_json}')
|
||||||
|
raise
|
||||||
|
|
||||||
|
def send_emergency_sound(self):
|
||||||
|
"""
|
||||||
|
紧急报警发声
|
||||||
|
"""
|
||||||
|
self.sendIOControl(Constant.IO_EmergencyPoint, 1)
|
||||||
|
|
||||||
|
def send_emergency_stop(self):
|
||||||
|
"""
|
||||||
|
紧急停止
|
||||||
|
"""
|
||||||
|
self.sendIOControl(Constant.IO_EmergencyPoint, 0)
|
||||||
|
|
||||||
|
def sendIOControl(self, IO_bit, IO_Status: int, delay=0, emptyList='0'):#
|
||||||
|
"""
|
||||||
|
发送IO口控制
|
||||||
|
:param IO_bit: IO口编号(指定需要控制的物理/虚拟IO口,对应机械臂的IO配置)
|
||||||
|
:param IO_Status: IO口目标状态(0/1)
|
||||||
|
:param delay: 延时时间
|
||||||
|
:param emptyList: 空指令列表
|
||||||
|
"""
|
||||||
|
|
||||||
|
IO_command = CMDInstructRequest()
|
||||||
|
IO_command.emptyList = emptyList
|
||||||
|
io_instruction = Instruction()
|
||||||
|
io_instruction.IO = True
|
||||||
|
io_instruction.io_status = IO_Status
|
||||||
|
io_instruction.delay = delay
|
||||||
|
io_instruction.point = IO_bit # {"dsID":"HCRemoteCommand","reqType":"AddRCC","emptyList":"1","instructions":[{"oneshot":"1","action":"200","type":"0","io_status":"1","point":"15","delay":"0"}]}
|
||||||
|
IO_command.dsID = 'HCRemoteCommand'
|
||||||
|
IO_command.instructions.append(io_instruction)
|
||||||
|
self.add_sendQuene(IO_command.toString())
|
||||||
|
logging.INFO(f'{Constant.str_feed_io_control}{IO_bit},{IO_Status}')
|
||||||
|
pass
|
||||||
|
|
||||||
|
def get_origin_position(self):
|
||||||
|
"""
|
||||||
|
获取机器人原点坐标
|
||||||
|
"""
|
||||||
|
return self.origin_position
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
95
robot/COM_TCP.py
Normal file
95
robot/COM_TCP.py
Normal file
@ -0,0 +1,95 @@
|
|||||||
|
#!/usr/bin/python3
|
||||||
|
"""
|
||||||
|
# @Time : 2025/12/12 11:05
|
||||||
|
# @Author : reenrr
|
||||||
|
# @File : COM_TCP.py
|
||||||
|
# @Desc : TCP客户端
|
||||||
|
"""
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import socket
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
import Constant
|
||||||
|
|
||||||
|
class TCPClient:
|
||||||
|
def __init__(self, ip, port):
|
||||||
|
# -------连接状态与错误统计属性-------
|
||||||
|
self.error_count=0 # 通信错误计数器(记录连续通信失败次数,用于触发重连逻辑)
|
||||||
|
self.connected = False # TCP连接状态
|
||||||
|
|
||||||
|
self.IPAddress = ip
|
||||||
|
self.port = port
|
||||||
|
|
||||||
|
self.thread_signal = True # 通信循环运行信号
|
||||||
|
self.client_socket = None # TCP套接字对象
|
||||||
|
|
||||||
|
def CreatConnect(self):
|
||||||
|
"""
|
||||||
|
建立TCP连接方法:创建并初始化TCP套接字,与目标服务端建立网络连接
|
||||||
|
若已有连接,先关闭旧连接再创建新连接,避免套接字资源泄露
|
||||||
|
"""
|
||||||
|
if self.client_socket:
|
||||||
|
self.client_socket.close()
|
||||||
|
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
self.client_socket.settimeout(5)
|
||||||
|
self.client_socket.connect((self.IPAddress, self.port))
|
||||||
|
|
||||||
|
|
||||||
|
def is_Connect(self):
|
||||||
|
"""
|
||||||
|
检测当前TCP连接是否有效
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# 发送空字节流,仅用于检测连接可用性
|
||||||
|
self.client_socket.send(b'')
|
||||||
|
return True
|
||||||
|
except OSError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
"""
|
||||||
|
维持TCP长连接,循环执行状态查询和执行发送,实现自动重连
|
||||||
|
"""
|
||||||
|
while self.thread_signal:
|
||||||
|
time.sleep(0.4) # 控制通讯循环频率,避免过于频繁
|
||||||
|
self.connected = (self.error_count <= 3)
|
||||||
|
|
||||||
|
try:
|
||||||
|
if self.send_Status() and self.send_Command():
|
||||||
|
self.error_count = 0
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logging.ERROR(f'COM_TCP: {str(e)}')
|
||||||
|
self.error_count += 1
|
||||||
|
|
||||||
|
# 触发自动重连
|
||||||
|
if self.error_count> 5:
|
||||||
|
print("Error: 机械臂控制程序中TCPClient is not connected")
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.CreatConnect()
|
||||||
|
logging.INFO(Constant.str_tcp_reconnect)
|
||||||
|
|
||||||
|
except OSError as e1:
|
||||||
|
# 捕获套接字特定异常(错误码10056:已存在连接,无法再次连接)
|
||||||
|
if e1.errno == 10056:
|
||||||
|
self.client_socket.close()
|
||||||
|
print("Error: 机械臂控制程序中TCPClient is not connected_1")
|
||||||
|
logging.ERROR(Constant.str_tcp_connect_error)
|
||||||
|
except Exception as e2:
|
||||||
|
print(e2)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self.thread_signal = False
|
||||||
|
self.client_socket.close()
|
||||||
|
|
||||||
|
def send_Command(self):
|
||||||
|
return False
|
||||||
|
|
||||||
|
def send_Status(self):
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
115
robot/Constant.py
Normal file
115
robot/Constant.py
Normal file
@ -0,0 +1,115 @@
|
|||||||
|
import os
|
||||||
|
|
||||||
|
# 调试变量
|
||||||
|
Debug = False # 控制不加图像的Fphoto False是不加
|
||||||
|
Debug1 = False # 打印很多日志节点
|
||||||
|
DebugPosition = False # 调试位置,关闭机器人和传感器及判断
|
||||||
|
# Debug2 = False
|
||||||
|
feedStatus = True #feedStatus的状态打印
|
||||||
|
|
||||||
|
IO_EmergencyPoint = 2
|
||||||
|
max_log_len = 100
|
||||||
|
bag_height = 10 # 一袋的高度
|
||||||
|
position_accuracy_action = 0.1 #动作时的位置精度6 这个精度要高 必须到位置才做动作
|
||||||
|
position_accuracy_command = 500 #命令时的位置精度
|
||||||
|
manual_adjust_accuracy = 1
|
||||||
|
# speed = 10
|
||||||
|
# shake_speed = 20
|
||||||
|
# debug_speed=10
|
||||||
|
# return_speed = 10
|
||||||
|
feedLine_set_section = 'FeedLine'
|
||||||
|
dropLine_set_section = 'DropLine'
|
||||||
|
position_set_section = 'Position'
|
||||||
|
reset_line_set_section = 'ResetLine'
|
||||||
|
feedLine_set_file = f'.{os.sep}Config{os.sep}FeedLine.ini'
|
||||||
|
dropLine_set_file = f'.{os.sep}Config{os.sep}drop.ini'
|
||||||
|
resetLine_set_file = f'.{os.sep}Config{os.sep}reset_line.ini'
|
||||||
|
MAX_Position_num = 40
|
||||||
|
MAX_Line_num = 10
|
||||||
|
set_ini = 'Seting.ini'
|
||||||
|
movie_moving = './Image/transport.gif'
|
||||||
|
mode_array = ["⽆", "⼿动模式", "⾃动模式", "停⽌模式", "", "", "","⾃动运⾏中", "单步", "单循环"]
|
||||||
|
|
||||||
|
log_file_path = './log/log.log'
|
||||||
|
feed_sign_path = './Image/wait.png'
|
||||||
|
|
||||||
|
|
||||||
|
str_feed_tiaoshi = '调试状态'
|
||||||
|
|
||||||
|
str_feed_start = '投料开始'
|
||||||
|
str_feed_check = '检测是否安全投料'
|
||||||
|
str_feed_photo = '移动到拍照位置'
|
||||||
|
str_feed_take = 'str_feed_take移动到抓料位置'
|
||||||
|
str_feed_pause = '投料暂停'
|
||||||
|
str_feed_continue = '投料继续'
|
||||||
|
str_feed_stop = '投料停止'
|
||||||
|
str_feed_feed = '移动到投料位置'
|
||||||
|
str_feed_mid = '移动到中位位置'
|
||||||
|
str_feed_safe = '移动到安全位置'
|
||||||
|
str_feed_safe_middle = '移动到安全中位位置'
|
||||||
|
str_feed_takePhoto = '拍照'
|
||||||
|
str_feed_broken = '移动到破袋位置'
|
||||||
|
str_feed_broken_bag = '划袋'
|
||||||
|
str_feed_drop = '移动到扔袋位置'
|
||||||
|
str_feed_drop_mid = '移动到码垛中间点位置'
|
||||||
|
str_feed_drop_reset = '移动到码垛复位位置'
|
||||||
|
str_feed_broken = '移动到破袋位置'
|
||||||
|
str_feed_takePhoto_fail = '识别图像失败'
|
||||||
|
str_image_model_fail = '摄像头识别料带失败'
|
||||||
|
str_feed_takePhoto_success = '识别图像成功'
|
||||||
|
str_feed_takePhoto_new_line = '新的一排袋识别'
|
||||||
|
str_feed_takePhoto_line = '一排袋最高的识别'
|
||||||
|
str_feed_takePhoto_front_finish = '零星袋完成'
|
||||||
|
str_feed_takePhoto_front = '零星袋识别'
|
||||||
|
str_feed_takePhoto_move = '移动到抓料位置'
|
||||||
|
str_feed_covert_success = '转换坐标成功'
|
||||||
|
str_feed_covert_fail = '转换坐标失败'
|
||||||
|
str_feed_error = '未知错误'
|
||||||
|
str_feed_none = '无'
|
||||||
|
str_feed_finish = '投料结束'
|
||||||
|
str_feed_take_success = '抓料成功'
|
||||||
|
str_feed_take_fail = '抓料失败'
|
||||||
|
str_feed_feed_num = '已码垛数量:'
|
||||||
|
str_feed_zip_bag = '移动到压缩袋位置'
|
||||||
|
str_feed_photo_error_msgbox = '请重新摆放料带后再关闭此窗口'
|
||||||
|
str_feed_photo_confirm = '确认摆好'
|
||||||
|
str_feed_io_control = '发送IO控制: '
|
||||||
|
str_feed_safe_error_msgbox = '未在安全位置,请先复位!'
|
||||||
|
str_feed_shake = '摇摆'
|
||||||
|
str_feed_start_error = '请先复位到原点'
|
||||||
|
str_feed_return_original_position_fail = '机械臂未按照实际路线点移动,未能寻找到适配路径点位'
|
||||||
|
str_feed_reset_no_line_error = '未定义的线段'
|
||||||
|
str_feed_angle_error = '角度差距过大,停止运行'
|
||||||
|
str_feed_reset_start = '开始复位'
|
||||||
|
str_feed_reverse = '复位成功'
|
||||||
|
str_sys_start = '进入系统'
|
||||||
|
str_sys_exit = '退出系统'
|
||||||
|
str_sys_switch_tool = '切换到工具坐标'
|
||||||
|
str_sys_start_tool = '切换到自动运行状态'
|
||||||
|
str_sys_clearAlarm = '清除报警'
|
||||||
|
str_sys_setSpeed = '设置速度'
|
||||||
|
str_sys_manualPosition = '手动移动'
|
||||||
|
str_sys_setFeedNum = '设置码垛开始包数'
|
||||||
|
str_sys_feedNum_sub = '减少投料次数'
|
||||||
|
str_sys_feedNum_add = '增加投料次数'
|
||||||
|
str_sys_log_feedNum ='记录袋数失败'
|
||||||
|
str_sys_emergencyStop = '按下急停'
|
||||||
|
str_sys_set_error = '保存设置失败'
|
||||||
|
str_sys_log_IO_error = 'IO更新失败'
|
||||||
|
str_sys_log_alarm_error = '发生报警:'
|
||||||
|
str_sys_log_move_error = '请填写全部坐标'
|
||||||
|
str_sys_set_position_error = '未选中行'
|
||||||
|
str_sys_command = '发送命令'
|
||||||
|
str_tcp_robot_connect_fail = '连接失败'
|
||||||
|
str_tcp_robot_connect_success = '连接成功'
|
||||||
|
str_tcp_robot_data_error = '数据解析错误'
|
||||||
|
str_tcp_connect_no_reply = '无回复'
|
||||||
|
str_tcp_connect_error = 'tcp连接错误'
|
||||||
|
str_tcp_reconnect = '重连中'
|
||||||
|
str_sys_json_error = 'json解析错误'
|
||||||
|
|
||||||
|
|
||||||
|
str_sys_runing = '运行到这里了!'
|
||||||
|
str_sys_runing1 = '运行到这里2! '
|
||||||
|
str_sys_runing2 = '执行完成FPhoto'
|
||||||
|
str_sys_runing3 = '运行到这里3'
|
||||||
133
robot/Position.py
Normal file
133
robot/Position.py
Normal file
@ -0,0 +1,133 @@
|
|||||||
|
import math
|
||||||
|
|
||||||
|
from Constant import position_accuracy_command
|
||||||
|
from Constant import position_accuracy_action
|
||||||
|
from Constant import DebugPosition
|
||||||
|
class Position:
|
||||||
|
def __init__(self):
|
||||||
|
self.X = 0.0
|
||||||
|
self.Y = 0.0
|
||||||
|
self.Z = 0.0
|
||||||
|
self.U = 0.0
|
||||||
|
self.V = 0.0
|
||||||
|
self.W = 0.0
|
||||||
|
self.Axis_0 = 0.0
|
||||||
|
self.Axis_1 = 0.0
|
||||||
|
self.Axis_2 = 0.0
|
||||||
|
self.Axis_3 = 0.0
|
||||||
|
self.Axis_4 = 0.0
|
||||||
|
self.Axis_5 = 0.0
|
||||||
|
#点位类型 1世界坐标(默认)4关节坐标
|
||||||
|
self.position_type =1
|
||||||
|
|
||||||
|
self.a = 0.0
|
||||||
|
self.b = 0.0
|
||||||
|
self.c = 0.0
|
||||||
|
|
||||||
|
|
||||||
|
def compare(self,position,is_action=False):
|
||||||
|
if DebugPosition:
|
||||||
|
return True
|
||||||
|
#点位类型 1世界坐标(默认)4关节坐标
|
||||||
|
if(position.position_type==4):
|
||||||
|
return self._compare_joint(position,is_action)
|
||||||
|
else:
|
||||||
|
return self._compare_world(position,is_action)
|
||||||
|
|
||||||
|
def _compare_world(self,position,is_action=False):
|
||||||
|
"""
|
||||||
|
世界坐标比较
|
||||||
|
:param position:
|
||||||
|
:return:精度内TRUE,否则为FALSE
|
||||||
|
"""
|
||||||
|
|
||||||
|
distance = math.sqrt((self.X - position.X) ** 2 +
|
||||||
|
(self.Y - position.Y) ** 2 +
|
||||||
|
(self.Z - position.Z) ** 2 )
|
||||||
|
if distance<=(position_accuracy_action if is_action else position_accuracy_command):
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _compare_joint(self,position,is_action=False):
|
||||||
|
"""
|
||||||
|
关节坐标比较
|
||||||
|
:param position:
|
||||||
|
:return:精度内TRUE,否则为FALSE
|
||||||
|
"""
|
||||||
|
distance = math.sqrt((self.Axis_0 - position.X) ** 2 +
|
||||||
|
(self.Axis_1 - position.Y) ** 2 +
|
||||||
|
(self.Axis_2 - position.Z) ** 2 +
|
||||||
|
(self.Axis_3 - position.U) ** 2 )
|
||||||
|
if distance<=(position_accuracy_action if is_action else position_accuracy_command):
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# def compare(self,position):
|
||||||
|
# if self.X-position.X<position_accuracy and \
|
||||||
|
# self.Y-position.Y<position_accuracy and \
|
||||||
|
# self.Z - position.Z < position_accuracy and \
|
||||||
|
# self.U - position.U < position_accuracy and \
|
||||||
|
# self.V - position.V < position_accuracy and \
|
||||||
|
# self.W - position.W < position_accuracy:
|
||||||
|
# return True
|
||||||
|
# else:
|
||||||
|
# return False
|
||||||
|
# pass
|
||||||
|
|
||||||
|
def is_error_angel_move(self,position,interval):
|
||||||
|
if self.X - position.X >= interval or \
|
||||||
|
self.Y - position.Y >= interval or \
|
||||||
|
self.Z - position.Z >= interval or \
|
||||||
|
self.U - position.U >= interval or \
|
||||||
|
self.V - position.V >= interval or \
|
||||||
|
self.W - position.W >= interval:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
pass
|
||||||
|
"""
|
||||||
|
摄像头识别位置和角度
|
||||||
|
"""
|
||||||
|
class Detection_Position(Position):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__()
|
||||||
|
|
||||||
|
def init_position(self,X,Y,Z,U,V,W):
|
||||||
|
self.X = X
|
||||||
|
self.Y = Y
|
||||||
|
self.Z = Z
|
||||||
|
self.U = U
|
||||||
|
self.V = V
|
||||||
|
self.W = W
|
||||||
|
|
||||||
|
|
||||||
|
class Real_Position(Position):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__()
|
||||||
|
|
||||||
|
def init_position(self, X, Y, Z, U, V, W):
|
||||||
|
self.X = X
|
||||||
|
self.Y = Y
|
||||||
|
self.Z = Z
|
||||||
|
self.U = U
|
||||||
|
self.V = V
|
||||||
|
self.W = W
|
||||||
|
return self
|
||||||
|
|
||||||
|
def init_position_joint_and_world(self, X, Y, Z, U, V, W,Axis_0,Axis_1,Axis_2,Axis_3,Axis_4,Axis_5):
|
||||||
|
self.init_position(X,Y,Z,U,V,W)
|
||||||
|
self.Axis_0 = Axis_0
|
||||||
|
self.Axis_1 = Axis_1
|
||||||
|
self.Axis_2 = Axis_2
|
||||||
|
self.Axis_3 = Axis_3
|
||||||
|
self.Axis_4 = Axis_4
|
||||||
|
self.Axis_5 = Axis_5
|
||||||
|
return self
|
||||||
|
|
||||||
|
# def init_position(self, position):
|
||||||
|
# return self.init_position(position.X,position.Y,position.Z,position.U,position.V,position.W)
|
||||||
|
|
||||||
|
def to_string(self):
|
||||||
|
return "X:{:.3f},Y:{:.3f},Z:{:.3f},U:{:.3f},V:{:.3f},W:{:.3f}".format(self.X,self.Y,self.Z,self.U,self.V,self.W)
|
||||||
212
robot/RobotModel.py
Normal file
212
robot/RobotModel.py
Normal file
@ -0,0 +1,212 @@
|
|||||||
|
#!/usr/bin/python3
|
||||||
|
"""
|
||||||
|
# @Time : 2025/12/12 11:05
|
||||||
|
# @Author : reenrr
|
||||||
|
# @File : RobotModel.py
|
||||||
|
# @Desc :
|
||||||
|
"""
|
||||||
|
from enum import Enum
|
||||||
|
from Position import Real_Position
|
||||||
|
|
||||||
|
|
||||||
|
class MoveType(Enum):
|
||||||
|
AXIS = 4 # 关节轴运动模式
|
||||||
|
WORLD = 10 # 世界坐标系运动模式
|
||||||
|
Cure = 17 # 曲线运动模式
|
||||||
|
|
||||||
|
class DATARequest:
|
||||||
|
def __init__(self):
|
||||||
|
"""
|
||||||
|
初始化查询请求的核心协议参数,符合机械臂通信协议要求
|
||||||
|
"""
|
||||||
|
self.dsID = 'www.hc-system.com.RemoteMonitor' # 设备服务ID
|
||||||
|
self.reqType = 'query' # 请求类型
|
||||||
|
self.queryAddr = [] # 查询地址列表
|
||||||
|
|
||||||
|
def toString(self):
|
||||||
|
"""
|
||||||
|
将DATARequest对象转换为符合JSON格式的字符串(符合机械臂通信协议)
|
||||||
|
"""
|
||||||
|
model_str = '{'+f'"dsID":"{self.dsID}","reqType":"{self.reqType}","queryAddr":[' \
|
||||||
|
f'{self.queryAddr[0].toString()}]'+'}'
|
||||||
|
return model_str
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class DataAddress:
|
||||||
|
def __init__(self):
|
||||||
|
# 基础配置属性
|
||||||
|
self.version = ''
|
||||||
|
self.curMold = ''
|
||||||
|
self.counterList = ''
|
||||||
|
self.counter_n = ''
|
||||||
|
self.curMode = 0
|
||||||
|
self.boardIONum = ''
|
||||||
|
|
||||||
|
# IO口状态属性
|
||||||
|
self.input_n = ''
|
||||||
|
self.output_n = 0
|
||||||
|
|
||||||
|
# 运动轴配置属性
|
||||||
|
self.axisNum = '6'
|
||||||
|
|
||||||
|
self.axis_n = ''
|
||||||
|
self.world_0 = 0 # 世界坐标系X轴坐标
|
||||||
|
self.world_1 = 0
|
||||||
|
self.world_2 = 0
|
||||||
|
self.world_3 = 0
|
||||||
|
self.world_4 = 0
|
||||||
|
self.world_5 = 0
|
||||||
|
self.axis_0 = 0
|
||||||
|
self.axis_1 = 0 # 1轴关节角度
|
||||||
|
self.axis_2 = 0
|
||||||
|
self.axis_3 = 0
|
||||||
|
self.axis_4 = 0
|
||||||
|
self.axis_5 = 0
|
||||||
|
|
||||||
|
self.curAlarm = 0
|
||||||
|
self.curCycle = ''
|
||||||
|
self.lastCycle = ''
|
||||||
|
self.machineName = ''
|
||||||
|
self.curTorque_n = ''
|
||||||
|
self.curSpeed_n = ''
|
||||||
|
self.curAccount = ''
|
||||||
|
self.origin = ''
|
||||||
|
self.moldList = ''
|
||||||
|
self.isMoving = False
|
||||||
|
self.M_n = ''
|
||||||
|
self.toolCoord=0
|
||||||
|
self.RemoteCmdLen = 0
|
||||||
|
|
||||||
|
def setPosition(self,w0,w1,w2,w3,w4,w5,a0,a1,a2,a3,a4,a5):
|
||||||
|
"""
|
||||||
|
位置坐标设置方法:批量更新世界坐标系和关节轴坐标系的位置数据,封装位置更新逻辑
|
||||||
|
"""
|
||||||
|
self.world_0 = float(w0)
|
||||||
|
self.world_1 = float(w1)
|
||||||
|
self.world_2 = float(w2)
|
||||||
|
self.world_3 = float(w3)
|
||||||
|
self.world_4 = float(w4)
|
||||||
|
self.world_5 = float(w5)
|
||||||
|
|
||||||
|
self.axis_0 = float(a0)
|
||||||
|
self.axis_1 = float(a1)
|
||||||
|
self.axis_2 = float(a2)
|
||||||
|
self.axis_3 = float(a3)
|
||||||
|
self.axis_4 = float(a4)
|
||||||
|
self.axis_5 = float(a5)
|
||||||
|
|
||||||
|
def getRealPosition(self):
|
||||||
|
"""
|
||||||
|
提取世界坐标系位置数据,封装成Real_Position对象
|
||||||
|
:return: Real_Position对象
|
||||||
|
"""
|
||||||
|
real_position = Real_Position().init_position(self.world_0,self.world_1,self.world_2,self.world_3,self.world_4,self.world_5)
|
||||||
|
return real_position
|
||||||
|
|
||||||
|
def getAnglePosition(self):
|
||||||
|
"""
|
||||||
|
提取关节轴位置数据,封装成Real_Position对象
|
||||||
|
:return: Real_Position对象
|
||||||
|
"""
|
||||||
|
real_position = Real_Position().init_position(self.axis_0,self.axis_1,self.axis_2,self.axis_3,self.axis_4,self.axis_5)
|
||||||
|
return real_position
|
||||||
|
|
||||||
|
def get_IO_bits(self):
|
||||||
|
"""
|
||||||
|
提取IO口状态数据,封装成IO_bits数组
|
||||||
|
"""
|
||||||
|
io_bits_str = format(self.output_n, '032b')[::-1] # 将output_n转换为32位二进制字符串(不足32位补0),然后反转字符串(低位对应低编号IO口)
|
||||||
|
io_bits_arry = [bit == '1' for bit in io_bits_str] # 转换为布尔数组,标识每个IO口的通断状态
|
||||||
|
return io_bits_arry
|
||||||
|
|
||||||
|
def setAngle(self,a0,a1,a2,a3,a4,a5):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def toString(self):
|
||||||
|
model_str = f'"curMode",' \
|
||||||
|
f'"world-0","world-1","world-2","world-3","world-4","world-5","axis-0","axis-1","axis-2","axis-3","axis-4","axis-5",' \
|
||||||
|
f'"curAlarm","isMoving","RemoteCmdLen","toolCoord","input-n","output-n","curSpeed-n"'
|
||||||
|
return model_str
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class DATAReply:
|
||||||
|
def __init__(self):
|
||||||
|
self.dsID = ''
|
||||||
|
self.reqType = ''
|
||||||
|
self.queryData = []
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def JsonToObject(self):
|
||||||
|
return
|
||||||
|
|
||||||
|
class CMDReply:
|
||||||
|
def __init__(self):
|
||||||
|
self.dsID = 'www.hc-system.com.RemoteMonitor'
|
||||||
|
self.reqType = 'command'
|
||||||
|
self.cmdData = []
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
class Instruction:
|
||||||
|
def __init__(self):
|
||||||
|
self.oneshot = 1
|
||||||
|
self.action = 4 #4 自由路径 10 姿势直线 17 姿势曲线
|
||||||
|
self.m0 = 0.0
|
||||||
|
self.m1 = 0.0
|
||||||
|
self.m2 = 0.0
|
||||||
|
self.m3 = 0.0
|
||||||
|
self.m0_p = 0.0
|
||||||
|
self.m1_p = 0.0
|
||||||
|
self.m2_p = 0.0
|
||||||
|
self.m3_p = 0.0
|
||||||
|
self.m4_p = 0.0
|
||||||
|
self.m5_p = 0.0
|
||||||
|
self.ckStatus = '0x3F'
|
||||||
|
self.speed = 10
|
||||||
|
self.smooth = 0
|
||||||
|
self.tool=2
|
||||||
|
self.IO = False
|
||||||
|
self.type = 0
|
||||||
|
self.io_status=1
|
||||||
|
self.point = 0
|
||||||
|
self.delay = 0
|
||||||
|
|
||||||
|
def toString(self):
|
||||||
|
if not self.IO :
|
||||||
|
model_str = f'"oneshot":"{self.oneshot}","action":"{self.action}","m0":"{self.m0}","m1":"{self.m1}","m2":"{self.m2}",' \
|
||||||
|
f'"m3":"{self.m3}","ckStatus":"{self.ckStatus}","speed":"{self.speed}",' \
|
||||||
|
f'"delay":"{self.delay}","smooth":"{self.smooth}","tool":"{self.tool}"'
|
||||||
|
if self.action == 17:
|
||||||
|
model_str = f'"oneshot":"{self.oneshot}","action":"{self.action}","m0":"{self.m0}","m1":"{self.m1}","m2":"{self.m2}",' \
|
||||||
|
f'"m3":"{self.m3}","m0_p":"{self.m0_p}","m1_p":"{self.m1_p}","m2_p":"{self.m2_p}",' \
|
||||||
|
f'"m3_p":"{self.m3_p}","ckStatus":"{self.ckStatus}","speed":"{self.speed}",' \
|
||||||
|
f'"delay":"{self.delay}","smooth":"{self.smooth}","tool":"{self.tool}"'
|
||||||
|
else:
|
||||||
|
model_str = f'"oneshot":"{self.oneshot}","action":"{200}","type":"{self.type}","io_status":"{self.io_status}"' \
|
||||||
|
f',"point":"{self.point}","delay":"{self.delay}"'
|
||||||
|
return model_str
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class CMDInstructRequest:
|
||||||
|
def __init__(self):
|
||||||
|
self.dsID = 'www.hc-system.com.HCRemoteCommand'
|
||||||
|
self.reqType = "AddRCC"
|
||||||
|
self.emptyList = '0' #清空机械臂的远程命令数据
|
||||||
|
self.instructions = []
|
||||||
|
|
||||||
|
def toString(self):
|
||||||
|
model_str = '{'+f'"dsID":"{self.dsID}","reqType":"{self.reqType}","emptyList":"{self.emptyList}"'
|
||||||
|
if len(self.instructions) != 0:
|
||||||
|
model_str = model_str+',"instructions":'+"[{"+self.instructions[0].toString()+"}]"+"}"
|
||||||
|
else:
|
||||||
|
model_str = model_str+',"instructions":'+"[]"+"}" #model_str+"}"
|
||||||
|
return model_str
|
||||||
|
|
||||||
|
|
||||||
157
robot/util_time.py
Normal file
157
robot/util_time.py
Normal file
@ -0,0 +1,157 @@
|
|||||||
|
# !/usr/bin/python3
|
||||||
|
"""
|
||||||
|
# @Time : 2025/12/12 11:05
|
||||||
|
# @Author : reenrr
|
||||||
|
# @File : util_time.py
|
||||||
|
# @Desc :
|
||||||
|
"""
|
||||||
|
import time
|
||||||
|
|
||||||
|
class MyTimer:
|
||||||
|
@staticmethod
|
||||||
|
def gMyGetTickCount():
|
||||||
|
"""
|
||||||
|
获取当前系统时间的毫秒级时间戳
|
||||||
|
"""
|
||||||
|
ts = time.time()
|
||||||
|
return int(ts * 1000) # Convert to milliseconds
|
||||||
|
|
||||||
|
# CTon class equivalent in Python
|
||||||
|
class CTon:
|
||||||
|
def __init__(self):
|
||||||
|
self.m_unET = 0
|
||||||
|
self.m_bLastI = False
|
||||||
|
self.m_bIn = False
|
||||||
|
self.m_bPause = False
|
||||||
|
self.m_bOver = True
|
||||||
|
self.m_unPT = 0
|
||||||
|
self.m_unStartTime = 0
|
||||||
|
self.m_unPauseET = 0
|
||||||
|
|
||||||
|
def GetET(self):
|
||||||
|
"""
|
||||||
|
获取当前已流逝的延时时间
|
||||||
|
"""
|
||||||
|
if self.m_bIn:
|
||||||
|
nET = self.m_unPT + (self.m_unStartTime - MyTimer.gMyGetTickCount())
|
||||||
|
return max(nET, 0)
|
||||||
|
else:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
def SetReset(self):
|
||||||
|
"""
|
||||||
|
复位TON所有状态属性,终止当前延时流程,恢复到初始状态,为下一次延时流程做准备
|
||||||
|
"""
|
||||||
|
self.m_bIn = False
|
||||||
|
self.m_bLastI = False
|
||||||
|
self.m_bPause = False
|
||||||
|
|
||||||
|
def SetPause(self, value):
|
||||||
|
if self.m_bIn:
|
||||||
|
self.m_bPause = value
|
||||||
|
if self.m_bPause:
|
||||||
|
self.m_unPauseET = MyTimer.gMyGetTickCount() - self.m_unStartTime
|
||||||
|
|
||||||
|
def SetOver(self, value):
|
||||||
|
self.m_bOver = value
|
||||||
|
|
||||||
|
def GetStartTime(self):
|
||||||
|
return self.m_unStartTime
|
||||||
|
|
||||||
|
def Q(self, value_i, value_pt):
|
||||||
|
self.m_bIn = value_i
|
||||||
|
self.m_unPT = value_pt
|
||||||
|
un_tick = MyTimer.gMyGetTickCount()
|
||||||
|
|
||||||
|
if self.m_bOver and self.m_bIn:
|
||||||
|
self.m_unStartTime = un_tick - self.m_unPT
|
||||||
|
self.m_bOver = False
|
||||||
|
|
||||||
|
if self.m_bPause and self.m_bIn:
|
||||||
|
self.m_unStartTime = un_tick - self.m_unPauseET
|
||||||
|
|
||||||
|
if self.m_bIn != self.m_bLastI:
|
||||||
|
self.m_bLastI = self.m_bIn
|
||||||
|
if self.m_bIn:
|
||||||
|
self.m_unStartTime = un_tick
|
||||||
|
self.m_bPause = False
|
||||||
|
|
||||||
|
return self.m_bIn and (un_tick >= (self.m_unStartTime + self.m_unPT))
|
||||||
|
|
||||||
|
# CClockPulse class equivalent in Python
|
||||||
|
class CClockPulse:
|
||||||
|
def __init__(self):
|
||||||
|
self.m_bFirstOut = True
|
||||||
|
self.m_bTonAOut = False
|
||||||
|
self.m_bTonBOut = False
|
||||||
|
self.m_cTonA = CTon()
|
||||||
|
self.m_cTonB = CTon()
|
||||||
|
|
||||||
|
def Q(self, value_i, run_time, stop_time):
|
||||||
|
if self.m_bFirstOut:
|
||||||
|
self.m_bTonAOut = self.m_cTonA.Q(not self.m_bTonBOut and value_i, run_time)
|
||||||
|
self.m_bTonBOut = self.m_cTonB.Q(self.m_bTonAOut and value_i, stop_time)
|
||||||
|
return not self.m_bTonAOut and value_i
|
||||||
|
else:
|
||||||
|
self.m_bTonAOut = self.m_cTonA.Q(not self.m_bTonBOut and value_i, stop_time)
|
||||||
|
self.m_bTonBOut = self.m_cTonB.Q(self.m_bTonAOut and value_i, run_time)
|
||||||
|
return self.m_bTonAOut and value_i
|
||||||
|
|
||||||
|
# CDelayOut class equivalent in Python
|
||||||
|
class CDelayOut:
|
||||||
|
def __init__(self):
|
||||||
|
self.m_cOutTon = CTon()
|
||||||
|
self.m_cmWaitTon = CTon()
|
||||||
|
|
||||||
|
def Reset(self):
|
||||||
|
self.m_cOutTon.SetReset()
|
||||||
|
self.m_cmWaitTon.SetReset()
|
||||||
|
|
||||||
|
def Q(self, value_i, wait_time, out_time):
|
||||||
|
if self.m_cmWaitTon.Q(value_i, wait_time):
|
||||||
|
if self.m_cOutTon.Q(True, out_time):
|
||||||
|
self.m_cOutTon.SetReset()
|
||||||
|
self.m_cmWaitTon.SetReset()
|
||||||
|
value_i = False
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
# CRisOrFall class equivalent in Python
|
||||||
|
class CRisOrFall:
|
||||||
|
def __init__(self):
|
||||||
|
self.m_bTemp = False
|
||||||
|
|
||||||
|
def Q(self, value_i, ris_or_fall):
|
||||||
|
result = False
|
||||||
|
if value_i != self.m_bTemp:
|
||||||
|
if ris_or_fall and value_i: # Rising edge
|
||||||
|
result = True
|
||||||
|
if not ris_or_fall and not value_i: # Falling edge
|
||||||
|
result = True
|
||||||
|
self.m_bTemp = value_i
|
||||||
|
return result
|
||||||
|
|
||||||
|
# CTof class equivalent in Python
|
||||||
|
class CTof:
|
||||||
|
def __init__(self):
|
||||||
|
self.m_cDelayTon = CTon()
|
||||||
|
self.m_bValue = False
|
||||||
|
self.m_cRis = CRisOrFall()
|
||||||
|
|
||||||
|
def SetReset(self):
|
||||||
|
self.m_bValue = False
|
||||||
|
self.m_cDelayTon.SetReset()
|
||||||
|
|
||||||
|
def Q(self, value_i, delay_time):
|
||||||
|
if self.m_cRis.Q(value_i, False):
|
||||||
|
self.m_cDelayTon.SetReset()
|
||||||
|
self.m_bValue = True
|
||||||
|
if self.m_cDelayTon.Q(self.m_bValue, delay_time):
|
||||||
|
self.m_bValue = False
|
||||||
|
self.m_cDelayTon.SetReset()
|
||||||
|
return value_i or self.m_bValue
|
||||||
|
|
||||||
|
# Utility function
|
||||||
|
def gGetNowTime():
|
||||||
|
return int(time.time())
|
||||||
Reference in New Issue
Block a user