#!/usr/bin/env python # -*- coding: utf-8 -*- ''' # @Time : 2025/9/11 16:48 # @Author : reenrr # @File : test_01.py ''' import cv2 import time import os from PIL import Image import shutil from cls_inference.cls_inference import yolov11_cls_inference import numpy as np # ================== 配置参数(严格按“定义在前,使用在后”排序)================== # 1. 检测核心参数(先定义,后续打印和逻辑会用到) detection_interval = 10 # 每隔10秒检查一次 detection_frame_count = 3 # 每次检测抽取3帧 required_all_noready = True # 要求所有帧都为“盖板不对齐” # 2. 视频存储与摄像头基础参数 url = "rtsp://admin:XJ123456@192.168.1.50:554/streaming/channels/101" output_video_dir = os.path.join("camera01_videos") # 视频保存目录 # 3. 摄像头重连参数 max_retry_seconds = 10 retry_interval_seconds = 1 # 4. 分类模型参数 cls_model_path = "/userdata/data_collection/cls_inference/yolov11_cls.rknn" target_size = (640, 640) # 5. 视频录制参数 video_fps = 25 video_codec = cv2.VideoWriter_fourcc(*'mp4v') single_recording_duration = 10 # 每次录制10秒 total_target_duration = 60 # 累计目标60秒 single_recording_frames = video_fps * single_recording_duration total_target_frames = video_fps * total_target_duration def rotate_frame_180(pil_image): """将PIL图像旋转180度并转为OpenCV的BGR格式""" rotated_pil = pil_image.rotate(180, expand=True) rotated_rgb = np.array(rotated_pil) rotated_bgr = cv2.cvtColor(rotated_rgb, cv2.COLOR_RGB2BGR) return rotated_bgr if __name__ == '__main__': # 全局状态变量(断连重连后保留) total_recorded_frames = 0 # 累计录制总帧数 current_segment = 0 # 当前视频分段编号 is_recording = False # 是否正在录制 current_video_filepath = None# 当前录制视频路径 # 单次连接内的临时状态变量 video_writer = None # 视频写入对象 recorded_frames = 0 # 当前分段已录制帧数 frame_count = 0 # 摄像头总读取帧数 confirmed_frames = [] # 检测通过的确认帧(用于录制起始) last_detection_time = time.time() # 上次检测时间 detection_window_frames = [] # 检测窗口帧缓存 # 创建视频目录(确保目录存在) os.makedirs(output_video_dir, exist_ok=True) # 打印目标信息(此时detection_frame_count已提前定义) print(f"✅ 已创建/确认视频目录: {output_video_dir}") print(f"🎯 目标:累计录制{total_target_duration}秒,每次录制{single_recording_duration}秒,需{detection_frame_count}帧全为盖板不对齐") # 外层循环:处理摄像头断连与重连 while True: # 初始化摄像头连接 cap = cv2.VideoCapture(url) cap.set(cv2.CAP_PROP_BUFFERSIZE, 5) # 设置RTSP缓存为5MB cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264')) # 强制H264解码 # 摄像头连接重试逻辑 start_retry_time = time.time() while not cap.isOpened(): # 超过最大重试时间则退出程序 if time.time() - start_retry_time >= max_retry_seconds: print(f"\n❌ 已尝试重新连接 {max_retry_seconds} 秒,仍无法获取视频流,程序退出。") # 退出前释放未关闭的视频写入器 if video_writer is not None: video_writer.release() print(f"📊 程序退出时累计录制:{total_recorded_frames/video_fps:.1f}秒") exit() # 每隔1秒重试一次 print(f"🔄 无法打开摄像头,正在尝试重新连接...(已重试{int(time.time()-start_retry_time)}秒)") time.sleep(retry_interval_seconds) cap.release() # 释放旧连接 cap = cv2.VideoCapture(url) # 重新创建连接 # 获取摄像头实际参数(可能与配置值不同) frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) actual_fps = cap.get(cv2.CAP_PROP_FPS) # 修正帧率(以摄像头实际帧率为准) if actual_fps > 0: video_fps = int(actual_fps) single_recording_frames = video_fps * single_recording_duration total_target_frames = video_fps * total_target_duration # 打印重连成功信息 print(f"\n✅ 摄像头重连成功(分辨率:{frame_width}x{frame_height},实际帧率:{video_fps})") print(f"📊 当前累计录制:{total_recorded_frames / video_fps:.1f}/{total_target_duration}秒") # 重连后恢复录制状态(如果断连前正在录制) if is_recording and current_video_filepath: print(f"🔄 恢复录制状态,继续录制视频:{current_video_filepath}") # 重新初始化视频写入器(确保参数与摄像头匹配) video_writer = cv2.VideoWriter( current_video_filepath, video_codec, video_fps, (frame_width, frame_height) ) # 恢复失败则重置录制状态 if not video_writer.isOpened(): print(f"⚠️ 视频写入器重新初始化失败,无法继续录制") is_recording = False video_writer = None recorded_frames = 0 # 内层循环:读取摄像头帧并处理(检测/录制) try: while True: # 读取一帧图像 ret, frame = cap.read() if not ret: print(f"\n⚠️ 读取帧失败,可能是流中断或摄像头断开") # 断连时保存当前录制进度(不释放全局状态) if video_writer is not None: video_writer.release() video_writer = None print(f"⏸️ 流中断,暂停录制(已保存当前进度)") # 重置单次连接的临时状态(全局状态保留) frame_count = 0 detection_window_frames = [] cap.release() # 释放当前摄像头连接 break # 跳出内层循环,进入重连流程 # 累计总帧数 frame_count += 1 # 预处理帧:转为RGB→PIL→旋转180度→转为BGR(适配录制和模型) rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) pil_image = Image.fromarray(rgb_frame) rotated_bgr = rotate_frame_180(pil_image) # -------------------------- 1. 未录制时:执行检测逻辑 -------------------------- if not is_recording: # 将帧加入检测窗口缓存(用于10秒后的检测) detection_window_frames.append(rotated_bgr) # 限制缓存大小(避免内存占用过高,最多保留2倍检测帧数) if len(detection_window_frames) > detection_frame_count * 2: detection_window_frames = detection_window_frames[-detection_frame_count * 2:] # 触发检测的条件: # 1. 距离上次检测超过10秒;2. 检测窗口有足够帧;3. 累计录制未完成 if (time.time() - last_detection_time) >= detection_interval and \ len(detection_window_frames) >= detection_frame_count and \ total_recorded_frames < total_target_frames: try: print(f"\n==== 开始10秒间隔检测(总帧:{frame_count},累计已录:{total_recorded_frames/video_fps:.1f}秒) ====") # 从检测窗口中均匀抽取3帧(避免连续帧重复) sample_step = max(1, len(detection_window_frames) // detection_frame_count) sample_frames = detection_window_frames[::sample_step][:detection_frame_count] print(f"📋 检测窗口共{len(detection_window_frames)}帧,均匀抽取{len(sample_frames)}帧进行判断") # 统计“盖板不对齐”的帧数 noready_frame_count = 0 valid_detection = True # 标记检测是否有效(无无效帧) for idx, sample_frame in enumerate(sample_frames): # 调用模型获取分类结果 class_name = yolov11_cls_inference(cls_model_path, sample_frame, target_size) # 校验分类结果有效性 if not isinstance(class_name, str) or class_name not in ["cover_ready", "cover_noready"]: print(f"❌ 抽取帧{idx+1}:分类结果无效({class_name}),本次检测失败") valid_detection = False break # 有无效帧则终止本次检测 # 统计不对齐帧 if class_name == "cover_noready": noready_frame_count += 1 print(f"✅ 抽取帧{idx+1}:分类结果={class_name}(符合条件)") else: print(f"❌ 抽取帧{idx+1}:分类结果={class_name}(不符合条件)") # 检测通过条件:所有帧有效 + 3帧全为不对齐 if valid_detection and noready_frame_count == detection_frame_count: print(f"\n✅ 本次检测通过({noready_frame_count}/{detection_frame_count}帧均为盖板不对齐)") # 保存检测通过的帧(用于录制起始,避免丢失检测阶段的画面) confirmed_frames.extend(sample_frames) # 检查磁盘空间(剩余<5GB则停止) total_disk, used_disk, free_disk = shutil.disk_usage(output_video_dir) if free_disk < 1024 * 1024 * 1024 * 5: print(f"❌ 磁盘空间严重不足(仅剩 {free_disk / (1024 ** 3):.2f} GB),停止录制并退出。") raise SystemExit(1) # 生成当前分段的视频文件名(包含时间戳和分段号) current_segment = total_recorded_frames // single_recording_frames + 1 timestamp = time.strftime("%Y%m%d_%H%M%S") current_video_filepath = os.path.join( output_video_dir, f"video_{timestamp}_part{current_segment}.mp4" ) # 初始化视频写入器 video_writer = cv2.VideoWriter( current_video_filepath, video_codec, video_fps, (frame_width, frame_height) ) if not video_writer.isOpened(): print(f"⚠️ 视频写入器初始化失败(路径:{current_video_filepath}),跳过本次录制") confirmed_frames = [] continue # 写入检测阶段的确认帧(录制起始画面) for frame in confirmed_frames: video_writer.write(frame) recorded_frames = len(confirmed_frames) is_recording = True # 标记为正在录制 # 打印录制开始信息 print(f"\n📹 开始录制第{current_segment}段视频(目标10秒)") print(f"📁 视频保存路径:{current_video_filepath}") print(f"🔢 已写入检测阶段的确认帧:{recorded_frames}帧") # 重置检测相关的临时状态 confirmed_frames = [] # 检测未通过(未满足“3帧全不对齐”或有无效帧) else: if valid_detection: print(f"\n❌ 本次检测未通过(仅{noready_frame_count}/{detection_frame_count}帧为盖板不对齐,需全部符合)") else: print(f"\n❌ 本次检测未通过(存在无效分类结果)") # 重置检测临时状态 confirmed_frames = [] # 更新检测时间,清空检测窗口(准备下一次检测) last_detection_time = time.time() detection_window_frames = [] # 捕获模型调用异常(不终止程序,仅重置检测状态) except Exception as e: print(f"\n⚠️ 分类模型调用异常: {str(e)}(总帧:{frame_count})") confirmed_frames = [] last_detection_time = time.time() detection_window_frames = [] continue # -------------------------- 2. 正在录制时:执行写入逻辑 -------------------------- if is_recording and video_writer is not None: # 写入当前帧(已预处理为旋转180度的BGR格式) video_writer.write(rotated_bgr) recorded_frames += 1 # 检查当前分段是否录制完成(达到10秒的帧数) if recorded_frames >= single_recording_frames: # 释放当前视频写入器(完成当前分段) video_writer.release() video_writer = None is_recording = False # 标记为未录制 # 更新累计录制进度 total_recorded_frames += recorded_frames actual_recording_time = recorded_frames / video_fps # 打印分段完成信息 print(f"\n✅ 第{current_segment}段视频录制完成") print(f"🔢 实际录制:{recorded_frames}帧 ≈ {actual_recording_time:.1f}秒") print(f"📊 累计录制:{total_recorded_frames/video_fps:.1f}/{total_target_duration}秒") # 检查是否达到总目标(60秒) if total_recorded_frames >= total_target_frames: print(f"\n🎉 已完成累计{total_target_duration}秒的录制目标!") # 重置累计状态(如需重复录制,保留此逻辑;如需单次录制,可添加break退出) total_recorded_frames = 0 current_segment = 0 # 重置录制相关的临时状态,准备下一次检测 recorded_frames = 0 last_detection_time = time.time() detection_window_frames = [] # 捕获用户中断(Ctrl+C) except KeyboardInterrupt: print(f"\n\n👋 用户中断程序") # 中断前保存当前录制的视频 if video_writer is not None: video_writer.release() print(f"⚠️ 已保存当前录制的视频:{current_video_filepath}") print(f"📊 中断时累计录制:{total_recorded_frames/video_fps:.1f}秒") break # 最终释放资源(无论内层循环因何退出) finally: cap.release() # 释放摄像头连接 if video_writer is not None: video_writer.release() # 释放视频写入器 print(f"\n🔌 视频流已关闭,累计已录:{total_recorded_frames/video_fps:.1f}秒") # 程序结束 print("\n📋 程序结束")