Files
wire_controlsystem/EMV/EMV_test.py

334 lines
10 KiB
Python
Raw Normal View History

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
# @Time : 2025/12/12 14:39
# @Author : reenrr
# @File : EMV.py
# @Desc : 网络继电器控制输入、输出设备 修改了接口,需测试
'''
import socket
import binascii
import time
import logging
from typing import Union
# 网络继电器的 IP 和端口
HOST = '192.168.5.18'
PORT = 50000
# 控件命名映射
SOLENOID_VALVE1 = 'solenoid_valve1' # 控制排料机构NG时的电磁阀1
SOLENOID_VALVE2 = 'solenoid_valve2' # 控制排料机构NG时的电磁阀2
ABSORB_SOLENOID_VALVE1 = 'absorb_solenoid_valve1' # 控制吸取设备的电磁阀1
ABSORB_SOLENOID_VALVE2 = 'absorb_solenoid_valve2' # 控制吸取设备的电磁阀2
ABSORB_SOLENOID_VALVE3 = 'absorb_solenoid_valve3' # 控制吸取设备的电磁阀3
ABSORB_SOLENOID_VALVE4 = 'absorb_solenoid_valve4' # 控制吸取设备的电磁阀4
ABSORB_SOLENOID_VALVE5 = 'absorb_solenoid_valve5' # 控制吸取设备的电磁阀5
# 传感器命名映射
CONVEYOR1_SENSOR = 'conveyor1_sensor' # 传送带1的行程开关
CONVEYOR2_SENSOR = 'conveyor2_sensor' # 传送带2的行程开关
PRESS_SENSOR1 = 'press_sensor1' # 传送带1旁边的按压开关1
PRESS_SENSOR2 = 'press_sensor2' # 传送带1旁边的按压开关2
FIBER_SENSOR = 'fiber_sensor' # 传送带1旁边的光纤传感器
# 控件控制报文
valve_commands = {
SOLENOID_VALVE1: {
'open': '00000000000601050000FF00',
'close': '000000000006010500000000',
},
SOLENOID_VALVE2: {
'open': '00000000000601050001FF00',
'close': '000000000006010500010000',
},
ABSORB_SOLENOID_VALVE1: {
'open': '00000000000601050002FF00',
'close': '000000000006010500020000',
},
ABSORB_SOLENOID_VALVE2: {
'open': '00000000000601050003FF00',
'close': '000000000006010500030000',
},
ABSORB_SOLENOID_VALVE3: {
'open': '00000000000601050004FF00',
'close': '000000000006010500040000',
},
ABSORB_SOLENOID_VALVE4: {
'open': '00000000000601050005FF00',
'close': '000000000006010500050000',
},
ABSORB_SOLENOID_VALVE5: {
'open': '00000000000601050006FF00',
'close': '000000000006010500060000',
}
}
# 读取状态命令
read_status_command = {
'devices': '000000000006010100000008',
'sensors': '000000000006010200000008'
}
# 控件对应 DO 位(从低到高)
device_bit_map = {
SOLENOID_VALVE1: 0,
SOLENOID_VALVE2: 1,
ABSORB_SOLENOID_VALVE1: 2,
ABSORB_SOLENOID_VALVE2: 3,
ABSORB_SOLENOID_VALVE3: 4,
ABSORB_SOLENOID_VALVE4: 5,
ABSORB_SOLENOID_VALVE5: 6
}
device_name_map = {
SOLENOID_VALVE1: "电磁阀1",
SOLENOID_VALVE2: "电磁阀2",
ABSORB_SOLENOID_VALVE1: "吸取装置电磁阀1",
ABSORB_SOLENOID_VALVE2: "吸取装置电磁阀2",
ABSORB_SOLENOID_VALVE3: "吸取装置电磁阀3",
ABSORB_SOLENOID_VALVE4: "吸取装置电磁阀4",
ABSORB_SOLENOID_VALVE5: "吸取装置电磁阀5",
}
# 传感器对应位(从低到高)
sensor_bit_map = {
FIBER_SENSOR: 0,
PRESS_SENSOR1: 1,
PRESS_SENSOR2: 2,
CONVEYOR1_SENSOR: 4,
CONVEYOR2_SENSOR: 3,
# 根据你继电器的配置,继续添加更多传感器
}
sensor_name_map = {
FIBER_SENSOR: '光纤传感器',
PRESS_SENSOR1: '按压开关1',
PRESS_SENSOR2: '按压开关2',
CONVEYOR1_SENSOR: '传送带1开关',
CONVEYOR2_SENSOR: '传送带2开关'
}
class RelayController:
def __init__(self):
"""初始化继电器控制器"""
self.sock = None
self.is_connected = False # 长连接状态标记
# 初始化时自动尝试建立长连接
print(f"正在初始化网络继电器并建立长连接")
connect_success = self.connect()
if connect_success:
print(f"网络继电器长连接建立成功:{HOST}:{PORT}")
else:
print(f"网络继电器长连接建立失败")
def connect(self) -> bool:
"""
建立长连接
:return: True-连接成功False-连接失败
"""
if not self.is_connected:
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(3)
self.sock.connect((HOST, PORT))
self.is_connected = True
print(f"长连接建立成功:{HOST}:{PORT}")
return True
except Exception as e:
print(f"长连接建立失败:{e}")
self.sock = None
self.is_connected = False
return False
return True
def disconnect(self):
"""断开长连接"""
if self.sock and self.is_connected:
try:
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
print("长连接已正常断开")
except Exception as e:
print(f"断开长连接时出现异常:{e}")
finally:
self.sock = None
self.is_connected = False
def send_command(self, command: str) -> Union[bytes, bool]:
"""
将十六进制字符串转换为字节数据并发送
:param command: 十六进制字符串
:return: 响应字节数据 / False
"""
if not self.is_connected or not self.sock:
print("长连接未建立,无法发送指令")
return False
try:
byte_data = binascii.unhexlify(command)
self.sock.sendall(byte_data) # sendall确保数据全部发送
# 接收响应
response = self.sock.recv(1024)
if not response:
logging.warning("长连接接收空响应,可能连接已断开")
self.is_connected = False
return False
# print(f"收到响应: {binascii.hexlify(response)}")
# 校验响应
return response
except socket.timeout:
print("长连接发送/接收超时")
self.is_connected = False
return False
except Exception as e:
print(f"通信错误: {e}")
self.is_connected = False
return False
def get_all_device_status(self, command_type: str='devices') -> dict[str, bool]:
"""
获取所有设备/传感器状态
:param command_type: 'devices'控件 / 'sensors'传感器
:return: 状态字典 {设备名: 状态(bool)}
"""
command = read_status_command.get(command_type)
if not command:
print(f"未知的读取类型: {command_type}")
return {}
response = self.send_command(command)
status_dict = {}
if response and len(response) >= 10:
status_byte = response[9] # 状态在第10字节
status_bin = f"{status_byte:08b}"[::-1]
if command_type == 'devices':
bit_map = device_bit_map
name_map = device_name_map
elif command_type == 'sensors':
bit_map = sensor_bit_map
name_map = sensor_name_map
else:
print("不支持的映射类型")
return {}
for key, bit_index in bit_map.items():
state = status_bin[bit_index] == '1'
status_dict[key] = state
# readable = "开启" if state else "关闭"
# print(f"{device.capitalize()} 状态: {readable}")
else:
print("读取状态失败或响应无效")
return status_dict
def set_device(self, device_name: str, state: bool):
"""
设置指定输出设备DO的开关状态
:param device_name: 设备名称
:param state: 目标状态 True--打开 FALSE--关闭
"""
if device_name not in valve_commands:
raise ValueError(f"未知设备:{device_name}")
try:
current = self.get_all_device_status()
is_on = current.get(device_name, False)
if state and not is_on:
self.send_command(valve_commands[device_name]['open'])
elif not state and is_on:
self.send_command(valve_commands[device_name]['close'])
except Exception as e:
raise RuntimeError(f"设置设备 '{device_name}' 状态失败: {e}") from e
# 全局变量
# private
_GLOBAL_RELAY = RelayController()
def ng_push():
"""NG推料流程"""
try:
# 同时打开电磁阀1、2
write_do(SOLENOID_VALVE1, True)
write_do(SOLENOID_VALVE2, True)
print(f"电磁阀1、2已打开")
# 等待线条掉落
time.sleep(0.5)
write_do(SOLENOID_VALVE1, False)
write_do(SOLENOID_VALVE2, False)
print(f"电磁阀1、2已关闭")
except Exception as e:
print(f"NG推料失败:{e}")
raise RuntimeError("NG推料流程异常") from e
def write_do(device_name: str, state: bool):
"""
控制单个数字输出设备DO的开关状态
:param device_name: 设备名称
:param state: True:打开 False关闭
"""
if _GLOBAL_RELAY is None:
raise ValueError("未初始化实例")
# 验证设备是否存在
if device_name not in device_bit_map:
valid_devices = list(device_bit_map.keys())
raise ValueError(f"无效的设备名 '{device_name}'。有效设备: {valid_devices}")
# 确保已连接
if not _GLOBAL_RELAY.is_connected:
if not _GLOBAL_RELAY.connect():
raise RuntimeError("无法连接到网络继电器")
try:
_GLOBAL_RELAY.set_device(device_name, state)
except Exception as e:
raise RuntimeError(f"控制设备 '{device_name}' 失败: {e}")
def read_all_io() -> dict[str, dict[str, bool]]:
"""
读取所有DI传感器和DO设备状态
:return: {'devices': {...}, 'sensors': {...}}
"""
if _GLOBAL_RELAY is None:
raise ValueError("未初始化")
try:
devices = _GLOBAL_RELAY.get_all_device_status('devices')
sensors = _GLOBAL_RELAY.get_all_device_status('sensors')
return {'devices': devices, 'sensors': sensors}
except Exception as e:
print(f"读取IO状态失败:{e}")
raise RuntimeError("读取IO失败") from e
# ------------测试接口-------------
if __name__ == '__main__':
write_do(SOLENOID_VALVE1, True)
time.sleep(5)
io_status = read_all_io()
for name, status in io_status['devices'].items():
status_str = "开启" if status else "关闭"
print(f"{device_name_map.get(name, name)}: {status_str}")