Files
2025-09-26 13:32:34 +08:00

78 lines
3.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# hardware/relay.py
import socket
import binascii
from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ModbusException
class RelayController:
# 继电器映射
DOOR_UPPER = 'door_upper' # DO0 - 上料斗滑动
DOOR_LOWER_1 = 'door_lower_1' # DO1 - 上料斗出砼门
DOOR_LOWER_2 = 'door_lower_2' # DO2 - 下料斗出砼门
BREAK_ARCH_UPPER = 'break_arch_upper' # DO3 - 上料斗破拱
BREAK_ARCH_LOWER = 'break_arch_lower' # DO4 - 下料斗破拱
def __init__(self, host='192.168.0.18', port=50000):
self.host = host
self.port = port
self.modbus_client = ModbusTcpClient(host, port=port)
# 继电器命令原始Socket
self.relay_commands = {
self.DOOR_UPPER: {'open': '00000000000601050000FF00', 'close': '000000000006010500000000'},
self.DOOR_LOWER_1: {'open': '00000000000601050001FF00', 'close': '000000000006010500010000'},
self.DOOR_LOWER_2: {'open': '00000000000601050002FF00', 'close': '000000000006010500020000'},
self.BREAK_ARCH_UPPER: {'open': '00000000000601050003FF00', 'close': '000000000006010500030000'},
self.BREAK_ARCH_LOWER: {'open': '00000000000601050004FF00', 'close': '000000000006010500040000'}
}
# 读取状态命令
self.read_status_command = '000000000006010100000008'
# 设备位映射
self.device_bit_map = {
self.DOOR_UPPER: 0,
self.DOOR_LOWER_1: 1,
self.DOOR_LOWER_2: 2,
self.BREAK_ARCH_UPPER: 3,
self.BREAK_ARCH_LOWER: 4
}
def send_command(self, command_hex):
"""发送原始Socket命令"""
try:
byte_data = binascii.unhexlify(command_hex)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect((self.host, self.port))
sock.send(byte_data)
response = sock.recv(1024)
print(f"收到继电器响应: {binascii.hexlify(response)}")
return response
except Exception as e:
print(f"继电器通信错误: {e}")
return None
def get_status(self):
"""获取继电器状态"""
response = self.send_command(self.read_status_command)
status_dict = {}
if response and len(response) >= 10:
status_byte = response[9]
status_bin = f"{status_byte:08b}"[::-1]
for key, bit_index in self.device_bit_map.items():
status_dict[key] = status_bin[bit_index] == '1'
else:
print("读取继电器状态失败")
return status_dict
def control(self, device, action):
"""控制继电器"""
if device in self.relay_commands and action in self.relay_commands[device]:
print(f"控制继电器 {device} {action}")
self.send_command(self.relay_commands[device][action])
else:
print(f"无效设备或动作: {device}, {action}")