import os import cv2 from rknnlite.api import RKNNLite # classify_single_image, StableClassJudge, CLASS_NAMES 已在 muju_cls_rknn 中定义 from muju_cls_rknn import classify_single_image, StableClassJudge, CLASS_NAMES def run_stable_classification_loop( model_path, roi_file, image_source, stable_frames=3, display_scale=0.5, # 显示缩放比例(0.5 = 显示为原来 50%) show_window=False # 是否显示窗口 ): """ image_source: cv2.VideoCapture 对象 """ judge = StableClassJudge( stable_frames=stable_frames, ignore_class=2 # 忽略“有遮挡”类别参与稳定判断 ) cap = image_source if not hasattr(cap, "read"): raise TypeError("image_source 必须是 cv2.VideoCapture 实例") # 可选:创建可缩放窗口 if show_window: cv2.namedWindow("RTSP Stream - Press 'q' to quit", cv2.WINDOW_NORMAL) while True: ret, frame = cap.read() if not ret: print("无法读取视频帧(可能是流断开或结束)") break # 上下左右翻转 frame = cv2.flip(frame, -1) # --------------------------- # 单帧推理 # --------------------------- result = classify_single_image(frame, model_path, roi_file) class_id = result["class_id"] class_name = result["class"] score = result["score"] print(f"[FRAME] {class_name} | conf={score:.3f}") # --------------------------- # 稳定判断 # --------------------------- stable_class_id = judge.update(class_id) if stable_class_id is not None: print(f"\n稳定输出: {CLASS_NAMES[stable_class_id]}\n") # --------------------------- # 显示画面(缩小窗口) # --------------------------- if show_window: h, w = frame.shape[:2] display_frame = cv2.resize( frame, (int(w * display_scale), int(h * display_scale)), interpolation=cv2.INTER_AREA ) cv2.imshow("RTSP Stream - Press 'q' to quit", display_frame) if cv2.waitKey(1) & 0xFF == ord('q'): break cap.release() cv2.destroyAllWindows() if __name__ == "__main__": # --------------------------- # 配置参数 # --------------------------- MODEL_PATH = "muju_cls.rknn" ROI_FILE = "./roi_coordinates/muju_roi.txt" RTSP_URL = "rtsp://admin:XJ123456@192.168.250.61:554/streaming/channels/101" STABLE_FRAMES = 3 DISPLAY_SCALE = 0.5 # 显示窗口缩放比例 SHOW_WINDOW = False # 部署时改成 False,测试的时候打开 # --------------------------- # 打开 RTSP 视频流 # --------------------------- print(f"正在连接 RTSP 流: {RTSP_URL}") cap = cv2.VideoCapture(RTSP_URL) # 降低 RTSP 延迟(部分摄像头支持) cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) if not cap.isOpened(): print("无法打开 RTSP 流,请检查网络、账号密码或 URL") exit(1) print("RTSP 流连接成功,开始推理...") # --------------------------- # 启动稳定分类循环三帧稳定判断 # --------------------------- run_stable_classification_loop( model_path=MODEL_PATH, roi_file=ROI_FILE, image_source=cap, stable_frames=STABLE_FRAMES, display_scale=DISPLAY_SCALE, show_window=SHOW_WINDOW )