import os import sys sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) from EMV import RelayController import time from datetime import datetime import threading relay_controller = RelayController() # 实例化控制器 def test_device(device_name, action): """ 测试指定设备的开/关操作,并读取传感器状态 :param device_name: str,设备名(conveyor1, conveyor2, pusher, clamp) :param action: str,操作(open, close) """ device_map = { 'conveyor1': { 'open': lambda: relay_controller.open(conveyor1=True), 'close': lambda: relay_controller.close(conveyor1=True) }, 'conveyor2': { 'open': lambda: relay_controller.open(conveyor2=True), 'close': lambda: relay_controller.close(conveyor2=True) }, 'conveyor2_reverse': { 'open': lambda: relay_controller.open(conveyor2_reverse=True), 'close': lambda: relay_controller.close(conveyor2_reverse=True) }, 'pusher': { 'open': lambda: relay_controller.open(pusher=True), 'close': lambda: relay_controller.close(pusher=True) }, 'pusher1': { 'open': lambda: relay_controller.open(pusher1=True), 'close': lambda: relay_controller.close(pusher1=True) }, 'clamp': { 'open': lambda: relay_controller.open(clamp=True), 'close': lambda: relay_controller.close(clamp=True) }, 'alarm': { 'open': lambda: relay_controller.open(alarm=True), 'close': lambda: relay_controller.close(alarm=True) }, 'belt': { 'open': lambda: relay_controller.open(belt=True), 'close': lambda: relay_controller.close(belt=True) }, 'blow_sensor2': { 'open': lambda: relay_controller.open(blow_sensor2=True), 'close': lambda: relay_controller.close(blow_sensor2=True) } } if device_name not in device_map: print(f"❌ 未知设备: {device_name}") return if action not in ['open', 'close']: print(f"❌ 未知操作: {action}") return print(f"\n🧪 正在测试设备: {device_name},操作: {action}") device_map[device_name][action]() # 可选:等待一段时间后读取传感器状态 time.sleep(1) print("📊 当前传感器状态:") status = relay_controller.get_all_sensor_responses(command_type='sensors') print(status) # ✅ 示例调用(你可以取消注释你想测试的部分) if __name__ == "__main__": #已完成测试 #test_device('conveyor2', 'open') #test_device('conveyor2', 'close') ''' test_device('D', 'close') sensors = relay_controller.get_all_device_status('sensors') sensor2_value = sensors.get(relay_controller.SENSOR2, False) print(sensor2_value) ''' #test_device('pusher', 'open') #time.sleep(0.1) #test_device('pusher', 'close') #test_device('pusher1', 'open') #time.sleep(0.1) #test_device('pusher1', 'close') # 已完成测试 #test_device('clamp', 'open') # test_device('clamp', 'close') # test_device('conveyor2', 'close') # sensors = relay_controller.get_all_device_status() # print(sensors) # time.sleep(3) while True: test_device('conveyor2', 'open') # test_device('belt', 'close') # time.sleep(3) test_device('conveyor2', 'close') # test_device('belt', 'close') # while True: # responses = relay_controller.get_all_sensor_responses('sensors') # response = responses.get(relay_controller.SENSOR2) # if not response: # print(f"[警告] 无法获取 {relay_controller.SENSOR2} 的响应,尝试重试...") # else: # status_code = relay_controller.parse_status_code(response) # if status_code in relay_controller.required_codes_1: # print(datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]+'收到有效状态码信号') # else: # print(datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]+'无效状态码信号') # time.sleep(0.2) # relay_controller._running=True # relay_controller.sensor2_ready=True # relay_controller.handle_sensor2() # test_device('blow_sensor2', 'open') # test_device('blow_sensor2', 'close') # while True: # if relay_controller.is_valid_sensor_status_1('sensor2'): # test_device('conveyor2', 'close') # time.sleep(4) # test_device('conveyor2_reverse', 'open') # time.sleep(3) # test_device('conveyor2', 'open') # test_device('conveyor1', 'open') # test_device('conveyor1', 'close') # relay_controller._running=True # relay_controller.handle_sensor1() # test_device('conveyor1', 'open') # while True: # if relay_controller.is_valid_sensor_status(relay_controller.SENSOR1): # test_device('conveyor1', 'close') # time.sleep(0.1) # relay_controller._running=True # relay_controller.handle_emergency_pressed() time.sleep(1000000) print('aaaaa')