88 lines
3.3 KiB
Python
88 lines
3.3 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
import time
|
||
import logging
|
||
from threading import Thread
|
||
from unittest.mock import patch
|
||
|
||
# 假设你的 RelayController 类在名为 EMV 的模块中
|
||
from EMV import RelayController # 替换为你的模块名
|
||
|
||
# 设置日志格式
|
||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||
|
||
def simulate_sensor(controller, sensor_name, active=True, duration=10):
|
||
"""
|
||
模拟指定传感器在一段时间内处于激活状态。
|
||
:param controller: RelayController 实例
|
||
:param sensor_name: 要模拟的传感器名(如 controller.SENSOR2)
|
||
:param active: 是否激活传感器信号
|
||
:param duration: 模拟运行时间(秒)
|
||
"""
|
||
logging.info(f"🧪 开始模拟传感器:{controller.sensor_name_map.get(sensor_name, sensor_name)}")
|
||
original_method = controller.get_all_device_status
|
||
|
||
def mock_get_all_device_status(command_type='devices'):
|
||
if command_type == 'sensors':
|
||
all_status = original_method(command_type)
|
||
all_status[sensor_name] = active
|
||
return all_status
|
||
return original_method(command_type)
|
||
|
||
# 确定目标函数
|
||
target_func = None
|
||
if sensor_name == controller.SENSOR1:
|
||
target_func = controller.handle_sensor1
|
||
elif sensor_name == controller.SENSOR2:
|
||
target_func = controller.handle_sensor2
|
||
else:
|
||
raise ValueError("不支持的传感器名称")
|
||
|
||
# 设置 _running 为 True,确保线程能进入循环
|
||
controller._running = True
|
||
|
||
# 启动线程
|
||
sensor_thread = Thread(target=target_func, daemon=True)
|
||
sensor_thread.start()
|
||
logging.info(f"✅ {controller.sensor_name_map[sensor_name]} 监听线程已启动")
|
||
|
||
try:
|
||
# Patch get_all_device_status 方法
|
||
with patch.object(controller, 'get_all_device_status', mock_get_all_device_status):
|
||
logging.info(f"🟢 模拟 {controller.sensor_name_map[sensor_name]} 有信号输入,持续 {duration} 秒")
|
||
time.sleep(duration)
|
||
except Exception as e:
|
||
logging.error(f"🔴 模拟过程中发生错误: {e}")
|
||
finally:
|
||
# 停止控制器
|
||
controller._running = False
|
||
logging.info("🛑 停止控制器主循环")
|
||
# 等待线程退出
|
||
sensor_thread.join(timeout=2)
|
||
if sensor_thread.is_alive():
|
||
logging.warning("⚠️ 传感器线程未能及时退出")
|
||
else:
|
||
logging.info("✅ 传感器线程已安全退出")
|
||
|
||
if __name__ == '__main__':
|
||
# 创建控制器实例
|
||
relay_controller = RelayController()
|
||
|
||
# 打印当前配置的传感器名称映射(方便调试)
|
||
logging.info("🔧 当前传感器配置:")
|
||
for key, val in relay_controller.sensor_name_map.items():
|
||
logging.info(f" {key}: {val}")
|
||
|
||
try:
|
||
# 模拟 SENSOR2 有信号输入,运行 10 秒
|
||
logging.info("🧪 开始模拟传感器2")#修改这里
|
||
simulate_sensor(relay_controller, relay_controller.SENSOR1, active=True, duration=10)#
|
||
|
||
# 可选:模拟 SENSOR1
|
||
# logging.info("🧪 开始模拟传感器1")
|
||
# simulate_sensor(relay_controller, relay_controller.SENSOR1, active=True, duration=10)
|
||
|
||
except KeyboardInterrupt:
|
||
logging.info("🛑 用户中断模拟")
|
||
finally:
|
||
logging.info("🏁 模拟结束") |