199 lines
5.4 KiB
Python
199 lines
5.4 KiB
Python
|
|
#!/usr/bin/env python
|
|||
|
|
# -*- coding: utf-8 -*-
|
|||
|
|
'''
|
|||
|
|
# @Time : 2025/7/1 11:07
|
|||
|
|
# @Author : reenrr
|
|||
|
|
# @File : network_controller.py
|
|||
|
|
'''
|
|||
|
|
import socket
|
|||
|
|
import binascii
|
|||
|
|
import time
|
|||
|
|
|
|||
|
|
# 网络继电器的 IP 和端口
|
|||
|
|
HOST = '192.168.0.18'
|
|||
|
|
PORT = 50000
|
|||
|
|
|
|||
|
|
# 控件命名映射
|
|||
|
|
CONVEYOR1 = 'conveyor1'
|
|||
|
|
PUSHER = 'pusher'
|
|||
|
|
CONVEYOR2 = 'conveyor2'
|
|||
|
|
CLAMP = 'clamp' # 机械臂抓夹,初始状态是开
|
|||
|
|
|
|||
|
|
# 传感器命名映射
|
|||
|
|
SENSOR1 = 'sensor1'
|
|||
|
|
SENSOR2 = 'sensor2'
|
|||
|
|
|
|||
|
|
# 按钮控制报文
|
|||
|
|
valve_commands = {
|
|||
|
|
CONVEYOR1: {
|
|||
|
|
'open': '00000000000601050000FF00',
|
|||
|
|
'close': '000000000006010500000000',
|
|||
|
|
},
|
|||
|
|
PUSHER: {
|
|||
|
|
'open': '00000000000601050001FF00',
|
|||
|
|
'close': '000000000006010500010000',
|
|||
|
|
},
|
|||
|
|
CONVEYOR2: {
|
|||
|
|
'open': '00000000000601050002FF00',
|
|||
|
|
'close': '000000000006010500020000',
|
|||
|
|
},
|
|||
|
|
CLAMP: {
|
|||
|
|
'open': '00000000000601050003FF00',
|
|||
|
|
'close': '000000000006010500030000',
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
sensors_commands = {
|
|||
|
|
SENSOR1: {
|
|||
|
|
'signal': '00000000000401020101',
|
|||
|
|
'nosignal': '00000000000401020100',
|
|||
|
|
},
|
|||
|
|
SENSOR2: {
|
|||
|
|
'signal': '00000000000401020102',
|
|||
|
|
'nosignal': '00000000000401020100',
|
|||
|
|
},
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 读取状态命令
|
|||
|
|
read_status_command = {
|
|||
|
|
'devices':'000000000006010100000008',
|
|||
|
|
'sensors':'000000000006010200000008'
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 控件对应 DO 位(从低到高)
|
|||
|
|
device_bit_map = {
|
|||
|
|
CONVEYOR1: 0,
|
|||
|
|
PUSHER: 1,
|
|||
|
|
CONVEYOR2: 2,
|
|||
|
|
CLAMP: 3,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
device_name_map = {
|
|||
|
|
CONVEYOR1: "传送带1",
|
|||
|
|
PUSHER: "推板",
|
|||
|
|
CONVEYOR2: "传送带2",
|
|||
|
|
CLAMP: "机械臂抓夹"
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 传感器对应位(从低到高)
|
|||
|
|
sensor_bit_map = {
|
|||
|
|
SENSOR1: 0,
|
|||
|
|
SENSOR2: 1,
|
|||
|
|
# 根据你继电器的配置,继续添加更多传感器
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
sensor_name_map = {
|
|||
|
|
SENSOR1: '位置传感器1',
|
|||
|
|
SENSOR2: '位置传感器2',
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 将十六进制字符串转换为字节数据并发送
|
|||
|
|
def send_command(command):
|
|||
|
|
byte_data = binascii.unhexlify(command)
|
|||
|
|
|
|||
|
|
# 创建套接字并连接到继电器
|
|||
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
|||
|
|
try:
|
|||
|
|
sock.connect((HOST, PORT))
|
|||
|
|
sock.send(byte_data)
|
|||
|
|
# 接收响应
|
|||
|
|
response = sock.recv(1024)
|
|||
|
|
# print(f"收到响应: {binascii.hexlify(response)}")
|
|||
|
|
# 校验响应
|
|||
|
|
return response
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"通信错误: {e}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def get_all_device_status(command_type='devices'):
|
|||
|
|
command = read_status_command.get(command_type)
|
|||
|
|
if not command:
|
|||
|
|
print(f"未知的读取类型: {command_type}")
|
|||
|
|
return {}
|
|||
|
|
|
|||
|
|
response = send_command(command)
|
|||
|
|
status_dict = {}
|
|||
|
|
|
|||
|
|
if response and len(response) >= 10:
|
|||
|
|
status_byte = response[9] # 状态在第10字节
|
|||
|
|
status_bin = f"{status_byte:08b}"[::-1]
|
|||
|
|
|
|||
|
|
if command_type == 'devices':
|
|||
|
|
bit_map = device_bit_map
|
|||
|
|
name_map = device_name_map
|
|||
|
|
elif command_type == 'sensors':
|
|||
|
|
bit_map = sensor_bit_map
|
|||
|
|
name_map = sensor_name_map
|
|||
|
|
else:
|
|||
|
|
print("不支持的映射类型")
|
|||
|
|
return{}
|
|||
|
|
|
|||
|
|
for key, bit_index in bit_map.items():
|
|||
|
|
state = status_bin[bit_index] == '1'
|
|||
|
|
status_dict[key] = state
|
|||
|
|
# readable = "开启" if state else "关闭"
|
|||
|
|
# print(f"{device.capitalize()} 状态: {readable}")
|
|||
|
|
else:
|
|||
|
|
print("读取状态失败或响应无效")
|
|||
|
|
|
|||
|
|
return status_dict
|
|||
|
|
|
|||
|
|
# 获取单个控件状态(True: 开启, False: 关闭, None: 无法读取)
|
|||
|
|
def get_device_status(device_name, command_type='devices'):
|
|||
|
|
status = get_all_device_status(command_type)
|
|||
|
|
return status.get(device_name, None)
|
|||
|
|
|
|||
|
|
# 根据状态决定是否执行开操作
|
|||
|
|
def open(conveyor1=False, pusher=False, conveyor2=False, clamp=False):
|
|||
|
|
status = get_all_device_status()
|
|||
|
|
|
|||
|
|
if conveyor1 and not status.get(CONVEYOR1, False):
|
|||
|
|
print("打开传送带1")
|
|||
|
|
send_command(valve_commands[CONVEYOR1]['open'])
|
|||
|
|
time.sleep(1)
|
|||
|
|
|
|||
|
|
if pusher and not status.get(PUSHER, False):
|
|||
|
|
print("打开推板")
|
|||
|
|
send_command(valve_commands[PUSHER]['open'])
|
|||
|
|
time.sleep(0.05)
|
|||
|
|
|
|||
|
|
if conveyor2 and not status.get(CONVEYOR2, False):
|
|||
|
|
print("打开传送带2")
|
|||
|
|
send_command(valve_commands[CONVEYOR2]['open'])
|
|||
|
|
time.sleep(1)
|
|||
|
|
|
|||
|
|
if clamp and not status.get(CLAMP, False):
|
|||
|
|
print("启动机械臂")
|
|||
|
|
send_command(valve_commands[CLAMP]['open'])
|
|||
|
|
time.sleep(0.5)
|
|||
|
|
|
|||
|
|
# 根据状态决定是否执行关操作
|
|||
|
|
def close(conveyor1=False, pusher=False, conveyor2=False, clamp=False):
|
|||
|
|
status = get_all_device_status()
|
|||
|
|
|
|||
|
|
if conveyor1 and status.get(CONVEYOR1, True):
|
|||
|
|
print("关闭传送带1")
|
|||
|
|
send_command(valve_commands[CONVEYOR1]['close'])
|
|||
|
|
time.sleep(1)
|
|||
|
|
|
|||
|
|
if pusher and status.get(PUSHER, True):
|
|||
|
|
print("关闭推板")
|
|||
|
|
send_command(valve_commands[PUSHER]['close'])
|
|||
|
|
time.sleep(0.05)
|
|||
|
|
|
|||
|
|
if conveyor2 and status.get(CONVEYOR2, True):
|
|||
|
|
print("关闭传送带2")
|
|||
|
|
send_command(valve_commands[CONVEYOR2]['close'])
|
|||
|
|
time.sleep(1)
|
|||
|
|
|
|||
|
|
if clamp and status.get(CLAMP, True):
|
|||
|
|
print("停止机械臂")
|
|||
|
|
send_command(valve_commands[CLAMP]['close'])
|
|||
|
|
time.sleep(0.5)
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
|