#!/usr/bin/env python # -*- coding: utf-8 -*- import time import logging from threading import Thread from unittest.mock import patch # 假设你的 RelayController 类在名为 EMV 的模块中 from EMV import RelayController # 替换为你的模块名 # 设置日志格式 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') def simulate_sensor(controller, sensor_name, active=True, duration=10): """ 模拟指定传感器在一段时间内处于激活状态。 :param controller: RelayController 实例 :param sensor_name: 要模拟的传感器名(如 controller.SENSOR2) :param active: 是否激活传感器信号 :param duration: 模拟运行时间(秒) """ logging.info(f"🧪 开始模拟传感器:{controller.sensor_name_map.get(sensor_name, sensor_name)}") original_method = controller.get_all_device_status def mock_get_all_device_status(command_type='devices'): if command_type == 'sensors': all_status = original_method(command_type) all_status[sensor_name] = active return all_status return original_method(command_type) # 确定目标函数 target_func = None if sensor_name == controller.SENSOR1: target_func = controller.handle_sensor1 elif sensor_name == controller.SENSOR2: target_func = controller.handle_sensor2 else: raise ValueError("不支持的传感器名称") # 设置 _running 为 True,确保线程能进入循环 controller._running = True # 启动线程 sensor_thread = Thread(target=target_func, daemon=True) sensor_thread.start() logging.info(f"✅ {controller.sensor_name_map[sensor_name]} 监听线程已启动") try: # Patch get_all_device_status 方法 with patch.object(controller, 'get_all_device_status', mock_get_all_device_status): logging.info(f"🟢 模拟 {controller.sensor_name_map[sensor_name]} 有信号输入,持续 {duration} 秒") time.sleep(duration) except Exception as e: logging.error(f"🔴 模拟过程中发生错误: {e}") finally: # 停止控制器 controller._running = False logging.info("🛑 停止控制器主循环") # 等待线程退出 sensor_thread.join(timeout=2) if sensor_thread.is_alive(): logging.warning("⚠️ 传感器线程未能及时退出") else: logging.info("✅ 传感器线程已安全退出") if __name__ == '__main__': # 创建控制器实例 relay_controller = RelayController() # 打印当前配置的传感器名称映射(方便调试) logging.info("🔧 当前传感器配置:") for key, val in relay_controller.sensor_name_map.items(): logging.info(f" {key}: {val}") try: # 模拟 SENSOR2 有信号输入,运行 10 秒 logging.info("🧪 开始模拟传感器2")#修改这里 simulate_sensor(relay_controller, relay_controller.SENSOR1, active=True, duration=10)# # 可选:模拟 SENSOR1 # logging.info("🧪 开始模拟传感器1") # simulate_sensor(relay_controller, relay_controller.SENSOR1, active=True, duration=10) except KeyboardInterrupt: logging.info("🛑 用户中断模拟") finally: logging.info("🏁 模拟结束")