Files
2026-03-10 16:51:57 +08:00

118 lines
3.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import cv2
from rknnlite.api import RKNNLite
# classify_single_image, StableClassJudge, CLASS_NAMES 已在 muju_cls_rknn 中定义
from zdb_cls_rknn import classify_single_image, StableClassJudge, CLASS_NAMES
def run_stable_classification_loop(
model_path,
roi_file,
image_source,
stable_frames=3,
display_scale=0.5, # 显示缩放比例0.5 = 显示为原来 50%
show_window=False # 是否显示窗口
):
"""
image_source: cv2.VideoCapture 对象
"""
judge = StableClassJudge(
stable_frames=stable_frames,
ignore_class=2 # 忽略“有遮挡”类别参与稳定判断
)
cap = image_source
if not hasattr(cap, "read"):
raise TypeError("image_source 必须是 cv2.VideoCapture 实例")
# 可选:创建可缩放窗口
if show_window:
cv2.namedWindow("RTSP Stream - Press 'q' to quit", cv2.WINDOW_NORMAL)
while True:
ret, frame = cap.read()
if not ret:
print("无法读取视频帧(可能是流断开或结束)")
break
# ---------------------------
# 单帧推理
# ---------------------------
result = classify_single_image(frame, model_path, roi_file)
class_id = result["class_id"]
class_name = result["class"]
score = result["score"]
print(f"[FRAME] {class_name} | conf={score:.3f}")
# ---------------------------
# 稳定判断
# ---------------------------
stable_class_id = judge.update(class_id)
if stable_class_id is not None:
print(f"\n稳定输出: {CLASS_NAMES[stable_class_id]}\n")
# ---------------------------
# 显示画面(缩小窗口)
# ---------------------------
if show_window:
h, w = frame.shape[:2]
display_frame = cv2.resize(
frame,
(int(w * display_scale), int(h * display_scale)),
interpolation=cv2.INTER_AREA
)
cv2.imshow("RTSP Stream - Press 'q' to quit", display_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
# ---------------------------
# 配置参数
# ---------------------------
MODEL_PATH = "zdb_cls.rknn"
ROI_FILE = "./roi_coordinates/zdb_roi.txt"
RTSP_URL = "rtsp://admin:XJ123456@192.168.250.60:554/streaming/channels/101"
STABLE_FRAMES = 3
DISPLAY_SCALE = 0.5 # 显示窗口缩放比例
SHOW_WINDOW = False # 部署时改成 False测试的时候打开
# ---------------------------
# 打开 RTSP 视频流
# ---------------------------
print(f"正在连接 RTSP 流: {RTSP_URL}")
cap = cv2.VideoCapture(RTSP_URL)
# 降低 RTSP 延迟(部分摄像头支持)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
if not cap.isOpened():
print("无法打开 RTSP 流,请检查网络、账号密码或 URL")
exit(1)
print("RTSP 流连接成功,开始推理...")
# ---------------------------
# 启动稳定分类循环三帧稳定判断
# ---------------------------
run_stable_classification_loop(
model_path=MODEL_PATH,
roi_file=ROI_FILE,
image_source=cap,
stable_frames=STABLE_FRAMES,
display_scale=DISPLAY_SCALE,
show_window=SHOW_WINDOW
)