Files
AutoControlSystem-G/EMV/all_sensors_device_test.py
2026-01-16 15:21:54 +08:00

163 lines
5.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from EMV import RelayController
import time
from datetime import datetime
import threading
relay_controller = RelayController() # 实例化控制器
def test_device(device_name, action):
"""
测试指定设备的开/关操作,并读取传感器状态
:param device_name: str设备名conveyor1, conveyor2, pusher, clamp
:param action: str操作open, close
"""
device_map = {
'conveyor1': {
'open': lambda: relay_controller.open(conveyor1=True),
'close': lambda: relay_controller.close(conveyor1=True)
},
'conveyor2': {
'open': lambda: relay_controller.open(conveyor2=True),
'close': lambda: relay_controller.close(conveyor2=True)
},
'conveyor2_reverse': {
'open': lambda: relay_controller.open(conveyor2_reverse=True),
'close': lambda: relay_controller.close(conveyor2_reverse=True)
},
'pusher': {
'open': lambda: relay_controller.open(pusher=True),
'close': lambda: relay_controller.close(pusher=True)
},
'pusher1': {
'open': lambda: relay_controller.open(pusher1=True),
'close': lambda: relay_controller.close(pusher1=True)
},
'clamp': {
'open': lambda: relay_controller.open(clamp=True),
'close': lambda: relay_controller.close(clamp=True)
},
'alarm': {
'open': lambda: relay_controller.open(alarm=True),
'close': lambda: relay_controller.close(alarm=True)
},
'belt': {
'open': lambda: relay_controller.open(belt=True),
'close': lambda: relay_controller.close(belt=True)
},
'blow_sensor2': {
'open': lambda: relay_controller.open(blow_sensor2=True),
'close': lambda: relay_controller.close(blow_sensor2=True)
}
}
if device_name not in device_map:
print(f"❌ 未知设备: {device_name}")
return
if action not in ['open', 'close']:
print(f"❌ 未知操作: {action}")
return
print(f"\n🧪 正在测试设备: {device_name},操作: {action}")
device_map[device_name][action]()
# 可选:等待一段时间后读取传感器状态
time.sleep(1)
print("📊 当前传感器状态:")
status = relay_controller.get_all_sensor_responses(command_type='sensors')
print(status)
# ✅ 示例调用(你可以取消注释你想测试的部分)
if __name__ == "__main__":
#已完成测试
#test_device('conveyor2', 'open')
#test_device('conveyor2', 'close')
'''
test_device('D', 'close')
sensors = relay_controller.get_all_device_status('sensors')
sensor2_value = sensors.get(relay_controller.SENSOR2, False)
print(sensor2_value)
'''
#test_device('pusher', 'open')
#time.sleep(0.1)
#test_device('pusher', 'close')
#test_device('pusher1', 'open')
#time.sleep(0.1)
#test_device('pusher1', 'close')
# 已完成测试
#test_device('clamp', 'open')
# test_device('clamp', 'close')
# test_device('conveyor2', 'close')
# sensors = relay_controller.get_all_device_status()
# print(sensors)
# time.sleep(3)
while True:
test_device('conveyor2', 'open')
# test_device('belt', 'close')
# time.sleep(3)
test_device('conveyor2', 'close')
# test_device('belt', 'close')
# while True:
# responses = relay_controller.get_all_sensor_responses('sensors')
# response = responses.get(relay_controller.SENSOR2)
# if not response:
# print(f"[警告] 无法获取 {relay_controller.SENSOR2} 的响应,尝试重试...")
# else:
# status_code = relay_controller.parse_status_code(response)
# if status_code in relay_controller.required_codes_1:
# print(datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]+'收到有效状态码信号')
# else:
# print(datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]+'无效状态码信号')
# time.sleep(0.2)
# relay_controller._running=True
# relay_controller.sensor2_ready=True
# relay_controller.handle_sensor2()
# test_device('blow_sensor2', 'open')
# test_device('blow_sensor2', 'close')
# while True:
# if relay_controller.is_valid_sensor_status_1('sensor2'):
# test_device('conveyor2', 'close')
# time.sleep(4)
# test_device('conveyor2_reverse', 'open')
# time.sleep(3)
# test_device('conveyor2', 'open')
# test_device('conveyor1', 'open')
# test_device('conveyor1', 'close')
# relay_controller._running=True
# relay_controller.handle_sensor1()
# test_device('conveyor1', 'open')
# while True:
# if relay_controller.is_valid_sensor_status(relay_controller.SENSOR1):
# test_device('conveyor1', 'close')
# time.sleep(0.1)
# relay_controller._running=True
# relay_controller.handle_emergency_pressed()
time.sleep(1000000)
print('aaaaa')