Compare commits

2 Commits

Author SHA1 Message Date
cbda29e270 251108 2025-11-08 20:51:29 +08:00
e54c8d6f5e 251108问题修复 2025-11-08 20:38:55 +08:00
12 changed files with 613 additions and 51 deletions

View File

@ -101,10 +101,10 @@ class RobotClient(TCPClient):
log.log_message(logging.ERROR,Constant.str_tcp_robot_data_error) log.log_message(logging.ERROR,Constant.str_tcp_robot_data_error)
return True return True
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
log.log_message(logging.WARNING,Constant.str_sys_json_error+response) log.log_message(logging.WARNING,Constant.str_sys_json_error+request_status_json)
return True return True
except Exception as e: except Exception as e:
log.log_message(logging.ERROR,f'{e}{response}') log.log_message(logging.ERROR,f'{e}{request_status_json}')
raise raise
def send_emergency_sound(self): def send_emergency_sound(self):

View File

@ -45,7 +45,7 @@ class TCPClient:
self.error_count += 1 self.error_count += 1
if self.error_count> 5: if self.error_count> 5:
print("Error: 机械臂控制程序中TCPClient is not connected") print("Error: 机械臂控制程序中TCPClient is not connected")
log.log_message(logging.ERROR,f'{Constant.str_tcp_connect_no_reply}:{str(e)}') # log.log_message(logging.ERROR,f'{Constant.str_tcp_connect_no_reply}:{str(e)}')
try: try:
self.CreatConnect() self.CreatConnect()
log.log_message(logging.INFO, Constant.str_tcp_reconnect) log.log_message(logging.INFO, Constant.str_tcp_reconnect)

View File

