import time import threading import sys import os # 添加项目根目录到sys.path sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from visual_callback import VisualCallback # 创建VisualCallback实例 visual_callback = VisualCallback() # 模拟safe_control_lower_close执行 def simulate_safe_close(): """模拟safe_control_lower_close执行""" print("\n=== 开始模拟safe_control_lower_close执行 ===") # 直接调用safe_control_lower_close函数 # 注意:这里需要访问_run_feed方法中的内部函数,所以我们需要一个间接的方式来测试 # 我们可以通过修改标志位来模拟这个过程 # 1. 首先,让线程循环运行一段时间,观察正常情况下的行为 print("\n1. 正常运行阶段 (5秒):") time.sleep(5) # 2. 模拟safe_control_lower_close开始执行 print("\n2. 模拟safe_control_lower_close开始执行:") visual_callback._is_safe_closing = True # 3. 让线程循环运行一段时间,观察是否会跳过relay操作 print("\n3. safe_control_lower_close执行中 (5秒):") time.sleep(5) # 4. 模拟safe_control_lower_close执行完毕 print("\n4. 模拟safe_control_lower_close执行完毕:") visual_callback._is_safe_closing = False # 5. 再次观察正常运行 print("\n5. 恢复正常运行 (5秒):") time.sleep(5) print("\n=== 测试结束 ===") # 创建测试线程 test_thread = threading.Thread(target=simulate_safe_close) test_thread.start() # 主线程等待测试线程结束 test_thread.join() # 关闭视觉回调实例 visual_callback.shutdown()