# realtime_from_gui_logic.py import cv2 import time import threading from threading import Lock import obb_angle # 确保可导入 # =============== 完全复制自你 GUI 中验证有效的 VideoStream =============== class VideoStream: def __init__(self, rtsp_url, name="Stream"): self.rtsp_url = rtsp_url self.name = name self.cap = None self.frame = None self.timestamp = 0 self.lock = Lock() self.running = False self.thread = None def start(self): self.running = True self.thread = threading.Thread(target=self.update, args=(), daemon=True) self.thread.start() return self def update(self): while self.running: if self.cap is None or not self.cap.isOpened(): print(f"[{self.name}] 正在连接 RTSP: {self.rtsp_url}") try: # 关键:显式使用 CAP_FFMPEG self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG) self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) self.cap.set(cv2.CAP_PROP_TCP_NODELAY, 1) # 可选:强制设置解码器(某些平台需要) # self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('H', '2', '6', '4')) except Exception as e: print(f"[{self.name}] 连接失败: {e}") time.sleep(1) continue ret, frame = self.cap.read() if ret: with self.lock: self.frame = frame.copy() self.timestamp = time.time() else: print(f"[{self.name}] 读取失败,准备重连...") if self.cap: self.cap.release() self.cap = None time.sleep(1) def read(self): with self.lock: if self.frame is not None and (time.time() - self.timestamp) < 1.5: return self.frame.copy() else: return None def stop(self): self.running = False if self.thread is not None: self.thread.join(timeout=2) if self.cap is not None: self.cap.release() print(f"[{self.name}] 视频流已停止") # =============== 主程序 =============== if __name__ == "__main__": RTSP_URL = "rtsp://admin:XJ123456@192.168.250.60:554/streaming/channels/101" MODEL_PATH = "./obb.rknn" # 设置 obb_angle 行为 obb_angle.DRAW_RESULT = True obb_angle.SAVE_PATH = None # 启动流(和 GUI 完全一致) stream = VideoStream(RTSP_URL, "Camera01").start() print("🚀 使用 GUI 验证过的实时流逻辑启动...") last_time = time.time() window_name = "Realtime OBB - From GUI Logic" cv2.namedWindow(window_name, cv2.WINDOW_NORMAL) cv2.resizeWindow(window_name, 800, 600) # 宽 800,高 600 try: while True: frame = stream.read() if frame is None: time.sleep(0.01) continue # 上下翻转 frame = cv2.flip(frame, 0) # 推理 angle_result, vis_frame = obb_angle.detect_two_box_angle(MODEL_PATH, frame) # 打印角度 if angle_result is not None: if isinstance(angle_result, (list, tuple)) and len(angle_result) >= 2: a1, a2 = angle_result[0], angle_result[1] diff = abs(a1 - a2) diff = min(diff, 360 - diff) print(f"Box1: {a1:.1f}°, Box2: {a2:.1f}°, 夹角: {diff:.1f}°") else: print(f"角度: {angle_result:.1f}°") else: print("未检测到目标") # 显示(窗口已提前创建并设好大小) cv2.imshow(window_name, vis_frame) # FPS 估算0 curr_time = time.time() fps = 1.0 / (curr_time - last_time + 1e-6) last_time = curr_time print(f"FPS: {fps:.1f}") if cv2.waitKey(1) == ord('q'): break except KeyboardInterrupt: pass finally: stream.stop() cv2.destroyAllWindows()