Files
AutoControlSystem-G/COM/COM_Robot.py

138 lines
5.3 KiB
Python
Raw Normal View History

2025-07-29 13:16:30 +08:00
import logging
from enum import Enum
from turtle import Turtle
from numpy.array_api import trunc
import Constant
from COM.COM_TCP import TCPClient
import queue
import json
from Model.RobotModel import DataAddress, DATARequest, DATAReply, CMDInstructRequest, Instruction
from Util.util_log import log
from Util.util_math import is_bit_set
class DetectType(Enum):
EyeOnHand = 0
EyeOutHand = 1
class RobotClient(TCPClient):
def __init__(self, ip, port, photo_locs,command_quene, status_model:DataAddress,con_ios, time_delay_take,time_delay_put,time_delay_shake,origin_position):
super().__init__(ip, port)
self.command_quene = command_quene
self.status_model = status_model
self.errorCommands = {}
self.photo_locs = photo_locs
self.con_ios = con_ios
self.time_delay_take= time_delay_take
self.time_delay_put = time_delay_put
self.time_delay_shake = time_delay_shake
self.type_detection = DetectType.EyeOutHand
self.origin_position = origin_position
self.debug_speed = 10
self.feed_speed = 10
self.reset_speed = 10
self.max_angle_interval = 0
self.smooth = 0
self.dynamic_height = 0
def add_sendQuene(self,command): #后面 命令分等级,紧急命令直接执行
self.command_quene.put(command)
log.log_message(logging.INFO, f'{Constant.str_sys_command}{command}')
return
def send_Command(self):
try:
if self.command_quene.qsize()!=0:
command = self.command_quene.get()
self.client_socket.send(command.encode())
if True:
response = self.client_socket.recv(1024).decode('utf-8')
# response_message = json.loads(response)
return True
else:
return True
except Exception as e:
log.log_message(logging.ERROR, Constant.str_tcp_robot_connect_fail)
raise
def send_Status(self):
request = DATARequest()
dataAddr = DataAddress()
request.queryAddr.append(dataAddr)
# 移除特殊属性和方法
request_status_json = request.toString()
# 转字符串
try:
self.client_socket.send(request_status_json.encode())
if True:
response = self.client_socket.recv(1024).decode('utf-8')
response_message = json.loads(response)
if True:
try:
data_status = DATAReply()
# if response_message.keys().contains('cmdReply'):
# return
data_status.__dict__ = response_message # {'cmdReply': ['AddRCC', 'ok'], 'dsID': 'www.hc-system.com.HCRemoteCommand', 'reqType': 'AddRCC'}
data_address_array = data_status.queryData
self.status_model.curMode = int(data_address_array[0])
self.status_model.setPosition(*data_address_array[1:13])
self.status_model.curAlarm = int(data_address_array[13])
self.status_model.isMoving = int(data_address_array[14])
self.status_model.RemoteCmdLen = int(data_address_array[15])
self.status_model.toolCoord=int(data_address_array[16])
self.status_model.input_n = int(data_address_array[19])
self.status_model.output_n = int(data_address_array[20])
self.status_model.curSpeed_n = int(data_address_array[21])
except:
if response_message.keys().__contains__('cmdReply'):
return
log.log_message(logging.ERROR,Constant.str_tcp_robot_data_error)
return True
except json.JSONDecodeError as e:
log.log_message(logging.WARNING,Constant.str_sys_json_error+response)
return True
except Exception as e:
log.log_message(logging.ERROR,f'{e}{response}')
raise
def send_emergency_sound(self):
self.sendIOControl(Constant.IO_EmergencyPoint, 1)
def send_emergency_stop(self):
self.sendIOControl(Constant.IO_EmergencyPoint, 0)
def sendIOControl(self, IO_bit, IO_Status: int, delay=0,emptyList='0'):#
IO_command = CMDInstructRequest()
IO_command.emptyList = emptyList
io_instruction = Instruction()
io_instruction.IO = True
io_instruction.io_status = IO_Status
io_instruction.delay = delay
io_instruction.point = IO_bit # {"dsID":"HCRemoteCommand","reqType":"AddRCC","emptyList":"1","instructions":[{"oneshot":"1","action":"200","type":"0","io_status":"1","point":"15","delay":"0"}]}
IO_command.dsID = 'HCRemoteCommand'
IO_command.instructions.append(io_instruction)
self.add_sendQuene(IO_command.toString())
log.log_message(logging.INFO, f'{Constant.str_feed_io_control}{IO_bit}{IO_Status}')
pass
def check_outputQ(self,IO_bit):
if is_bit_set(self.status_model.output_n, IO_bit):
return True
else:
return False
def get_origin_position(self):
return self.origin_position
pass