Files
AutoControlSystem-G/COM/COM_Robot.py
2026-01-16 15:21:54 +08:00

142 lines
5.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
from enum import Enum
from turtle import Turtle
import Constant
from COM.COM_TCP import TCPClient
import queue
import json
from Model.RobotModel import DataAddress, DATARequest, DATAReply, CMDInstructRequest, Instruction
from Util.util_log import log
from Util.util_math import is_bit_set
#from numpy.array_api import trunc
class DetectType(Enum):
EyeOnHand = 0
EyeOutHand = 1
class RobotClient(TCPClient):
def __init__(self, ip, port, photo_locs,command_quene, status_model:DataAddress,con_ios, time_delay_take,time_delay_put,time_delay_shake,origin_position):
super().__init__(ip, port)
self.command_quene = command_quene
self.status_model = status_model
self.errorCommands = {}
self.photo_locs = photo_locs
self.con_ios = con_ios
self.time_delay_take= time_delay_take
self.time_delay_put = time_delay_put
self.time_delay_shake = time_delay_shake
self.type_detection = DetectType.EyeOutHand
self.origin_position = origin_position
self.debug_speed = 10
self.feed_speed = 10
self.reset_speed = 10
self.max_angle_interval = 0
self.smooth = 0
self.dynamic_height = 0
def add_sendQuene(self,command): #后面 命令分等级,紧急命令直接执行
self.command_quene.put(command)
log.log_message(logging.INFO, f'{Constant.str_sys_command}{command}')
return
def send_Command(self):
try:
if self.command_quene.qsize()!=0:
# log.log_message(logging.INFO, f'robot-command:从队列获取命令')
command = self.command_quene.get()
#log.log_message(logging.INFO, f'robot-command:{command}')
self.client_socket.send(command.encode())
if True:
response = self.client_socket.recv(1024).decode('utf-8')
# response_message = json.loads(response)
if response:
log.log_message(logging.INFO, f'robot-command:{response},剩余:{self.command_quene.qsize()}条命令')
else:
log.log_message(logging.INFO, f'robot-command:无返回值,剩余:{self.command_quene.qsize()}条命令')
return True
else:
return True
except Exception as e:
log.log_message(logging.ERROR, Constant.str_tcp_robot_connect_fail)
raise
def send_Status(self):
request = DATARequest()
dataAddr = DataAddress()
request.queryAddr.append(dataAddr)
# 移除特殊属性和方法
request_status_json = request.toString()
# 转字符串
try:
self.client_socket.send(request_status_json.encode())
if True:
response = self.client_socket.recv(1024).decode('utf-8')
response_message = json.loads(response)
if True:
try:
data_status = DATAReply()
# if response_message.keys().contains('cmdReply'):
# return
data_status.__dict__ = response_message # {'cmdReply': ['AddRCC', 'ok'], 'dsID': 'www.hc-system.com.HCRemoteCommand', 'reqType': 'AddRCC'}
data_address_array = data_status.queryData
self.status_model.curMode = int(data_address_array[0])
self.status_model.setPosition(*data_address_array[1:13])
self.status_model.curAlarm = int(data_address_array[13])
self.status_model.isMoving = int(data_address_array[14])
self.status_model.RemoteCmdLen = int(data_address_array[15])
self.status_model.toolCoord=int(data_address_array[16])
self.status_model.input_n = int(data_address_array[19])
self.status_model.output_n = int(data_address_array[20])
self.status_model.curSpeed_n = int(data_address_array[21])
except:
if response_message.keys().__contains__('cmdReply'):
return
log.log_message(logging.ERROR,Constant.str_tcp_robot_data_error)
return True
except json.JSONDecodeError as e:
log.log_message(logging.WARNING,Constant.str_sys_json_error+request_status_json)
return True
except Exception as e:
log.log_message(logging.ERROR,f'{e}{request_status_json}')
raise
def send_emergency_sound(self):
self.sendIOControl(Constant.IO_EmergencyPoint, 1)
def send_emergency_stop(self):
self.sendIOControl(Constant.IO_EmergencyPoint, 0)
def sendIOControl(self, IO_bit, IO_Status: int, delay=0,emptyList='0'):#
IO_command = CMDInstructRequest()
IO_command.emptyList = emptyList
io_instruction = Instruction()
io_instruction.IO = True
io_instruction.io_status = IO_Status
io_instruction.delay = delay
io_instruction.point = IO_bit # {"dsID":"HCRemoteCommand","reqType":"AddRCC","emptyList":"1","instructions":[{"oneshot":"1","action":"200","type":"0","io_status":"1","point":"15","delay":"0"}]}
IO_command.dsID = 'HCRemoteCommand'
IO_command.instructions.append(io_instruction)
self.add_sendQuene(IO_command.toString())
log.log_message(logging.INFO, f'{Constant.str_feed_io_control}{IO_bit}{IO_Status}')
pass
def check_outputQ(self,IO_bit):
if is_bit_set(self.status_model.output_n, IO_bit):
return True
else:
return False
def get_origin_position(self):
return self.origin_position
pass