Files

129 lines
4.2 KiB
Python
Raw Permalink Normal View History

2025-12-11 08:37:09 +08:00
# realtime_from_gui_logic.py
import cv2
import time
import threading
from threading import Lock
import obb_angle # 确保可导入
# =============== 完全复制自你 GUI 中验证有效的 VideoStream ===============
class VideoStream:
def __init__(self, rtsp_url, name="Stream"):
self.rtsp_url = rtsp_url
self.name = name
self.cap = None
self.frame = None
self.timestamp = 0
self.lock = Lock()
self.running = False
self.thread = None
def start(self):
self.running = True
self.thread = threading.Thread(target=self.update, args=(), daemon=True)
self.thread.start()
return self
def update(self):
while self.running:
if self.cap is None or not self.cap.isOpened():
print(f"[{self.name}] 正在连接 RTSP: {self.rtsp_url}")
try:
# 关键:显式使用 CAP_FFMPEG
self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
self.cap.set(cv2.CAP_PROP_TCP_NODELAY, 1)
# 可选:强制设置解码器(某些平台需要)
# self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('H', '2', '6', '4'))
except Exception as e:
print(f"[{self.name}] 连接失败: {e}")
time.sleep(1)
continue
ret, frame = self.cap.read()
if ret:
with self.lock:
self.frame = frame.copy()
self.timestamp = time.time()
else:
print(f"[{self.name}] 读取失败,准备重连...")
if self.cap:
self.cap.release()
self.cap = None
time.sleep(1)
def read(self):
with self.lock:
if self.frame is not None and (time.time() - self.timestamp) < 1.5:
return self.frame.copy()
else:
return None
def stop(self):
self.running = False
if self.thread is not None:
self.thread.join(timeout=2)
if self.cap is not None:
self.cap.release()
print(f"[{self.name}] 视频流已停止")
# =============== 主程序 ===============
if __name__ == "__main__":
RTSP_URL = "rtsp://admin:XJ123456@192.168.250.60:554/streaming/channels/101"
MODEL_PATH = "./obb.rknn"
# 设置 obb_angle 行为
obb_angle.DRAW_RESULT = True
obb_angle.SAVE_PATH = None
# 启动流(和 GUI 完全一致)
stream = VideoStream(RTSP_URL, "Camera01").start()
print("🚀 使用 GUI 验证过的实时流逻辑启动...")
last_time = time.time()
window_name = "Realtime OBB - From GUI Logic"
cv2.namedWindow(window_name, cv2.WINDOW_NORMAL)
cv2.resizeWindow(window_name, 800, 600) # 宽 800高 600
try:
while True:
frame = stream.read()
if frame is None:
time.sleep(0.01)
continue
# 上下翻转
frame = cv2.flip(frame, 0)
# 推理
angle_result, vis_frame = obb_angle.detect_two_box_angle(MODEL_PATH, frame)
# 打印角度
if angle_result is not None:
if isinstance(angle_result, (list, tuple)) and len(angle_result) >= 2:
a1, a2 = angle_result[0], angle_result[1]
diff = abs(a1 - a2)
diff = min(diff, 360 - diff)
print(f"Box1: {a1:.1f}°, Box2: {a2:.1f}°, 夹角: {diff:.1f}°")
else:
print(f"角度: {angle_result:.1f}°")
else:
print("未检测到目标")
# 显示(窗口已提前创建并设好大小)
cv2.imshow(window_name, vis_frame)
# FPS 估算0
curr_time = time.time()
fps = 1.0 / (curr_time - last_time + 1e-6)
last_time = curr_time
print(f"FPS: {fps:.1f}")
if cv2.waitKey(1) == ord('q'):
break
except KeyboardInterrupt:
pass
finally:
stream.stop()
cv2.destroyAllWindows()