#!/usr/bin/env python # -*- coding: utf-8 -*- ''' # @Time : 2025/9/11 16:48 # @Author : reenrr # @File : video_new.py ''' import cv2 import time import os from PIL import Image import shutil # 用于检查磁盘空间 from cls_inference.cls_inference import yolov11_cls_inference import numpy as np # ================== 配置参数 ================== url = "rtsp://admin:XJ123456@192.168.1.50:554/streaming/channels/101" check_interval = 100 # 每隔 N 帧检查一次“盖板对齐”状态 output_video_dir = os.path.join("camera01_videos") # 视频保存目录 # 多帧确认参数 required_consecutive_frames = 3 # 需要连续 N 帧检测为"盖板对齐"才开始录制 # 摄像头重连参数 max_retry_seconds = 10 # 最大重试时间为10秒 retry_interval_seconds = 1 # 每隔1秒尝试重新连接一次 # 分类模型参数 cls_model_path = "/userdata/data_collection/cls_inference/yolov11_cls.rknn" # 分类模型路径 target_size = (640, 640) # 视频录制参数 video_fps = 25 # 视频帧率 video_codec = cv2.VideoWriter_fourcc(*'mp4v') # MP4 编码 video_duration = 60 # 每次检测到符合条件后录制的秒数 frame_per_video = video_fps * video_duration # 每个视频的总帧数 def rotate_frame_180(pil_image): """ 统一处理:将PIL图像旋转180度,并转为OpenCV录制所需的BGR格式 input: pil_image (PIL.Image对象,RGB格式) output: rotated_bgr (numpy数组,BGR格式,旋转180度后) """ # 1. 旋转180度(expand=True避免图像被裁剪) rotated_pil = pil_image.rotate(180, expand=True) # 2. 转为numpy数组(此时是RGB格式) rotated_rgb = np.array(rotated_pil) # 3. 转为BGR格式(OpenCV VideoWriter要求BGR输入) rotated_bgr = cv2.cvtColor(rotated_rgb, cv2.COLOR_RGB2BGR) return rotated_bgr if __name__ == '__main__': # 视频录制状态变量 is_recording = False # 是否正在录制视频 video_writer = None # 视频写入对象 recorded_frames = 0 # 当前视频已录制帧数 # 创建视频目录 os.makedirs(output_video_dir, exist_ok=True) print(f"✅ 已创建/确认视频目录: {output_video_dir}") while True: # 外层循环:处理摄像头断连重连 cap = cv2.VideoCapture(url) # 设置RTSP流缓存大小 cap.set(cv2.CAP_PROP_BUFFERSIZE, 5) # 5MB缓存 # 强制指定解码方式 cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264')) start_time = time.time() # 摄像头连接重试逻辑 while not cap.isOpened(): if time.time() - start_time >= max_retry_seconds: print(f"已尝试重新连接 {max_retry_seconds} 秒,仍无法获取视频流,程序退出。") # 若退出时正在录制,先释放视频写入对象 if video_writer is not None: video_writer.release() video_writer = None cap.release() exit() print("无法打开摄像头,正在尝试重新连接...") time.sleep(retry_interval_seconds) cap.release() cap = cv2.VideoCapture(url) # 获取摄像头实际参数 frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) actual_fps = cap.get(cv2.CAP_PROP_FPS) # 修正帧率 if actual_fps > 0: video_fps = int(actual_fps) frame_per_video = video_fps * video_duration print(f"✅ 开始读取视频流(分辨率:{frame_width}x{frame_height},帧率:{video_fps})...") frame_count = 0 # 总帧计数器 consecutive_ready_count = 0 # 连续"盖板对齐"计数 confirmed_frames = [] # 存储确认的"盖板对齐"帧(用于录制开始前的帧) try: while True: ret, frame = cap.read() if not ret: print("读取帧失败,可能是流中断或摄像头断开") # 若断连时正在录制,先释放视频写入对象 if video_writer is not None: video_writer.release() video_writer = None is_recording = False recorded_frames = 0 print("⚠️ 流中断时正在录制,已保存当前视频。") # 重置检测状态 consecutive_ready_count = 0 confirmed_frames = [] cap.release() break # 跳出内层循环,重新连接摄像头 frame_count += 1 # 转换为RGB并创建PIL图像(用于后续处理) rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) pil_image = Image.fromarray(rgb_frame) # 预先旋转180度(为后续可能的录制做准备) rotated_bgr = rotate_frame_180(pil_image) # 按间隔检测“盖板对齐”状态(仅未录制时执行) if not is_recording and frame_count % check_interval == 0: try: # 调用模型判断状态(使用旋转后的图像) class_name = yolov11_cls_inference(cls_model_path, rotated_bgr, target_size) # 校验类别有效性 if not isinstance(class_name, str) or class_name not in ["cover_ready", "cover_noready"]: print(f"跳过检测:模型返回无效类别({class_name}) (总帧:{frame_count})") consecutive_ready_count = 0 confirmed_frames = [] continue # 检测到"盖板对齐" if class_name == "cover_ready": consecutive_ready_count += 1 # 保存当前确认的帧 confirmed_frames.append(rotated_bgr) print( f"检测到'盖板对齐',连续计数: {consecutive_ready_count}/{required_consecutive_frames} (总帧:{frame_count})") # 达到所需连续帧数,开始录制 if consecutive_ready_count >= required_consecutive_frames: # 检查磁盘空间 total, used, free = shutil.disk_usage(output_video_dir) if free < 1024 * 1024 * 1024 * 5: print(f"❌ 磁盘空间严重不足(仅剩 {free / (1024 ** 3):.2f} GB),停止录制。") consecutive_ready_count = 0 confirmed_frames = [] raise SystemExit(1) # 生成视频文件名 timestamp = time.strftime("%Y%m%d_%H%M%S") video_filename = f"video_{timestamp}.mp4" video_filepath = os.path.join(output_video_dir, video_filename) # 初始化视频写入器 video_writer = cv2.VideoWriter( video_filepath, video_codec, video_fps, (frame_width, frame_height) ) if not video_writer.isOpened(): print(f"⚠️ 视频写入器初始化失败,无法录制视频(路径:{video_filepath})") consecutive_ready_count = 0 confirmed_frames = [] continue # 写入之前确认的帧 for confirmed_frame in confirmed_frames: video_writer.write(confirmed_frame) is_recording = True # 已录制帧数为确认帧数量 recorded_frames = len(confirmed_frames) print( f"📹 开始录制视频(已连续{required_consecutive_frames}帧'盖板对齐',保存路径:{video_filepath})") print(f"已写入 {recorded_frames} 帧确认帧") # 重置计数器,为下一次检测做准备 consecutive_ready_count = 0 confirmed_frames = [] # 检测到"盖板不对齐",重置计数器 else: print(f"盖板状态:{class_name} (总帧:{frame_count})") if consecutive_ready_count > 0: print( f"检测到'盖板不对齐',重置连续计数 (当前计数: {consecutive_ready_count}/{required_consecutive_frames})") consecutive_ready_count = 0 confirmed_frames = [] except Exception as e: print(f"分类模型调用异常: {e}(总帧:{frame_count})") consecutive_ready_count = 0 confirmed_frames = [] continue # 若正在录制,持续写入旋转后的帧 if is_recording: video_writer.write(rotated_bgr) recorded_frames += 1 # 检查是否达到录制时长 if recorded_frames >= frame_per_video: video_writer.release() video_writer = None is_recording = False recorded_frames = 0 print(f"✅ 视频录制完成(已达 {video_duration} 秒,共录制 {frame_per_video} 帧)") except KeyboardInterrupt: print("\n用户中断程序") # 中断时若正在录制,先保存视频 if video_writer is not None: video_writer.release() print("⚠️ 用户中断时正在录制,已保存当前视频。") break finally: # 释放资源 cap.release() cv2.destroyAllWindows() if video_writer is not None: video_writer.release() is_recording = False recorded_frames = 0 consecutive_ready_count = 0 confirmed_frames = [] print(f"视频流已关闭,共处理总帧:{frame_count}") print("程序结束")