Files
2026-01-16 15:21:54 +08:00

718 lines
32 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import socket
import binascii
import time
import threading
from datetime import datetime
import logging
from typing import Optional
from PySide6.QtCore import Signal, QObject
import numpy as np
from pandas.core.arrays import boolean
import Constant
class RelayController(QObject):
log_signal = Signal(int, str)
take_robot_signal = Signal()
emergency_signal = Signal(bool)
def __init__(self, host='192.168.0.18', port=50000):
super().__init__()
# ===================== 全局线程延时参数 =====================
self.sensor1_loop_delay = 0.1 # SENSOR1 线程轮询间隔(秒)
self.sensor1_error_delay = 1.0 # SENSOR1 出错或暂停时延时(秒)
self.sensor1_post_action_delay = 0.2 # SENSOR1 每次循环后延时(秒)
self.sensor2_loop_delay = 0.2 # SENSOR2 线程轮询间隔(秒)
self.sensor2_loop_lost=0.1 # SENSOR2 线程轮询间隔(秒)
# self.sensor2_loop_delay = 0.5 # SENSOR2 线程轮询间隔(秒)
self.sensor2_error_delay = 0.5 # SENSOR2 出错时延时(秒)
self.sensor2_post_action_delay = 0.2 # SENSOR2 每次循环后延时(秒)
# ===================== 全局动作延时参数 =====================
self.delay_conveyor = 0.5 # 传送带开/关动作延时(一半时间,我在控制程序和线程都加了一样的延时)
self.delay_pusher = 0.05 # 推板开/关动作延时
self.delay_clamp = 0.5 # 夹爪动作延时
self.delay_after_pusher = 5.0 # 推板推出后到重启传动带时间
self.emergency_is_pressed=False
# ===================== 传感器稳定检测参数 =====================
self.sensor_stable_duration = 1.0 # 传感器状态稳定检测时间(秒)
self.sensor_max_attempts = 3 # 连续检测次数达到此值判定有效
self.sensor1_debounce_time = 1.0 # 传感器1防抖时间
# ===================== 网络与设备映射 =====================
self.host = host
self.port = port
self.CONVEYOR1 = 'conveyor1'
self.PUSHER = 'pusher'
self.CONVEYOR2 = 'conveyor2'
self.CONVEYOR2_REVERSE = 'conveyor2_reverse'
self.CLAMP = 'clamp'
self.PUSHER1 = 'pusher1'
self.SENSOR1 = 'sensor1'
self.SENSOR2 = 'sensor2'
self.BELT = 'belt'
self.ALARM = 'alarm'
self.BLOW_SENSOR2 = 'blow_sensor2'
self.valve_commands = {
#包装机皮带
self.CONVEYOR1: {'open': '000000000006010500070000', 'close': '00000000000601050007FF00'},
# self.CONVEYOR11: {'open': '00000000000601050000FF00', 'close': '000000000006010500000000'},
self.PUSHER: {'open': '00000000000601050001FF00', 'close': '000000000006010500010000'},
#滚筒2000 0012正转2000 0022 2001变频器频率调整 2000正反转。
self.CONVEYOR2: {'open': '000100000006020620000012', 'close': '000100000006020620000001'},
#DO4
self.CLAMP: {'open': '00000000000601050003FF00', 'close': '000000000006010500030000'},
#DO5 回 DO2推
self.PUSHER1: {'open': '00000000000601050004FF00', 'close': '000000000006010500040000'},
#D07 长皮带
self.BELT: {'open': '00000000000601050006FF00', 'close': '000000000006010500060000'},
#D01 声控报警
self.ALARM: {'open': '00000000000601050000FF00', 'close': '000000000006010500000000'},
#DO6 吹传感器2
self.BLOW_SENSOR2: {'open': '00000000000601050005FF00', 'close': '000000000006010500050000'},
#滚筒反转
self.CONVEYOR2_REVERSE: {'open': '000100000006020620000022', 'close': '000100000006020620000001'}
}
#devices:读取继点器的状态
#sensors 传感器的状态 D12
self.read_status_command = {
'devices': '000000000006010100000008',
'sensors': '000000000006010200000008'
}
self.device_bit_map = {
self.CONVEYOR1: 0,
self.PUSHER: 1,
self.CONVEYOR2: 2,
self.CLAMP: 3,
self.PUSHER1: 4,
self.CONVEYOR2_REVERSE: 5,
self.BELT: 6,
self.ALARM: 7,
#self.BLOW_SENSOR2: 8
}
self.sensor_bit_map = {
self.SENSOR1: 0,
self.SENSOR2: 1,
}
self.device_name_map = {
self.CONVEYOR1: "包装机皮带",
self.PUSHER: "推板开",
self.CONVEYOR2: "滚筒",
self.CLAMP: "机械臂夹爪",
self.PUSHER1: "推板关",
self.CONVEYOR2_REVERSE: "滚筒反转",
self.BELT: "皮带",
self.ALARM: "声控报警",
self.BLOW_SENSOR2: "吹传感器2"
}
self.sensor_name_map = {
self.SENSOR1: '位置传感器1',
self.SENSOR2: '位置传感器2',
}
# ===================== 状态控制变量 =====================
self._running = False #线程运行标识
self._ispause = False #线程暂停标识
self._sensor1_thread = None
self._sensor2_thread = None
self.required_codes = {'0101', '0103','0105','0107'} # 有效状态码传感器1
self.required_codes_1 = {'0102', '0103','0106','0107'} # 有效状态码传感器2
self.sensor1_triggered = False
self.sensor1_last_time = 0
self.sensor2_ready = False #默认不打开
self.motor_stopped_by_sensor2 = True
self.is_drop_35=False #是否是35码
# ===================== 基础通信方法 =====================
def send_command(self, command_hex, retry_count=2, source='unknown'):
if Constant.DebugPosition:
print(f"[发送命令] {command_hex} ({source})")
return None
byte_data = binascii.unhexlify(command_hex)
for attempt in range(retry_count):
try:
# begin_time=time.time()
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(1)
sock.connect((self.host, self.port))
sock.send(byte_data)
response = sock.recv(1024)
# end_time=time.time()
# print(f"({source}) 耗时: {end_time-begin_time:.3f}秒")
# hex_response = binascii.hexlify(response).decode('utf-8')
#if source == 'sensor':
#print(f"[传感器响应] {hex_response}")
#elif source == 'device':
#print(f"[设备控制响应] {hex_response}")
#else:
#print(f"[通信响应] {hex_response}")
return response
except Exception as e:
self.log_signal.emit(logging.INFO,f"网络继电器通信错误 ({source}): {e}, 尝试重连... ({attempt + 1}/{retry_count})")
print(f"网络继电器通信错误 ({source}): {e}, 尝试重连... ({attempt + 1}/{retry_count})")
time.sleep(1)
self.trigger_alarm()
return None
def trigger_alarm(self):
self.log_signal.emit(logging.ERROR,"警告:网络继电器连续多次通信失败,请检查设备连接!")
print("警告:网络继电器连续多次通信失败,请检查设备连接!")
# ===================== 状态读取方法 =====================
def get_all_device_status(self, command_type='devices'):
# if Constant.DebugPosition:
# return {self.SENSOR2:True}
command = self.read_status_command.get(command_type)
if not command:
print(f"未知的网络继电器读取类型: {command_type}")
return {}
source = 'sensor' if command_type == 'sensors' else 'device'
response = self.send_command(command, source=source)
status_dict = {}
if response and len(response) >= 10:
status_byte = response[9]
status_bin = f"{status_byte:08b}"[::-1]
bit_map = self.device_bit_map if command_type == 'devices' else self.sensor_bit_map
# name_map = self.device_name_map if command_type == 'devices' else self.sensor_name_map
for key, bit_index in bit_map.items():
status_dict[key] = status_bin[bit_index] == '1'
else:
print(f"网络继电器[{command_type}] 读取状态失败或响应无效")
return status_dict
def get_all_sensor_responses(self, command_type='sensors'):
"""
获取所有传感器的原始 Modbus 响应字符串
示例:{'sensor1': '00000000000401020101', 'sensor2': '00000000000401020100'}
"""
command = self.read_status_command.get(command_type)
if not command:
print(f"未知的读取类型: {command_type}")
return {}
source = 'sensor' if command_type == 'sensors' else 'device'
response = self.send_command(command, source=source)
responses = {}
if response and len(response) >= 10:
hex_response = binascii.hexlify(response).decode('utf-8')
# print(f"[原始响应][{command_type}] {hex_response}")
# 假设传感器数据从第 9 字节开始,长度为 2 字节
for name, bit_index in self.sensor_bit_map.items():
offset = 9 + (bit_index // 8)
bit_pos = bit_index % 8
byte = response[offset]
status = (byte >> bit_pos) & 1
responses[name] = hex_response
else:
print(f"[{command_type}] 无法获取响应数据")
return responses
def get_emergency_is_pressed(self):
"获取急停信号DI 3 位为1正常DI 3为0时为按下急停状态00000000000401020107 后四位01表示一个字节最后一位07二进制对应开关量"
"按下急停为"
command = self.read_status_command.get("sensors")
response = self.send_command(command)
loc_is_pressed=False
if response and len(response) >= 10:
status_byte = response[9]
#简单验证
status_crc=response[8]
loc_is_pressed =status_crc==1 and (status_byte & 0b100) == 0 # 0b100 表示第三位为1
else:
loc_is_pressed=None
self.log_signal.emit(logging.ERROR,f"网络继电器[急停] 读取状态失败或响应无效")
print(f"网络继电器[急停] 读取状态失败或响应无效")
return loc_is_pressed
def parse_status_code(self, response):
"""
从 Modbus 响应字符串中提取状态码(后两位)
示例00000000000401020101 -> '01'
"""
if isinstance(response, str) and len(response) >= 18:
return response[16:20]
return None
def is_valid_sensor_status(self, sensor_name):
stable_count = 0
for _ in range(int(self.sensor_stable_duration / self.sensor1_loop_delay)):
responses = self.get_all_sensor_responses('sensors')
response = responses.get(sensor_name)
if not response:
stable_count = 0
else:
status_code = self.parse_status_code(response)
if status_code in self.required_codes:
stable_count += 1
if stable_count >= self.sensor_max_attempts:
return True
else:
stable_count = 0
time.sleep(self.sensor1_loop_delay)
return False
def is_valid_sensor_signal_stable(self, sensor_name, detection_duration=3.0, stability_duration=2.5, check_interval=0.1):
"""
检测在指定时间窗口内是否存在持续稳定的有效信号
参数:
sensor_name: 传感器名称
detection_duration: 总检测时间窗口(秒)默认为3秒
stability_duration: 信号需要持续稳定的时间(秒)默认为2.5秒
check_interval: 检测间隔(秒)默认为0.1秒
返回:
True: 在时间窗口内检测到持续稳定的有效信号
False: 未检测到持续稳定的有效信号
"""
stable_start_time = None # 记录首次检测到有效信号的时间
start_time = time.time()
if not self.is_valid_sensor(sensor_name):
return False # 传感器状态无效,返回
else:
stable_start_time = time.time() # 首次检测到有效信号
time.sleep(check_interval)
while time.time() - start_time < detection_duration:
temp_is_valid = self.is_valid_sensor(sensor_name)
if temp_is_valid:
if time.time() - stable_start_time >= stability_duration:
return True # 信号持续稳定达到要求时间
else:
stable_start_time = time.time() # 信号不稳定,重置计时
time.sleep(check_interval)
return False
def is_valid_sensor_status_1(self, sensor_name):
stable_count = 0
for _ in range(int(self.sensor_stable_duration / self.sensor2_loop_delay)):
responses = self.get_all_sensor_responses('sensors')
response = responses.get(sensor_name)
if not response:
print(f"[警告] 无法获取 {sensor_name} 的响应,尝试重试...")
stable_count = 0
else:
status_code = self.parse_status_code(response)
if status_code in self.required_codes_1:
stable_count += 1
if stable_count >= self.sensor_max_attempts:
return True
else:
stable_count = 0
time.sleep(self.sensor2_loop_delay)
return False
def is_valid_sensor2_status_lost(self, sensor_name):
stable_count = 0
_try_nums=5 # 尝试次数
for _ in range(_try_nums):
responses = self.get_all_sensor_responses('sensors')
response = responses.get(sensor_name)
if not response:
print(f"[警告] 无法获取 {sensor_name} 的响应,尝试重试...")
stable_count = 0
else:
status_code = self.parse_status_code(response)
if status_code not in self.required_codes_1:
stable_count += 1
if stable_count >= self.sensor_max_attempts:
return True
else:
stable_count = 0
time.sleep(self.sensor2_loop_lost)
return False
def is_valid_sensor(self,sensor_name):
"""
检查传感器状态是否有效
参数:
sensor_name: 传感器名称
返回:
True: 传感器状态有效
False: 传感器状态无效
"""
stable_count = 0
_try_nums=5 # 尝试次数
for _ in range(_try_nums):
responses = self.get_all_sensor_responses('sensors')
response = responses.get(sensor_name)
if not response:
print(f"[警告] 无法获取 {sensor_name} 的响应,尝试重试...")
return False
else:
temp_status_code = self.parse_status_code(response)
if temp_status_code in self.required_codes_1:
stable_count += 1
if stable_count >= 3:
return True
else:
stable_count = 0
time.sleep(self.sensor2_loop_lost)
return False
def is_valid_sensor_nowait(self,sensor_name):
"""
检查传感器状态是否有效,不等待,如果第一次没有信号不等待,有信号一起一秒钟
参数:
sensor_name: 传感器名称
返回:
True: 传感器状态有效
False: 传感器状态无效
"""
stable_count = 0
_try_nums=5 # 尝试次数
for _num in range(_try_nums):
responses = self.get_all_sensor_responses('sensors')
response = responses.get(sensor_name)
if not response:
print(f"[警告] 无法获取 {sensor_name} 的响应,尝试重试...")
return False
else:
temp_status_code = self.parse_status_code(response)
if temp_status_code in self.required_codes_1:
stable_count += 1
if stable_count >= 3:
return True
else:
if _num==0:
#首次没有信号,直接返回
return False
else:
stable_count = 0
time.sleep(self.sensor2_loop_lost)
return False
def is_valid_sensor_stable(self,sensor_name):
"""
检查传感器状态是否有效
参数:
sensor_name: 传感器名称
返回:
True: 传感器状态有效
False: 传感器状态无效
"""
#检测一秒钟,首次没有信号直接返回
if not self.is_valid_sensor_nowait(sensor_name):
return False
#需要增加超时时间,否则会一直等待
stable_count = 0
_try_nums=10 # 尝试次数
for _ in range(_try_nums):
responses = self.get_all_sensor_responses('sensors')
response = responses.get(sensor_name)
if response:
temp_status_code = self.parse_status_code(response)
if temp_status_code in self.required_codes_1:
stable_count += 1
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}:检测到信号,已检测 {stable_count}")
if stable_count >= 8:
return True
else:
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}:未检测到信号,已检测 {stable_count}")
else:
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}:[警告] 无法获取 {sensor_name} 的响应,尝试重试...")
# else:
# stable_count = 0
time.sleep(self.sensor2_loop_delay)
return False
# def is_valid_sensor_signal_stable(self, sensor_name, detection_duration=3.0, stability_duration=2.5, check_interval=0.1):
# """
# 检测在指定时间窗口内是否存在持续稳定的有效信号
# 参数:
# sensor_name: 传感器名称
# detection_duration: 总检测时间窗口(秒)默认为3秒
# stability_duration: 信号需要持续稳定的时间(秒)默认为2.5秒
# check_interval: 检测间隔(秒)默认为0.1秒
# 返回:
# True: 在时间窗口内检测到持续稳定的有效信号
# False: 未检测到持续稳定的有效信号
# """
# stable_start_time = None # 记录首次检测到有效信号的时间
# stable_end_time = None # 记录最后检测到有效信号的时间
# start_time = time.time()
# if not self.is_valid_sensor_single(sensor_name):
# return False # 传感器状态无效,返回
# else:
# stable_start_time = time.time() # 首次检测到有效信号
# # stable_end_time = stable_start_time # 最后检测到有效信号
# time.sleep(check_interval)
# while time.time() - start_time < detection_duration:
# #1s时间
# temp_is_valid = self.is_valid_sensor_single(sensor_name)
# if temp_is_valid:
# stable_start_time=time.time()
# else:
# stable_start_time = None # 信号不稳定,重置计时
# # time.sleep(check_interval)
# if stable_start_time - start_time >= stability_duration:
# return True # 信号持续稳定达到要求时间
# else:
# return False
# ===================== 动作控制方法 =====================
def open(self, conveyor1=False, pusher=False, conveyor2=False, clamp=False, pusher1=False, conveyor2_reverse=False,belt=False,alarm=False,blow_sensor2=False):
# if Constant.DebugPosition:
# return
# status = self.get_all_device_status()
if conveyor1:
self.send_command(self.valve_commands[self.CONVEYOR1]['open'])
time.sleep(self.delay_conveyor)
if pusher:
self.send_command(self.valve_commands[self.PUSHER]['open'])
time.sleep(self.delay_pusher)
if conveyor2:
self.send_command(self.valve_commands[self.CONVEYOR2]['open'])
# time.sleep(0.01)
if clamp:
self.send_command(self.valve_commands[self.CLAMP]['open'])
time.sleep(self.delay_clamp)
if pusher1:
self.send_command(self.valve_commands[self.PUSHER1]['open'])
time.sleep(self.delay_pusher)
if conveyor2_reverse:
self.send_command(self.valve_commands[self.CONVEYOR2_REVERSE]['open'])
time.sleep(self.delay_conveyor)
if belt:
self.send_command(self.valve_commands[self.BELT]['open'])
# time.sleep(self.delay_belt)
if alarm:
self.send_command(self.valve_commands[self.ALARM]['open'])
# time.sleep(self.delay_alarm)
if blow_sensor2:
self.send_command(self.valve_commands[self.BLOW_SENSOR2]['open'])
# time.sleep(self.delay_blow_sensor2)
def close(self, conveyor1=False, pusher=False, conveyor2=False, clamp=False, pusher1=False, conveyor2_reverse=False,belt=False,alarm=False,blow_sensor2=False):
if conveyor1:
self.send_command(self.valve_commands[self.CONVEYOR1]['close'])
time.sleep(self.delay_conveyor)
if pusher:
self.send_command(self.valve_commands[self.PUSHER]['close'])
time.sleep(self.delay_pusher)
if conveyor2:
self.send_command(self.valve_commands[self.CONVEYOR2]['close'])
#time.sleep(self.delay_conveyor)
if clamp:
self.send_command(self.valve_commands[self.CLAMP]['close'])
time.sleep(0.05)
if pusher1:
self.send_command(self.valve_commands[self.PUSHER1]['close'])
time.sleep(self.delay_pusher)
if conveyor2_reverse:
self.send_command(self.valve_commands[self.CONVEYOR2_REVERSE]['close'])
time.sleep(self.delay_conveyor)
if belt:
self.send_command(self.valve_commands[self.BELT]['close'])
# time.sleep(self.delay_belt)
if alarm:
self.send_command(self.valve_commands[self.ALARM]['close'])
# time.sleep(self.delay_alarm)
if blow_sensor2:
self.send_command(self.valve_commands[self.BLOW_SENSOR2]['close'])
# time.sleep(self.delay_blow_sensor2)
# ===================== 传感器处理线程 =====================
def handle_sensor1(self):
while self._running:
if self._ispause and not self.sensor1_triggered:
#暂停线程, 保证线程1执行完传感器操作
time.sleep(self.sensor1_error_delay)
continue
try:
if self.is_valid_sensor_status(self.SENSOR1):
current_time = time.time()
if not self.sensor1_triggered and (
current_time - self.sensor1_last_time) > self.sensor1_debounce_time:
self.sensor1_triggered = True
self.sensor1_last_time = current_time
# 1.停止包装机皮带电机:关闭 conveyor1
self.close(conveyor1=True)
time.sleep(self.delay_conveyor)
# 2.推板开启:推出去动作
self.open(pusher=True)
time.sleep(self.delay_pusher)
self.close(pusher=True)
# 推板推出后重新启动电机时间传送带1延时5秒
time.sleep(self.delay_after_pusher)
# 4.推板关闭:推板收回来动作
self.open(pusher1=True)
time.sleep(self.delay_pusher)
self.close(pusher1=True)
# 3.重新开启包装机皮带电机: 开启conveyor1
time.sleep(self.delay_conveyor)
self.open(conveyor1=True)
time.sleep(1)
# 5. 状态检查(可选)
# status = self.get_all_device_status()
# if status.get('conveyor1') and not status.get('pusher'):
# print("流程完成1皮带运行中推板已收回")
# else:
# print("状态异常,请检查设备")
# 流程结束,重置触发标志
self.sensor1_triggered = False
time.sleep(self.sensor1_loop_delay)
except Exception as e:
print(f"SENSOR1 处理错误: {e}")
self.log_signal.emit(logging.ERROR,f"SENSOR1 处理错误: {e}")
self.sensor1_triggered = False
time.sleep(self.sensor1_error_delay)
def handle_sensor2(self):
_is_pause_close=True
#是否料袋尾部(有信号--》无信号)
_is_signal=False
while self._running:
if self._ispause:
#暂停
if _is_pause_close:
self.close(conveyor2=True)
self.motor_stopped_by_sensor2 = True
# self.sensor2_ready = True #初始值
_is_pause_close=False
time.sleep(self.sensor2_error_delay)
continue
#开启线程
_is_pause_close=True
try:
# if _is_signal or self.is_valid_sensor_status_1(self.SENSOR2):
#motor_stopped_by_sensor2=False(在滚动的时候)才去检测信号,否则如果后面的料子
#有信号了不会在FPhoto后开滚筒
if (not self.motor_stopped_by_sensor2) and (_is_signal or self.is_valid_sensor_stable(self.SENSOR2)):
#检测到信号,如果之前是没有信号,关闭滚筒
#print('检查到sensor2信号')
if _is_signal and self.is_valid_sensor2_status_lost(self.SENSOR2):
self.log_signal.emit(logging.INFO,"检查到sensor2信号消失")
if self.is_drop_35:
time.sleep(3.5)
self.open(conveyor2_reverse=True)
time.sleep(2.5)
self.close(conveyor2=True)
#滚筒关闭标志
self.motor_stopped_by_sensor2 = True
# 发送信号通知机器人取走物品
self.take_robot_signal.emit()
_is_signal=False
self.sensor2_ready=False #打开滚洞标识FPhoto控制打开
else:
if self.sensor2_ready:
#只有在FPhoto处才有效
_is_signal=True
# if self.motor_stopped_by_sensor2:
# self.log_signal.emit(logging.INFO,"开滚筒2")
# print('开滚筒2开滚筒2开滚筒2开滚筒2')
# self.open(conveyor2=True)
# self.motor_stopped_by_sensor2 = False
time.sleep(0.01)
continue
elif self.sensor2_ready:
#sensor2_ready:通过Feeding:FPhoto处控制是否启动
if self.motor_stopped_by_sensor2:
self.log_signal.emit(logging.INFO,"开滚筒1")
print('开滚筒1开滚筒1开滚筒1开滚筒1')
self.open(conveyor2=True)
self.motor_stopped_by_sensor2 = False
time.sleep(0.5)
except Exception as e:
print(f"SENSOR2 处理错误: {e}")
self.log_signal.emit(logging.ERROR,f"SENSOR2 处理错误: {e}")
time.sleep(self.sensor2_error_delay)
def pause_start_sensor(self,is_pause):
"""
暂停或开启传感器线程
is_pause:True是False否
"""
self._ispause = is_pause
if is_pause:
"""暂停皮带"""
self.close(belt=True)
else:
"""开启皮带"""
self.open(belt=True)
def set_drop_35(self,is_drop_35):
"""
设置是否是35码
is_drop_35:True是False否
"""
self.is_drop_35=is_drop_35
def stop_sensor(self,sensor1_thread,sensor2_thread):
if not self._running:
print("线程未在运行")
return
print("停止传感器线程")
self._running = False
if sensor1_thread and sensor1_thread.is_alive():
sensor1_thread.join()
if sensor2_thread and sensor2_thread.is_alive():
sensor2_thread.join()
print("传感器线程已终止。")
def handle_emergency_pressed(self):
"处理急停按钮信号状态线程"
#print('检查急停按钮状态1')
while self._running:
try:
#print('检查急停按钮状态')
loc_is_pressed =self.get_emergency_is_pressed()
if loc_is_pressed is None:
time.sleep(0.5)
continue
if loc_is_pressed:
# 处理急停按钮信号状态
if not self.emergency_is_pressed:
print('急停按钮被按下')
self.log_signal.emit(logging.INFO,f"急停按钮被按下")
self.emergency_is_pressed=True
self.emergency_signal.emit(True)
else:
#print('急停按钮未被按下')
self.emergency_is_pressed=False
self.emergency_signal.emit(False)
time.sleep(0.5)
except Exception as e:
print(f"急停 处理错误: {e}")
self.log_signal.emit(logging.ERROR,f"急停线程 处理错误: {e}")
time.sleep(1)