#!/usr/bin/env python # -*- coding: utf-8 -*- ''' # @Time : 2025/9/11 16:48 # @Author : reenrr # @File : test_01.py # @Description: 结构化重构 - 录制6个10秒子视频(累计60秒) ''' import cv2 import time import os from PIL import Image import shutil from cls_inference.cls_inference import yolov11_cls_inference import numpy as np class VideoRecorder: """视频录制器类:封装配置、状态和核心逻辑""" def __init__(self): # -------------------------- 1. 配置参数(集中管理,便于修改) -------------------------- self.config = { # 检测参数 "detection_interval": 10, # 检测间隔(秒) "detection_frame_count": 3, # 每次检测抽帧数量 "required_all_noready": True, # 需所有帧为“盖板不对齐” # 摄像头参数 "rtsp_url": "rtsp://admin:XJ123456@192.168.1.50:554/streaming/channels/101", "output_dir": "camera01_videos", # 视频保存目录 "max_retry_sec": 10, # 摄像头重连超时(秒) "retry_interval_sec": 1, # 重连间隔(秒) # 模型参数 "cls_model_path": "/userdata/data_collection/cls_inference/yolov11_cls.rknn", "model_target_size": (640, 640), # 录制参数 "video_fps": 25, # 初始帧率 "video_codec": cv2.VideoWriter_fourcc(*'mp4v'), "single_duration": 10, # 单段视频时长(秒) "total_duration": 60, # 总目标时长(秒) "min_valid_duration": 10 # 最小有效视频时长(秒) } # 计算衍生配置(避免重复计算) self.config["single_frames"] = self.config["video_fps"] * self.config["single_duration"] self.config["total_frames"] = self.config["video_fps"] * self.config["total_duration"] # -------------------------- 2. 全局状态(断连后需保留) -------------------------- self.state = { "total_recorded_frames": 0, # 累计录制帧数 "current_segment": 0, # 当前分段编号 "is_recording": False, # 是否正在录制 "current_video_path": None, # 当前分段视频路径 "cached_frames": [], # 断连时缓存的帧 "last_resolution": (0, 0), # 上次摄像头分辨率(宽,高) "recording_start_time": 0 # 当前分段录制开始时间 } # -------------------------- 3. 临时状态(单次连接内有效,断连后重置) -------------------------- self.temp = { "cap": None, # 摄像头对象 "video_writer": None, # 视频写入对象 "recorded_frames": 0, # 当前分段已录帧数 "frame_count": 0, # 摄像头总读帧数 "confirmed_frames": [], # 检测通过的起始帧 "last_detection_time": time.time(),# 上次检测时间 "detection_window": [] # 检测用帧缓存窗口 } def init_environment(self): """初始化环境:创建目录、打印配置信息""" # 创建视频保存目录 os.makedirs(self.config["output_dir"], exist_ok=True) # 打印目标信息 print("=" * 60) print(f"✅ 环境初始化完成") print(f"📁 视频保存目录:{os.path.abspath(self.config['output_dir'])}") print(f"🎯 录制目标:{self.config['total_duration']}秒({self.config['total_duration']//self.config['single_duration']}段×{self.config['single_duration']}秒)") print(f"🔍 检测条件:每{self.config['detection_interval']}秒检测,需{self.config['detection_frame_count']}帧全为'盖板不对齐'") print("=" * 60) def rotate_frame(self, pil_image): """工具函数:将PIL图像旋转180度并转为OpenCV的BGR格式""" rotated_pil = pil_image.rotate(180, expand=True) rotated_rgb = np.array(rotated_pil) return cv2.cvtColor(rotated_rgb, cv2.COLOR_RGB2BGR) def connect_camera(self): """摄像头连接:含重连逻辑,返回是否连接成功""" # 初始化摄像头 self.temp["cap"] = cv2.VideoCapture(self.config["rtsp_url"]) self.temp["cap"].set(cv2.CAP_PROP_BUFFERSIZE, 5) # 设置RTSP缓存 self.temp["cap"].set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264')) # 强制H264解码 # 重连逻辑 start_retry_time = time.time() while not self.temp["cap"].isOpened(): # 超时退出 if time.time() - start_retry_time >= self.config["max_retry_sec"]: print(f"\n❌ 摄像头重连超时({self.config['max_retry_sec']}秒),程序退出") self.release_resources() # 释放资源 print(f"📊 退出时累计录制:{self.state['total_recorded_frames']/self.config['video_fps']:.1f}秒") return False # 重试提示 retry_sec = int(time.time() - start_retry_time) print(f"🔄 正在重连摄像头(已重试{retry_sec}秒)...") time.sleep(self.config["retry_interval_sec"]) # 释放旧连接,重新初始化 self.temp["cap"].release() self.temp["cap"] = cv2.VideoCapture(self.config["rtsp_url"]) # 连接成功:获取实际摄像头参数 frame_width = int(self.temp["cap"].get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(self.temp["cap"].get(cv2.CAP_PROP_FRAME_HEIGHT)) actual_fps = self.temp["cap"].get(cv2.CAP_PROP_FPS) # 修正帧率(以摄像头实际帧率为准) if actual_fps > 0: self.config["video_fps"] = int(actual_fps) self.config["single_frames"] = self.config["video_fps"] * self.config["single_duration"] self.config["total_frames"] = self.config["video_fps"] * self.config["total_duration"] # 检查分辨率变化 resolution_changed = False if self.state["last_resolution"] != (0, 0): if (frame_width, frame_height) != self.state["last_resolution"]: print(f"⚠️ 摄像头分辨率变化:{self.state['last_resolution']} → ({frame_width},{frame_height})") resolution_changed = True # 分辨率变化:重置录制状态 self.state["is_recording"] = False self.state["cached_frames"] = [] self.temp["detection_window"] = [] # 更新分辨率记录 self.state["last_resolution"] = (frame_width, frame_height) # 打印连接成功信息 print(f"\n✅ 摄像头连接成功") print(f"📊 分辨率:{frame_width}×{frame_height} | 实际帧率:{self.config['video_fps']}fps") print(f"📈 当前累计进度:{self.state['total_recorded_frames']/self.config['video_fps']:.1f}/{self.config['total_duration']}秒") return True, resolution_changed def restore_recording(self, resolution_changed): """重连后恢复录制状态:重新初始化写入器并写入缓存帧""" if not (self.state["is_recording"] and self.state["current_video_path"] and not resolution_changed): return # 无需恢复 print(f"🔄 恢复录制:{self.state['current_video_path']}") # 重新初始化视频写入器 frame_width, frame_height = self.state["last_resolution"] self.temp["video_writer"] = cv2.VideoWriter( self.state["current_video_path"], self.config["video_codec"], self.config["video_fps"], (frame_width, frame_height) ) # 恢复失败:重置状态 if not self.temp["video_writer"].isOpened(): print(f"⚠️ 视频写入器恢复失败,放弃当前分段") self.state["is_recording"] = False self.temp["recorded_frames"] = 0 self.state["cached_frames"] = [] return # 写入缓存帧 if self.state["cached_frames"]: print(f"🔄 恢复缓存的{len(self.state['cached_frames'])}帧") for frame in self.state["cached_frames"]: self.temp["video_writer"].write(frame) self.temp["recorded_frames"] = len(self.state["cached_frames"]) self.state["cached_frames"] = [] # 清空缓存 def run_detection(self): """执行检测逻辑:从缓存窗口抽帧,调用模型判断是否开始录制""" # 检测触发条件:时间间隔达标 + 缓存帧足够 + 未录满总目标 if (time.time() - self.temp["last_detection_time"] < self.config["detection_interval"]) or \ (len(self.temp["detection_window"]) < self.config["detection_frame_count"]) or \ (self.state["total_recorded_frames"] >= self.config["total_frames"]): return print(f"\n==== 开始检测(总读帧:{self.temp['frame_count']} | 累计进度:{self.state['total_recorded_frames']/self.config['video_fps']:.1f}秒) ====") # 均匀抽帧(避免连续帧重复) sample_step = max(1, len(self.temp["detection_window"]) // self.config["detection_frame_count"]) sample_frames = self.temp["detection_window"][::sample_step][:self.config["detection_frame_count"]] print(f"📋 检测窗口:{len(self.temp['detection_window'])}帧 → 抽帧:{len(sample_frames)}帧") # 模型分类:统计“盖板不对齐”帧数 noready_count = 0 valid_detection = True for idx, frame in enumerate(sample_frames): try: class_name = yolov11_cls_inference( self.config["cls_model_path"], frame, self.config["model_target_size"] ) except Exception as e: print(f"❌ 模型调用异常:{str(e)}") valid_detection = False break # 校验分类结果有效性 if not isinstance(class_name, str) or class_name not in ["cover_ready", "cover_noready"]: print(f"❌ 抽帧{idx+1}:分类结果无效({class_name})") valid_detection = False break # 统计结果 if class_name == "cover_noready": noready_count += 1 print(f"✅ 抽帧{idx+1}:{class_name}(符合条件)") else: print(f"❌ 抽帧{idx+1}:{class_name}(不符合)") # 检测未通过:重置状态 if not valid_detection or noready_count != self.config["detection_frame_count"]: if valid_detection: print(f"❌ 检测未通过(仅{noready_count}/{self.config['detection_frame_count']}帧符合条件)") else: print(f"❌ 检测未通过(存在无效结果)") self.temp["confirmed_frames"] = [] self.temp["last_detection_time"] = time.time() self.temp["detection_window"] = [] return # 检测通过:准备开始录制 print(f"✅ 检测通过!准备录制新分段") self.temp["confirmed_frames"] = sample_frames # 保存起始帧 self.start_recording() # 启动录制 # 重置检测状态 self.temp["last_detection_time"] = time.time() self.temp["detection_window"] = [] def start_recording(self): """启动新分段录制:初始化写入器、写入起始帧""" # 检查磁盘空间 total_disk, used_disk, free_disk = shutil.disk_usage(self.config["output_dir"]) if free_disk < 1024 * 1024 * 1024 * 5: # 剩余<5GB print(f"❌ 磁盘空间不足(仅剩{free_disk/(1024**3):.2f}GB),退出程序") self.release_resources() raise SystemExit(1) # 生成分段视频路径(含时间戳和分段号) self.state["current_segment"] = self.state["total_recorded_frames"] // self.config["single_frames"] + 1 timestamp = time.strftime("%Y%m%d_%H%M%S") self.state["current_video_path"] = os.path.join( self.config["output_dir"], f"video_{timestamp}_part{self.state['current_segment']}.mp4" ) # 初始化视频写入器 frame_width, frame_height = self.state["last_resolution"] self.temp["video_writer"] = cv2.VideoWriter( self.state["current_video_path"], self.config["video_codec"], self.config["video_fps"], (frame_width, frame_height) ) # 写入器初始化失败:跳过本次录制 if not self.temp["video_writer"].isOpened(): print(f"⚠️ 视频写入器初始化失败(路径:{self.state['current_video_path']})") self.temp["confirmed_frames"] = [] return # 写入检测通过的起始帧 for frame in self.temp["confirmed_frames"]: self.temp["video_writer"].write(frame) self.temp["recorded_frames"] = len(self.temp["confirmed_frames"]) self.state["is_recording"] = True self.state["recording_start_time"] = time.time() # 打印录制启动信息 print(f"\n📹 开始录制第{self.state['current_segment']}段视频(目标{self.config['single_duration']}秒)") print(f"📁 视频路径:{self.state['current_video_path']}") print(f"🔢 已写入起始帧:{self.temp['recorded_frames']}帧") self.temp["confirmed_frames"] = [] # 清空起始帧缓存 def process_recording(self, rotated_frame): """处理录制逻辑:写入当前帧,判断分段是否完成""" if not (self.state["is_recording"] and self.temp["video_writer"]): return # 写入当前帧 self.temp["video_writer"].write(rotated_frame) self.temp["recorded_frames"] += 1 # 检查分段是否完成(帧数达标 或 时间达标,取其一) actual_duration = time.time() - self.state["recording_start_time"] if self.temp["recorded_frames"] >= self.config["single_frames"] or actual_duration >= self.config["single_duration"]: self.finish_segment() def finish_segment(self): """完成当前分段录制:释放写入器、更新累计进度、检查总目标""" # 释放当前分段写入器 self.temp["video_writer"].release() self.temp["video_writer"] = None self.state["is_recording"] = False # 计算实际录制信息 actual_duration = self.temp["recorded_frames"] / self.config["video_fps"] self.state["total_recorded_frames"] += self.temp["recorded_frames"] # 打印分段完成信息 print(f"\n✅ 第{self.state['current_segment']}段录制完成") print(f"🔍 实际录制:{self.temp['recorded_frames']}帧 ≈ {actual_duration:.1f}秒(目标{self.config['single_duration']}秒)") print(f"📊 累计进度:{self.state['total_recorded_frames']/self.config['video_fps']:.1f}/{self.config['total_duration']}秒") # 检查是否达到总目标 if self.state["total_recorded_frames"] >= self.config["total_frames"]: print(f"\n🎉 已完成{self.config['total_duration']}秒录制目标!") # 重置累计状态(如需重复录制,保留此逻辑;单次录制可改为退出) self.state["total_recorded_frames"] = 0 self.state["current_segment"] = 0 # 重置录制临时状态 self.temp["recorded_frames"] = 0 self.temp["last_detection_time"] = time.time() self.temp["detection_window"] = [] self.state["recording_start_time"] = 0 def handle_disconnect(self): """处理摄像头断连:缓存帧、释放资源""" print(f"\n⚠️ 摄像头断连(读取帧失败)") # 缓存当前录制的帧 if self.state["is_recording"]: # 收集已录帧(含当前分段所有帧) if self.temp["video_writer"]: self.temp["video_writer"].release() self.temp["video_writer"] = None # 缓存已录帧(重连后恢复) self.state["cached_frames"] = self.temp["detection_window"].copy() # 用检测窗口缓存当前帧 print(f"⏸️ 缓存{len(self.state['cached_frames'])}帧,重连后继续录制") # 重置单次连接临时状态 self.temp["frame_count"] = 0 self.temp["detection_window"] = [] if self.temp["cap"]: self.temp["cap"].release() self.temp["cap"] = None def release_resources(self): """释放所有资源:摄像头、视频写入器""" if self.temp["cap"] and self.temp["cap"].isOpened(): self.temp["cap"].release() if self.temp["video_writer"]: self.temp["video_writer"].release() print(f"\n🔌 所有资源已释放") def run(self): """主运行逻辑:初始化→循环(连接→录制→断连处理)""" # 初始化环境 self.init_environment() try: while True: # 1. 连接摄像头(含重连) connect_success, resolution_changed = self.connect_camera() if not connect_success: break # 连接失败,退出程序 # 2. 重连后恢复录制状态 self.restore_recording(resolution_changed) # 3. 主循环:读取帧→处理 last_frame_time = time.time() while True: # 读取帧 ret, frame = self.temp["cap"].read() if not ret: self.handle_disconnect() break # 断连,跳出内层循环重连 # 打印实时帧率(每10帧更新一次) current_time = time.time() fps = 1 / (current_time - last_frame_time) if (current_time - last_frame_time) > 0 else 0 last_frame_time = current_time if self.temp["frame_count"] % 10 == 0 and self.temp["frame_count"] > 0: print(f"📊 帧率:{fps:.1f} | 总读帧:{self.temp['frame_count']}", end='\r') self.temp["frame_count"] += 1 # 帧预处理(旋转180度) rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) rotated_frame = self.rotate_frame(Image.fromarray(rgb_frame)) # 加入检测窗口缓存(用于检测和断连缓存) self.temp["detection_window"].append(rotated_frame) # 限制缓存大小(避免内存溢出) if len(self.temp["detection_window"]) > self.config["detection_frame_count"] * 2: self.temp["detection_window"] = self.temp["detection_window"][-self.config["detection_frame_count"] * 2:] # 未录制:执行检测 if not self.state["is_recording"]: self.run_detection() # 正在录制:执行写入 else: self.process_recording(rotated_frame) # 捕获用户中断(Ctrl+C) except KeyboardInterrupt: print(f"\n\n👋 用户主动中断程序") # 处理未完成视频 if self.state["is_recording"] and self.state["current_video_path"] and os.path.exists(self.state["current_video_path"]): duration = self.temp["recorded_frames"] / self.config["video_fps"] if duration < self.config["min_valid_duration"]: os.remove(self.state["current_video_path"]) print(f"🗑️ 删除不足{self.config['min_valid_duration']}秒的视频:{self.state['current_video_path']}") else: print(f"⚠️ 保存未完成视频:{self.state['current_video_path']}({duration:.1f}秒)") print(f"📊 中断时累计录制:{self.state['total_recorded_frames']/self.config['video_fps']:.1f}秒") # 捕获其他异常 except Exception as e: print(f"\n⚠️ 程序异常:{str(e)}") # 最终释放资源 finally: self.release_resources() print("\n📋 程序结束") if __name__ == '__main__': # 初始化并运行录制器 recorder = VideoRecorder() recorder.run()