Files
Feeding_control_system/hardware/relay.py
2025-12-28 17:20:02 +08:00

315 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# hardware/relay.py
import socket
import binascii
import time
import threading
from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ModbusException
from config.settings import app_set_config
class RelayController:
# 继电器映射
RING = 'ring' # DO1 - 响铃
UPPER_TO_JBL = 'upper_to_jbl' # DO2 - 上料斗到搅拌楼
UPPER_TO_ZD = 'upper_to_zd' # DO3 - 上料斗到振捣室
# DOOR_UPPER = 'door_upper' # DO0 - 上料斗滑动
DOOR_LOWER_OPEN = 'door_lower_open' # DO1 - 下料斗出砼门开角度
DOOR_LOWER_CLOSE = 'door_lower_close' # DO2 - 下料斗出砼门关角度角度在7.5以下可关闭信号)
DOOR_UPPER_OPEN = 'door_upper_open' # DO3 - 上料斗开
DOOR_UPPER_CLOSE = 'door_upper_close' # DO4 - 上料斗关
BREAK_ARCH_UPPER = 'break_arch_upper' # DO3 - 上料斗震动
BREAK_ARCH_LOWER = 'break_arch_lower' # DO4 - 下料斗震动
DIRECT_LOWER_FRONT = 'direct_lower_front' # DO5 - 下料斗前
DIRECT_LOWER_BEHIND = 'direct_lower_behind' # DO6 - 下料斗后
DIRECT_LOWER_TOP = 'direct_lower_top' # DO7 - 下料斗上
DIRECT_LOWER_BELOW = 'direct_lower_below' # DO8 - 下料斗下
def __init__(self, host='192.168.250.62', port=50000):
self.host = host
self.port = port
self.modbus_client = ModbusTcpClient(host, port=port)
#遥1 DO 7 左 DO8 右 角度 摇2DO 15下 13上 12 往后 14往前 下料斗DO7开 D09关
# 继电器命令原始Socket
self.relay_commands = {
self.RING: {'open': '00000000000601050000FF00', 'close': '000000000006010500000000'},
self.UPPER_TO_JBL: {'open': '00000000000601050001FF00', 'close': '000000000006010500010000'},
self.UPPER_TO_ZD: {'open': '00000000000601050002FF00', 'close': '000000000006010500020000'},
self.DOOR_LOWER_OPEN: {'open': '00000000000601050006FF00', 'close': '000000000006010500060000'},
self.DOOR_LOWER_CLOSE: {'open': '00000000000601050008FF00', 'close':'000000000006010500080000'},
self.DOOR_UPPER_OPEN: {'open': '00000000000601050003FF00', 'close': '000000000006010500030000'},
self.DOOR_UPPER_CLOSE: {'open': '00000000000601050004FF00', 'close': '000000000006010500040000'},
self.BREAK_ARCH_UPPER: {'open': '0000000000060105000AFF00', 'close': '0000000000060105000A0000'},
self.BREAK_ARCH_LOWER: {'open': '00000000000601050005FF00', 'close': '000000000006010500050000'},
self.DIRECT_LOWER_FRONT: {'open': '0000000000060105000DFF00', 'close': '0000000000060105000D0000'},
self.DIRECT_LOWER_BEHIND: {'open': '0000000000060105000BFF00', 'close': '0000000000060105000B0000'},
self.DIRECT_LOWER_TOP: {'open': '0000000000060105000CFF00', 'close': '0000000000060105000C0000'},
self.DIRECT_LOWER_BELOW: {'open': '0000000000060105000EFF00', 'close': '0000000000060105000E0000'}
}
# 读取状态命令
self.read_status_command = '000000000006010100000008'
# 设备位映射
self.device_bit_map = {
self.RING: 0,
self.UPPER_TO_JBL: 1,
self.UPPER_TO_ZD: 2,
self.BREAK_ARCH_UPPER: 3,
self.BREAK_ARCH_LOWER: 4
}
# 添加线程锁,保护对下料斗控制的并发访问
self.door_control_lock = threading.Lock()
def send_command(self, command_hex):
"""发送原始Socket命令"""
# if app_set_config.debug_mode:
# return None
try:
byte_data = binascii.unhexlify(command_hex)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect((self.host, self.port))
sock.send(byte_data)
response = sock.recv(1024)
# print(f"收到继电器响应: {binascii.hexlify(response)}")
return response
except Exception as e:
print(f"继电器通信错误: {e}")
return None
def get_status(self):
"""获取继电器状态"""
response = self.send_command(self.read_status_command)
status_dict = {}
if response and len(response) >= 10:
status_byte = response[9]
status_bin = f"{status_byte:08b}"[::-1]
for key, bit_index in self.device_bit_map.items():
status_dict[key] = status_bin[bit_index] == '1'
else:
print("读取继电器状态失败")
return status_dict
def control(self, device, action):
"""控制继电器"""
if device in self.relay_commands and action in self.relay_commands[device]:
# print(f"发送控制继电器命令 {device} {action}")
self.send_command(self.relay_commands[device][action])
else:
print(f"无效设备或动作: {device}, {action}")
def control_upper_close(self):
"""控制上料斗关"""
# 关闭上料斗出砼门
self.control(self.DOOR_UPPER_OPEN, 'close')
self.control(self.DOOR_UPPER_CLOSE, 'open')
# 异步5秒后关闭
threading.Thread(target=self._close_upper_s, daemon=True,name="close_upper_s").start()
def control_upper_close_after(self):
"""控制上料斗关在几秒后"""
# 关闭上料斗出砼门
self.control(self.DOOR_UPPER_OPEN, 'close')
# 异步5秒后关闭
threading.Thread(target=self._close_upper_after_s, daemon=True,name="close_upper_after_s").start()
def control_upper_close_sync(self,duration=5):
self.control(self.DOOR_UPPER_OPEN, 'close')
self.control(self.DOOR_UPPER_CLOSE, 'open')
time.sleep(duration)
self.control(self.DOOR_UPPER_CLOSE, 'close')
def control_lower_close(self):
"""控制下料斗关"""
thread_name = threading.current_thread().name
print(f"[{thread_name}] 尝试控制下料斗关闭")
with self.door_control_lock:
print(f"[{thread_name}] 获得下料斗控制锁,执行关闭操作")
# 关闭下料斗出砼门
self.control(self.DOOR_LOWER_OPEN, 'close')
self.control(self.DOOR_LOWER_CLOSE, 'open')
time.sleep(3)
self.control(self.DOOR_LOWER_CLOSE, 'close')
print(f"[{thread_name}] 下料斗关闭完成,释放控制锁")
# 异步5秒后关闭
# threading.Thread(target=self._close_lower_5s, daemon=True,name="close_lower_5s").start()
def control_upper_open_sync(self,duration):
self.control(self.DOOR_UPPER_CLOSE, 'close')
self.control(self.DOOR_UPPER_OPEN, 'open')
time.sleep(duration)
self.control(self.DOOR_UPPER_OPEN, 'close')
def control_upper_close_sync(self,duration):
thread_name = threading.current_thread().name
print(f"[{thread_name}] 尝试执行上料斗同步关闭,实际操作下料斗")
with self.door_control_lock:
print(f"[{thread_name}] 获得下料斗控制锁,执行同步关闭操作")
self.control(self.DOOR_UPPER_OPEN, 'close')
self.control(self.DOOR_UPPER_CLOSE, 'open')
time.sleep(duration)
self.control(self.DOOR_UPPER_CLOSE, 'close')
print(f"[{thread_name}] 同步关闭操作完成,释放控制锁")
def control_upper_open(self):
#关闭信号才能生效
self.control(self.DOOR_UPPER_CLOSE, 'close')
self.control(self.DOOR_UPPER_OPEN, 'open')
time.sleep(0.2)
self.control(self.DOOR_UPPER_OPEN, 'close')
time.sleep(1)
self.control(self.DOOR_UPPER_OPEN, 'open')
time.sleep(0.2)
self.control(self.DOOR_UPPER_OPEN, 'close')
time.sleep(1)
self.control(self.DOOR_UPPER_OPEN, 'open')
time.sleep(0.5)
self.control(self.DOOR_UPPER_OPEN, 'close')
time.sleep(0.1)
self.control(self.DOOR_UPPER_OPEN, 'open')
time.sleep(0.5)
self.control(self.DOOR_UPPER_OPEN, 'close')
time.sleep(0.1)
self.control(self.DOOR_UPPER_OPEN, 'open')
time.sleep(0.5)
self.control(self.DOOR_UPPER_OPEN, 'close')
time.sleep(0.1)
self.control(self.DOOR_UPPER_OPEN, 'open')
time.sleep(0.5)
self.control(self.DOOR_UPPER_OPEN, 'close')
time.sleep(0.1)
self.control(self.DOOR_UPPER_OPEN, 'open')
time.sleep(0.5)
self.control(self.DOOR_UPPER_OPEN, 'close')
#保持8秒
time.sleep(8)
#8秒后再开5秒
self.control(self.DOOR_UPPER_OPEN, 'open')
time.sleep(0.5)
self.control(self.DOOR_UPPER_OPEN, 'close')
time.sleep(0.1)
self.control(self.DOOR_UPPER_OPEN, 'open')
time.sleep(0.5)
self.control(self.DOOR_UPPER_OPEN, 'close')
time.sleep(0.1)
self.control(self.DOOR_UPPER_OPEN, 'open')
time.sleep(0.5)
self.control(self.DOOR_UPPER_OPEN, 'close')
time.sleep(0.1)
self.control(self.DOOR_UPPER_OPEN, 'open')
time.sleep(0.5)
self.control(self.DOOR_UPPER_OPEN, 'close')
time.sleep(0.1)
self.control(self.DOOR_UPPER_OPEN, 'open')
time.sleep(0.5)
self.control(self.DOOR_UPPER_OPEN, 'close')
time.sleep(0.1)
self.control(self.DOOR_UPPER_OPEN, 'open')
time.sleep(0.5)
self.control(self.DOOR_UPPER_OPEN, 'close')
time.sleep(0.1)
self.control(self.DOOR_UPPER_OPEN, 'open')
time.sleep(0.5)
self.control(self.DOOR_UPPER_OPEN, 'close')
def control_ring_open(self):
"""控制下料斗关"""
# 关闭下料斗出砼门
self.control(self.RING, 'open')
# 异步5秒后关闭
threading.Thread(target=self._close_ring, daemon=True,name="_close_ring").start()
def _close_upper_s(self):
time.sleep(16)
self.control(self.DOOR_UPPER_CLOSE, 'close')
print("上料斗关闭完成")
def _close_upper_after_s(self):
"""
异步5秒后关闭上料斗20秒
"""
# time.sleep(5)
self.control_arch_upper_open_sync(5)
self.control(self.DOOR_UPPER_CLOSE, 'open')
time.sleep(1)
self.control(self.DOOR_UPPER_CLOSE, 'close')
self.control_arch_upper_open_sync(5)
self.control_arch_upper_open_sync(5)
self.control_arch_upper_open_async(8)
self.control(self.DOOR_UPPER_CLOSE, 'open')
time.sleep(20)
self.control(self.DOOR_UPPER_CLOSE, 'close')
print("上料斗关闭完成")
def _close_lower_5s(self):
time.sleep(6)
self.control(self.DOOR_LOWER_CLOSE, 'close')
def _close_ring(self):
time.sleep(3)
self.control(self.RING, 'close')
def control_arch_lower_open(self):
"""控制下料斗关"""
# 关闭下料斗出砼门
self.control(self.BREAK_ARCH_LOWER, 'open')
# 异步5秒后关闭
threading.Thread(target=self._close_break_arch_lower, daemon=True,name="_close_break_arch_lower").start()
def control_arch_lower_open_sync(self,duration):
"""控制下料斗振动"""
self.control(self.BREAK_ARCH_LOWER, 'open')
# 异步5秒后关闭
time.sleep(duration)
self.control(self.BREAK_ARCH_LOWER, 'close')
def control_arch_upper_open_sync(self,duration):
"""控制下料斗振动"""
self.control(self.BREAK_ARCH_UPPER, 'open')
# 异步5秒后关闭
time.sleep(duration)
self.control(self.BREAK_ARCH_UPPER, 'close')
def _close_break_arch_lower(self):
time.sleep(3)
self.control(self.BREAK_ARCH_LOWER, 'close')
def control_arch_upper_open_async(self,delay_seconds: float = 15):
"""异步控制上料斗振动
Args:
delay_seconds: 延迟关闭时间默认15秒
"""
# 关闭下料斗出砼门
self.control(self.BREAK_ARCH_UPPER, 'open')
# 异步5秒后关闭
threading.Thread(target=lambda d: self._close_break_arch_upper(delay_seconds),args=(delay_seconds,), daemon=True, name="_close_break_arch_upper").start()
def _close_break_arch_upper(self, delay_seconds: float = 15):
time.sleep(delay_seconds)
print(f"上料斗振动关闭完成,延迟{delay_seconds}")
self.control(self.BREAK_ARCH_UPPER, 'close')
def close_all(self):
"""关闭所有继电器"""
self.control(self.UPPER_TO_JBL, 'close')
self.control(self.UPPER_TO_ZD, 'close')
self.control(self.BREAK_ARCH_UPPER, 'close')
self.control(self.BREAK_ARCH_LOWER, 'close')
self.control(self.RING, 'close')
self.control(self.DOOR_LOWER_OPEN, 'close')
self.control(self.DOOR_LOWER_CLOSE, 'close')
self.control(self.DOOR_UPPER_OPEN, 'close')
self.control(self.DOOR_UPPER_CLOSE, 'close')