Files
wire_controlsystem/EMV/EMV_test.py

339 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
# @Time : 2025/12/12 14:39
# @Author : reenrr
# @File : EMV.py
# @Desc : 网络继电器控制输入、输出设备 修改了接口,需测试
'''
import socket
import binascii
import time
import logging
from typing import Union
# 网络继电器的 IP 和端口
HOST = '192.168.5.18'
PORT = 50000
# 控件命名映射
SOLENOID_VALVE1 = 'solenoid_valve1' # 控制排料机构NG时的电磁阀1
SOLENOID_VALVE2 = 'solenoid_valve2' # 控制排料机构NG时的电磁阀2
ABSORB_SOLENOID_VALVE1 = 'absorb_solenoid_valve1' # 控制吸取设备的电磁阀1
ABSORB_SOLENOID_VALVE2 = 'absorb_solenoid_valve2' # 控制吸取设备的电磁阀2
ABSORB_SOLENOID_VALVE3 = 'absorb_solenoid_valve3' # 控制吸取设备的电磁阀3
ABSORB_SOLENOID_VALVE4 = 'absorb_solenoid_valve4' # 控制吸取设备的电磁阀4
ABSORB_SOLENOID_VALVE5 = 'absorb_solenoid_valve5' # 控制吸取设备的电磁阀5
# 传感器命名映射
CONVEYOR1_SENSOR = 'conveyor1_sensor' # 传送带1的行程开关
CONVEYOR2_SENSOR = 'conveyor2_sensor' # 传送带2的行程开关
PRESS_SENSOR1 = 'press_sensor1' # 传送带1旁边的按压开关1
PRESS_SENSOR2 = 'press_sensor2' # 传送带1旁边的按压开关2
FIBER_SENSOR = 'fiber_sensor' # 传送带1旁边的光纤传感器
# 控件控制报文
valve_commands = {
SOLENOID_VALVE1: {
'open': '00000000000601050000FF00',
'close': '000000000006010500000000',
},
SOLENOID_VALVE2: {
'open': '00000000000601050001FF00',
'close': '000000000006010500010000',
},
ABSORB_SOLENOID_VALVE1: {
'open': '00000000000601050002FF00',
'close': '000000000006010500020000',
},
ABSORB_SOLENOID_VALVE2: {
'open': '00000000000601050003FF00',
'close': '000000000006010500030000',
},
ABSORB_SOLENOID_VALVE3: {
'open': '00000000000601050004FF00',
'close': '000000000006010500040000',
},
ABSORB_SOLENOID_VALVE4: {
'open': '00000000000601050005FF00',
'close': '000000000006010500050000',
},
ABSORB_SOLENOID_VALVE5: {
'open': '00000000000601050006FF00',
'close': '000000000006010500060000',
}
}
# 读取状态命令
read_status_command = {
'devices': '000000000006010100000008',
'sensors': '000000000006010200000008'
}
# 控件对应 DO 位(从低到高)
device_bit_map = {
SOLENOID_VALVE1: 0,
SOLENOID_VALVE2: 1,
ABSORB_SOLENOID_VALVE1: 2,
ABSORB_SOLENOID_VALVE2: 3,
ABSORB_SOLENOID_VALVE3: 4,
ABSORB_SOLENOID_VALVE4: 5,
ABSORB_SOLENOID_VALVE5: 6
}
device_name_map = {
SOLENOID_VALVE1: "电磁阀1",
SOLENOID_VALVE2: "电磁阀2",
ABSORB_SOLENOID_VALVE1: "吸取装置电磁阀1",
ABSORB_SOLENOID_VALVE2: "吸取装置电磁阀2",
ABSORB_SOLENOID_VALVE3: "吸取装置电磁阀3",
ABSORB_SOLENOID_VALVE4: "吸取装置电磁阀4",
ABSORB_SOLENOID_VALVE5: "吸取装置电磁阀5",
}
# 传感器对应位(从低到高)
sensor_bit_map = {
FIBER_SENSOR: 0,
PRESS_SENSOR1: 1,
PRESS_SENSOR2: 2,
CONVEYOR1_SENSOR: 4,
CONVEYOR2_SENSOR: 3,
# 根据你继电器的配置,继续添加更多传感器
}
sensor_name_map = {
FIBER_SENSOR: '光纤传感器',
PRESS_SENSOR1: '按压开关1',
PRESS_SENSOR2: '按压开关2',
CONVEYOR1_SENSOR: '传送带1开关',
CONVEYOR2_SENSOR: '传送带2开关'
}
class RelayController:
def __init__(self):
"""初始化继电器控制器"""
self.sock = None
self.is_connected = False # 长连接状态标记
# 初始化时自动尝试建立长连接
print(f"正在初始化网络继电器并建立长连接")
connect_success = self.connect()
if connect_success:
print(f"网络继电器长连接建立成功:{HOST}:{PORT}")
else:
print(f"网络继电器长连接建立失败")
def connect(self) -> bool:
"""
建立长连接
:return: True-连接成功False-连接失败
"""
if not self.is_connected:
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(3)
self.sock.connect((HOST, PORT))
self.is_connected = True
print(f"长连接建立成功:{HOST}:{PORT}")
return True
except Exception as e:
print(f"长连接建立失败:{e}")
self.sock = None
self.is_connected = False
return False
return True
def disconnect(self):
"""断开长连接"""
if self.sock and self.is_connected:
try:
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
print("长连接已正常断开")
except Exception as e:
print(f"断开长连接时出现异常:{e}")
finally:
self.sock = None
self.is_connected = False
def send_command(self, command: str) -> Union[bytes, bool]:
"""
将十六进制字符串转换为字节数据并发送
:param command: 十六进制字符串
:return: 响应字节数据 / False
"""
if not self.is_connected or not self.sock:
print("长连接未建立,无法发送指令")
return False
try:
byte_data = binascii.unhexlify(command)
self.sock.sendall(byte_data) # sendall确保数据全部发送
# 接收响应
response = self.sock.recv(1024)
if not response:
logging.warning("长连接接收空响应,可能连接已断开")
self.is_connected = False
return False
# print(f"收到响应: {binascii.hexlify(response)}")
# 校验响应
return response
except socket.timeout:
print("长连接发送/接收超时")
self.is_connected = False
return False
except Exception as e:
print(f"通信错误: {e}")
self.is_connected = False
return False
def get_all_device_status(self, command_type: str='devices') -> dict[str, bool]:
"""
获取所有设备/传感器状态
:param command_type: 'devices'(控件) / 'sensors'(传感器)
:return: 状态字典 {设备名: 状态(bool)}
"""
command = read_status_command.get(command_type)
if not command:
print(f"未知的读取类型: {command_type}")
return {}
response = self.send_command(command)
status_dict = {}
if response and len(response) >= 10:
status_byte = response[9] # 状态在第10字节
status_bin = f"{status_byte:08b}"[::-1]
if command_type == 'devices':
bit_map = device_bit_map
name_map = device_name_map
elif command_type == 'sensors':
bit_map = sensor_bit_map
name_map = sensor_name_map
else:
print("不支持的映射类型")
return {}
for key, bit_index in bit_map.items():
state = status_bin[bit_index] == '1'
status_dict[key] = state
# readable = "开启" if state else "关闭"
# print(f"{device.capitalize()} 状态: {readable}")
else:
print("读取状态失败或响应无效")
return status_dict
def set_device(self, device_name: str, state: bool):
"""
设置指定输出设备DO的开关状态
:param device_name: 设备名称
:param state: 目标状态 True--打开 FALSE--关闭
"""
if device_name not in valve_commands:
raise ValueError(f"未知设备:{device_name}")
try:
current = self.get_all_device_status()
is_on = current.get(device_name, False)
if state and not is_on:
self.send_command(valve_commands[device_name]['open'])
elif not state and is_on:
self.send_command(valve_commands[device_name]['close'])
except Exception as e:
raise RuntimeError(f"设置设备 '{device_name}' 状态失败: {e}") from e
# 全局变量
# private
_GLOBAL_RELAY = RelayController()
def ng_push():
"""NG推料流程"""
try:
# 同时打开电磁阀1、2
write_do(SOLENOID_VALVE1, True)
write_do(SOLENOID_VALVE2, True)
print(f"电磁阀1、2已打开")
# 等待线条掉落
time.sleep(0.5)
write_do(SOLENOID_VALVE1, False)
write_do(SOLENOID_VALVE2, False)
print(f"电磁阀1、2已关闭")
except Exception as e:
print(f"NG推料失败:{e}")
raise RuntimeError("NG推料流程异常") from e
def write_do(device_name: str, state: bool):
"""
控制单个数字输出设备DO的开关状态
:param device_name: 设备名称
:param state: True:打开 False关闭
"""
if _GLOBAL_RELAY is None:
raise ValueError("未初始化实例")
# 验证设备是否存在
if device_name not in device_bit_map:
valid_devices = list(device_bit_map.keys())
raise ValueError(f"无效的设备名 '{device_name}'。有效设备: {valid_devices}")
# 确保已连接
if not _GLOBAL_RELAY.is_connected:
if not _GLOBAL_RELAY.connect():
raise RuntimeError("无法连接到网络继电器")
try:
_GLOBAL_RELAY.set_device(device_name, state)
except Exception as e:
raise RuntimeError(f"控制设备 '{device_name}' 失败: {e}")
def read_all_io() -> dict[str, dict[str, bool]]:
"""
读取所有DI传感器和DO设备状态
:return: {'devices': {...}, 'sensors': {...}}
"""
if _GLOBAL_RELAY is None:
raise ValueError("未初始化")
try:
devices = _GLOBAL_RELAY.get_all_device_status('devices')
sensors = _GLOBAL_RELAY.get_all_device_status('sensors')
return {'devices': devices, 'sensors': sensors}
except Exception as e:
print(f"读取IO状态失败:{e}")
raise RuntimeError("读取IO失败") from e
# ------------测试接口-------------
if __name__ == '__main__':
write_do(SOLENOID_VALVE1, True)
time.sleep(5)
io_status = read_all_io()
for name, status in io_status['devices'].items():
status_str = "开启" if status else "关闭"
print(f"{device_name_map.get(name, name)}: {status_str}")