@ -1,6 +1,8 @@
import copy import copy
from dis import stack_effect from dis import stack_effect
import logging import logging
import math
from pickle import FALSE
import random import random
import threading import threading
import time import time
@ -30,6 +32,8 @@ from Model.RobotModel import Instruction
from EMV.EMV import RelayController from EMV.EMV import RelayController
from CU.drop import DropPositionManager from CU.drop import DropPositionManager
from Mv3D.CameraUtil import CameraUtil from Mv3D.CameraUtil import CameraUtil
from Mv3D.calculate_diff2 import calculate_offset_from_image
class ResetStatus(Enum): class ResetStatus(Enum):
RNone = 0 RNone = 0
RStart = 1 RStart = 1
@ -204,7 +208,7 @@ class FeedLine:
f"✅ FDropBag 位置已更新: ({position.X}, {position.Y}, {position.Z}, {position.U}, {position.V}, {position.W})") f"✅ FDropBag 位置已更新: ({position.X}, {position.Y}, {position.Z}, {position.U}, {position.V}, {position.W})")
break # 假设只有一个丢包点 break # 假设只有一个丢包点
def get_drop_path(self) -> list: def get_drop_path(self,offsetX) -> list:
"""获取动态扔包路径""" """获取动态扔包路径"""
if self.drop_manager is None: if self.drop_manager is None:
@ -218,8 +222,11 @@ class FeedLine:
self.id, self.id,
self.current_index self.current_index
) )
if pos_model is None: if pos_model is None:
break break
if pos_model.status==FeedStatus.FDropBag.value:
pos_model=self.get_offset_position(pos_model,self.current_index,offsetX)
path.append(pos_model) path.append(pos_model)
return path return path
@ -245,20 +252,50 @@ class FeedLine:
self.start_to_take = self.feed_positions[index_start:index_take+1] self.start_to_take = self.feed_positions[index_start:index_take+1]
self.feeding_to_end = self.feed_positions[index_take:] self.feeding_to_end = self.feed_positions[index_take:]
def set_feeding_to_end(self): def set_feeding_to_end(self,offsetX)->bool:
"""
设置动态扔包路径
:param image_path: 偏移量图像路径
"""
for i in range(len(self.feed_positions)): for i in range(len(self.feed_positions)):
if self.feed_positions[i].status == FeedStatus.FPhoto.value: if self.feed_positions[i].status == FeedStatus.FPhoto.value:
index_take = i index_take = i
break break
index_drop=self.current_dropbag_index index_drop=self.current_dropbag_index
test_path = self.get_drop_path() test_path = self.get_drop_path(offsetX)
self.current_index+=1 self.current_index+=1
# 将总list的drop部分替换为动态路径 # 将总list的drop部分替换为动态路径
self.feed_positions = self.feed_positions[:index_drop] + test_path self.feed_positions = self.feed_positions[:index_drop] + test_path
# self.feeding_to_end = self.feed_positions[index_take:index_drop] # self.feeding_to_end = self.feed_positions[index_take:index_drop]
self.feeding_to_end = self.feed_positions[index_take:index_drop]+test_path self.feeding_to_end = self.feed_positions[index_take:index_drop]+test_path
# 计算偏移量
def get_offset_position(self,point,_current_index,offsetX):
"""
获取偏移后的坐标。
"""
_current_index=_current_index-1
if _current_index<0:
_current_index=0
loc_floor=(_current_index//5)+1
loc_bag=(_current_index%5)+1
if loc_floor in [1,3,5]:
if loc_bag in [1,2,3]:
point.X=round(point.X-offsetX, 3)
elif loc_bag==4:
point.Y=round(point.Y-offsetX, 3)
elif loc_bag==5:
point.Y=round(point.Y+offsetX, 3)
elif loc_floor in [2,4,6]:
if loc_bag in [1,2,3]:
point.X=round(point.X+offsetX, 3)
elif loc_bag==4:
point.Y=round(point.Y-offsetX, 3)
elif loc_bag==5:
point.Y=round(point.Y+offsetX, 3)
return point
class FeedingConfig: class FeedingConfig:
def __init__(self, num: int, feedLine: FeedLine, photo_locs,remain_count:int): def __init__(self, num: int, feedLine: FeedLine, photo_locs,remain_count:int):
@ -286,6 +323,8 @@ class Feeding(QObject):
log_signal = Signal(int,str) log_signal = Signal(int,str)
#码垛完成通知 #码垛完成通知
stack_finish_signal=Signal() stack_finish_signal=Signal()
#误差过大,通知用户
feed_error_signal=Signal(int,str)
def __init__(self, robotClient: RobotClient,relay_controller:RelayController): def __init__(self, robotClient: RobotClient,relay_controller:RelayController):
super().__init__() super().__init__()
@ -353,6 +392,8 @@ class Feeding(QObject):
"""接收机器人信号通知""" """接收机器人信号通知"""
self.take_sensor_signal = True self.take_sensor_signal = True
def run(self): def run(self):
self.catch.run() self.catch.run()
# 获取事件坐标 # 获取事件坐标
@ -501,13 +542,41 @@ class Feeding(QObject):
return return
time.sleep(1) # 每秒检查一次 time.sleep(1) # 每秒检查一次
loc_offsetX=0
loc_is_next=False
#第二次执行FeedStatus.FPhoto时改变码垛点 #第二次执行FeedStatus.FPhoto时改变码垛点
self.camera_img.save_img() try:
self.feedConfig.feedLine.set_feeding_to_end() loc_image_path=self.camera_img.save_img()
# self.take_photo_sigal.emit() loc_model_result = calculate_offset_from_image(loc_image_path, visualize=False)
if loc_model_result['success']:
#置信度
if loc_model_result['obj_conf']>0.5:
loc_offsetX=loc_model_result['dx_mm']
if abs(loc_model_result['dx_mm'])<200:
loc_is_next=True
print(f'{loc_image_path}:料带偏移{loc_offsetX}mm')
else:
print(f"{loc_image_path}:料带偏移过大,报警")
self.feed_error_signal.emit(1,f"{loc_image_path}:料带偏移过大,报警")
else:
print(f"{loc_image_path}:未检测到料带,置信度低")
self.feed_error_signal.emit(2,f"{loc_image_path}:未检测到料带,置信度低")
else:
print(f"{loc_image_path}:计算偏移失败视觉返回False")
self.feed_error_signal.emit(2,f"{loc_image_path}:计算偏移失败视觉返回False")
# loc_is_next=False
loc_offsetX=0
except Exception as e:
print(f"视频模型异常:{loc_image_path}",e)
# loc_is_next=False
loc_offsetX=0
self.feed_error_signal.emit(2,"视频模型异常")
self.log_signal.emit(logging.ERROR, f"视频模型异常:{e}")
finally:
if loc_is_next:
self.feedConfig.feedLine.set_feeding_to_end(loc_offsetX)
self.next_position() self.next_position()
self.log_signal.emit(logging.INFO, Constant.str_sys_runing2) self.log_signal.emit(logging.INFO, Constant.str_sys_runing2)
# self.feedStatus = FeedStatus.FTake
elif self.feedStatus == FeedStatus.FTake: elif self.feedStatus == FeedStatus.FTake:
self.log_signal.emit(logging.INFO, Constant.str_feed_take) self.log_signal.emit(logging.INFO, Constant.str_feed_take)

View File

@ -53,6 +53,7 @@ str_feed_drop_mid = '移动到码垛中间点位置'
str_feed_drop_reset = '移动到码垛复位位置' str_feed_drop_reset = '移动到码垛复位位置'
str_feed_broken = '移动到破袋位置' str_feed_broken = '移动到破袋位置'
str_feed_takePhoto_fail = '识别图像失败' str_feed_takePhoto_fail = '识别图像失败'
str_image_model_fail = '摄像头识别料带失败'
str_feed_takePhoto_success = '识别图像成功' str_feed_takePhoto_success = '识别图像成功'
str_feed_takePhoto_new_line = '新的一排袋识别' str_feed_takePhoto_new_line = '新的一排袋识别'
str_feed_takePhoto_line = '一排袋最高的识别' str_feed_takePhoto_line = '一排袋最高的识别'

View File

@ -14,6 +14,8 @@ import Constant
class RelayController(QObject): class RelayController(QObject):
log_signal = Signal(int, str) log_signal = Signal(int, str)
take_robot_signal = Signal() take_robot_signal = Signal()
emergency_signal = Signal(bool)
def __init__(self, host='192.168.0.18', port=50000): def __init__(self, host='192.168.0.18', port=50000):
super().__init__() super().__init__()
# ===================== 全局线程延时参数 ===================== # ===================== 全局线程延时参数 =====================
@ -32,6 +34,8 @@ class RelayController(QObject):
self.delay_clamp = 0.5 # 夹爪动作延时 self.delay_clamp = 0.5 # 夹爪动作延时
self.delay_after_pusher = 5.0 # 推板推出后到重启传动带时间 self.delay_after_pusher = 5.0 # 推板推出后到重启传动带时间
self.emergency_is_pressed=False
# ===================== 传感器稳定检测参数 ===================== # ===================== 传感器稳定检测参数 =====================
self.sensor_stable_duration = 1.0 # 传感器状态稳定检测时间(秒) self.sensor_stable_duration = 1.0 # 传感器状态稳定检测时间(秒)
self.sensor_max_attempts = 3 # 连续检测次数达到此值判定有效 self.sensor_max_attempts = 3 # 连续检测次数达到此值判定有效
@ -49,8 +53,11 @@ class RelayController(QObject):
self.PUSHER1 = 'pusher1' self.PUSHER1 = 'pusher1'
self.SENSOR1 = 'sensor1' self.SENSOR1 = 'sensor1'
self.SENSOR2 = 'sensor2' self.SENSOR2 = 'sensor2'
self.BELT = 'belt'
self.ALARM = 'alarm'
self.valve_commands = { self.valve_commands = {
#包装机皮带
self.CONVEYOR1: {'open': '000000000006010500070000', 'close': '00000000000601050007FF00'}, self.CONVEYOR1: {'open': '000000000006010500070000', 'close': '00000000000601050007FF00'},
# self.CONVEYOR11: {'open': '00000000000601050000FF00', 'close': '000000000006010500000000'}, # self.CONVEYOR11: {'open': '00000000000601050000FF00', 'close': '000000000006010500000000'},
self.PUSHER: {'open': '00000000000601050001FF00', 'close': '000000000006010500010000'}, self.PUSHER: {'open': '00000000000601050001FF00', 'close': '000000000006010500010000'},
@ -60,6 +67,10 @@ class RelayController(QObject):
self.CLAMP: {'open': '00000000000601050003FF00', 'close': '000000000006010500030000'}, self.CLAMP: {'open': '00000000000601050003FF00', 'close': '000000000006010500030000'},
#DO5 回 DO2推 #DO5 回 DO2推
self.PUSHER1: {'open': '00000000000601050004FF00', 'close': '000000000006010500040000'}, self.PUSHER1: {'open': '00000000000601050004FF00', 'close': '000000000006010500040000'},
#D07 长皮带
self.BELT: {'open': '00000000000601050006FF00', 'close': '000000000006010500060000'},
#D01 声控报警
self.ALARM: {'open': '00000000000601050000FF00', 'close': '000000000006010500000000'},
#滚筒反转 #滚筒反转
self.CONVEYOR2_REVERSE: {'open': '000100000006020620000022', 'close': '000100000006020620000001'} self.CONVEYOR2_REVERSE: {'open': '000100000006020620000022', 'close': '000100000006020620000001'}
} }
@ -77,7 +88,9 @@ class RelayController(QObject):
self.CONVEYOR2: 2, self.CONVEYOR2: 2,
self.CLAMP: 3, self.CLAMP: 3,
self.PUSHER1: 4, self.PUSHER1: 4,
self.CONVEYOR2_REVERSE: 5 self.CONVEYOR2_REVERSE: 5,
self.BELT: 6,
self.ALARM: 7,
} }
self.sensor_bit_map = { self.sensor_bit_map = {
@ -91,7 +104,9 @@ class RelayController(QObject):
self.CONVEYOR2: "传送带2", self.CONVEYOR2: "传送带2",
self.CLAMP: "机械臂夹爪", self.CLAMP: "机械臂夹爪",
self.PUSHER1: "推板关", self.PUSHER1: "推板关",
self.CONVEYOR2_REVERSE: "传送带2反转" self.CONVEYOR2_REVERSE: "传送带2反转",
self.BELT: "皮带",
self.ALARM: "声控报警",
} }
self.sensor_name_map = { self.sensor_name_map = {
@ -105,8 +120,8 @@ class RelayController(QObject):
self._sensor1_thread = None self._sensor1_thread = None
self._sensor2_thread = None self._sensor2_thread = None
self.required_codes = {'0101', '0103'} # 有效状态码传感器1 self.required_codes = {'0101', '0103','0105','0107'} # 有效状态码传感器1
self.required_codes_1 = {'0102', '0103'} # 有效状态码传感器2 self.required_codes_1 = {'0102', '0103','0106','0107'} # 有效状态码传感器2
self.sensor1_triggered = False self.sensor1_triggered = False
self.sensor1_last_time = 0 self.sensor1_last_time = 0
@ -117,7 +132,7 @@ class RelayController(QObject):
# ===================== 基础通信方法 ===================== # ===================== 基础通信方法 =====================
def send_command(self, command_hex, retry_count=2, source='unknown'): def send_command(self, command_hex, retry_count=2, source='unknown'):
if Constant.DebugPosition: if Constant.DebugPosition:
# print(f"[发送命令] {command_hex} ({source})") print(f"[发送命令] {command_hex} ({source})")
return None return None
byte_data = binascii.unhexlify(command_hex) byte_data = binascii.unhexlify(command_hex)
for attempt in range(retry_count): for attempt in range(retry_count):
@ -136,12 +151,14 @@ class RelayController(QObject):
#print(f"[通信响应] {hex_response}") #print(f"[通信响应] {hex_response}")
return response return response
except Exception as e: except Exception as e:
self.log_signal.emit(logging.INFO,f"网络继电器通信错误 ({source}): {e}, 尝试重连... ({attempt + 1}/{retry_count})")
print(f"网络继电器通信错误 ({source}): {e}, 尝试重连... ({attempt + 1}/{retry_count})") print(f"网络继电器通信错误 ({source}): {e}, 尝试重连... ({attempt + 1}/{retry_count})")
time.sleep(5) time.sleep(5)
self.trigger_alarm() self.trigger_alarm()
return None return None
def trigger_alarm(self): def trigger_alarm(self):
self.log_signal.emit(logging.ERROR,"警告:网络继电器连续多次通信失败,请检查设备连接!")
print("警告:网络继电器连续多次通信失败,请检查设备连接!") print("警告:网络继电器连续多次通信失败,请检查设备连接!")
# ===================== 状态读取方法 ===================== # ===================== 状态读取方法 =====================
@ -199,6 +216,25 @@ class RelayController(QObject):
print(f"[{command_type}] 无法获取响应数据") print(f"[{command_type}] 无法获取响应数据")
return responses return responses
def get_emergency_is_pressed(self)->bool:
"获取急停信号DI 3 位为1正常DI 3为0时为按下急停状态00000000000401020107 后四位01表示一个字节最后一位07二进制对应开关量"
"按下急停为"
command = self.read_status_command.get("sensors")
response = self.send_command(command)
loc_is_pressed=False
if response and len(response) >= 10:
status_byte = response[9]
#简单验证
status_crc=response[8]
loc_is_pressed =status_crc==1 and (status_byte & 0b100) == 0 # 0b100 表示第三位为1
else:
self.log_signal.emit(logging.ERROR,f"网络继电器[急停] 读取状态失败或响应无效")
print(f"网络继电器[急停] 读取状态失败或响应无效")
return loc_is_pressed
def parse_status_code(self, response): def parse_status_code(self, response):
""" """
从 Modbus 响应字符串中提取状态码(后两位) 从 Modbus 响应字符串中提取状态码(后两位)
@ -268,8 +304,67 @@ class RelayController(QObject):
time.sleep(self.sensor2_loop_lost) time.sleep(self.sensor2_loop_lost)
return False return False
def is_valid_sensor(self,sensor_name):
"""
检查传感器状态是否有效
参数:
sensor_name: 传感器名称
返回:
True: 传感器状态有效
False: 传感器状态无效
"""
stable_count = 0
_try_nums=5 # 尝试次数
for _ in range(_try_nums):
responses = self.get_all_sensor_responses('sensors')
response = responses.get(sensor_name)
if not response:
print(f"[警告] 无法获取 {sensor_name} 的响应,尝试重试...")
return False
else:
temp_status_code = self.parse_status_code(response)
if temp_status_code in self.required_codes_1:
stable_count += 1
if stable_count >= 3:
return True
else:
stable_count = 0
time.sleep(self.sensor2_loop_lost)
return False
def is_valid_sensor_signal_stable(self, sensor_name, detection_duration=3.0, stability_duration=2.5, check_interval=0.1):
"""
检测在指定时间窗口内是否存在持续稳定的有效信号
参数:
sensor_name: 传感器名称
detection_duration: 总检测时间窗口(秒)默认为3秒
stability_duration: 信号需要持续稳定的时间(秒)默认为2.5秒
check_interval: 检测间隔(秒)默认为0.1秒
返回:
True: 在时间窗口内检测到持续稳定的有效信号
False: 未检测到持续稳定的有效信号
"""
stable_start_time = None # 记录首次检测到有效信号的时间
start_time = time.time()
if not self.is_valid_sensor(sensor_name):
return False # 传感器状态无效,返回
else:
stable_start_time = time.time() # 首次检测到有效信号
time.sleep(check_interval)
while time.time() - start_time < detection_duration:
temp_is_valid = self.is_valid_sensor(sensor_name)
if temp_is_valid:
if time.time() - stable_start_time >= stability_duration:
return True # 信号持续稳定达到要求时间
else:
stable_start_time = None # 信号不稳定,重置计时
time.sleep(check_interval)
return False
# ===================== 动作控制方法 ===================== # ===================== 动作控制方法 =====================
def open(self, conveyor1=False, pusher=False, conveyor2=False, clamp=False, pusher1=False, conveyor2_reverse=False): def open(self, conveyor1=False, pusher=False, conveyor2=False, clamp=False, pusher1=False, conveyor2_reverse=False,belt=False,alarm=False):
if Constant.DebugPosition: if Constant.DebugPosition:
return return
status = self.get_all_device_status() status = self.get_all_device_status()
@ -291,8 +386,14 @@ class RelayController(QObject):
if conveyor2_reverse: if conveyor2_reverse:
self.send_command(self.valve_commands[self.CONVEYOR2_REVERSE]['open']) self.send_command(self.valve_commands[self.CONVEYOR2_REVERSE]['open'])
time.sleep(self.delay_conveyor) time.sleep(self.delay_conveyor)
if belt and not status.get(self.BELT, False):
self.send_command(self.valve_commands[self.BELT]['open'])
# time.sleep(self.delay_belt)
if alarm and not status.get(self.ALARM, False):
self.send_command(self.valve_commands[self.ALARM]['open'])
# time.sleep(self.delay_alarm)
def close(self, conveyor1=False, pusher=False, conveyor2=False, clamp=False, pusher1=False, conveyor2_reverse=False): def close(self, conveyor1=False, pusher=False, conveyor2=False, clamp=False, pusher1=False, conveyor2_reverse=False,belt=False,alarm=False):
if conveyor1: if conveyor1:
self.send_command(self.valve_commands[self.CONVEYOR1]['close']) self.send_command(self.valve_commands[self.CONVEYOR1]['close'])
time.sleep(self.delay_conveyor) time.sleep(self.delay_conveyor)
@ -311,6 +412,12 @@ class RelayController(QObject):
if conveyor2_reverse: if conveyor2_reverse:
self.send_command(self.valve_commands[self.CONVEYOR2_REVERSE]['close']) self.send_command(self.valve_commands[self.CONVEYOR2_REVERSE]['close'])
time.sleep(self.delay_conveyor) time.sleep(self.delay_conveyor)
if belt:
self.send_command(self.valve_commands[self.BELT]['close'])
# time.sleep(self.delay_belt)
if alarm:
self.send_command(self.valve_commands[self.ALARM]['close'])
# time.sleep(self.delay_alarm)
# ===================== 传感器处理线程 ===================== # ===================== 传感器处理线程 =====================
def handle_sensor1(self): def handle_sensor1(self):
@ -346,16 +453,17 @@ class RelayController(QObject):
time.sleep(1) time.sleep(1)
# 5. 状态检查(可选) # 5. 状态检查(可选)
status = self.get_all_device_status() # status = self.get_all_device_status()
if status.get('conveyor1') and not status.get('pusher'): # if status.get('conveyor1') and not status.get('pusher'):
print("流程完成1皮带运行中推板已收回") # print("流程完成1皮带运行中推板已收回")
else: # else:
print("状态异常,请检查设备") # print("状态异常,请检查设备")
# 流程结束,重置触发标志 # 流程结束,重置触发标志
self.sensor1_triggered = False self.sensor1_triggered = False
time.sleep(self.sensor1_loop_delay) time.sleep(self.sensor1_loop_delay)
except Exception as e: except Exception as e:
print(f"SENSOR1 处理错误: {e}") print(f"SENSOR1 处理错误: {e}")
self.log_signal.emit(logging.ERROR,f"SENSOR1 处理错误: {e}")
self.sensor1_triggered = False self.sensor1_triggered = False
time.sleep(self.sensor1_error_delay) time.sleep(self.sensor1_error_delay)
@ -391,6 +499,8 @@ class RelayController(QObject):
_is_signal=False _is_signal=False
self.sensor2_ready=False #打开滚洞标识 self.sensor2_ready=False #打开滚洞标识
else: else:
if self.sensor2_ready:
#只有在FPhoto处才有效
_is_signal=True _is_signal=True
# time.sleep(0.1) # time.sleep(0.1)
continue continue
@ -404,6 +514,7 @@ class RelayController(QObject):
time.sleep(2) time.sleep(2)
except Exception as e: except Exception as e:
print(f"SENSOR2 处理错误: {e}") print(f"SENSOR2 处理错误: {e}")
self.log_signal.emit(logging.ERROR,f"SENSOR2 处理错误: {e}")
time.sleep(self.sensor2_error_delay) time.sleep(self.sensor2_error_delay)
def pause_start_sensor(self,is_pause): def pause_start_sensor(self,is_pause):
@ -413,6 +524,7 @@ class RelayController(QObject):
""" """
self._ispause = is_pause self._ispause = is_pause
def stop_sensor(self,sensor1_thread,sensor2_thread): def stop_sensor(self,sensor1_thread,sensor2_thread):
if not self._running: if not self._running:
print("线程未在运行") print("线程未在运行")
@ -424,3 +536,22 @@ class RelayController(QObject):
if sensor2_thread and sensor2_thread.is_alive(): if sensor2_thread and sensor2_thread.is_alive():
sensor2_thread.join() sensor2_thread.join()
print("传感器线程已终止。") print("传感器线程已终止。")
def handle_emergency_pressed(self):
"处理急停按钮信号状态线程"
while self._running:
try:
loc_is_pressed =self.get_emergency_is_pressed()
if loc_is_pressed:
# 处理急停按钮信号状态
self.log_signal.emit(logging.INFO,f"急停按钮被按下")
self.emergency_is_pressed=True
self.emergency_signal.emit(True)
else:
self.emergency_is_pressed=False
self.emergency_signal.emit(False)
time.sleep(0.5)
except Exception as e:
print(f"急停 处理错误: {e}")
self.log_signal.emit(logging.ERROR,f"急停线程 处理错误: {e}")
time.sleep(2)

View File

@ -3,6 +3,7 @@ import cv2
import os import os
import time import time
from datetime import datetime from datetime import datetime
from typing import Optional
import numpy as np import numpy as np
import shutil # 用于检查磁盘空间 import shutil # 用于检查磁盘空间
@ -159,17 +160,18 @@ class CameraUtil:
print(f"保存图像失败: {e}") print(f"保存图像失败: {e}")
return None return None
def save_img(self): def save_img(self)->Optional[str]:
""" """
保存当前帧 保存当前帧
:return: None :return: 保存的文件路径
""" """
if self.camera is None: if self.camera is None:
self.setup_capture() self.setup_capture()
frame = self.capture_frame() frame = self.capture_frame()
if frame is not None: if frame is not None:
self.save_frame(frame) loc_filepath=self.save_frame(frame)
self.release() self.release()
return loc_filepath
else: else:
print("无法捕获图像帧") print("无法捕获图像帧")
return None return None

237
Mv3D/calculate_diff2.py Normal file
View File

@ -0,0 +1,237 @@
import cv2
import numpy as np
import os
# ====================== 配置区 ======================
MODEL_PATH = "./Mv3D/point.rknn"
OUTPUT_DIR = "./output_rknn"
os.makedirs(OUTPUT_DIR, exist_ok=True)
# 固定参考点(像素坐标)
FIXED_REF_POINT = (535, 605)
# mm/px 缩放因子(根据标定数据填写)
width_mm = 70.0
width_px = 42
SCALE_X = width_mm / float(width_px)
height_mm = 890.0
height_px = 507
SCALE_Y = height_mm / float(height_px)
print(f"Scale factors: SCALE_X={SCALE_X:.3f} mm/px, SCALE_Y={SCALE_Y:.3f} mm/px")
# 输入尺寸
IMG_SIZE = (640, 640)
def letterbox_resize(image, size, bg_color=114):
target_w, target_h = size
h, w = image.shape[:2]
scale = min(target_w / w, target_h / h)
new_w, new_h = int(w * scale), int(h * scale)
resized = cv2.resize(image, (new_w, new_h))
canvas = np.full((target_h, target_w, 3), bg_color, dtype=np.uint8)
dx, dy = (target_w - new_w) // 2, (target_h - new_h) // 2
canvas[dy:dy + new_h, dx:dx + new_w] = resized
return canvas, scale, dx, dy
def safe_sigmoid(x):
x = np.clip(x, -50, 50)
return 1.0 / (1.0 + np.exp(-x))
def softmax(x):
x = x - np.max(x)
e = np.exp(x)
return e / e.sum()
def dfl_to_xywh(loc, grid_x, grid_y, stride):
"""将 DFL 输出解析为 xywh"""
xywh_ = np.zeros(4)
xywh = np.zeros(4)
# 每个维度 16 bins 做 softmax
for i in range(4):
l = loc[i * 16:(i + 1) * 16]
l = softmax(l)
xywh_[i] = sum([j * l[j] for j in range(16)])
# 对应公式
xywh_[0] = (grid_x + 0.5) - xywh_[0]
xywh_[1] = (grid_y + 0.5) - xywh_[1]
xywh_[2] = (grid_x + 0.5) + xywh_[2]
xywh_[3] = (grid_y + 0.5) + xywh_[3]
# 转成中心点 + 宽高
xywh[0] = ((xywh_[0] + xywh_[2]) / 2) * stride
xywh[1] = ((xywh_[1] + xywh_[3]) / 2) * stride
xywh[2] = (xywh_[2] - xywh_[0]) * stride
xywh[3] = (xywh_[3] - xywh_[1]) * stride
# 转为左上角坐标
xywh[0] = xywh[0] - xywh[2] / 2
xywh[1] = xywh[1] - xywh[3] / 2
return xywh
def parse_pose_outputs(outputs, conf_threshold=0.5, dx=0, dy=0, scale=1.0):
"""
完整解析 RKNN YOLO-Pose 输出
返回 keypoints, class_id, obj_conf, bbox已映射回原图
"""
boxes = []
obj_confs = []
class_ids = []
# 遍历前三个输出 tensor (det 输出)
for idx in range(3):
det = np.array(outputs[idx])[0] # (C,H,W)
C, H, W = det.shape
num_classes = C - 64 # 前64通道为 DFL bbox
stride = 640 // H
for h in range(H):
for w in range(W):
for c in range(num_classes):
conf = safe_sigmoid(det[64 + c, h, w])
if conf >= conf_threshold:
loc = det[:64, h, w].astype(np.float32)
xywh = dfl_to_xywh(loc, w, h, stride)
boxes.append(xywh)
obj_confs.append(conf)
class_ids.append(c)
if not obj_confs:
best_box = np.array([0, 0, 0, 0])
class_id = -1
obj_conf = 0.0
else:
max_idx = np.argmax(obj_confs)
best_box = boxes[max_idx]
class_id = class_ids[max_idx]
obj_conf = obj_confs[max_idx]
# 🔹 bbox 坐标映射回原图
x, y, w, h = best_box
x = (x - dx) / scale
y = (y - dy) / scale
w = w / scale
h = h / scale
best_box = np.array([x, y, w, h])
# 🔹 关键点解析
kpt_output = np.array(outputs[3])[0] # (num_kpts, 3, num_anchor)
confs = kpt_output[:, 2, :]
best_anchor_idx = np.argmax(np.mean(confs, axis=0))
kpt_data = kpt_output[:, :, best_anchor_idx]
keypoints = []
for i in range(kpt_data.shape[0]):
x_img, y_img, vis_conf_raw = kpt_data[i]
vis_prob = safe_sigmoid(vis_conf_raw)
x_orig = (x_img - dx) / scale
y_orig = (y_img - dy) / scale
keypoints.append([x_orig, y_orig, vis_prob])
return np.array(keypoints), class_id, obj_conf, best_box
def compute_offset(keypoints, fixed_point, scale_x, scale_y):
if len(keypoints) < 2:
return None
p1, p2 = keypoints[0], keypoints[1]
cx = (p1[0] + p2[0]) / 2.0
cy = (p1[1] + p2[1]) / 2.0
dx_mm = (cx - fixed_point[0]) * scale_x
dy_mm = (cy - fixed_point[1]) * scale_y
return cx, cy, dx_mm, dy_mm
def visualize_result(image, keypoints, bbox, fixed_point, offset_info, save_path):
vis = image.copy()
colors = [(0, 0, 255), (0, 255, 255)]
cx, cy, dx_mm, dy_mm = offset_info
fx, fy = map(int, fixed_point)
# 绘制关键点
for i, (x, y, conf) in enumerate(keypoints[:2]):
if conf > 0.5:
cv2.circle(vis, (int(x), int(y)), 6, colors[i], -1)
if len(keypoints) >= 2:
cv2.line(vis,
(int(keypoints[0][0]), int(keypoints[0][1])),
(int(keypoints[1][0]), int(keypoints[1][1])),
(0, 255, 0), 2)
# 绘制 bbox
x, y, w, h = bbox
cv2.rectangle(vis, (int(x), int(y)), (int(x + w), int(y + h)), (255, 0, 0), 2)
# 绘制中心点
cv2.circle(vis, (int(cx), int(cy)), 10, (0, 255, 0), 3)
cv2.circle(vis, (fx, fy), 12, (255, 255, 0), 3)
cv2.arrowedLine(vis, (fx, fy), (int(cx), int(cy)), (255, 255, 0), 2, tipLength=0.05)
cv2.putText(vis, f"DeltaX={dx_mm:+.1f}mm", (fx + 30, fy - 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2)
cv2.putText(vis, f"DeltaY={dy_mm:+.1f}mm", (fx + 30, fy + 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2)
cv2.imwrite(save_path, vis)
def calculate_offset_from_image(image_path, visualize=False):
from rknnlite.api import RKNNLite
orig = cv2.imread(image_path)
if orig is None:
return {'success': False, 'message': f'Failed to load image: {image_path}'}
img_resized, scale, dx, dy = letterbox_resize(orig, IMG_SIZE)
infer_img = np.expand_dims(img_resized[..., ::-1], 0).astype(np.uint8)
rknn = RKNNLite(verbose=False)
ret = rknn.load_rknn(MODEL_PATH)
if ret != 0:
return {'success': False, 'message': 'Failed to load RKNN model'}
try:
rknn.init_runtime(core_mask=RKNNLite.NPU_CORE_0)
outputs = rknn.inference([infer_img])
finally:
rknn.release()
try:
keypoints, class_id, obj_conf, bbox = parse_pose_outputs(outputs, dx=dx, dy=dy, scale=scale)
except Exception as e:
return {'success': False, 'message': f'Parse error: {str(e)}'}
offset_info = compute_offset(keypoints, FIXED_REF_POINT, SCALE_X, SCALE_Y)
if offset_info is None:
return {'success': False, 'message': 'Not enough keypoints'}
cx, cy, dx_mm, dy_mm = offset_info
if visualize:
vis_save_path = os.path.join(OUTPUT_DIR, f"result_{os.path.basename(image_path)}")
visualize_result(orig, keypoints, bbox, FIXED_REF_POINT, offset_info, vis_save_path)
return {'success': True, 'dx_mm': dx_mm, 'dy_mm': dy_mm,
'cx': cx, 'cy': cy, 'class_id': class_id,
'obj_conf': obj_conf, 'bbox': bbox,
'message': 'Success'}
# ====================== 使用示例 ======================
if __name__ == "__main__":
image_path = "11.jpg"
result = calculate_offset_from_image(image_path, visualize=True)
if result['success']:
print(f"Center point: ({result['cx']:.1f}, {result['cy']:.1f})")
print(f"Offset: DeltaX={result['dx_mm']:+.2f} mm, DeltaY={result['dy_mm']:+.2f} mm")
print(f"Class ID: {result['class_id']}, Confidence: {result['obj_conf']:.3f}")
print(f"BBox: {result['bbox']}")
else:
print("Error:", result['message'])

BIN
Mv3D/point.rknn Normal file

Binary file not shown.

View File

@ -47,7 +47,7 @@ photo_v5 = 0.0
photo_w5 = 1.0 photo_w5 = 1.0
linecount = 2 linecount = 2
remain_linename = 1 remain_linename = 1
remain_count = 10 remain_count = 0
maduo_count = 30 maduo_count = 30
io_take_addr = 8 io_take_addr = 8
io_zip_addr = 11 io_zip_addr = 11

View File

@ -55,6 +55,15 @@ class Logger(QObject):
handler.setFormatter(formatter) handler.setFormatter(formatter)
self.logger_file_info.addHandler(handler) self.logger_file_info.addHandler(handler)
# 添加错误专用日志文件
error_handler = TimedRotatingFileHandler(
file_path.replace('.log', '_error.log'),
when='D', interval=1, backupCount=30, encoding='utf-8')
error_handler.suffix = "%Y-%m-%d"
error_handler.setLevel(logging.ERROR) # 只处理错误级别
error_handler.setFormatter(formatter)
self.logger_file_info.addHandler(error_handler)
def _process_logs(self): def _process_logs(self):
while True: while True:
time.sleep(0.1) time.sleep(0.1)

102
main.py
View File

@ -61,8 +61,11 @@ class MainWindow(QMainWindow, Ui_MainWindow):
self.drop_position_manager = DropPositionManager() self.drop_position_manager = DropPositionManager()
self.sensor_thread = None self.sensor_thread = None
self.sensor2_thread = None self.sensor2_thread = None
self.emergency_thread = None
self.last_status_printed = None self.last_status_printed = None
self.last_pause_printed = None self.last_pause_printed = None
#判断是否按下急停按钮
self.is_emergency_pressed = False
self.cur_pushbutton_num = None self.cur_pushbutton_num = None
self.pushbutton_num_style = """ self.pushbutton_num_style = """
QPushButton { background-color: #101F3F;color:#ffffff } QPushButton { background-color: #101F3F;color:#ffffff }
@ -751,6 +754,9 @@ class MainWindow(QMainWindow, Ui_MainWindow):
# self.camera_img.update_camera_image.connect(self.updateUI_label_detection) # self.camera_img.update_camera_image.connect(self.updateUI_label_detection)
self.feeding.log_signal.connect(self.log_message) self.feeding.log_signal.connect(self.log_message)
self.feeding.stack_finish_signal.connect(self.stack_finish) self.feeding.stack_finish_signal.connect(self.stack_finish)
self.relay_controller.log_signal.connect(self.log_message)
self.relay_controller.emergency_signal.connect(self.emergency_press_notify)
self.feeding.feed_error_signal.connect(self.feed_error_notify)
# self.camera_img=CameraImg(self.feeding) # self.camera_img=CameraImg(self.feeding)
# self.camera_img.update_camera_image.connect(self.updateUI_label_detection) # self.camera_img.update_camera_image.connect(self.updateUI_label_detection)
@ -941,6 +947,13 @@ class MainWindow(QMainWindow, Ui_MainWindow):
def start_Runing(self): def start_Runing(self):
#多线程启动 #多线程启动
self.emergency_thread = Thread(
target=self.relay_controller.handle_emergency_pressed,
name="emergency",
daemon=True
)
self.emergency_thread.start()
self.main_threading = Thread(target=self.run,name="feeding")#主循环 self.main_threading = Thread(target=self.run,name="feeding")#主循环
self.robot_connect_threading = Thread(target=self.robotClient.run,name="robot_connect") self.robot_connect_threading = Thread(target=self.robotClient.run,name="robot_connect")
# self.main_UI_threading = Thread(target=self.updateUI,name="updateUI") # self.main_UI_threading = Thread(target=self.updateUI,name="updateUI")
@ -1005,6 +1018,10 @@ class MainWindow(QMainWindow, Ui_MainWindow):
return return
def send_startFeed_button_click(self): def send_startFeed_button_click(self):
if self.is_emergency_pressed:
self.show_infomessage_box("急停按钮已按下,不能启动,请先松开急停按钮")
return
if self.feeding.feedStatus != FeedStatus.FNone: if self.feeding.feedStatus != FeedStatus.FNone:
self.show_infomessage_box("正在执行") self.show_infomessage_box("正在执行")
return return
@ -1372,7 +1389,7 @@ class MainWindow(QMainWindow, Ui_MainWindow):
self.feeding.feedStatus = FeedStatus.FNone self.feeding.feedStatus = FeedStatus.FNone
self.feeding.reset_status = ResetStatus.RNone self.feeding.reset_status = ResetStatus.RNone
self.relay_controller.pause_start_sensor(True) self.relay_controller.pause_start_sensor(True)
self.feeding.pause = True #暂停,停止滚去等 self.feeding.pause = True #暂停
stop_command = CMDRequest() stop_command = CMDRequest()
stop_command.cmdData.append("actionStop") stop_command.cmdData.append("actionStop")
stop_command.cmdData.append("1") stop_command.cmdData.append("1")
@ -1384,12 +1401,17 @@ class MainWindow(QMainWindow, Ui_MainWindow):
log.log_message(logging.INFO, Constant.str_sys_emergencyStop) log.log_message(logging.INFO, Constant.str_sys_emergencyStop)
def send_emergency_alarm_command(self): def send_emergency_alarm_command(self):
if self.is_emergency_pressed:
self.show_infomessage_box("急停按钮已按下,不能停止,请先松开急停按钮")
return
"""停止当前工作,需要启动""" """停止当前工作,需要启动"""
msg_box_person = QMessageBox() msg_box_person = QMessageBox()
msg_box_person.setIcon(QMessageBox.Icon.Question) msg_box_person.setIcon(QMessageBox.Icon.Question)
msg_box_person.setText("您确定要停止当前工作吗?") msg_box_person.setText("您确定要停止当前工作吗?")
msg_box_person.setWindowTitle("提示") msg_box_person.setWindowTitle("提示")
font = msg_box_person.font()
font.setPointSize(20)
msg_box_person.setFont(font)
msg_box_person.addButton("确定", PySide6.QtWidgets.QMessageBox.ButtonRole.AcceptRole) msg_box_person.addButton("确定", PySide6.QtWidgets.QMessageBox.ButtonRole.AcceptRole)
cancel_button = msg_box_person.addButton("取消", PySide6.QtWidgets.QMessageBox.ButtonRole.RejectRole) cancel_button = msg_box_person.addButton("取消", PySide6.QtWidgets.QMessageBox.ButtonRole.RejectRole)
msg_box_person.exec() msg_box_person.exec()
@ -1401,6 +1423,16 @@ class MainWindow(QMainWindow, Ui_MainWindow):
def send_pauseFeed_button_click(self): def send_pauseFeed_button_click(self):
if self.is_emergency_pressed:
self.show_infomessage_box("急停按钮已按下,不能暂停,请先松开急停按钮")
return
if self.feeding.feedStatus == FeedStatus.FNone:
self.show_infomessage_box("当前没有运行中的工作,不能暂停")
return
self.send_pauseFeed_click()
pass
def send_pauseFeed_click(self):
if self.pushButton_pauseFeed.text() == '暂停': if self.pushButton_pauseFeed.text() == '暂停':
self.relay_controller.pause_start_sensor(True) self.relay_controller.pause_start_sensor(True)
icon9 = QIcon(QIcon.fromTheme(QIcon.ThemeIcon.SystemReboot)) icon9 = QIcon(QIcon.fromTheme(QIcon.ThemeIcon.SystemReboot))
@ -1423,8 +1455,6 @@ class MainWindow(QMainWindow, Ui_MainWindow):
log.log_message(logging.INFO, Constant.str_sys_start_tool) log.log_message(logging.INFO, Constant.str_sys_start_tool)
self.feeding.pause = False self.feeding.pause = False
pass
def send_tabWidget_control_change(self): def send_tabWidget_control_change(self):
if self.robotClient.status_model.curMode != 7: if self.robotClient.status_model.curMode != 7:
@ -1635,6 +1665,9 @@ class MainWindow(QMainWindow, Ui_MainWindow):
else: else:
self.set_run_status_button(False) self.set_run_status_button(False)
if self.is_emergency_pressed:
self.stackedWidget_num.setCurrentIndex(1)
else:
if self.feeding.feedStatus == FeedStatus.FNone: if self.feeding.feedStatus == FeedStatus.FNone:
self.stackedWidget_num.setCurrentIndex(0) self.stackedWidget_num.setCurrentIndex(0)
else: else:
@ -1675,16 +1708,53 @@ class MainWindow(QMainWindow, Ui_MainWindow):
# self.send_pause_command(True) # self.send_pause_command(True)
# self.feeding.pause = True # self.feeding.pause = True
# self.relay_controller.pause_start_sensor(True) # self.relay_controller.pause_start_sensor(True)
self.stop_all_work()
msg_box_finish = QMessageBox() msg_box_finish = QMessageBox()
msg_box_finish.setIcon(QMessageBox.Icon.Warning) msg_box_finish.setIcon(QMessageBox.Icon.Warning)
msg_box_finish.setText("码垛完成,请移走拖板") msg_box_finish.setText("码垛完成,请移走拖板")
msg_box_finish.setWindowTitle("提示") msg_box_finish.setWindowTitle("提示")
msg_box_finish.addButton("确定", QMessageBox.AcceptRole) msg_box_finish.addButton("确定", QMessageBox.AcceptRole)
loc_font=QFont("Microsoft YaHei UI",20) loc_font=QFont("Microsoft YaHei UI",20)
msg_box_finish.setFont(font) msg_box_finish.setFont(loc_font)
result=msg_box_finish.exec() result=msg_box_finish.exec()
self.label_remain_num.setText(str(self.feeding.feedConfig.remain_count)) self.label_remain_num.setText(str(self.feeding.feedConfig.remain_count))
self.stop_all_work()
def feed_error_notify(self,error_code,error_msg):
"""
feeding 报警处理,打开声控
"""
self.relay_controller.open(alarm=True)
self.send_pauseFeed_click()
msg_box_finish = QMessageBox()
msg_box_finish.setIcon(QMessageBox.Icon.Warning)
msg_box_finish.setText(error_msg)
msg_box_finish.setWindowTitle("提示")
msg_box_finish.addButton("确定", QMessageBox.AcceptRole)
loc_font=QFont("Microsoft YaHei UI",20)
msg_box_finish.setFont(loc_font)
result=msg_box_finish.exec()
if error_code==1:
msg_box_finish.setText("是否打开滚筒让料带滚出?")
msg_box_finish.addButton("取消", QMessageBox.RejectRole)
result=msg_box_finish.exec()
if result == QMessageBox.AcceptRole:
self.relay_controller.open(conveyor2=True)
def emergency_press_notify(self,emergency_pressed):
"""
急停按钮按下
"""
if emergency_pressed:
if not self.is_emergency_pressed:
self.send_pauseFeed_click()
self.is_emergency_pressed = True
self.label_button_status.setText("急停中,禁止操作")
else:
if self.is_emergency_pressed:
self.label_button_status.setText("已暂停,可操作工控机")
self.is_emergency_pressed = False
def updateUI_IOPanel(self): def updateUI_IOPanel(self):
@ -1921,12 +1991,13 @@ class MainWindow(QMainWindow, Ui_MainWindow):
self.robotClient.add_sendQuene(request_command) self.robotClient.add_sendQuene(request_command)
def send_clear_alarm_command(self, pause: bool): def send_clear_alarm_command(self, pause: bool):
self.send_clear_auto_command() self.relay_controller.close(alarm=True)
pause_command = CMDRequest() # self.send_clear_auto_command()
pause_command.cmdData.append("StopButton") # pause_command = CMDRequest()
pause_command.cmdData.append("1") # pause_command.cmdData.append("StopButton")
request_command = pause_command.toString() # pause_command.cmdData.append("1")
self.robotClient.add_sendQuene(request_command) # request_command = pause_command.toString()
# self.robotClient.add_sendQuene(request_command)
def send_switch_tool_command(self): def send_switch_tool_command(self):
switch_command = CMDRequest() switch_command = CMDRequest()
@ -2201,6 +2272,9 @@ class MainWindow(QMainWindow, Ui_MainWindow):
msg_box.setText(message) msg_box.setText(message)
msg_box.setIcon(QMessageBox.Icon.Information) msg_box.setIcon(QMessageBox.Icon.Information)
msg_box.setStandardButtons(QMessageBox.StandardButton.Ok) msg_box.setStandardButtons(QMessageBox.StandardButton.Ok)
font = msg_box.font()
font.setPointSize(20)
msg_box.setFont(font)
msg_box.show() msg_box.show()
def send_click_change_stackView(self,index): def send_click_change_stackView(self,index):
self.stackedWidget_view.setCurrentIndex(index) self.stackedWidget_view.setCurrentIndex(index)
@ -2303,7 +2377,9 @@ def handle_exception(exc_type, exc_value, exc_tb):
if __name__ == "__main__": if __name__ == "__main__":
app = MyApplication(sys.argv) app = MyApplication(sys.argv)
window = MainWindow() window = MainWindow()
window.show() # window.show()
# window.showFullScreen()
window.showMaximized()
sys.excepthook = handle_exception sys.excepthook = handle_exception
try : try :
sys.exit(app.exec()) sys.exit(app.exec())

View File

@ -0,0 +1,37 @@
import os
import sys
import time
from Mv3D.CameraUtil import CameraUtil
from Mv3D.calculate_diff2 import calculate_offset_from_image
if __name__=='__main__':
camera_img=CameraUtil()
loc_image_path=camera_img.save_img()
print(f"保存图片路径:{loc_image_path}")
try:
loc_model_result = calculate_offset_from_image(loc_image_path, visualize=False)
if loc_model_result['success']:
#置信度
if loc_model_result['obj_conf']>0.5:
loc_offsetX=loc_model_result['dx_mm']
if abs(loc_model_result['dx_mm'])<200:
loc_is_next=True
print(f'{loc_image_path}:料带偏移{loc_offsetX}mm')
else:
print(f"{loc_image_path}:料带偏移过大,报警")
# self.feed_error_signal.emit(1,f"{loc_image_path}:料带偏移过大,报警")
else:
print(f"{loc_image_path}:未检测到料带,置信度低")
# self.feed_error_signal.emit(2,f"{loc_image_path}:未检测到料带,置信度低")
else:
print(f"{loc_image_path}:计算偏移失败视觉返回False")
# self.feed_error_signal.emit(2,f"{loc_image_path}:计算偏移失败视觉返回False")
# loc_is_next=False
loc_offsetX=0
except Exception as e:
print(f"{loc_image_path}:计算偏移失败:{e}")
time.sleep(10)