Files
AutoControlSystem-G/EMV/EMV_reneer_pengqi_save.py

274 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import socket
import binascii
import time
import threading
class RelayController:
def __init__(self, host='192.168.0.18', port=50000):
self.host = host
self.port = port
# 控件映射
self.CONVEYOR1 = 'conveyor1'
self.PUSHER = 'pusher'
self.CONVEYOR2 = 'conveyor2'
self.CLAMP = 'clamp'
self.SENSOR1 = 'sensor1'
self.SENSOR2 = 'sensor2'
self.valve_commands = {
self.CONVEYOR1: {'open': '00000000000601050000FF00', 'close': '000000000006010500000000'},
self.PUSHER: {'open': '00000000000601050001FF00', 'close': '000000000006010500010000'},
self.CONVEYOR2: {'open': '00000000000601050002FF00', 'close': '000000000006010500020000'},
self.CLAMP: {'open': '00000000000601050003FF00', 'close': '000000000006010500030000'}
}
self.read_status_command = {
'devices': '000000000006010100000008',
'sensors': '000000000006010200000008'
}
self.device_bit_map = {
self.CONVEYOR1: 0,
self.PUSHER: 1,
self.CONVEYOR2: 2,
self.CLAMP: 3,
}
self.sensor_bit_map = {
self.SENSOR1: 0,
self.SENSOR2: 1,
}
self.device_name_map = {
self.CONVEYOR1: "传送带1",
self.PUSHER: "推板",
self.CONVEYOR2: "传送带2",
self.CLAMP: "机械臂"
}
self.sensor_name_map = {
self.SENSOR1: '位置传感器1',
self.SENSOR2: '位置传感器2',
}
self._running = False
self._sensor1_thread = None
self._sensor2_thread = None
def send_command_old(self, command_hex):
byte_data = binascii.unhexlify(command_hex)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
try:
sock.connect((self.host, self.port))
sock.send(byte_data)
response = sock.recv(1024)
print(f"收到响应: {binascii.hexlify(response)}")
return response
except Exception as e:
print(f"通信错误: {e}")
return None
def start_sensor1_only(self):
if self._running:
print("线程已经在运行")
return
print("启动传感器1监听线程")
self._running = True
self._sensor1_thread = threading.Thread(target=self.handle_sensor1, daemon=True)
self._sensor1_thread.start()
def send_command(self, command_hex, retry_count=5, source='unknown'):
byte_data = binascii.unhexlify(command_hex)
for attempt in range(retry_count):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(10)
sock.connect((self.host, self.port))
sock.send(byte_data)
response = sock.recv(1024)
hex_response = binascii.hexlify(response).decode('utf-8')
# 根据 source 区分响应来源
if source == 'sensor':
print(f"[传感器响应] {hex_response}")
elif source == 'device':
print(f"[设备控制响应] {hex_response}")
else:
print(f"[通信响应] {hex_response}")
return response
except Exception as e:
print(f"通信错误 ({source}): {e}, 尝试重连... ({attempt + 1}/{retry_count})")
if attempt == retry_count - 1:
self.trigger_alarm()
time.sleep(5)
return None
def trigger_alarm(self):
"""当通信错误超过最大重试次数时触发报警"""
print("警告:连续多次通信失败,请检查设备连接!")
# 这里可以添加更多的报警措施,如发送邮件、短信或声音警报等
def get_all_device_status(self, command_type='devices'):
command = self.read_status_command.get(command_type)
if not command:
print(f"未知的读取类型: {command_type}")
return {}
# 设置来源标识
source = 'sensor' if command_type == 'sensors' else 'device'
response = self.send_command(command, source=source)
status_dict = {}
if response and len(response) >= 10:
status_byte = response[9]
status_bin = f"{status_byte:08b}"[::-1]
bit_map = self.device_bit_map if command_type == 'devices' else self.sensor_bit_map
name_map = self.device_name_map if command_type == 'devices' else self.sensor_name_map
for key, bit_index in bit_map.items():
status_dict[key] = status_bin[bit_index] == '1'
else:
print(f"[{command_type}] 读取状态失败或响应无效")
return status_dict
def get_device_status_old(self, name, command_type='devices'):
return self.get_all_device_status(command_type).get(name, None)
def get_device_status(self, name, command_type='devices', stable_duration=1.0, max_attempts=3):
"""
获取指定设备/传感器的状态,只有在连续稳定检测到信号后才返回 True。
参数:
name (str): 设备/传感器名称,如 SENSOR1
command_type (str): 类型,'devices''sensors'
stable_duration (float): 信号需要稳定的持续时间(秒)
max_attempts (int): 最大尝试次数(用于稳定性判断)
返回:
bool or None: 稳定检测到信号返回 True否则返回 False 或 None失败
"""
stable_count = 0
interval = 0.2 # 每隔多久检查一次
for _ in range(int(stable_duration / interval)):
statuses = self.get_all_device_status(command_type)
status = statuses.get(name)
if status is True:
stable_count += 1
if stable_count >= max_attempts:
return True
elif status is False:
return False
else:
# None 表示读取失败
print(f"[警告] 读取 {name} 状态失败,尝试重试...")
stable_count = 0
time.sleep(interval)
return False # 默认返回 False避免误触发
def open(self, conveyor1=False, pusher=False, conveyor2=False, clamp=False):
status = self.get_all_device_status()
if conveyor1 and not status.get(self.CONVEYOR1, False):
print("打开传送带1")
self.send_command(self.valve_commands[self.CONVEYOR1]['open'], source='device')
time.sleep(1)
if pusher and not status.get(self.PUSHER, False):
print("打开推板")
self.send_command(self.valve_commands[self.PUSHER]['open'], source='device')
time.sleep(0.05)
if conveyor2 and not status.get(self.CONVEYOR2, False):
print("打开传送带2")
self.send_command(self.valve_commands[self.CONVEYOR2]['open'], source='device')
time.sleep(1)
if clamp and not status.get(self.CLAMP, False):
print("启动机械臂抓夹")
self.send_command(self.valve_commands[self.CLAMP]['open'], source='device')
time.sleep(0.5)
def close(self, conveyor1=False, pusher=False, conveyor2=False, clamp=False):
status = self.get_all_device_status()
if conveyor1 and status.get(self.CONVEYOR1, True):
print("关闭传送带1")
self.send_command(self.valve_commands[self.CONVEYOR1]['close'], source='device')
time.sleep(1)
if pusher and status.get(self.PUSHER, True):
print("关闭推板")
self.send_command(self.valve_commands[self.PUSHER]['close'], source='device')
time.sleep(0.05)
if conveyor2 and status.get(self.CONVEYOR2, True):
print("关闭传送带2")
self.send_command(self.valve_commands[self.CONVEYOR2]['close'], source='device')
time.sleep(1)
if clamp and status.get(self.CLAMP, True):
print("停止机械臂抓夹")
self.send_command(self.valve_commands[self.CLAMP]['close'], source='device')
time.sleep(0.5)
def handle_sensor1(self):
while self._running:
try:
if self.get_device_status(self.SENSOR1, 'sensors'):
print("SENSOR1 检测到信号,执行流程")
self.close(conveyor1=True)
time.sleep(2)
self.open(pusher=True)
time.sleep(5)
self.close(pusher=True)
time.sleep(2)
self.open(conveyor1=True)
time.sleep(0.5)
except Exception as e:
print(f"SENSOR1 处理错误: {e}")
def handle_sensor2(self):
while self._running:
try:
if self.get_device_status(self.SENSOR2, 'sensors'):
print("SENSOR2 检测到信号,执行流程")
self.close(conveyor2=True)
time.sleep(2)
self.open(clamp=True)
time.sleep(2)
self.open(conveyor2=True)
time.sleep(0.5)
except Exception as e:
print(f"SENSOR2 处理错误: {e}")
def start(self):
if self._running:
print("线程已经在运行")
return
print("启动传感器线程")
self._running = True
self._sensor1_thread = threading.Thread(target=self.handle_sensor1, daemon=True)
self._sensor2_thread = threading.Thread(target=self.handle_sensor2, daemon=True)
self._sensor1_thread.start()
self._sensor2_thread.start()
def stop(self):
if not self._running:
print("线程未在运行")
return
print("停止传感器线程")
self._running = False # 设置标志位为 False
if self._sensor1_thread is not None:
self._sensor1_thread.join() # 等待线程结束
if self._sensor2_thread is not None:
self._sensor2_thread.join()
print("传感器线程已终止。")