import socket import binascii import time # 网络继电器的 IP 和端口 HOST = "192.168.58.18" PORT = 50000 # 电磁阀控制报文 valve_commands = { 1: { "open": "00000000000601050000FF00", "close": "000000000006010500000000", }, 2: { "open": "00000000000601050001FF00", "close": "000000000006010500010000", }, 3: { "open": "00000000000601050002FF00", "close": "000000000006010500020000", }, } # 将十六进制字符串转换为字节数据并发送 def send_command(command): byte_data = binascii.unhexlify(command) # 创建套接字并连接到继电器 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: try: sock.connect((HOST, PORT)) sock.send(byte_data) # 接收响应 response = sock.recv(1024) # print(f"收到响应: {binascii.hexlify(response)}") # 校验响应 if response == byte_data: # print("命令成功下发,继电器已执行操作。") return True else: print("命令下发失败,响应与请求不符。") return False except Exception as e: print(f"通信错误: {e}") return False # 控制电磁阀打开 def open(grasp, shake, throw): if grasp: # print("打开电磁阀 1") if send_command(valve_commands[1]["open"]): time.sleep(1) if shake: # print("打开电磁阀 2") if send_command(valve_commands[2]["open"]): time.sleep(0.05) if throw: print("打开电磁阀 3") if send_command(valve_commands[3]["open"]): time.sleep(0.5) # 控制电磁阀关闭 def close(grasp, shake, throw): if grasp: # print("关闭电磁阀 1") if send_command(valve_commands[1]["close"]): time.sleep(1) if shake: # print("关闭电磁阀 2") if send_command(valve_commands[2]["close"]): time.sleep(0.05) if throw: # print("关闭电磁阀 3") if send_command(valve_commands[3]["close"]): time.sleep(0.5) # 关闭电磁阀 # open(False, False, True) # 参数传True和False # close(True,False,True) # for i in range(10): # open(False,True,True) # close(True,True,True)