Files
AutoControlSystem-G/EMV/sensors_test.py

88 lines
3.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time
import logging
from threading import Thread
from unittest.mock import patch
# 假设你的 RelayController 类在名为 EMV 的模块中
from EMV import RelayController # 替换为你的模块名
# 设置日志格式
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def simulate_sensor(controller, sensor_name, active=True, duration=10):
"""
模拟指定传感器在一段时间内处于激活状态。
:param controller: RelayController 实例
:param sensor_name: 要模拟的传感器名(如 controller.SENSOR2
:param active: 是否激活传感器信号
:param duration: 模拟运行时间(秒)
"""
logging.info(f"🧪 开始模拟传感器:{controller.sensor_name_map.get(sensor_name, sensor_name)}")
original_method = controller.get_all_device_status
def mock_get_all_device_status(command_type='devices'):
if command_type == 'sensors':
all_status = original_method(command_type)
all_status[sensor_name] = active
return all_status
return original_method(command_type)
# 确定目标函数
target_func = None
if sensor_name == controller.SENSOR1:
target_func = controller.handle_sensor1
elif sensor_name == controller.SENSOR2:
target_func = controller.handle_sensor2
else:
raise ValueError("不支持的传感器名称")
# 设置 _running 为 True确保线程能进入循环
controller._running = True
# 启动线程
sensor_thread = Thread(target=target_func, daemon=True)
sensor_thread.start()
logging.info(f"{controller.sensor_name_map[sensor_name]} 监听线程已启动")
try:
# Patch get_all_device_status 方法
with patch.object(controller, 'get_all_device_status', mock_get_all_device_status):
logging.info(f"🟢 模拟 {controller.sensor_name_map[sensor_name]} 有信号输入,持续 {duration}")
time.sleep(duration)
except Exception as e:
logging.error(f"🔴 模拟过程中发生错误: {e}")
finally:
# 停止控制器
controller._running = False
logging.info("🛑 停止控制器主循环")
# 等待线程退出
sensor_thread.join(timeout=2)
if sensor_thread.is_alive():
logging.warning("⚠️ 传感器线程未能及时退出")
else:
logging.info("✅ 传感器线程已安全退出")
if __name__ == '__main__':
# 创建控制器实例
relay_controller = RelayController()
# 打印当前配置的传感器名称映射(方便调试)
logging.info("🔧 当前传感器配置:")
for key, val in relay_controller.sensor_name_map.items():
logging.info(f" {key}: {val}")
try:
# 模拟 SENSOR2 有信号输入,运行 10 秒
logging.info("🧪 开始模拟传感器2")#修改这里
simulate_sensor(relay_controller, relay_controller.SENSOR1, active=True, duration=10)#
# 可选:模拟 SENSOR1
# logging.info("🧪 开始模拟传感器1")
# simulate_sensor(relay_controller, relay_controller.SENSOR1, active=True, duration=10)
except KeyboardInterrupt:
logging.info("🛑 用户中断模拟")
finally:
logging.info("🏁 模拟结束")