Files
AutoControlSystem-G/EMV/sensors_test.py

88 lines
3.3 KiB
Python
Raw Permalink Normal View History

2025-07-29 13:16:30 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time
import logging
from threading import Thread
from unittest.mock import patch
# 假设你的 RelayController 类在名为 EMV 的模块中
from EMV import RelayController # 替换为你的模块名
# 设置日志格式
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def simulate_sensor(controller, sensor_name, active=True, duration=10):
"""
模拟指定传感器在一段时间内处于激活状态
:param controller: RelayController 实例
:param sensor_name: 要模拟的传感器名 controller.SENSOR2
:param active: 是否激活传感器信号
:param duration: 模拟运行时间
"""
logging.info(f"🧪 开始模拟传感器:{controller.sensor_name_map.get(sensor_name, sensor_name)}")
original_method = controller.get_all_device_status
def mock_get_all_device_status(command_type='devices'):
if command_type == 'sensors':
all_status = original_method(command_type)
all_status[sensor_name] = active
return all_status
return original_method(command_type)
# 确定目标函数
target_func = None
if sensor_name == controller.SENSOR1:
target_func = controller.handle_sensor1
elif sensor_name == controller.SENSOR2:
target_func = controller.handle_sensor2
else:
raise ValueError("不支持的传感器名称")
# 设置 _running 为 True确保线程能进入循环
controller._running = True
# 启动线程
sensor_thread = Thread(target=target_func, daemon=True)
sensor_thread.start()
logging.info(f"{controller.sensor_name_map[sensor_name]} 监听线程已启动")
try:
# Patch get_all_device_status 方法
with patch.object(controller, 'get_all_device_status', mock_get_all_device_status):
logging.info(f"🟢 模拟 {controller.sensor_name_map[sensor_name]} 有信号输入,持续 {duration}")
time.sleep(duration)
except Exception as e:
logging.error(f"🔴 模拟过程中发生错误: {e}")
finally:
# 停止控制器
controller._running = False
logging.info("🛑 停止控制器主循环")
# 等待线程退出
sensor_thread.join(timeout=2)
if sensor_thread.is_alive():
logging.warning("⚠️ 传感器线程未能及时退出")
else:
logging.info("✅ 传感器线程已安全退出")
if __name__ == '__main__':
# 创建控制器实例
relay_controller = RelayController()
# 打印当前配置的传感器名称映射(方便调试)
logging.info("🔧 当前传感器配置:")
for key, val in relay_controller.sensor_name_map.items():
logging.info(f" {key}: {val}")
try:
# 模拟 SENSOR2 有信号输入,运行 10 秒
logging.info("🧪 开始模拟传感器2")#修改这里
simulate_sensor(relay_controller, relay_controller.SENSOR1, active=True, duration=10)#
# 可选:模拟 SENSOR1
# logging.info("🧪 开始模拟传感器1")
# simulate_sensor(relay_controller, relay_controller.SENSOR1, active=True, duration=10)
except KeyboardInterrupt:
logging.info("🛑 用户中断模拟")
finally:
logging.info("🏁 模拟结束")