from EMV import RelayController import time import threading relay_controller = RelayController() # 实例化控制器 def test_device(device_name, action): """ 测试指定设备的开/关操作,并读取传感器状态 :param device_name: str,设备名(conveyor1, conveyor2, pusher, clamp) :param action: str,操作(open, close) """ device_map = { 'conveyor1': { 'open': lambda: relay_controller.open(conveyor1=True), 'close': lambda: relay_controller.close(conveyor1=True) }, 'conveyor2': { 'open': lambda: relay_controller.open(conveyor2=True), 'close': lambda: relay_controller.close(conveyor2=True) }, 'conveyor2_reverse': { 'open': lambda: relay_controller.open(conveyor2_reverse=True), 'close': lambda: relay_controller.close(conveyor2_reverse=True) }, 'pusher': { 'open': lambda: relay_controller.open(pusher=True), 'close': lambda: relay_controller.close(pusher=True) }, 'pusher1': { 'open': lambda: relay_controller.open(pusher1=True), 'close': lambda: relay_controller.close(pusher1=True) }, 'clamp': { 'open': lambda: relay_controller.open(clamp=True), 'close': lambda: relay_controller.close(clamp=True) } } if device_name not in device_map: print(f"❌ 未知设备: {device_name}") return if action not in ['open', 'close']: print(f"❌ 未知操作: {action}") return print(f"\n🧪 正在测试设备: {device_name},操作: {action}") device_map[device_name][action]() # 可选:等待一段时间后读取传感器状态 time.sleep(1) print("📊 当前传感器状态:") status = relay_controller.get_all_sensor_responses(command_type='sensors') print(status) # ✅ 示例调用(你可以取消注释你想测试的部分) if __name__ == "__main__": #已完成测试 #test_device('conveyor2', 'open') #test_device('conveyor2', 'close') ''' test_device('D', 'close') sensors = relay_controller.get_all_device_status('sensors') sensor2_value = sensors.get(relay_controller.SENSOR2, False) print(sensor2_value) ''' #test_device('pusher', 'open') #time.sleep(0.1) #test_device('pusher', 'close') #test_device('pusher1', 'open') #time.sleep(0.1) #test_device('pusher1', 'close') # 已完成测试 #test_device('clamp', 'open') # test_device('clamp', 'close') # test_device('conveyor2', 'close') # sensors = relay_controller.get_all_device_status() # print(sensors) # time.sleep(3) # test_device('conveyor2', 'open') test_device('conveyor2_reverse', 'open') # time.sleep(3) # test_device('conveyor2', 'open') test_device('conveyor2', 'close') # sensors = relay_controller.get_all_device_status('sensors') # sensor2_value = sensors.get(relay_controller.SENSOR2, False) # relay_controller._running=True # relay_controller.handle_sensor2() # test_device('conveyor2', 'close') # while True: # if relay_controller.is_valid_sensor_status_1('sensor2'): # test_device('conveyor2', 'close') # time.sleep(4) # test_device('conveyor2_reverse', 'open') # time.sleep(3) # test_device('conveyor2', 'open') # test_device('conveyor1', 'open') # test_device('conveyor1', 'close') # relay_controller._running=True # relay_controller.handle_sensor1() # test_device('conveyor1', 'open') # while True: # if relay_controller.is_valid_sensor_status(relay_controller.SENSOR1): # test_device('conveyor1', 'close') # time.sleep(0.1) print('aaaaa')