Files
Feeding_control_system/src/divices/relay.py
2025-09-13 11:00:17 +08:00

85 lines
3.2 KiB
Python

import socket
import binascii
import time
from src.utils.logger import app_logger
class RelayController:
"""网络继电器控制器"""
def __init__(self, host: str, port: int):
self.host = host
self.port = port
self.logger = app_logger.getChild('RelayController')
# 设备DO端口映射 (将在初始化时从配置加载)
self.device_mapping = {}
# 继电器控制命令
self.relay_commands = {
0: {'open': '00000000000601050000FF00', 'close': '000000000006010500000000'},
1: {'open': '00000000000601050001FF00', 'close': '000000000006010500010000'},
2: {'open': '00000000000601050002FF00', 'close': '000000000006010500020000'},
3: {'open': '00000000000601050003FF00', 'close': '000000000006010500030000'},
4: {'open': '00000000000601050004FF00', 'close': '000000000006010500040000'}
}
# 读取状态命令
self.read_status_command = '000000000006010100000008'
def set_device_mapping(self, mapping: dict):
"""设置设备映射"""
self.device_mapping = mapping
def send_command(self, command_hex: str):
"""发送命令到网络继电器"""
try:
byte_data = binascii.unhexlify(command_hex)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(5) # 设置超时
sock.connect((self.host, self.port))
sock.send(byte_data)
response = sock.recv(1024)
self.logger.debug(f"收到继电器响应: {binascii.hexlify(response)}")
return response
except Exception as e:
self.logger.error(f"继电器通信错误: {e}")
return None
def get_status(self):
"""获取继电器状态"""
response = self.send_command(self.read_status_command)
status_dict = {}
if response and len(response) >= 10:
status_byte = response[9]
status_bin = f"{status_byte:08b}"[::-1]
# 根据设备映射返回状态
for device_name, bit_index in self.device_mapping.items():
status_dict[device_name] = status_bin[bit_index] == '1'
else:
self.logger.warning("读取继电器状态失败或响应无效")
return status_dict
def control(self, device_name: str, action: str):
"""控制继电器开关"""
if device_name not in self.device_mapping:
self.logger.error(f"无效的继电器设备: {device_name}")
return False
bit_index = self.device_mapping[device_name]
if bit_index not in self.relay_commands:
self.logger.error(f"不支持的DO端口: {bit_index}")
return False
if action not in self.relay_commands[bit_index]:
self.logger.error(f"无效的继电器动作: {action}")
return False
self.logger.info(f"控制继电器 {device_name} ({bit_index}) {action}")
result = self.send_command(self.relay_commands[bit_index][action])
time.sleep(0.1) # 短暂延迟确保命令执行
return result is not None