import time import threading from vision.visual_callback import VisualCallback, angle_visual_callback # 创建VisualCallback实例 visual_callback = VisualCallback() # 模拟safe_control_lower_close执行 def simulate_safe_close(): while True: time.sleep(1) """模拟safe_control_lower_close执行""" print("\n=== 开始模拟safe_control_lower_close执行 ===") # 1. 首先,发送一些视觉回调数据,观察正常情况下的行为 print("\n1. 正常运行阶段:") for i in range(5): angle_visual_callback(50.0, "无堆料") time.sleep(1) # 2. 模拟safe_control_lower_close开始执行 print("\n2. 模拟safe_control_lower_close开始执行:") visual_callback._is_safe_closing = True # 3. 再次发送视觉回调数据,观察是否会跳过relay操作 print("\n3. safe_control_lower_close执行中:") for i in range(5): angle_visual_callback(50.0, "无堆料") time.sleep(1) # 4. 模拟safe_control_lower_close执行完毕 print("\n4. 模拟safe_control_lower_close执行完毕:") visual_callback._is_safe_closing = False # 5. 再次发送视觉回调数据,观察是否恢复正常 print("\n5. 恢复正常运行:") for i in range(5): angle_visual_callback(50.0, "无堆料") time.sleep(1) print("\n=== 测试结束 ===") visual_callback.shutdown() # 启动测试 try: simulate_safe_close() except KeyboardInterrupt: print("\n\n接收到Ctrl+C,正在停止服务...") finally: # 确保服务正确停止 visual_callback.shutdown() visual_callback.relay_controller.close_all() print("服务已安全停止")