Files
AutoControlSystem-G/CU/sae.py

844 lines
41 KiB
Python
Raw Normal View History

2025-08-15 12:08:30 +08:00
import copy
2025-07-29 13:16:30 +08:00
import logging
2025-08-15 12:08:30 +08:00
import random
import threading
import time
import cv2
import numpy as np
from PyQt5.QtWidgets import QMessageBox
from PySide6.QtCore import Signal, QObject
from PySide6.QtGui import QPixmap
import Constant
import Expection
from CU.Catch import Catch, CatchStatus
from CU.Detect import Detect, DetectStatus
from Model.FeedModel import PositionModel
from Model.Position import Real_Position, Detection_Position
from enum import Enum, IntEnum
from COM.COM_Robot import RobotClient, DetectType
from Model.RobotModel import CMDInstructRequest, MoveType
from Trace.handeye_calibration import getPosition
from Trace.handeye_calibration import getxyz,getxyz1
from Util.util_math import get_distance
from Util.util_time import CRisOrFall
#from Vision.camera_coordinate_dete import Detection
from Util.util_log import log
from Model.RobotModel import Instruction
from EMV.EMV import RelayController
class ResetStatus(Enum):
RNone = 0
RStart = 1
RRunging = 2
ROk =3
class FeedStatus(IntEnum):
FNone = 0
FStart = 1
FCheck = 2
FMid = 3
FPhoto = 4
FTake = 5
FBroken1 = 6
FBroken2 =7
FShake = 8
FDropBag = 9
FFinished = 10
FReverse = 11
FStartReverse = 12
class LineType(Enum):
Straight = 0
CureMid = 2
CureEnd = 3
WORLD = 4
class FeedMidStatus(Enum):
FMid_Start = 1
FMid_Take = 2
FMid_Feed= 3
class FeedPosition:
def __init__(self,status:FeedStatus,position:Real_Position):
self.status = status
self.position = position
2025-07-29 13:16:30 +08:00
2025-08-15 12:08:30 +08:00
class FeedLine:
def __init__(self, id, name, feed_positions:list,drop_positons:list):
self.feed_positions = copy.deepcopy(feed_positions)
self.drop_positions = copy.deepcopy(drop_positons)
self.feeding2end_pos_index = 0
self.origin2start_pos_index = 0
self.start2take_pos_index = 0
self.name = name
self.id = id
# --- 新增:用于存储从 ini 文件读取的多个投料点坐标 ---
# 这个列表将在加载 ini 时填充 [[x1,y1,z1,u1,v1,w1], [x2,y2,z2,u2,v2,w2], ...]
self.drop_point_list = []
def get_current_feed_position(self,is_reverse):
pos = self.feeding_to_end[ (self.feeding2end_pos_index+1)%len(self.feeding_to_end) if is_reverse else self.feeding2end_pos_index-1]
return pos
def get_current_take_position(self,is_reverse):
pos = self.start_to_take[ (self.start2take_pos_index+1)%len(self.start_to_take) if is_reverse else self.start2take_pos_index-1]
return pos
def get_current_start_position(self,is_reverse):
pos = self.origin_to_start[(self.origin2start_pos_index+1)%len(self.origin_to_start) if is_reverse else self.origin2start_pos_index-1]
return pos
def get_next_feed_position(self,reverse:bool=False):
pos = self.feeding_to_end[self.feeding2end_pos_index]
if reverse:
self.feeding2end_pos_index -= 1
if self.feeding2end_pos_index < 0:
self.feeding2end_pos_index = len(self.feeding_to_end) - 1
else:
self.feeding2end_pos_index += 1
if self.feeding2end_pos_index >= len(self.feeding_to_end):
self.feeding2end_pos_index = 0
return pos
def get_next_start_position(self,reverse:bool=False):
pos = self.origin_to_start[self.origin2start_pos_index]
if reverse:
self.origin2start_pos_index -= 1
if self.origin2start_pos_index < 0:
self.origin2start_pos_index = len(self.origin_to_start) - 1
2025-07-29 13:16:30 +08:00
else:
2025-08-15 12:08:30 +08:00
self.origin2start_pos_index += 1
if self.origin2start_pos_index >= len(self.origin_to_start):
self.origin2start_pos_index = 0
return pos
def get_next_take_position(self,reverse:bool=False):
pos = self.start_to_take[self.start2take_pos_index]
if reverse:
self.start2take_pos_index -= 1
if self.start2take_pos_index < 0:
self.start2take_pos_index = len(self.start_to_take) - 1
else:
self.start2take_pos_index += 1
if self.start2take_pos_index >= len(self.start_to_take):
self.start2take_pos_index = 0
return pos
def get_take_position(self):
for i in range(len(self.feeding_to_end)):
if self.feeding_to_end[i].status == FeedStatus.FTake.value:
return self.feeding_to_end[i]
def get_drop_position(self):
for i in range(len(self.feeding_to_end)):
if self.feeding_to_end[i].status == FeedStatus.FTake.value:
return self.feeding_to_end[i]
def set_take_position(self, position: Real_Position, dynamic_height=0):
print("[调试] 开始设置抓取位置")
print(f"传入的 position 坐标: X={position.X}, Y={position.Y}, Z={position.Z}, "
f"a={position.a}, b={position.b}, c={position.c}, "
f"U={position.U}, V={position.V}, W={position.W}")
for i in range(len(self.feeding_to_end)):
if self.feeding_to_end[i].status == FeedStatus.FTake.value:
xyz = getxyz1(position.X, position.Y, position.Z, position.a, position.b, position.c)
print(f"[调试] getxyz1 返回值: X={xyz[0]}, Y={xyz[1]}, Z={xyz[2]}")
befor_take_position = Real_Position().init_position(xyz[0],
xyz[1],
xyz[2],
position.U,
position.V,
position.W)
print(
f"[调试] 抓取前位置: X={befor_take_position.X}, Y={befor_take_position.Y}, Z={befor_take_position.Z}, "
f"U={befor_take_position.U}, V={befor_take_position.V}, W={befor_take_position.W}")
after_take_position = Real_Position().init_position(xyz[0],
xyz[1],
xyz[2],
position.U,
position.V,
position.W)
print(
f"[调试] 抓取后位置: X={after_take_position.X}, Y={after_take_position.Y}, Z={after_take_position.Z}, "
f"U={after_take_position.U}, V={after_take_position.V}, W={after_take_position.W}")
self.feeding_to_end[i - 1].set_position(befor_take_position)
self.feeding_to_end[i + 1].set_position(after_take_position)
self.feeding_to_end[i].set_position(position)
print(f"[调试] 当前抓取点已设置: X={position.X}, Y={position.Y}, Z={position.Z}, "
f"U={position.U}, V={position.V}, W={position.W}")
print("[调试] 抓取前后位置已设置完成")
def set_drop_position(self, position: Real_Position):
"""
设置 FDropBag 位置只设置当前点不处理前后点
:param position: 新的丢包位置
"""
for i in range(len(self.feeding_to_end)):
if self.feeding_to_end[i].status == FeedStatus.FDropBag.value:
# 直接设置当前点的位置
self.feeding_to_end[i].set_position(position)
print(
f"✅ FDropBag 位置已更新: ({position.X}, {position.Y}, {position.Z}, {position.U}, {position.V}, {position.W})")
break # 假设只有一个丢包点
def get_position_list(self):
index_start = -1
for i in range(len(self.feed_positions)):
if self.feed_positions[i].status == FeedStatus.FCheck.value:
index_start = i
break
for i in range(len(self.feed_positions)):
if self.feed_positions[i].status == FeedStatus.FPhoto.value:
index_take = i
self.origin_to_start = self.feed_positions[: index_start+1]
self.start_to_take = self.feed_positions[index_start:index_take+1]
self.feeding_to_end = self.feed_positions[index_take:]
for i in range(len(self.feeding_to_end)): #插入动态中间点
if self.feeding_to_end[i].status == FeedStatus.FTake.value:
befor_position_model = PositionModel()
befor_position_model.init_position(None)
after_position_model = PositionModel()
after_position_model.init_position(None)
self.feeding_to_end.insert(i, befor_position_model)
self.feeding_to_end.insert(i+2, after_position_model)
break
class FeedingConfig:
def __init__(self, num: int, feedLine: FeedLine, photo_locs, drop_positions=None):
self.num = num
self.feedLine = feedLine
self.photo_locs = [self.deal_photo_locs(p) for p in photo_locs]
# --- 新增: 存储投料点列表 ---
# 从UI界面读取 or 从txt文本读取
self.drop_positions = [self.deal_photo_locs(p) for p in drop_positions] if drop_positions else []
def deal_photo_locs(self, photo_loc):
position_photo = Real_Position()
position_photo.init_position(photo_loc[0], photo_loc[1], photo_loc[2], photo_loc[3], photo_loc[4], photo_loc[5])
return position_photo
def get_line_info(self):
pass
2025-07-29 13:16:30 +08:00
2025-08-15 12:08:30 +08:00
class Feeding(QObject):
need_origin_signal = Signal(str)
take_no_photo_sigal = Signal()
update_detect_image = Signal(np.ndarray)
log_signal = Signal(int,str)
def __init__(self, robotClient: RobotClient):
super().__init__()
self.feedConfig = None
self.feedStatus = FeedStatus.FNone
self.robotClient = robotClient
# 添加 RelayController 实例
self.relay_controller = RelayController()
self.sensor_thread = None
self.detection_image = None
# self.init_detection_image()
self.pause = False
self.cRis_photo = CRisOrFall()
self.cRis_shake = CRisOrFall()
self.feed_Mid_Status = FeedMidStatus.FMid_Start
self.is_reverse = False
# 复位集合
self.run_reverse = False
self.take_no_photo = False
self.reset_status = ResetStatus.RNone
self.reversed_positions = []
self.current_position = None
self.index=1
self.pos_index = -1
self.pos_near_index = -1
self.catch = Catch(self.robotClient)
# self.detect = Detect()
# self.is_detected = True
# self.detect_thread = threading.Thread(target=self.run_detect)
# self.detect_thread.start()
self.onekey = False
# self.debug_run_count = 0 # 初始化计数器
self.mid_take_count = 0
#传感器判断抓包参数
self.sensor2_ready = False # 传感器2是否检测到料包
self.motor_stopped_by_sensor2 = False # 是否由传感器2触发停止电机
self.sensor_thread = None
self.relay_controller = RelayController()
# 启动传感器2线程
self.relay_controller._running = True
self.sensor2_thread = None
2025-07-29 13:16:30 +08:00
2025-08-15 12:08:30 +08:00
# --- 新增: 用于码垛模式的投料点索引 ---
self.current_drop_index = 0
2025-07-29 13:16:30 +08:00
2025-08-15 12:08:30 +08:00
def close_feed(self):
2025-07-29 13:16:30 +08:00
pass
2025-08-15 12:08:30 +08:00
# self.is_detected = False
# self.detect_thread.join()
# self.detect.detection.release()
2025-07-29 13:16:30 +08:00
2025-08-15 12:08:30 +08:00
# def init_detection_image(self):
# detection_image = cv2.imread(Constant.feed_sign_path)
# self.update_detect_image.emit(detection_image)
2025-07-29 13:16:30 +08:00
2025-08-15 12:08:30 +08:00
# def run_detect(self):
# while self.is_detected:
# self.detect.run()
# time.sleep(0.02)
2025-07-29 13:16:30 +08:00
2025-08-15 12:08:30 +08:00
def run(self):
self.catch.run()
# 获取事件坐标
real_position = Real_Position()
# self.detect.position_index = 0
real_position.init_position(self.robotClient.status_model.world_0,
self.robotClient.status_model.world_1,
self.robotClient.status_model.world_2,
self.robotClient.status_model.world_3,
self.robotClient.status_model.world_4,
self.robotClient.status_model.world_5)
# real_position.init_position(0,
# 0,
# 0,
# 0,
# 0,
# 0);
# img_path = f'Image/{self.index}.png'
# img = cv2.imread(img_path)
# self.index += 1
# self.index = self.index % 4
# self.detection_image = img
if self.feedConfig == None:
self.feedStatus = FeedStatus.FNone
if self.feedConfig != None and self.feedConfig.num == 0 and self.is_reverse and self.robotClient.origin_position.compare(
real_position, is_action=True):
self.feedStatus = FeedStatus.FNone
self.is_reverse = False
self.log_signal.emit(logging.INFO, Constant.str_feed_reverse)
if self.feedStatus == FeedStatus.FNone or self.pause:
return
elif self.feedStatus == FeedStatus.FCheck:
self.log_signal.emit(logging.INFO, Constant.str_feed_check)
if self.is_reverse:
self.feed_Mid_Status = FeedMidStatus.FMid_Start
else:
self.feed_Mid_Status = FeedMidStatus.FMid_Take
if self.feedConfig.feedLine.start_to_take[0].get_position().compare(real_position):
self.next_position(self.is_reverse)
elif self.feedStatus == FeedStatus.FStart:
self.log_signal.emit(logging.INFO, Constant.str_feed_start)
self.relay_controller.open(conveyor2=True)#开电机
#self.sensor2_thread = threading.Thread(target=self.relay_controller.handle_sensor2, daemon=True)#线程2的开始但是在那里设置结束呢
#self.sensor2_thread.start()
if not self.robotClient.origin_position.compare(real_position,is_action=True) and not self.is_reverse:
# QMessageBox.information(None, "提示", Constant.str_feed_start_error) # Fuck 引起异常
self.log_signal.emit(logging.ERROR, Constant.str_feed_start_error)
self.need_origin_signal.emit(Constant.str_feed_start_error)
self.feedStatus = FeedStatus.FNone
return
if self.is_reverse:
self.feedStatus = FeedStatus.FNone
self.is_reverse = False
return
2025-07-29 13:16:30 +08:00
2025-08-15 12:08:30 +08:00
self.feedConfig.feedLine.get_position_list()
# self.detect.detect_status = DetectStatus.DNone
self.feed_Mid_Status = FeedMidStatus.FMid_Start
# --- 修改: 初始化投料点索引 ---
self.current_drop_index = 0
self.next_position(self.is_reverse)
elif self.feedStatus == FeedStatus.FMid:
feed_pos = self.get_current_position(self.is_reverse)
if feed_pos.get_position().compare(real_position):
self.log_signal.emit(logging.INFO, Constant.str_feed_mid)
self.next_position(self.is_reverse)
# 增加计数器逻辑
self.mid_take_count += 1
# 可选:在 Debug1 模式下输出日志
if Constant.Debug1:
self.log_signal.emit(
logging.INFO,
f"[调试计数] 已进入 FMid 分支 {self.mid_take_count}"
)
if self.feedStatus == FeedStatus.FTake:
self.catch.catch_status = CatchStatus.CTake
elif self.feedStatus == FeedStatus.FPhoto:
self.log_signal.emit(logging.INFO, "🟡 [码垛模式] 跳过拍照定位步骤")
# 直接进入抓取状态或准备投料
# 所以在 FPhoto 后,应该移动到抓取点
take_position_model = self.feedConfig.feedLine.get_take_position()
if take_position_model:
self.log_signal.emit(logging.INFO, "[码垛模式] 准备移动到抓取点")
self.sendTargPosition(take_position_model.get_position(), speed=self.robotClient.feed_speed)
self.feedStatus = FeedStatus.FTake
else:
self.log_signal.emit(logging.ERROR, "[码垛模式] 无法获取抓取点位置!")
self.feedStatus = FeedStatus.FNone
return # 退出当前循环
2025-07-29 13:16:30 +08:00
2025-08-15 12:08:30 +08:00
elif self.feedStatus == FeedStatus.FTake:
self.log_signal.emit(logging.INFO, Constant.str_feed_take)
2025-07-29 13:16:30 +08:00
take_position = self.feedConfig.feedLine.get_take_position()
if not take_position or not take_position.get_position():
2025-08-15 12:08:30 +08:00
self.log_signal.emit(logging.ERROR, Constant.str_feed_takePhoto_fail)
2025-07-29 13:16:30 +08:00
return
2025-08-15 12:08:30 +08:00
if not take_position.get_position().compare(real_position, is_action=True):
self.log_signal.emit(logging.INFO, "🟡 机器人尚未到达抓料点位")
2025-07-29 13:16:30 +08:00
return
2025-08-15 12:08:30 +08:00
self.log_signal.emit(logging.INFO, "🟢 机器人已到达抓料点位")
'''real_position'''
# 一直等待传感器2信号永不超时
wait_start = time.time()
timeout = 30 # 30秒超时可根据需要调整
sensor2_detected = False
while not sensor2_detected:
if time.time() - wait_start > timeout:
self.log_signal.emit(logging.ERROR, "⏰ FTake: 等待传感器2超时!")
self.feedStatus = FeedStatus.FNone
return
sensors = self.relay_controller.get_all_device_status('sensors')
sensor2_value = sensors.get(self.relay_controller.SENSOR2, False)
if sensor2_value:
self.log_signal.emit(logging.INFO, "✅ 传感器2检测到料包到位开始执行抓取")
sensor2_detected = True
else:
self.log_signal.emit(logging.INFO, "⏳ FTake: 等待传感器2料包信号...")
time.sleep(0.2) # 每0.5秒检查一次
2025-07-29 13:16:30 +08:00
# 执行抓取动作
if self.catch.catch_status == CatchStatus.CNone:
self.catch.catch_status = CatchStatus.CTake
if self.catch.catch_status == CatchStatus.CTake:
2025-08-15 12:08:30 +08:00
self.log_signal.emit(logging.INFO, "正在执行抓料动作...")
2025-07-29 13:16:30 +08:00
self.catch.catch_status = CatchStatus.COk
if self.catch.catch_status == CatchStatus.COk:
2025-08-15 12:08:30 +08:00
self.log_signal.emit(logging.INFO, Constant.str_feed_take_success)
2025-07-29 13:16:30 +08:00
if not self.sensor2_ready:
2025-08-15 12:08:30 +08:00
self.log_signal.emit(logging.INFO, "抓取完成,重新启动 conveyor2")
2025-07-29 13:16:30 +08:00
self.relay_controller.open(conveyor2=True)
self.catch.catch_status = CatchStatus.CNone
2025-08-15 12:08:30 +08:00
self.log_signal.emit(logging.INFO, "[码垛模式] 跳过视觉检测,准备投料")
# 移动到第一个(或当前)投料点
if self.feedConfig.drop_positions and len(self.feedConfig.drop_positions) > self.current_drop_index:
first_drop_pos = self.feedConfig.drop_positions[self.current_drop_index]
self.log_signal.emit(logging.INFO, f"[码垛模式] 移动到投料点 {self.current_drop_index}")
self.sendTargPosition(first_drop_pos, speed=self.robotClient.feed_speed)
self.feedStatus = FeedStatus.FDropBag
else:
self.log_signal.emit(logging.ERROR, "[码垛模式] 投料点列表为空或索引错误!")
self.feedStatus = FeedStatus.FNone
return # 退出当前循环
else:
self.log_signal.emit(logging.ERROR, Constant.str_feed_takePhoto_fail)
elif self.feedStatus == FeedStatus.FBroken1:
if self.get_current_position().get_position().compare(real_position):
self.log_signal.emit(logging.INFO, Constant.str_feed_broken)
self.next_position()
2025-07-29 13:16:30 +08:00
2025-08-15 12:08:30 +08:00
elif self.feedStatus == FeedStatus.FBroken2:
2025-07-29 13:16:30 +08:00
2025-08-15 12:08:30 +08:00
if self.get_current_position().get_position().compare(real_position):
self.log_signal.emit(logging.INFO, Constant.str_feed_broken)
2025-07-29 13:16:30 +08:00
self.next_position()
2025-08-15 12:08:30 +08:00
elif self.feedStatus == FeedStatus.FShake:
if self.get_current_position().get_position().compare(real_position,is_action=True):
# TODO 震动方案
self.log_signal.emit(logging.INFO, Constant.str_feed_shake)
if self.catch.catch_status == CatchStatus.CNone:
self.catch.catch_status = CatchStatus.CShake
return
if self.catch.catch_status == CatchStatus.CShake:
# if self.feedConfig.feedLine.feeding_to_end[
# self.feedConfig.feedLine.feeding2end_pos_index + 1].status != FeedStatus.FShake:
# self.catch.catch_status = CatchStatus.COk
# else:
self.catch.shake_continue.SetReset()
self.next_position()
if self.feedStatus!=FeedStatus.FShake:
self.catch.catch_status = CatchStatus.CNone
return
elif self.feedStatus == FeedStatus.FDropBag:
# 1. 确保配置了投料点
if not self.feedConfig or not self.feedConfig.drop_positions:
self.log_signal.emit(logging.ERROR, "[码垛模式] 错误:未配置投料点列表!")
self.feedStatus = FeedStatus.FNone
return
# 2. 获取当前目标投料点 (基于索引)
if self.current_drop_index >= len(self.feedConfig.drop_positions):
self.log_signal.emit(logging.ERROR, f"[码垛模式] 错误:投料点索引 {self.current_drop_index} 超出范围!")
self.feedStatus = FeedStatus.FNone
return
target_drop_position = self.feedConfig.drop_positions[self.current_drop_index]
self.log_signal.emit(logging.INFO, f"[码垛模式] 当前目标投料点索引: {self.current_drop_index}")
# 3. 检查是否到达目标投料点
if target_drop_position.compare(real_position, is_action=True):
self.log_signal.emit(logging.INFO, f"[码垛模式] 到达投料点 {self.current_drop_index}")
# 4. 执行投料动作
if self.catch.catch_status == CatchStatus.CNone:
self.catch.catch_status = CatchStatus.CDrop
self.log_signal.emit(logging.INFO, "[码垛模式] 开始执行投料动作...")
return # 等待下一周期检查 COk
if self.catch.catch_status == CatchStatus.CDrop:
self.log_signal.emit(logging.INFO, "[码垛模式] 投料动作进行中...")
return # 等待抓取模块完成
if self.catch.catch_status == CatchStatus.COk:
self.catch.catch_status = CatchStatus.CNone
self.log_signal.emit(logging.INFO, f"[码垛模式] 在投料点 {self.current_drop_index} 投料完成")
# 5. 更新投料计数
self.feedConfig.num -= 1
self.log_signal.emit(logging.INFO, f'{Constant.str_feed_feed_num}{self.feedConfig.num}')
# 6. 检查是否所有投料任务完成
if self.feedConfig.num > 0:
# 7. 更新投料点索引 (循环)
self.current_drop_index = (self.current_drop_index + 1) % len(self.feedConfig.drop_positions)
self.log_signal.emit(logging.INFO, f"[码垛模式] 更新投料点索引为: {self.current_drop_index}")
# 8. 移动到固定的抓取点
take_position_model = self.feedConfig.feedLine.get_take_position()
if take_position_model:
self.log_signal.emit(logging.INFO, f"[码垛模式] 移动到固定抓取点...")
self.sendTargPosition(take_position_model.get_position(), speed=self.robotClient.feed_speed)
self.feedStatus = FeedStatus.FTake # 设置状态为 FTake
else:
self.log_signal.emit(logging.ERROR, "[码垛模式] 无法获取抓取点位置!")
self.feedStatus = FeedStatus.FNone
else:
# 9. 所有投料完成
self.log_signal.emit(logging.INFO, "[码垛模式] 所有投料任务完成")
# 可以选择返回原点或进入空闲状态
# 例如,移动到原点
# self.sendTargPosition(self.robotClient.origin_position, speed=self.robotClient.reset_speed)
# self.feedStatus = FeedStatus.FStartReverse # 或者自定义一个结束状态
self.feedStatus = FeedStatus.FNone # 简单地结束
else:
# 如果还没到达目标投料点,可以打印日志或等待
# (通常机器人移动指令发出后,会自动移动到位,下次循环再检查)
self.log_signal.emit(logging.INFO, f"[码垛模式] 正在移动到投料点 {self.current_drop_index}...")
def run_reset(self):
real_position = Real_Position()
real_position.init_position(self.robotClient.status_model.world_0,
self.robotClient.status_model.world_1,
self.robotClient.status_model.world_2,
self.robotClient.status_model.world_3,
self.robotClient.status_model.world_4,
self.robotClient.status_model.world_5)
if self.reset_status == ResetStatus.RNone:
return
if self.reset_status == ResetStatus.RStart:
if self.feedConfig == None: return
# for index in range(len(self.feedConfig.feedLine.positions)):
# if self.feedConfig.feedLine.positions[index].status == 2:
# start_index = index
self.pos_index = -1
self.pos_near_index = -1
self.reversed_positions = []
for index, pos_model in enumerate(self.feedConfig.feedLine.feed_positions):
if pos_model.get_position().compare(real_position,is_action=True):
self.pos_index = index
break
if self.pos_index == -1:
self.log_signal.emit(logging.ERROR, Constant.str_feed_return_original_position_fail)
min_distance = 99999999
for index, pos_model in enumerate(self.feedConfig.feedLine.feed_positions):
if get_distance(pos_model.get_position(), real_position) < min_distance:
min_distance = get_distance(pos_model.get_position(), real_position)
self.pos_near_index = index
if self.pos_near_index != -1:
self.reversed_positions = self.feedConfig.feedLine.feed_positions[:self.pos_near_index + 1]
else:
return False
else:
self.reversed_positions = self.feedConfig.feedLine.feed_positions[:self.pos_index+1]
self.reversed_positions = list(reversed(self.reversed_positions))
self.reverse_index = 0
self.send_emergency_sound()
self.current_position = PositionModel()
self.current_position.init_position(real_position)
self.reset_status = ResetStatus.RRunging
if self.reset_status == ResetStatus.RRunging:
if not real_position.compare(self.current_position.get_position(),is_action=True):
2025-07-29 13:16:30 +08:00
return
2025-08-15 12:08:30 +08:00
pos_model = self.reversed_positions[self.reverse_index]
if pos_model.status == FeedStatus.FTake.value: # 跳过取袋节点
pos_model = self.reversed_positions[self.reverse_index + 1]
self.reverse_index = self.reverse_index+1
if pos_model.lineType == LineType.CureMid.value:
pos_model1 = self.reversed_positions[self.reverse_index + 1]
self.sendTargPosition(real_position=pos_model.get_position(), move_type=MoveType.Cure,
real_position1=pos_model1.get_position(), speed=self.robotClient.reset_speed)
self.current_position = pos_model1
self.reverse_index = self.reverse_index + 2
else:
self.sendTargPosition(real_position=pos_model.get_position(), speed=self.robotClient.reset_speed)
self.current_position = pos_model
self.reverse_index = self.reverse_index + 1
if self.reverse_index == len(self.reversed_positions):
self.reset_status = ResetStatus.ROk
if self.reset_status == ResetStatus.ROk:
self.reset_status = ResetStatus.RNone
self.send_emergency_stop()
def return_original_position(self):
self.run_reverse = True
real_position = Real_Position()
real_position.init_position(self.robotClient.status_model.world_0,
self.robotClient.status_model.world_1,
self.robotClient.status_model.world_2,
self.robotClient.status_model.world_3,
self.robotClient.status_model.world_4,
self.robotClient.status_model.world_5)
if self.feedConfig == None: return
start_index = -1
# for index in range(len(self.feedConfig.feedLine.positions)):
# if self.feedConfig.feedLine.positions[index].status == 2:
# start_index = index
pos_index = -1
pos_near_index = -1
reversed_positions = []
for index, pos_model in enumerate(self.feedConfig.feedLine.feed_positions):
if pos_model.get_position().compare(real_position):
pos_index = index
break
if pos_index == -1:
self.log_signal.emit(logging.ERROR, Constant.str_feed_return_original_position_fail)
min_distance = 99999999
for index, pos_model in enumerate(self.feedConfig.feedLine.feed_positions):
if get_distance(pos_model.get_position(), real_position)<min_distance:
min_distance = get_distance(pos_model.get_position(), real_position)
pos_near_index = index
if pos_near_index != -1:
reversed_positions = self.feedConfig.feedLine.feed_positions[:pos_near_index+1]
else:
return False
else:
reversed_positions = self.feedConfig.feedLine.feed_positions[:pos_index]
reversed_positions = list(reversed(reversed_positions))
self.reverse_index = 0
self.send_emergency_sound()
current_position = PositionModel()
current_position.init_position(real_position)
while self.run_reverse and self.reverse_index!=len(reversed_positions):
if self.reverse_index!=0 and not real_position.compare(current_position.get_position()):
continue
#todo 缺少比对
pos_model = reversed_positions[self.reverse_index]
if pos_model.status == FeedStatus.FTake.value: # 跳过取袋节点
pos_model = reversed_positions[self.reverse_index + 1]
# TODO take节点判断
if pos_model.lineType == LineType.CureMid.value:
pos_model1 = reversed_positions[self.reverse_index + 1]
self.sendTargPosition(real_position=pos_model.get_position(), move_type=MoveType.Cure,
real_position1=pos_model1.get_position(),speed= self.robotClient.reset_speed)
current_position = pos_model1
self.reverse_index = self.reverse_index+2
else:
self.sendTargPosition(real_position=pos_model.get_position(),speed=self.robotClient.reset_speed)
current_position = pos_model
self.reverse_index = self.reverse_index + 1
self.send_emergency_stop()
return True
def send_emergency_sound(self):
self.sendIOControl(Constant.IO_EmergencyPoint, 1)
def send_emergency_stop(self):
self.sendIOControl(Constant.IO_EmergencyPoint, 0)
def sendIOControl(self, IO_bit, IO_Status: int):
IO_command = CMDInstructRequest()
io_instruction = Instruction()
io_instruction.IO = True
io_instruction.io_status = IO_Status
io_instruction.point = IO_bit # {"dsID":"HCRemoteCommand","reqType":"AddRCC","emptyList":"1","instructions":[{"oneshot":"1","action":"200","type":"0","io_status":"1","point":"15","delay":"0"}]}
IO_command.dsID = 'HCRemoteCommand'
IO_command.instructions.append(io_instruction)
self.robotClient.add_sendQuene(IO_command.toString())
self.log_signal.emit(logging.INFO, f'{Constant.str_feed_io_control}{IO_bit}{IO_Status}')
pass
def sendTargPosition1(self, real_position, move_type: MoveType = MoveType.WORLD, speed=5,real_position1=None):
pass
def sendTargPosition(self, real_position, move_type: MoveType = MoveType.WORLD, speed=5,real_position1=None):
position_instruction = Instruction()
position_instruction.speed = speed
position_instruction.m0 = real_position.X
position_instruction.m1 = real_position.Y
position_instruction.m2 = real_position.Z
position_instruction.m3 = real_position.U
position_instruction.m4 = real_position.V
position_instruction.m5 = real_position.W
position_instruction.smooth = self.robotClient.smooth
position_instruction.action = move_type.value
if position_instruction.action == 17:
position_instruction.m0_p = real_position1.X
position_instruction.m1_p = real_position1.Y
position_instruction.m2_p = real_position1.Z
position_instruction.m3_p = real_position1.U
position_instruction.m4_p = real_position1.V
position_instruction.m5_p = real_position1.W
instruction_command = CMDInstructRequest()
instruction_command.instructions.append(position_instruction)
request_command = instruction_command.toString()
log_str = f'移动到位置:{ "姿势直线" if move_type==MoveType.WORLD else "姿势曲线"}' \
f'X:{position_instruction.m0}-' \
f'Y:{position_instruction.m1}-' \
f'Z:{position_instruction.m2}-' \
f'U:{position_instruction.m3}-' \
f'V:{position_instruction.m4}-' \
f'W:{position_instruction.m5}'
try:
print('fuck1',log_str)
self.log_signal.emit(logging.INFO, log_str)
print('fuck2',log_str)
except:
return
print(request_command)
self.robotClient.add_sendQuene(request_command)
pass
def next_start(self,reverse=False):
start_pos = self.feedConfig.feedLine.get_next_start_position(reverse)
self.feedStatus = FeedStatus(start_pos.status) if self.feedStatus != FeedStatus.FNone else FeedStatus.FNone
if start_pos.lineType == LineType.CureMid.value:
start_pos1 = self.feedConfig.feedLine.get_next_start_position(reverse)
self.feedStatus = FeedStatus(start_pos1.status) if self.feedStatus != FeedStatus.FNone else FeedStatus.FNone
self.sendTargPosition(real_position=start_pos.get_position(),speed=self.robotClient.reset_speed, move_type=MoveType.Cure, real_position1=start_pos1.get_position())
elif start_pos.lineType == LineType.WORLD.value:
if self.robotClient.status_model.getAnglePosition().is_error_angel_move(start_pos.get_position(),self.robotClient.max_angle_interval):
self.feedStatus = None
self.log_signal.emit(logging.ERROR, Constant.str_feed_angle_error)
else:
self.sendTargPosition(real_position=start_pos.get_position(),speed=self.robotClient.reset_speed,move_type=MoveType.AXIS)
else:
self.sendTargPosition(real_position=start_pos.get_position(),speed=self.robotClient.reset_speed)
pass
def next_take(self,reverse=False):
take_pos = self.feedConfig.feedLine.get_next_take_position(reverse)
self.feedStatus = FeedStatus(take_pos.status) if self.feedStatus != FeedStatus.FNone else FeedStatus.FNone
if take_pos.lineType == LineType.CureMid.value:
take_pos1 = self.feedConfig.feedLine.get_next_take_position(reverse)
self.feedStatus = FeedStatus(take_pos1.status) if self.feedStatus != FeedStatus.FNone else FeedStatus.FNone
self.sendTargPosition(real_position=take_pos.get_position(), move_type=MoveType.Cure, real_position1=take_pos1.get_position(),speed=self.robotClient.feed_speed)
elif take_pos.lineType == LineType.WORLD.value:
if self.robotClient.status_model.getAnglePosition().is_error_angel_move(take_pos.get_position(),
self.robotClient.max_angle_interval):
self.feedStatus = None
self.log_signal.emit(logging.ERROR, Constant.str_feed_angle_error)
2025-07-29 13:16:30 +08:00
else:
2025-08-15 12:08:30 +08:00
self.sendTargPosition(real_position=take_pos.get_position(), speed=self.robotClient.feed_speed,
move_type=MoveType.AXIS)
2025-07-29 13:16:30 +08:00
2025-08-15 12:08:30 +08:00
else:
self.sendTargPosition(real_position=take_pos.get_position(),speed=self.robotClient.feed_speed)
pass
def next_Feed(self,reverse=False):
feed_pos = self.feedConfig.feedLine.get_next_feed_position(reverse)
self.feedStatus = FeedStatus(feed_pos.status) if self.feedStatus != FeedStatus.FNone else FeedStatus.FNone
if feed_pos.lineType == LineType.CureMid.value:
feed_pos1 = self.feedConfig.feedLine.get_next_feed_position(reverse)
self.feedStatus = FeedStatus(feed_pos1.status) if self.feedStatus != FeedStatus.FNone else FeedStatus.FNone
self.sendTargPosition(real_position=feed_pos.get_position(), move_type=MoveType.Cure, real_position1=feed_pos1.get_position(),speed=self.robotClient.feed_speed)
elif feed_pos.lineType == LineType.WORLD.value:
if self.robotClient.status_model.getAnglePosition().is_error_angel_move(feed_pos.get_position(),
self.robotClient.max_angle_interval):
self.feedStatus = None
self.log_signal.emit(logging.ERROR, Constant.str_feed_angle_error)
else:
self.sendTargPosition(real_position=feed_pos.get_position(), speed=self.robotClient.feed_speed,
move_type=MoveType.AXIS)
2025-07-29 13:16:30 +08:00
2025-08-15 12:08:30 +08:00
else:
# if not reverse and self.feedStatus == FeedStatus.FShake:
# if not reverse:
# if self.cRis_shake.Q(self.feedStatus == FeedStatus.FShake, True):
# pass
# elif self.cRis_shake.Q(self.feedStatus == FeedStatus.FShake, False):
# self.feedStatus = FeedStatus.FShake
# self.sendTargPosition(real_position=feed_pos.get_position(),speed=self.robotClient.feed_speed)
# return
self.sendTargPosition(real_position=feed_pos.get_position(),speed=self.robotClient.feed_speed)
def get_current_position(self,is_reverse=False):
if self.feed_Mid_Status == FeedMidStatus.FMid_Start:
return self.feedConfig.feedLine.get_current_start_position(self.is_reverse)
elif self.feed_Mid_Status == FeedMidStatus.FMid_Take:
return self.feedConfig.feedLine.get_current_take_position(self.is_reverse)
elif self.feed_Mid_Status == FeedMidStatus.FMid_Feed:
return self.feedConfig.feedLine.get_current_feed_position(self.is_reverse)
def next_position(self,reverse=False):
if self.feed_Mid_Status == FeedMidStatus.FMid_Start:
self.next_start(reverse)
if Constant.Debug1:
print('next_start')
elif self.feed_Mid_Status == FeedMidStatus.FMid_Take:
self.next_take(reverse)
if Constant.Debug1:
print('next_take')
elif self.feed_Mid_Status == FeedMidStatus.FMid_Feed:
self.next_Feed(reverse)
if Constant.Debug1:
print('next_feed')
def safe_check_columns(self):
return True
pass
def safe_check_person(self):
return True
pass
2025-07-29 13:16:30 +08:00