EMV延迟参数集成,旧EMV在同名文件夹下EMV_old_save
This commit is contained in:
88
EMV/sensors_test.py
Normal file
88
EMV/sensors_test.py
Normal file
@ -0,0 +1,88 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
import time
|
||||
import logging
|
||||
from threading import Thread
|
||||
from unittest.mock import patch
|
||||
|
||||
# 假设你的 RelayController 类在名为 EMV 的模块中
|
||||
from EMV import RelayController # 替换为你的模块名
|
||||
|
||||
# 设置日志格式
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
def simulate_sensor(controller, sensor_name, active=True, duration=10):
|
||||
"""
|
||||
模拟指定传感器在一段时间内处于激活状态。
|
||||
:param controller: RelayController 实例
|
||||
:param sensor_name: 要模拟的传感器名(如 controller.SENSOR2)
|
||||
:param active: 是否激活传感器信号
|
||||
:param duration: 模拟运行时间(秒)
|
||||
"""
|
||||
logging.info(f"🧪 开始模拟传感器:{controller.sensor_name_map.get(sensor_name, sensor_name)}")
|
||||
original_method = controller.get_all_device_status
|
||||
|
||||
def mock_get_all_device_status(command_type='devices'):
|
||||
if command_type == 'sensors':
|
||||
all_status = original_method(command_type)
|
||||
all_status[sensor_name] = active
|
||||
return all_status
|
||||
return original_method(command_type)
|
||||
|
||||
# 确定目标函数
|
||||
target_func = None
|
||||
if sensor_name == controller.SENSOR1:
|
||||
target_func = controller.handle_sensor1
|
||||
elif sensor_name == controller.SENSOR2:
|
||||
target_func = controller.handle_sensor2
|
||||
else:
|
||||
raise ValueError("不支持的传感器名称")
|
||||
|
||||
# 设置 _running 为 True,确保线程能进入循环
|
||||
controller._running = True
|
||||
|
||||
# 启动线程
|
||||
sensor_thread = Thread(target=target_func, daemon=True)
|
||||
sensor_thread.start()
|
||||
logging.info(f"✅ {controller.sensor_name_map[sensor_name]} 监听线程已启动")
|
||||
|
||||
try:
|
||||
# Patch get_all_device_status 方法
|
||||
with patch.object(controller, 'get_all_device_status', mock_get_all_device_status):
|
||||
logging.info(f"🟢 模拟 {controller.sensor_name_map[sensor_name]} 有信号输入,持续 {duration} 秒")
|
||||
time.sleep(duration)
|
||||
except Exception as e:
|
||||
logging.error(f"🔴 模拟过程中发生错误: {e}")
|
||||
finally:
|
||||
# 停止控制器
|
||||
controller._running = False
|
||||
logging.info("🛑 停止控制器主循环")
|
||||
# 等待线程退出
|
||||
sensor_thread.join(timeout=2)
|
||||
if sensor_thread.is_alive():
|
||||
logging.warning("⚠️ 传感器线程未能及时退出")
|
||||
else:
|
||||
logging.info("✅ 传感器线程已安全退出")
|
||||
|
||||
if __name__ == '__main__':
|
||||
# 创建控制器实例
|
||||
relay_controller = RelayController()
|
||||
|
||||
# 打印当前配置的传感器名称映射(方便调试)
|
||||
logging.info("🔧 当前传感器配置:")
|
||||
for key, val in relay_controller.sensor_name_map.items():
|
||||
logging.info(f" {key}: {val}")
|
||||
|
||||
try:
|
||||
# 模拟 SENSOR2 有信号输入,运行 10 秒
|
||||
logging.info("🧪 开始模拟传感器2")#修改这里
|
||||
simulate_sensor(relay_controller, relay_controller.SENSOR1, active=True, duration=10)#
|
||||
|
||||
# 可选:模拟 SENSOR1
|
||||
# logging.info("🧪 开始模拟传感器1")
|
||||
# simulate_sensor(relay_controller, relay_controller.SENSOR1, active=True, duration=10)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
logging.info("🛑 用户中断模拟")
|
||||
finally:
|
||||
logging.info("🏁 模拟结束")
|
||||
Reference in New Issue
Block a user