commit 8c28da93006e62432641eedf4187873a5fb42f49 Author: pengqi Date: Fri Sep 26 20:41:44 2025 +0800 V1.0 diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..359bb53 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# 默认忽略的文件 +/shelf/ +/workspace.xml diff --git a/.idea/data_collection.iml b/.idea/data_collection.iml new file mode 100644 index 0000000..4d81b84 --- /dev/null +++ b/.idea/data_collection.iml @@ -0,0 +1,12 @@ + + + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..78dfc6d --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,18 @@ + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..d30a556 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..48dd15c --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/cls_inference/cls_inference.py b/cls_inference/cls_inference.py new file mode 100644 index 0000000..87ae78a --- /dev/null +++ b/cls_inference/cls_inference.py @@ -0,0 +1,166 @@ +import cv2 +import numpy as np +import platform +from .labels import labels # 确保这个文件存在 +from rknnlite.api import RKNNLite + + +# ------------------- 核心:全局变量存储RKNN模型实例(确保只加载一次) ------------------- +# 初始化为None,首次调用时加载模型,后续直接复用 +_global_rknn_instance = None + +# device tree for RK356x/RK3576/RK3588 +DEVICE_COMPATIBLE_NODE = '/proc/device-tree/compatible' + +def get_host(): + # get platform and device type + system = platform.system() + machine = platform.machine() + os_machine = system + '-' + machine + if os_machine == 'Linux-aarch64': + try: + with open(DEVICE_COMPATIBLE_NODE) as f: + device_compatible_str = f.read() + if 'rk3562' in device_compatible_str: + host = 'RK3562' + elif 'rk3576' in device_compatible_str: + host = 'RK3576' + elif 'rk3588' in device_compatible_str: + host = 'RK3588' + else: + host = 'RK3566_RK3568' + except IOError: + print('Read device node {} failed.'.format(DEVICE_COMPATIBLE_NODE)) + exit(-1) + else: + host = os_machine + return host + +def get_top1_class_str(result): + """ + 从推理结果中提取出得分最高的类别,并返回字符串 + + 参数: + result (list): 模型推理输出结果(格式需与原函数一致,如 [np.ndarray]) + 返回: + str:得分最高类别的格式化字符串 + 若推理失败,返回错误提示字符串 + """ + if result is None: + print("Inference failed: result is None") + return + + # 解析推理输出(与原逻辑一致:展平输出为1维数组) + output = result[0].reshape(-1) + + # 获取得分最高的类别索引(np.argmax 直接返回最大值索引,比排序更高效) + top1_index = np.argmax(output) + + # 处理标签(确保索引在 labels 列表范围内,避免越界) + if 0 <= top1_index < len(labels): + top1_class_name = labels[top1_index] + else: + top1_class_name = "Unknown Class" # 应对索引异常的边界情况 + + # 5. 格式化返回字符串(包含索引、得分、类别名称,得分保留6位小数) + return top1_class_name + +def preprocess(raw_image, target_size=(640, 640)): + """ + 读取图像并执行预处理(BGR转RGB、调整尺寸、添加Batch维度) + + 参数: + image_path (str): 图像文件的完整路径(如 "C:/test.jpg" 或 "/home/user/test.jpg") + target_size (tuple): 预处理后图像的目标尺寸,格式为 (width, height),默认 (640, 640) + 返回: + img (numpy.ndarray): 预处理后的图像 + 异常: + FileNotFoundError: 图像路径不存在或无法读取时抛出 + ValueError: 图像读取成功但为空(如文件损坏)时抛出 + """ + # img = cv2.cvtColor(raw_image, cv2.COLOR_BGR2RGB) + # 调整尺寸 + + img = cv2.resize(raw_image, target_size) + img = np.expand_dims(img, 0) # 添加batch维度 + + return img + +# ------------------- 新增:模型初始化函数(控制只加载一次) ------------------- +def init_rknn_model(model_path): + """ + 初始化RKNN模型(全局唯一实例): + - 首次调用:加载模型+初始化运行时,返回模型实例 + - 后续调用:直接返回已加载的全局实例,避免重复加载 + """ + global _global_rknn_instance # 声明使用全局变量 + + # 若模型未加载过,执行加载逻辑 + if _global_rknn_instance is None: + # 1. 创建RKNN实例(关闭内置日志) + rknn_lite = RKNNLite(verbose=False) + + # 2. 加载RKNN模型 + ret = rknn_lite.load_rknn(model_path) + if ret != 0: + print(f'[ERROR] Load CLS_RKNN model failed (code: {ret})') + exit(ret) + + # 3. 初始化运行时(绑定NPU核心0) + ret = rknn_lite.init_runtime(core_mask=RKNNLite.NPU_CORE_0) + if ret != 0: + print(f'[ERROR] Init CLS_RKNN runtime failed (code: {ret})') + exit(ret) + + # 4. 将加载好的实例赋值给全局变量 + _global_rknn_instance = rknn_lite + print(f'[INFO] CLS_RKNN model loaded successfully (path: {model_path})') + + return _global_rknn_instance + +def yolov11_cls_inference(model_path, raw_image, target_size=(640, 640)): + """ + 根据平台进行推理,并返回最终的分类结果 + + 参数: + model_path (str): RKNN模型文件路径 + image_path (str): 图像文件的完整路径(如 "C:/test.jpg" 或 "/home/user/test.jpg") + target_size (tuple): 预处理后图像的目标尺寸,格式为 (width, height),默认 (640, 640) + """ + rknn_model = model_path + + img = preprocess(raw_image, target_size) + + # 只加载一次模型,避免重复加载 + rknn = init_rknn_model(rknn_model) + if rknn is None: + return None, img + outputs = rknn.inference([img]) + + # Show the classification results + class_name = get_top1_class_str(outputs) + + # rknn_lite.release() + + return class_name + +if __name__ == '__main__': + + # 调用yolov11_cls_inference函数(target_size使用默认值640x640,也可显式传参如(112,112)) + image_path = "/userdata/reenrr/inference_with_lite/cover_ready.jpg" + bgr_image = cv2.imread(image_path) + if bgr_image is None: + print(f"Failed to read image from {image_path}") + exit(-1) + + rgb_frame = cv2.cvtColor(bgr_image, cv2.COLOR_BGR2RGB) + print(f"Read image from {image_path}, shape: {rgb_frame.shape}") + + result = yolov11_cls_inference( + model_path="/userdata/PyQt_main_test/app/view/yolo/yolov11_cls.rknn", + raw_image=rgb_frame, + target_size=(640, 640) + ) + # 打印最终结果 + print(f"\n最终分类结果:{result}") + diff --git a/cls_inference/labels.py b/cls_inference/labels.py new file mode 100644 index 0000000..4ed38b9 --- /dev/null +++ b/cls_inference/labels.py @@ -0,0 +1,6 @@ +# the labels come from synset.txt, download link: https://s3.amazonaws.com/onnx-model-zoo/synset.txt + +labels = \ +{0: 'cover_noready', + 1: 'cover_ready' +} \ No newline at end of file diff --git a/cls_inference/yolov11_cls.rknn b/cls_inference/yolov11_cls.rknn new file mode 100644 index 0000000..eca9557 Binary files /dev/null and b/cls_inference/yolov11_cls.rknn differ diff --git a/image_new.py b/image_new.py new file mode 100644 index 0000000..0a73341 --- /dev/null +++ b/image_new.py @@ -0,0 +1,157 @@ +import cv2 +import time +import os +import numpy as np +from PIL import Image +from skimage.metrics import structural_similarity as ssim +import shutil # 新增:用于检查磁盘空间 + +# ================== 配置参数 ================== +url = "rtsp://admin:XJ123456@192.168.1.50:554/streaming/channels/101" +save_interval = 15 # 每隔 N 帧处理一次(可调) +SSIM_THRESHOLD = 0.9 # SSIM 相似度阈值,>0.9 认为太像 +output_dir = os.path.join("camera01") # 固定路径:userdata/image + +# 灰色判断参数 +GRAY_LOWER = 70 +GRAY_UPPER = 230 +GRAY_RATIO_THRESHOLD = 0.7 + +# 创建输出目录 +if not os.path.exists(output_dir): + os.makedirs(output_dir) + print(f"已创建目录: {output_dir}") + +def is_large_gray(image, gray_lower=GRAY_LOWER, gray_upper=GRAY_UPPER, ratio_thresh=GRAY_RATIO_THRESHOLD): + """ + 判断图片是否大面积为灰色(R/G/B 都在 [gray_lower, gray_upper] 区间) + """ + img_array = np.array(image) + if len(img_array.shape) != 3 or img_array.shape[2] != 3: + return True # 非三通道图视为无效/灰色 + + h, w, _ = img_array.shape + total = h * w + + gray_mask = ( + (img_array[:, :, 0] >= gray_lower) & (img_array[:, :, 0] <= gray_upper) & + (img_array[:, :, 1] >= gray_lower) & (img_array[:, :, 1] <= gray_upper) & + (img_array[:, :, 2] >= gray_lower) & (img_array[:, :, 2] <= gray_upper) + ) + gray_pixels = np.sum(gray_mask) + gray_ratio = gray_pixels / total + + return gray_ratio > ratio_thresh + +max_retry_seconds = 10 # 最大重试时间为10秒 +retry_interval_seconds = 1 # 每隔1秒尝试重新连接一次 + +while True: # 外层循环用于处理重新连接逻辑 + cap = cv2.VideoCapture(url) + start_time = time.time() # 记录开始尝试连接的时间 + + while not cap.isOpened(): # 如果无法打开摄像头,则进入重试逻辑 + if time.time() - start_time >= max_retry_seconds: + print(f"已尝试重新连接 {max_retry_seconds} 秒,但仍无法获取视频流。") + exit() + + print("无法打开摄像头,正在尝试重新连接...") + time.sleep(retry_interval_seconds) # 等待一段时间后再次尝试 + cap = cv2.VideoCapture(url) + + print("✅ 开始读取视频流...") + + frame_count = 0 + last_gray = None # 用于 SSIM 去重 + + try: + while True: + ret, frame = cap.read() + if not ret: + print("读取帧失败,可能是流中断或摄像头断开") + cap.release() # 释放资源以便重新连接 + break # 跳出内层循环尝试重新连接 + + frame_count += 1 + + # 仅在指定间隔处理保存逻辑 + if frame_count % save_interval != 0: + cv2.imshow('Camera Stream (Live)', frame) + if cv2.waitKey(1) == ord('q'): + raise KeyboardInterrupt + continue + + print(f"处理帧 {frame_count}") + + # 转为 PIL 图像(用于后续判断和旋转) + rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + pil_image = Image.fromarray(rgb_frame) + + # STEP 1: 判断是否为大面积灰色(优先级最高) + if is_large_gray(pil_image): + print(f"跳过:大面积灰色图像 (frame_{frame_count})") + cv2.imshow('Camera Stream (Live)', frame) + if cv2.waitKey(1) == ord('q'): + raise KeyboardInterrupt + continue + + # STEP 2: 判断是否为重复帧(基于 SSIM) + gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + if last_gray is not None: + try: + similarity = ssim(gray, last_gray) + if similarity > SSIM_THRESHOLD: + print(f"跳过:与上一帧太相似 (SSIM={similarity:.3f})") + cv2.imshow('Camera Stream (Live)', frame) + if cv2.waitKey(1) == ord('q'): + raise KeyboardInterrupt + continue + except Exception as e: + print(f"SSIM 计算异常: {e}") + + # 更新 last_gray 用于下一帧比较 + last_gray = gray.copy() + + # STEP 3: 旋转 180 度 + rotated_pil = pil_image.rotate(180, expand=False) + + # 生成文件名(时间戳 + 毫秒防重),使用 .png 扩展名 + timestamp = time.strftime("%Y%m%d_%H%M%S") + ms = int((time.time() % 1) * 1000) + filename = f"frame_{timestamp}_{ms:03d}.png" + filepath = os.path.join(output_dir, filename) + + # ✅ 新增:检查磁盘可用空间 + total, used, free = shutil.disk_usage(output_dir) + if free < 1024 * 1024 * 1024* 5: # 小于 5GB 就停止 + print(f"❌ 磁盘空间严重不足(仅剩 {free / (1024**3):.2f} GB),停止运行。") + raise SystemExit(1) + + # 保存图像为 PNG 格式(无损) + try: + rotated_pil.save(filepath, format='PNG') + print(f"已保存: {filepath}") + except (OSError, IOError) as e: + error_msg = str(e) + if "No space left on device" in error_msg or "disk full" in error_msg.lower() or "quota" in error_msg.lower(): + print(f"磁盘空间不足,无法保存 {filepath}!错误: {e}") + print("停止程序以防止无限错误。") + raise SystemExit(1) # 或者使用break来跳出循环 + else: + print(f"保存失败 {filename}: {e}(非磁盘空间问题,继续运行)") + + # 显示画面 + cv2.imshow('Camera Stream (Live)', frame) + if cv2.waitKey(1) == ord('q'): + raise KeyboardInterrupt + + except KeyboardInterrupt: + print("\n用户中断") + break # 跳出外层循环并退出程序 + + finally: + cap.release() + cv2.destroyAllWindows() + print(f"视频流已关闭,共处理 {frame_count} 帧。") + +print("程序结束") diff --git a/merge_video.py b/merge_video.py new file mode 100644 index 0000000..31d7e6c --- /dev/null +++ b/merge_video.py @@ -0,0 +1,179 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/9/25 15:30 +# @Author : reenrr +# @File : merge_video.py +''' +# !/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/09/25 +# @Author : reenrr +# @File : video_merger.py +# 功能描述:基于 FFmpeg 合并多个视频文件(支持跨格式、进度显示) +# 依赖:需先安装 FFmpeg 并配置环境变量 +''' +import os +import subprocess +import platform +from typing import List + + +class VideoMerger: + def __init__(self): + # 检查 FFmpeg 是否安装(通过调用 ffmpeg -version 验证) + self._check_ffmpeg_installed() + + def _check_ffmpeg_installed(self) -> None: + """检查 FFmpeg 是否已安装并配置环境变量""" + try: + # 调用 FFmpeg 版本命令,无异常则说明安装成功 + subprocess.run( + ["ffmpeg", "-version"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + check=True + ) + except FileNotFoundError: + raise EnvironmentError( + "未找到 FFmpeg!请先安装 FFmpeg 并配置系统环境变量。\n" + "下载地址:https://ffmpeg.org/download.html\n" + "Windows 配置:将 FFmpeg 的 bin 目录添加到 PATH 环境变量;\n" + "macOS:使用 brew install ffmpeg;\n" + "Linux:使用 sudo apt install ffmpeg(Ubuntu)。" + ) + except subprocess.CalledProcessError: + raise RuntimeError("FFmpeg 安装异常,请重新安装。") + + def _create_video_list_file(self, video_paths: List[str], list_file_path: str = "video_list.txt") -> str: + """ + 创建 FFmpeg 所需的视频列表文件(用于合并多个视频) + FFmpeg 合并需通过文本文件指定视频路径,格式为:file '视频路径' + """ + # 检查所有视频文件是否存在 + for idx, path in enumerate(video_paths): + if not os.path.exists(path): + raise FileNotFoundError(f"第 {idx + 1} 个视频文件不存在:{path}") + # 检查是否为文件(而非目录) + if not os.path.isfile(path): + raise IsADirectoryError(f"第 {idx + 1} 个路径不是文件:{path}") + + # 写入视频列表(处理路径中的特殊字符,兼容 Windows 反斜杠) + with open(list_file_path, "w", encoding="utf-8") as f: + for path in video_paths: + # 统一路径分隔符为正斜杠(FFmpeg 兼容正斜杠) + normalized_path = path.replace("\\", "/") + # 写入格式:file '路径'(单引号避免路径含空格/特殊字符) + f.write(f"file '{normalized_path}'\n") + + return list_file_path + + def merge_videos(self, + video_paths: List[str], + output_path: str = "merged_output.mp4", + overwrite: bool = False) -> None: + """ + 合并多个视频文件 + :param video_paths: 待合并的视频路径列表(顺序即合并顺序) + :param output_path: 输出视频路径(默认当前目录 merged_output.mp4) + :param overwrite: 是否覆盖已存在的输出文件(默认不覆盖) + """ + # 1. 基础参数校验 + if len(video_paths) < 2: + raise ValueError("至少需要 2 个视频文件才能合并!") + + # 2. 处理输出文件(若已存在且不允许覆盖,直接退出) + if os.path.exists(output_path) and not overwrite: + raise FileExistsError(f"输出文件已存在:{output_path},若需覆盖请设置 overwrite=True。") + + # 3. 创建 FFmpeg 视频列表文件 + list_file = self._create_video_list_file(video_paths) + print(f"已创建视频列表文件:{list_file}") + + # 4. 构造 FFmpeg 合并命令 + # -f concat:使用 concat 协议(合并多个文件) + # -safe 0:允许处理任意路径的文件(避免 FFmpeg 限制相对路径) + # -i {list_file}:输入视频列表文件 + # -c copy:直接复制音视频流(不重新编码,速度快;若格式不兼容会自动 fallback 编码) + # -y:强制覆盖输出文件(仅 overwrite=True 时启用) + ffmpeg_cmd = [ + "ffmpeg", + "-f", "concat", + "-safe", "0", + "-i", list_file, + "-c", "copy", # 核心参数:复制流(快速合并),若格式不兼容可删除此句(自动编码) + "-hide_banner", # 隐藏 FFmpeg 版本横幅(简化输出) + "-loglevel", "info" # 显示进度和关键日志(可改为 warning 减少输出) + ] + # 若允许覆盖,添加 -y 参数 + if overwrite: + ffmpeg_cmd.append("-y") + # 添加输出路径 + ffmpeg_cmd.append(output_path) + + try: + print(f"\n开始合并视频(共 {len(video_paths)} 个):") + for idx, path in enumerate(video_paths, 1): + print(f" {idx}. {os.path.basename(path)}") + print(f"\n输出路径:{os.path.abspath(output_path)}") + print("=" * 50) + + # 5. 执行 FFmpeg 命令(实时打印进度) + process = subprocess.Popen( + ffmpeg_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, # 合并 stdout 和 stderr,统一捕获进度 + text=True, + bufsize=1 # 行缓冲,实时输出日志 + ) + + # 实时打印 FFmpeg 输出(显示合并进度) + for line in process.stdout: + print(line.strip(), end="\r" if "time=" in line else "\n") # 进度行覆盖显示,其他行换行 + + # 等待命令执行完成,获取返回码 + process.wait() + if process.returncode == 0: + print("\n" + "=" * 50) + print(f"视频合并成功!输出文件:{os.path.abspath(output_path)}") + else: + raise RuntimeError(f"FFmpeg 执行失败(返回码:{process.returncode}),请检查日志排查问题。") + + finally: + # 6. 清理临时文件(视频列表文件) + if os.path.exists(list_file): + os.remove(list_file) + print(f"\n已清理临时列表文件:{list_file}") + + +if __name__ == "__main__": + # -------------------------- 配置参数(根据需求修改) -------------------------- + # 1. 待合并的视频路径列表(顺序即合并顺序,支持绝对路径/相对路径) + # 示例1:相对路径(视频文件与脚本在同一目录) + # VIDEO_PATHS = [ + # "video1.mp4", + # "video2.mkv", + # "video3.avi" + # ] + # 示例2:绝对路径(Windows 需用双反斜杠或正斜杠) + VIDEO_PATHS = [ + r"C:\Project\zjsh\data_collection\data_collection\camera01_videos\video_20250925_062802_part1.mp4", # Windows 绝对路径(r 表示原始字符串,避免转义) + r"C:\Project\zjsh\data_collection\data_collection\camera01_videos\video_20250925_062905_part2.mp4" + ] + # 2. 输出视频路径(默认当前目录,可自定义) + OUTPUT_PATH = "merged_video.mp4" + # 3. 是否覆盖已存在的输出文件(True=覆盖,False=不覆盖) + OVERWRITE = True + + # -------------------------- 执行合并 -------------------------- + try: + merger = VideoMerger() + merger.merge_videos( + video_paths=VIDEO_PATHS, + output_path=OUTPUT_PATH, + overwrite=OVERWRITE + ) + except Exception as e: + print(f"\n合并失败:{str(e)}") \ No newline at end of file diff --git a/test.py b/test.py new file mode 100644 index 0000000..f39b04f --- /dev/null +++ b/test.py @@ -0,0 +1,306 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/9/11 16:48 +# @Author : reenrr +# @File : test_01.py +''' +import cv2 +import time +import os +from PIL import Image +import shutil +from cls_inference.cls_inference import yolov11_cls_inference +import numpy as np + +# ================== 配置参数(严格按“定义在前,使用在后”排序)================== +# 1. 检测核心参数(先定义,后续打印和逻辑会用到) +detection_interval = 10 # 每隔10秒检查一次 +detection_frame_count = 3 # 每次检测抽取3帧 +required_all_noready = True # 要求所有帧都为“盖板不对齐” + +# 2. 视频存储与摄像头基础参数 +url = "rtsp://admin:XJ123456@192.168.1.50:554/streaming/channels/101" +output_video_dir = os.path.join("camera01_videos") # 视频保存目录 + +# 3. 摄像头重连参数 +max_retry_seconds = 10 +retry_interval_seconds = 1 + +# 4. 分类模型参数 +cls_model_path = "/userdata/data_collection/cls_inference/yolov11_cls.rknn" +target_size = (640, 640) + +# 5. 视频录制参数 +video_fps = 25 +video_codec = cv2.VideoWriter_fourcc(*'mp4v') +single_recording_duration = 10 # 每次录制10秒 +total_target_duration = 60 # 累计目标60秒 +single_recording_frames = video_fps * single_recording_duration +total_target_frames = video_fps * total_target_duration + + +def rotate_frame_180(pil_image): + """将PIL图像旋转180度并转为OpenCV的BGR格式""" + rotated_pil = pil_image.rotate(180, expand=True) + rotated_rgb = np.array(rotated_pil) + rotated_bgr = cv2.cvtColor(rotated_rgb, cv2.COLOR_RGB2BGR) + return rotated_bgr + + +if __name__ == '__main__': + # 全局状态变量(断连重连后保留) + total_recorded_frames = 0 # 累计录制总帧数 + current_segment = 0 # 当前视频分段编号 + is_recording = False # 是否正在录制 + current_video_filepath = None# 当前录制视频路径 + + # 单次连接内的临时状态变量 + video_writer = None # 视频写入对象 + recorded_frames = 0 # 当前分段已录制帧数 + frame_count = 0 # 摄像头总读取帧数 + confirmed_frames = [] # 检测通过的确认帧(用于录制起始) + last_detection_time = time.time() # 上次检测时间 + detection_window_frames = [] # 检测窗口帧缓存 + + # 创建视频目录(确保目录存在) + os.makedirs(output_video_dir, exist_ok=True) + # 打印目标信息(此时detection_frame_count已提前定义) + print(f"✅ 已创建/确认视频目录: {output_video_dir}") + print(f"🎯 目标:累计录制{total_target_duration}秒,每次录制{single_recording_duration}秒,需{detection_frame_count}帧全为盖板不对齐") + + # 外层循环:处理摄像头断连与重连 + while True: + # 初始化摄像头连接 + cap = cv2.VideoCapture(url) + cap.set(cv2.CAP_PROP_BUFFERSIZE, 5) # 设置RTSP缓存为5MB + cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264')) # 强制H264解码 + + # 摄像头连接重试逻辑 + start_retry_time = time.time() + while not cap.isOpened(): + # 超过最大重试时间则退出程序 + if time.time() - start_retry_time >= max_retry_seconds: + print(f"\n❌ 已尝试重新连接 {max_retry_seconds} 秒,仍无法获取视频流,程序退出。") + # 退出前释放未关闭的视频写入器 + if video_writer is not None: + video_writer.release() + print(f"📊 程序退出时累计录制:{total_recorded_frames/video_fps:.1f}秒") + exit() + + # 每隔1秒重试一次 + print(f"🔄 无法打开摄像头,正在尝试重新连接...(已重试{int(time.time()-start_retry_time)}秒)") + time.sleep(retry_interval_seconds) + cap.release() # 释放旧连接 + cap = cv2.VideoCapture(url) # 重新创建连接 + + # 获取摄像头实际参数(可能与配置值不同) + frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + actual_fps = cap.get(cv2.CAP_PROP_FPS) + # 修正帧率(以摄像头实际帧率为准) + if actual_fps > 0: + video_fps = int(actual_fps) + single_recording_frames = video_fps * single_recording_duration + total_target_frames = video_fps * total_target_duration + # 打印重连成功信息 + print(f"\n✅ 摄像头重连成功(分辨率:{frame_width}x{frame_height},实际帧率:{video_fps})") + print(f"📊 当前累计录制:{total_recorded_frames / video_fps:.1f}/{total_target_duration}秒") + + # 重连后恢复录制状态(如果断连前正在录制) + if is_recording and current_video_filepath: + print(f"🔄 恢复录制状态,继续录制视频:{current_video_filepath}") + # 重新初始化视频写入器(确保参数与摄像头匹配) + video_writer = cv2.VideoWriter( + current_video_filepath, video_codec, video_fps, (frame_width, frame_height) + ) + # 恢复失败则重置录制状态 + if not video_writer.isOpened(): + print(f"⚠️ 视频写入器重新初始化失败,无法继续录制") + is_recording = False + video_writer = None + recorded_frames = 0 + + # 内层循环:读取摄像头帧并处理(检测/录制) + try: + while True: + # 读取一帧图像 + ret, frame = cap.read() + if not ret: + print(f"\n⚠️ 读取帧失败,可能是流中断或摄像头断开") + # 断连时保存当前录制进度(不释放全局状态) + if video_writer is not None: + video_writer.release() + video_writer = None + print(f"⏸️ 流中断,暂停录制(已保存当前进度)") + # 重置单次连接的临时状态(全局状态保留) + frame_count = 0 + detection_window_frames = [] + cap.release() # 释放当前摄像头连接 + break # 跳出内层循环,进入重连流程 + + # 累计总帧数 + frame_count += 1 + + # 预处理帧:转为RGB→PIL→旋转180度→转为BGR(适配录制和模型) + rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + pil_image = Image.fromarray(rgb_frame) + rotated_bgr = rotate_frame_180(pil_image) + + # -------------------------- 1. 未录制时:执行检测逻辑 -------------------------- + if not is_recording: + # 将帧加入检测窗口缓存(用于10秒后的检测) + detection_window_frames.append(rotated_bgr) + # 限制缓存大小(避免内存占用过高,最多保留2倍检测帧数) + if len(detection_window_frames) > detection_frame_count * 2: + detection_window_frames = detection_window_frames[-detection_frame_count * 2:] + + # 触发检测的条件: + # 1. 距离上次检测超过10秒;2. 检测窗口有足够帧;3. 累计录制未完成 + if (time.time() - last_detection_time) >= detection_interval and \ + len(detection_window_frames) >= detection_frame_count and \ + total_recorded_frames < total_target_frames: + + try: + print(f"\n==== 开始10秒间隔检测(总帧:{frame_count},累计已录:{total_recorded_frames/video_fps:.1f}秒) ====") + # 从检测窗口中均匀抽取3帧(避免连续帧重复) + sample_step = max(1, len(detection_window_frames) // detection_frame_count) + sample_frames = detection_window_frames[::sample_step][:detection_frame_count] + print(f"📋 检测窗口共{len(detection_window_frames)}帧,均匀抽取{len(sample_frames)}帧进行判断") + + # 统计“盖板不对齐”的帧数 + noready_frame_count = 0 + valid_detection = True # 标记检测是否有效(无无效帧) + for idx, sample_frame in enumerate(sample_frames): + # 调用模型获取分类结果 + class_name = yolov11_cls_inference(cls_model_path, sample_frame, target_size) + + # 校验分类结果有效性 + if not isinstance(class_name, str) or class_name not in ["cover_ready", "cover_noready"]: + print(f"❌ 抽取帧{idx+1}:分类结果无效({class_name}),本次检测失败") + valid_detection = False + break # 有无效帧则终止本次检测 + + # 统计不对齐帧 + if class_name == "cover_noready": + noready_frame_count += 1 + print(f"✅ 抽取帧{idx+1}:分类结果={class_name}(符合条件)") + else: + print(f"❌ 抽取帧{idx+1}:分类结果={class_name}(不符合条件)") + + # 检测通过条件:所有帧有效 + 3帧全为不对齐 + if valid_detection and noready_frame_count == detection_frame_count: + print(f"\n✅ 本次检测通过({noready_frame_count}/{detection_frame_count}帧均为盖板不对齐)") + # 保存检测通过的帧(用于录制起始,避免丢失检测阶段的画面) + confirmed_frames.extend(sample_frames) + + # 检查磁盘空间(剩余<5GB则停止) + total_disk, used_disk, free_disk = shutil.disk_usage(output_video_dir) + if free_disk < 1024 * 1024 * 1024 * 5: + print(f"❌ 磁盘空间严重不足(仅剩 {free_disk / (1024 ** 3):.2f} GB),停止录制并退出。") + raise SystemExit(1) + + # 生成当前分段的视频文件名(包含时间戳和分段号) + current_segment = total_recorded_frames // single_recording_frames + 1 + timestamp = time.strftime("%Y%m%d_%H%M%S") + current_video_filepath = os.path.join( + output_video_dir, f"video_{timestamp}_part{current_segment}.mp4" + ) + + # 初始化视频写入器 + video_writer = cv2.VideoWriter( + current_video_filepath, video_codec, video_fps, (frame_width, frame_height) + ) + if not video_writer.isOpened(): + print(f"⚠️ 视频写入器初始化失败(路径:{current_video_filepath}),跳过本次录制") + confirmed_frames = [] + continue + + # 写入检测阶段的确认帧(录制起始画面) + for frame in confirmed_frames: + video_writer.write(frame) + recorded_frames = len(confirmed_frames) + is_recording = True # 标记为正在录制 + + # 打印录制开始信息 + print(f"\n📹 开始录制第{current_segment}段视频(目标10秒)") + print(f"📁 视频保存路径:{current_video_filepath}") + print(f"🔢 已写入检测阶段的确认帧:{recorded_frames}帧") + + # 重置检测相关的临时状态 + confirmed_frames = [] + + # 检测未通过(未满足“3帧全不对齐”或有无效帧) + else: + if valid_detection: + print(f"\n❌ 本次检测未通过(仅{noready_frame_count}/{detection_frame_count}帧为盖板不对齐,需全部符合)") + else: + print(f"\n❌ 本次检测未通过(存在无效分类结果)") + # 重置检测临时状态 + confirmed_frames = [] + + # 更新检测时间,清空检测窗口(准备下一次检测) + last_detection_time = time.time() + detection_window_frames = [] + + # 捕获模型调用异常(不终止程序,仅重置检测状态) + except Exception as e: + print(f"\n⚠️ 分类模型调用异常: {str(e)}(总帧:{frame_count})") + confirmed_frames = [] + last_detection_time = time.time() + detection_window_frames = [] + continue + + # -------------------------- 2. 正在录制时:执行写入逻辑 -------------------------- + if is_recording and video_writer is not None: + # 写入当前帧(已预处理为旋转180度的BGR格式) + video_writer.write(rotated_bgr) + recorded_frames += 1 + + # 检查当前分段是否录制完成(达到10秒的帧数) + if recorded_frames >= single_recording_frames: + # 释放当前视频写入器(完成当前分段) + video_writer.release() + video_writer = None + is_recording = False # 标记为未录制 + + # 更新累计录制进度 + total_recorded_frames += recorded_frames + actual_recording_time = recorded_frames / video_fps + + # 打印分段完成信息 + print(f"\n✅ 第{current_segment}段视频录制完成") + print(f"🔢 实际录制:{recorded_frames}帧 ≈ {actual_recording_time:.1f}秒") + print(f"📊 累计录制:{total_recorded_frames/video_fps:.1f}/{total_target_duration}秒") + + # 检查是否达到总目标(60秒) + if total_recorded_frames >= total_target_frames: + print(f"\n🎉 已完成累计{total_target_duration}秒的录制目标!") + # 重置累计状态(如需重复录制,保留此逻辑;如需单次录制,可添加break退出) + total_recorded_frames = 0 + current_segment = 0 + + # 重置录制相关的临时状态,准备下一次检测 + recorded_frames = 0 + last_detection_time = time.time() + detection_window_frames = [] + + # 捕获用户中断(Ctrl+C) + except KeyboardInterrupt: + print(f"\n\n👋 用户中断程序") + # 中断前保存当前录制的视频 + if video_writer is not None: + video_writer.release() + print(f"⚠️ 已保存当前录制的视频:{current_video_filepath}") + print(f"📊 中断时累计录制:{total_recorded_frames/video_fps:.1f}秒") + break + + # 最终释放资源(无论内层循环因何退出) + finally: + cap.release() # 释放摄像头连接 + if video_writer is not None: + video_writer.release() # 释放视频写入器 + print(f"\n🔌 视频流已关闭,累计已录:{total_recorded_frames/video_fps:.1f}秒") + + # 程序结束 + print("\n📋 程序结束") \ No newline at end of file diff --git a/test_01.py b/test_01.py new file mode 100644 index 0000000..a61bd86 --- /dev/null +++ b/test_01.py @@ -0,0 +1,423 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/9/11 16:48 +# @Author : reenrr +# @File : test_01.py +# @Description: 结构化重构 - 录制6个10秒子视频(累计60秒) +''' +import cv2 +import time +import os +from PIL import Image +import shutil +from cls_inference.cls_inference import yolov11_cls_inference +import numpy as np + + +class VideoRecorder: + """视频录制器类:封装配置、状态和核心逻辑""" + def __init__(self): + # -------------------------- 1. 配置参数(集中管理,便于修改) -------------------------- + self.config = { + # 检测参数 + "detection_interval": 10, # 检测间隔(秒) + "detection_frame_count": 3, # 每次检测抽帧数量 + "required_all_noready": True, # 需所有帧为“盖板不对齐” + # 摄像头参数 + "rtsp_url": "rtsp://admin:XJ123456@192.168.1.50:554/streaming/channels/101", + "output_dir": "camera01_videos", # 视频保存目录 + "max_retry_sec": 10, # 摄像头重连超时(秒) + "retry_interval_sec": 1, # 重连间隔(秒) + # 模型参数 + "cls_model_path": "/userdata/data_collection/cls_inference/yolov11_cls.rknn", + "model_target_size": (640, 640), + # 录制参数 + "video_fps": 25, # 初始帧率 + "video_codec": cv2.VideoWriter_fourcc(*'mp4v'), + "single_duration": 10, # 单段视频时长(秒) + "total_duration": 60, # 总目标时长(秒) + "min_valid_duration": 10 # 最小有效视频时长(秒) + } + # 计算衍生配置(避免重复计算) + self.config["single_frames"] = self.config["video_fps"] * self.config["single_duration"] + self.config["total_frames"] = self.config["video_fps"] * self.config["total_duration"] + + # -------------------------- 2. 全局状态(断连后需保留) -------------------------- + self.state = { + "total_recorded_frames": 0, # 累计录制帧数 + "current_segment": 0, # 当前分段编号 + "is_recording": False, # 是否正在录制 + "current_video_path": None, # 当前分段视频路径 + "cached_frames": [], # 断连时缓存的帧 + "last_resolution": (0, 0), # 上次摄像头分辨率(宽,高) + "recording_start_time": 0 # 当前分段录制开始时间 + } + + # -------------------------- 3. 临时状态(单次连接内有效,断连后重置) -------------------------- + self.temp = { + "cap": None, # 摄像头对象 + "video_writer": None, # 视频写入对象 + "recorded_frames": 0, # 当前分段已录帧数 + "frame_count": 0, # 摄像头总读帧数 + "confirmed_frames": [], # 检测通过的起始帧 + "last_detection_time": time.time(),# 上次检测时间 + "detection_window": [] # 检测用帧缓存窗口 + } + + def init_environment(self): + """初始化环境:创建目录、打印配置信息""" + # 创建视频保存目录 + os.makedirs(self.config["output_dir"], exist_ok=True) + # 打印目标信息 + print("=" * 60) + print(f"✅ 环境初始化完成") + print(f"📁 视频保存目录:{os.path.abspath(self.config['output_dir'])}") + print(f"🎯 录制目标:{self.config['total_duration']}秒({self.config['total_duration']//self.config['single_duration']}段×{self.config['single_duration']}秒)") + print(f"🔍 检测条件:每{self.config['detection_interval']}秒检测,需{self.config['detection_frame_count']}帧全为'盖板不对齐'") + print("=" * 60) + + def rotate_frame(self, pil_image): + """工具函数:将PIL图像旋转180度并转为OpenCV的BGR格式""" + rotated_pil = pil_image.rotate(180, expand=True) + rotated_rgb = np.array(rotated_pil) + return cv2.cvtColor(rotated_rgb, cv2.COLOR_RGB2BGR) + + def connect_camera(self): + """摄像头连接:含重连逻辑,返回是否连接成功""" + # 初始化摄像头 + self.temp["cap"] = cv2.VideoCapture(self.config["rtsp_url"]) + self.temp["cap"].set(cv2.CAP_PROP_BUFFERSIZE, 5) # 设置RTSP缓存 + self.temp["cap"].set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264')) # 强制H264解码 + + # 重连逻辑 + start_retry_time = time.time() + while not self.temp["cap"].isOpened(): + # 超时退出 + if time.time() - start_retry_time >= self.config["max_retry_sec"]: + print(f"\n❌ 摄像头重连超时({self.config['max_retry_sec']}秒),程序退出") + self.release_resources() # 释放资源 + print(f"📊 退出时累计录制:{self.state['total_recorded_frames']/self.config['video_fps']:.1f}秒") + return False + + # 重试提示 + retry_sec = int(time.time() - start_retry_time) + print(f"🔄 正在重连摄像头(已重试{retry_sec}秒)...") + time.sleep(self.config["retry_interval_sec"]) + # 释放旧连接,重新初始化 + self.temp["cap"].release() + self.temp["cap"] = cv2.VideoCapture(self.config["rtsp_url"]) + + # 连接成功:获取实际摄像头参数 + frame_width = int(self.temp["cap"].get(cv2.CAP_PROP_FRAME_WIDTH)) + frame_height = int(self.temp["cap"].get(cv2.CAP_PROP_FRAME_HEIGHT)) + actual_fps = self.temp["cap"].get(cv2.CAP_PROP_FPS) + + # 修正帧率(以摄像头实际帧率为准) + if actual_fps > 0: + self.config["video_fps"] = int(actual_fps) + self.config["single_frames"] = self.config["video_fps"] * self.config["single_duration"] + self.config["total_frames"] = self.config["video_fps"] * self.config["total_duration"] + + # 检查分辨率变化 + resolution_changed = False + if self.state["last_resolution"] != (0, 0): + if (frame_width, frame_height) != self.state["last_resolution"]: + print(f"⚠️ 摄像头分辨率变化:{self.state['last_resolution']} → ({frame_width},{frame_height})") + resolution_changed = True + # 分辨率变化:重置录制状态 + self.state["is_recording"] = False + self.state["cached_frames"] = [] + self.temp["detection_window"] = [] + + # 更新分辨率记录 + self.state["last_resolution"] = (frame_width, frame_height) + + # 打印连接成功信息 + print(f"\n✅ 摄像头连接成功") + print(f"📊 分辨率:{frame_width}×{frame_height} | 实际帧率:{self.config['video_fps']}fps") + print(f"📈 当前累计进度:{self.state['total_recorded_frames']/self.config['video_fps']:.1f}/{self.config['total_duration']}秒") + return True, resolution_changed + + def restore_recording(self, resolution_changed): + """重连后恢复录制状态:重新初始化写入器并写入缓存帧""" + if not (self.state["is_recording"] and self.state["current_video_path"] and not resolution_changed): + return # 无需恢复 + + print(f"🔄 恢复录制:{self.state['current_video_path']}") + # 重新初始化视频写入器 + frame_width, frame_height = self.state["last_resolution"] + self.temp["video_writer"] = cv2.VideoWriter( + self.state["current_video_path"], + self.config["video_codec"], + self.config["video_fps"], + (frame_width, frame_height) + ) + + # 恢复失败:重置状态 + if not self.temp["video_writer"].isOpened(): + print(f"⚠️ 视频写入器恢复失败,放弃当前分段") + self.state["is_recording"] = False + self.temp["recorded_frames"] = 0 + self.state["cached_frames"] = [] + return + + # 写入缓存帧 + if self.state["cached_frames"]: + print(f"🔄 恢复缓存的{len(self.state['cached_frames'])}帧") + for frame in self.state["cached_frames"]: + self.temp["video_writer"].write(frame) + self.temp["recorded_frames"] = len(self.state["cached_frames"]) + self.state["cached_frames"] = [] # 清空缓存 + + def run_detection(self): + """执行检测逻辑:从缓存窗口抽帧,调用模型判断是否开始录制""" + # 检测触发条件:时间间隔达标 + 缓存帧足够 + 未录满总目标 + if (time.time() - self.temp["last_detection_time"] < self.config["detection_interval"]) or \ + (len(self.temp["detection_window"]) < self.config["detection_frame_count"]) or \ + (self.state["total_recorded_frames"] >= self.config["total_frames"]): + return + + print(f"\n==== 开始检测(总读帧:{self.temp['frame_count']} | 累计进度:{self.state['total_recorded_frames']/self.config['video_fps']:.1f}秒) ====") + # 均匀抽帧(避免连续帧重复) + sample_step = max(1, len(self.temp["detection_window"]) // self.config["detection_frame_count"]) + sample_frames = self.temp["detection_window"][::sample_step][:self.config["detection_frame_count"]] + print(f"📋 检测窗口:{len(self.temp['detection_window'])}帧 → 抽帧:{len(sample_frames)}帧") + + # 模型分类:统计“盖板不对齐”帧数 + noready_count = 0 + valid_detection = True + for idx, frame in enumerate(sample_frames): + try: + class_name = yolov11_cls_inference( + self.config["cls_model_path"], frame, self.config["model_target_size"] + ) + except Exception as e: + print(f"❌ 模型调用异常:{str(e)}") + valid_detection = False + break + + # 校验分类结果有效性 + if not isinstance(class_name, str) or class_name not in ["cover_ready", "cover_noready"]: + print(f"❌ 抽帧{idx+1}:分类结果无效({class_name})") + valid_detection = False + break + + # 统计结果 + if class_name == "cover_noready": + noready_count += 1 + print(f"✅ 抽帧{idx+1}:{class_name}(符合条件)") + else: + print(f"❌ 抽帧{idx+1}:{class_name}(不符合)") + + # 检测未通过:重置状态 + if not valid_detection or noready_count != self.config["detection_frame_count"]: + if valid_detection: + print(f"❌ 检测未通过(仅{noready_count}/{self.config['detection_frame_count']}帧符合条件)") + else: + print(f"❌ 检测未通过(存在无效结果)") + self.temp["confirmed_frames"] = [] + self.temp["last_detection_time"] = time.time() + self.temp["detection_window"] = [] + return + + # 检测通过:准备开始录制 + print(f"✅ 检测通过!准备录制新分段") + self.temp["confirmed_frames"] = sample_frames # 保存起始帧 + self.start_recording() # 启动录制 + + # 重置检测状态 + self.temp["last_detection_time"] = time.time() + self.temp["detection_window"] = [] + + def start_recording(self): + """启动新分段录制:初始化写入器、写入起始帧""" + # 检查磁盘空间 + total_disk, used_disk, free_disk = shutil.disk_usage(self.config["output_dir"]) + if free_disk < 1024 * 1024 * 1024 * 5: # 剩余<5GB + print(f"❌ 磁盘空间不足(仅剩{free_disk/(1024**3):.2f}GB),退出程序") + self.release_resources() + raise SystemExit(1) + + # 生成分段视频路径(含时间戳和分段号) + self.state["current_segment"] = self.state["total_recorded_frames"] // self.config["single_frames"] + 1 + timestamp = time.strftime("%Y%m%d_%H%M%S") + self.state["current_video_path"] = os.path.join( + self.config["output_dir"], f"video_{timestamp}_part{self.state['current_segment']}.mp4" + ) + + # 初始化视频写入器 + frame_width, frame_height = self.state["last_resolution"] + self.temp["video_writer"] = cv2.VideoWriter( + self.state["current_video_path"], + self.config["video_codec"], + self.config["video_fps"], + (frame_width, frame_height) + ) + + # 写入器初始化失败:跳过本次录制 + if not self.temp["video_writer"].isOpened(): + print(f"⚠️ 视频写入器初始化失败(路径:{self.state['current_video_path']})") + self.temp["confirmed_frames"] = [] + return + + # 写入检测通过的起始帧 + for frame in self.temp["confirmed_frames"]: + self.temp["video_writer"].write(frame) + self.temp["recorded_frames"] = len(self.temp["confirmed_frames"]) + self.state["is_recording"] = True + self.state["recording_start_time"] = time.time() + + # 打印录制启动信息 + print(f"\n📹 开始录制第{self.state['current_segment']}段视频(目标{self.config['single_duration']}秒)") + print(f"📁 视频路径:{self.state['current_video_path']}") + print(f"🔢 已写入起始帧:{self.temp['recorded_frames']}帧") + self.temp["confirmed_frames"] = [] # 清空起始帧缓存 + + def process_recording(self, rotated_frame): + """处理录制逻辑:写入当前帧,判断分段是否完成""" + if not (self.state["is_recording"] and self.temp["video_writer"]): + return + + # 写入当前帧 + self.temp["video_writer"].write(rotated_frame) + self.temp["recorded_frames"] += 1 + + # 检查分段是否完成(帧数达标 或 时间达标,取其一) + actual_duration = time.time() - self.state["recording_start_time"] + if self.temp["recorded_frames"] >= self.config["single_frames"] or actual_duration >= self.config["single_duration"]: + self.finish_segment() + + def finish_segment(self): + """完成当前分段录制:释放写入器、更新累计进度、检查总目标""" + # 释放当前分段写入器 + self.temp["video_writer"].release() + self.temp["video_writer"] = None + self.state["is_recording"] = False + + # 计算实际录制信息 + actual_duration = self.temp["recorded_frames"] / self.config["video_fps"] + self.state["total_recorded_frames"] += self.temp["recorded_frames"] + + # 打印分段完成信息 + print(f"\n✅ 第{self.state['current_segment']}段录制完成") + print(f"🔍 实际录制:{self.temp['recorded_frames']}帧 ≈ {actual_duration:.1f}秒(目标{self.config['single_duration']}秒)") + print(f"📊 累计进度:{self.state['total_recorded_frames']/self.config['video_fps']:.1f}/{self.config['total_duration']}秒") + + # 检查是否达到总目标 + if self.state["total_recorded_frames"] >= self.config["total_frames"]: + print(f"\n🎉 已完成{self.config['total_duration']}秒录制目标!") + # 重置累计状态(如需重复录制,保留此逻辑;单次录制可改为退出) + self.state["total_recorded_frames"] = 0 + self.state["current_segment"] = 0 + + # 重置录制临时状态 + self.temp["recorded_frames"] = 0 + self.temp["last_detection_time"] = time.time() + self.temp["detection_window"] = [] + self.state["recording_start_time"] = 0 + + def handle_disconnect(self): + """处理摄像头断连:缓存帧、释放资源""" + print(f"\n⚠️ 摄像头断连(读取帧失败)") + # 缓存当前录制的帧 + if self.state["is_recording"]: + # 收集已录帧(含当前分段所有帧) + if self.temp["video_writer"]: + self.temp["video_writer"].release() + self.temp["video_writer"] = None + # 缓存已录帧(重连后恢复) + self.state["cached_frames"] = self.temp["detection_window"].copy() # 用检测窗口缓存当前帧 + print(f"⏸️ 缓存{len(self.state['cached_frames'])}帧,重连后继续录制") + + # 重置单次连接临时状态 + self.temp["frame_count"] = 0 + self.temp["detection_window"] = [] + if self.temp["cap"]: + self.temp["cap"].release() + self.temp["cap"] = None + + def release_resources(self): + """释放所有资源:摄像头、视频写入器""" + if self.temp["cap"] and self.temp["cap"].isOpened(): + self.temp["cap"].release() + if self.temp["video_writer"]: + self.temp["video_writer"].release() + print(f"\n🔌 所有资源已释放") + + def run(self): + """主运行逻辑:初始化→循环(连接→录制→断连处理)""" + # 初始化环境 + self.init_environment() + + try: + while True: + # 1. 连接摄像头(含重连) + connect_success, resolution_changed = self.connect_camera() + if not connect_success: + break # 连接失败,退出程序 + + # 2. 重连后恢复录制状态 + self.restore_recording(resolution_changed) + + # 3. 主循环:读取帧→处理 + last_frame_time = time.time() + while True: + # 读取帧 + ret, frame = self.temp["cap"].read() + if not ret: + self.handle_disconnect() + break # 断连,跳出内层循环重连 + + # 打印实时帧率(每10帧更新一次) + current_time = time.time() + fps = 1 / (current_time - last_frame_time) if (current_time - last_frame_time) > 0 else 0 + last_frame_time = current_time + if self.temp["frame_count"] % 10 == 0 and self.temp["frame_count"] > 0: + print(f"📊 帧率:{fps:.1f} | 总读帧:{self.temp['frame_count']}", end='\r') + self.temp["frame_count"] += 1 + + # 帧预处理(旋转180度) + rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + rotated_frame = self.rotate_frame(Image.fromarray(rgb_frame)) + + # 加入检测窗口缓存(用于检测和断连缓存) + self.temp["detection_window"].append(rotated_frame) + # 限制缓存大小(避免内存溢出) + if len(self.temp["detection_window"]) > self.config["detection_frame_count"] * 2: + self.temp["detection_window"] = self.temp["detection_window"][-self.config["detection_frame_count"] * 2:] + + # 未录制:执行检测 + if not self.state["is_recording"]: + self.run_detection() + # 正在录制:执行写入 + else: + self.process_recording(rotated_frame) + + # 捕获用户中断(Ctrl+C) + except KeyboardInterrupt: + print(f"\n\n👋 用户主动中断程序") + # 处理未完成视频 + if self.state["is_recording"] and self.state["current_video_path"] and os.path.exists(self.state["current_video_path"]): + duration = self.temp["recorded_frames"] / self.config["video_fps"] + if duration < self.config["min_valid_duration"]: + os.remove(self.state["current_video_path"]) + print(f"🗑️ 删除不足{self.config['min_valid_duration']}秒的视频:{self.state['current_video_path']}") + else: + print(f"⚠️ 保存未完成视频:{self.state['current_video_path']}({duration:.1f}秒)") + print(f"📊 中断时累计录制:{self.state['total_recorded_frames']/self.config['video_fps']:.1f}秒") + + # 捕获其他异常 + except Exception as e: + print(f"\n⚠️ 程序异常:{str(e)}") + + # 最终释放资源 + finally: + self.release_resources() + print("\n📋 程序结束") + + +if __name__ == '__main__': + # 初始化并运行录制器 + recorder = VideoRecorder() + recorder.run() \ No newline at end of file diff --git a/test_02.py b/test_02.py new file mode 100644 index 0000000..31acce1 --- /dev/null +++ b/test_02.py @@ -0,0 +1,386 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/9/11 16:48 +# @Author : reenrr +# @File : test_01.py +# @Description: 录制60秒视频,存在6个子视频,每个子视频10秒(一定会是10秒) +''' +import cv2 +import time +import os +from PIL import Image +import shutil +from cls_inference.cls_inference import yolov11_cls_inference +import numpy as np + +# ================== 配置参数(严格按“定义在前,使用在后”排序)================== +# 1. 检测核心参数(先定义,后续打印和逻辑会用到) +detection_interval = 10 # 每隔10秒检查一次 +detection_frame_count = 3 # 每次检测抽取3帧 +required_all_noready = True # 要求所有帧都为“盖板不对齐” + +# 2. 视频存储与摄像头基础参数 +url = "rtsp://admin:XJ123456@192.168.1.50:554/streaming/channels/101" +output_video_dir = os.path.join("camera01_videos") # 视频保存目录 + +# 3. 摄像头重连参数 +max_retry_seconds = 10 +retry_interval_seconds = 1 + +# 4. 分类模型参数 +cls_model_path = "/userdata/data_collection/cls_inference/yolov11_cls.rknn" +target_size = (640, 640) + +# 5. 视频录制参数 +video_fps = 25 +video_codec = cv2.VideoWriter_fourcc(*'mp4v') +single_recording_duration = 10 # 每次录制10秒 +total_target_duration = 60 # 累计目标60秒 +single_recording_frames = video_fps * single_recording_duration +total_target_frames = video_fps * total_target_duration + +# 6.最小有效视频时长,低于此值的视频会被删除 +min_valid_video_duration = 10 + +def rotate_frame_180(pil_image): + """将PIL图像旋转180度并转为OpenCV的BGR格式""" + rotated_pil = pil_image.rotate(180, expand=True) + rotated_rgb = np.array(rotated_pil) + rotated_bgr = cv2.cvtColor(rotated_rgb, cv2.COLOR_RGB2BGR) + return rotated_bgr + + +if __name__ == '__main__': + # 全局状态变量(断连重连后保留) + total_recorded_frames = 0 # 累计录制总帧数 + current_segment = 0 # 当前视频分段编号 + is_recording = False # 是否正在录制 + current_video_filepath = None# 当前录制视频路径 + global_cached_frames = [] # 断连时缓存的已录的帧 + last_width, last_height = 0, 0 # 上次摄像头参数(用于断连后恢复) + recording_start_time = 0 # 录制开始时间 + + # 单次连接内的临时状态变量 + video_writer = None # 视频写入对象 + recorded_frames = 0 # 当前分段已录制帧数 + frame_count = 0 # 摄像头总读取帧数 + confirmed_frames = [] # 检测通过的确认帧(用于录制起始) + last_detection_time = time.time() # 上次检测时间 + detection_window_frames = [] # 检测窗口帧缓存 + recorded_frames_list = [] # 已录制的帧列表(用于断连后恢复) + + # 创建视频目录(确保目录存在) + os.makedirs(output_video_dir, exist_ok=True) + # 打印目标信息(此时detection_frame_count已提前定义) + print(f"✅ 已创建/确认视频目录: {output_video_dir}") + print(f"🎯 目标:累计录制{total_target_duration}秒,每次录制{single_recording_duration}秒,需{detection_frame_count}帧全为盖板不对齐") + + # 外层循环:处理摄像头断连与重连 + while True: + # 初始化摄像头连接 + cap = cv2.VideoCapture(url) + cap.set(cv2.CAP_PROP_BUFFERSIZE, 5) # 设置RTSP缓存为5MB + cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264')) # 强制H264解码 + + # 摄像头连接重试逻辑 + start_retry_time = time.time() + while not cap.isOpened(): + # 超过最大重试时间则退出程序 + if time.time() - start_retry_time >= max_retry_seconds: + print(f"\n❌ 已尝试重新连接 {max_retry_seconds} 秒,仍无法获取视频流,程序退出。") + # 退出前释放未关闭的视频写入器 + if video_writer is not None: + video_writer.release() + print(f"📊 程序退出时累计录制:{total_recorded_frames/video_fps:.1f}秒") + exit() + + # 每隔1秒重试一次 + print(f"🔄 无法打开摄像头,正在尝试重新连接...(已重试{int(time.time()-start_retry_time)}秒)") + time.sleep(retry_interval_seconds) + cap.release() # 释放旧连接 + cap = cv2.VideoCapture(url) # 重新创建连接 + + # 获取摄像头实际参数(可能与配置值不同) + frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + actual_fps = cap.get(cv2.CAP_PROP_FPS) + # 修正帧率(以摄像头实际帧率为准) + if actual_fps > 0: + video_fps = int(actual_fps) + single_recording_frames = video_fps * single_recording_duration + total_target_frames = video_fps * total_target_duration + + # 检查分辨率是否变化 + resolution_changed = False + if last_width > 0 and last_height > 0: + if frame_width != last_width or frame_height != last_height: + print(f"⚠️ 摄像头分辨率变化({last_width}×{last_height}→{frame_width}×{frame_height})") + resolution_changed = True + # 分辨率变化时重置录制状态 + is_recording = False + global_cached_frames = [] + recorded_frames_list = [] + + # 更新分辨率记录 + last_width, last_height = frame_width, frame_height + + # 打印重连成功信息 + print(f"\n✅ 摄像头重连成功(分辨率:{frame_width}x{frame_height},实际帧率:{video_fps})") + print(f"📊 当前累计录制:{total_recorded_frames / video_fps:.1f}/{total_target_duration}秒") + + # 重连后恢复录制状态(如果断连前正在录制且分辨率未变化) + if is_recording and current_video_filepath and not resolution_changed: + print(f"🔄 恢复录制状态,继续录制视频:{current_video_filepath}") + # 重新初始化视频写入器(确保参数与摄像头匹配) + video_writer = cv2.VideoWriter( + current_video_filepath, video_codec, video_fps, (frame_width, frame_height) + ) + # 恢复失败则重置录制状态 + if not video_writer.isOpened(): + print(f"⚠️ 视频写入器重新初始化失败,无法继续录制") + is_recording = False + video_writer = None + recorded_frames = 0 + recorded_frames_list = [] + else: + # 写入缓存的帧 + if len(global_cached_frames) > 0: + print(f"🔄 恢复缓存的{len(global_cached_frames)}帧") + for frame in global_cached_frames: + video_writer.write(frame) + recorded_frames = len(global_cached_frames) + recorded_frames_list = global_cached_frames.copy() + global_cached_frames = [] + + # 内层循环:读取摄像头帧并处理(检测/录制) + try: + last_frame_time = time.time() + while True: + # 读取一帧图像 + ret, frame = cap.read() + if not ret: + print(f"\n⚠️ 读取帧失败,可能是流中断或摄像头断开") + # 断连时保存当前录制进度(不释放全局状态) + if video_writer is not None: + video_writer.release() + video_writer = None + if is_recording: + global_cached_frames = recorded_frames_list.copy() + print(f"⏸️ 流中断,缓存{len(global_cached_frames)}帧,重连后继续录制") + # 重置单次连接的临时状态(全局状态保留) + frame_count = 0 + detection_window_frames = [] + cap.release() # 释放当前摄像头连接 + break # 跳出内层循环,进入重连流程 + + # 计算并打印实时帧率 + current_time = time.time() + fps = 1 / (current_time - last_frame_time) if (current_time - last_frame_time) > 0 else 0 + last_frame_time = current_time + if frame_count % 10 == 0 and frame_count > 0: + print(f"📊 实时帧率: {fps:.1f}fps, 累计帧: {frame_count}", end='\r') + + # 累计总帧数 + frame_count += 1 + + # 预处理帧:转为RGB→PIL→旋转180度→转为BGR(适配录制和模型) + rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + pil_image = Image.fromarray(rgb_frame) + rotated_bgr = rotate_frame_180(pil_image) + + # -------------------------- 1. 未录制时:执行检测逻辑 -------------------------- + if not is_recording: + # 将帧加入检测窗口缓存(用于10秒后的检测) + detection_window_frames.append(rotated_bgr) + # 限制缓存大小(避免内存占用过高,最多保留2倍检测帧数) + if len(detection_window_frames) > detection_frame_count * 2: + detection_window_frames = detection_window_frames[-detection_frame_count * 2:] + + # 触发检测的条件: + # 1. 距离上次检测超过10秒;2. 检测窗口有足够帧;3. 累计录制未完成 + if (time.time() - last_detection_time) >= detection_interval and \ + len(detection_window_frames) >= detection_frame_count and \ + total_recorded_frames < total_target_frames: + + try: + print(f"\n==== 开始10秒间隔检测(总帧:{frame_count},累计已录:{total_recorded_frames/video_fps:.1f}秒) ====") + # 从检测窗口中均匀抽取3帧(避免连续帧重复) + sample_step = max(1, len(detection_window_frames) // detection_frame_count) + sample_frames = detection_window_frames[::sample_step][:detection_frame_count] + print(f"📋 检测窗口共{len(detection_window_frames)}帧,均匀抽取{len(sample_frames)}帧进行判断") + + # 统计“盖板不对齐”的帧数 + noready_frame_count = 0 + valid_detection = True # 标记检测是否有效(无无效帧) + for idx, sample_frame in enumerate(sample_frames): + # 调用模型获取分类结果 + class_name = yolov11_cls_inference(cls_model_path, sample_frame, target_size) + + # 校验分类结果有效性 + if not isinstance(class_name, str) or class_name not in ["cover_ready", "cover_noready"]: + print(f"❌ 抽取帧{idx+1}:分类结果无效({class_name}),本次检测失败") + valid_detection = False + break # 有无效帧则终止本次检测 + + # 统计不对齐帧 + if class_name == "cover_noready": + noready_frame_count += 1 + print(f"✅ 抽取帧{idx+1}:分类结果={class_name}(符合条件)") + else: + print(f"❌ 抽取帧{idx+1}:分类结果={class_name}(不符合条件)") + + # 检测通过条件:所有帧有效 + 3帧全为不对齐 + if valid_detection and noready_frame_count == detection_frame_count: + print(f"\n✅ 本次检测通过({noready_frame_count}/{detection_frame_count}帧均为盖板不对齐)") + # 保存检测通过的帧(用于录制起始,避免丢失检测阶段的画面) + confirmed_frames.extend(sample_frames) + + # 检查磁盘空间(剩余<5GB则停止) + total_disk, used_disk, free_disk = shutil.disk_usage(output_video_dir) + if free_disk < 1024 * 1024 * 1024 * 5: + print(f"❌ 磁盘空间严重不足(仅剩 {free_disk / (1024 ** 3):.2f} GB),停止录制并退出。") + raise SystemExit(1) + + # 生成当前分段的视频文件名(包含时间戳和分段号) + current_segment = total_recorded_frames // single_recording_frames + 1 + timestamp = time.strftime("%Y%m%d_%H%M%S") + current_video_filepath = os.path.join( + output_video_dir, f"video_{timestamp}_part{current_segment}.mp4" + ) + + # 初始化视频写入器 + video_writer = cv2.VideoWriter( + current_video_filepath, video_codec, video_fps, (frame_width, frame_height) + ) + if not video_writer.isOpened(): + print(f"⚠️ 视频写入器初始化失败(路径:{current_video_filepath}),跳过本次录制") + confirmed_frames = [] + recorded_frames_List = [] + continue + + # 写入检测阶段的确认帧(录制起始画面) + for frame in confirmed_frames: + video_writer.write(frame) + recorded_frames = len(confirmed_frames) + recorded_frames_list = confirmed_frames.copy() + is_recording = True # 标记为正在录制 + recording_start_time = time.time() + + # 打印录制开始信息 + print(f"\n📹 开始录制第{current_segment}段视频(目标10秒)") + print(f"📁 视频保存路径:{current_video_filepath}") + print(f"🔢 已写入检测阶段的确认帧:{recorded_frames}帧") + + # 重置检测相关的临时状态 + confirmed_frames = [] + + # 检测未通过(未满足“3帧全不对齐”或有无效帧) + else: + if valid_detection: + print(f"\n❌ 本次检测未通过(仅{noready_frame_count}/{detection_frame_count}帧为盖板不对齐,需全部符合)") + else: + print(f"\n❌ 本次检测未通过(存在无效分类结果)") + # 重置检测临时状态 + confirmed_frames = [] + + # 更新检测时间,清空检测窗口(准备下一次检测) + last_detection_time = time.time() + detection_window_frames = [] + + # 捕获模型调用异常(不终止程序,仅重置检测状态) + except Exception as e: + print(f"\n⚠️ 分类模型调用异常: {str(e)}(总帧:{frame_count})") + confirmed_frames = [] + last_detection_time = time.time() + detection_window_frames = [] + continue + + # -------------------------- 2. 正在录制时:执行写入逻辑 -------------------------- + if is_recording and video_writer is not None: + # 写入当前帧(已预处理为旋转180度的BGR格式) + video_writer.write(rotated_bgr) + recorded_frames += 1 + recorded_frames_list.append(rotated_bgr) + + # 检查当前分段是否录制完成(达到10秒的帧数) + actual_recording_duration = (time.time() - recording_start_time) + if actual_recording_duration >= single_recording_frames or recorded_frames >= single_recording_frames: + # 释放当前视频写入器(完成当前分段) + video_writer.release() + video_writer = None + is_recording = False # 标记为未录制 + + # 更新累计录制进度 + total_recorded_frames += recorded_frames + + # 打印分段完成信息 + print(f"\n✅ 第{current_segment}段视频录制完成") + print(f"🔢 实际录制:{recorded_frames}帧 ≈ {actual_recording_duration:.1f}秒") + print(f"📊 累计录制:{total_recorded_frames/video_fps:.1f}/{total_target_duration}秒") + + # 检查是否达到总目标(60秒) + if total_recorded_frames >= total_target_frames: + print(f"\n🎉 已完成累计{total_target_duration}秒的录制目标!") + # 重置累计状态(如需重复录制,保留此逻辑;如需单次录制,可添加break退出) + total_recorded_frames = 0 + current_segment = 0 + + # 重置录制相关的临时状态,准备下一次检测 + recorded_frames = 0 + recorded_frames_list = [] + last_detection_time = time.time() + detection_window_frames = [] + recording_start_time = 0 + + # 捕获用户中断(Ctrl+C) + except KeyboardInterrupt: + print(f"\n\n👋 用户中断程序") + # 中断前保存当前录制的视频 + if video_writer is not None: + video_writer.release() + # 检查视频时长,过短则删除 + if is_recording and recorded_frames > 0: + duration = recorded_frames / video_fps + if duration < min_valid_video_duration: + print(f"🗑️ 删除不足{min_valid_video_duration}秒的视频:{current_video_filepath}") + os.remove(current_video_filepath) + else: + print(f"⚠️ 已保存当前录制的视频:{current_video_filepath}") + print(f"📊 中断时累计录制:{total_recorded_frames/video_fps:.1f}秒") + break + + # 捕获所有其他异常 + except Exception as e: + print(f"\n⚠️ 录制过程异常:{str(e)}") + if is_recording and video_writer is not None: + video_writer.release() + # 检查视频时长,过短则删除 + if recorded_frames > 0: + duration = recorded_frames / video_fps + if duration < min_valid_video_duration: + print(f"🗑️ 删除不足{min_valid_video_duration}秒的无效视频:{current_video_filepath}") + try: + os.remove(current_video_filepath) + except: + pass + else: + print(f"⚠️ 保存未完成视频:{current_video_filepath}(时长:{duration:.1f}秒)") + # 重置录制状态 + is_recording = False + recorded_frames = 0 + recorded_frames_list = [] + global_cached_frames = [] + cap.release() + continue + + # 最终释放资源(无论内层循环因何退出) + finally: + if 'cap' in locals() and cap.isOpened(): + cap.release() # 释放摄像头连接 + if video_writer is not None: + video_writer.release() # 释放视频写入器 + print(f"\n🔌 视频流已关闭,累计已录:{total_recorded_frames/video_fps:.1f}秒") + + # 程序结束 + print("\n📋 程序结束") \ No newline at end of file diff --git a/video_new.py b/video_new.py new file mode 100644 index 0000000..fafdb08 --- /dev/null +++ b/video_new.py @@ -0,0 +1,240 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/9/11 16:48 +# @Author : reenrr +# @File : video_new.py +''' +import cv2 +import time +import os +from PIL import Image +import shutil # 用于检查磁盘空间 +from cls_inference.cls_inference import yolov11_cls_inference +import numpy as np + +# ================== 配置参数 ================== +url = "rtsp://admin:XJ123456@192.168.1.50:554/streaming/channels/101" +check_interval = 100 # 每隔 N 帧检查一次“盖板对齐”状态 +output_video_dir = os.path.join("camera01_videos") # 视频保存目录 + +# 多帧确认参数 +required_consecutive_frames = 3 # 需要连续 N 帧检测为"盖板对齐"才开始录制 + +# 摄像头重连参数 +max_retry_seconds = 10 # 最大重试时间为10秒 +retry_interval_seconds = 1 # 每隔1秒尝试重新连接一次 + +# 分类模型参数 +cls_model_path = "/userdata/data_collection/cls_inference/yolov11_cls.rknn" # 分类模型路径 +target_size = (640, 640) + +# 视频录制参数 +video_fps = 25 # 视频帧率 +video_codec = cv2.VideoWriter_fourcc(*'mp4v') # MP4 编码 +video_duration = 60 # 每次检测到符合条件后录制的秒数 +frame_per_video = video_fps * video_duration # 每个视频的总帧数 + + +def rotate_frame_180(pil_image): + """ + 统一处理:将PIL图像旋转180度,并转为OpenCV录制所需的BGR格式 + input: pil_image (PIL.Image对象,RGB格式) + output: rotated_bgr (numpy数组,BGR格式,旋转180度后) + """ + # 1. 旋转180度(expand=True避免图像被裁剪) + rotated_pil = pil_image.rotate(180, expand=True) + # 2. 转为numpy数组(此时是RGB格式) + rotated_rgb = np.array(rotated_pil) + # 3. 转为BGR格式(OpenCV VideoWriter要求BGR输入) + rotated_bgr = cv2.cvtColor(rotated_rgb, cv2.COLOR_RGB2BGR) + return rotated_bgr + + +if __name__ == '__main__': + # 视频录制状态变量 + is_recording = False # 是否正在录制视频 + video_writer = None # 视频写入对象 + recorded_frames = 0 # 当前视频已录制帧数 + + # 创建视频目录 + os.makedirs(output_video_dir, exist_ok=True) + print(f"✅ 已创建/确认视频目录: {output_video_dir}") + + while True: # 外层循环:处理摄像头断连重连 + cap = cv2.VideoCapture(url) + + # 设置RTSP流缓存大小 + cap.set(cv2.CAP_PROP_BUFFERSIZE, 5) # 5MB缓存 + # 强制指定解码方式 + cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264')) + + start_time = time.time() + + # 摄像头连接重试逻辑 + while not cap.isOpened(): + if time.time() - start_time >= max_retry_seconds: + print(f"已尝试重新连接 {max_retry_seconds} 秒,仍无法获取视频流,程序退出。") + # 若退出时正在录制,先释放视频写入对象 + if video_writer is not None: + video_writer.release() + video_writer = None + cap.release() + exit() + + print("无法打开摄像头,正在尝试重新连接...") + time.sleep(retry_interval_seconds) + cap.release() + cap = cv2.VideoCapture(url) + + # 获取摄像头实际参数 + frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + actual_fps = cap.get(cv2.CAP_PROP_FPS) + # 修正帧率 + if actual_fps > 0: + video_fps = int(actual_fps) + frame_per_video = video_fps * video_duration + print(f"✅ 开始读取视频流(分辨率:{frame_width}x{frame_height},帧率:{video_fps})...") + + frame_count = 0 # 总帧计数器 + consecutive_ready_count = 0 # 连续"盖板对齐"计数 + confirmed_frames = [] # 存储确认的"盖板对齐"帧(用于录制开始前的帧) + + try: + while True: + ret, frame = cap.read() + if not ret: + print("读取帧失败,可能是流中断或摄像头断开") + # 若断连时正在录制,先释放视频写入对象 + if video_writer is not None: + video_writer.release() + video_writer = None + is_recording = False + recorded_frames = 0 + print("⚠️ 流中断时正在录制,已保存当前视频。") + # 重置检测状态 + consecutive_ready_count = 0 + confirmed_frames = [] + cap.release() + break # 跳出内层循环,重新连接摄像头 + + frame_count += 1 + + # 转换为RGB并创建PIL图像(用于后续处理) + rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + pil_image = Image.fromarray(rgb_frame) + # 预先旋转180度(为后续可能的录制做准备) + rotated_bgr = rotate_frame_180(pil_image) + + # 按间隔检测“盖板对齐”状态(仅未录制时执行) + if not is_recording and frame_count % check_interval == 0: + try: + # 调用模型判断状态(使用旋转后的图像) + class_name = yolov11_cls_inference(cls_model_path, rotated_bgr, target_size) + + # 校验类别有效性 + if not isinstance(class_name, str) or class_name not in ["cover_ready", "cover_noready"]: + print(f"跳过检测:模型返回无效类别({class_name}) (总帧:{frame_count})") + consecutive_ready_count = 0 + confirmed_frames = [] + continue + + # 检测到"盖板对齐" + if class_name == "cover_ready": + consecutive_ready_count += 1 + # 保存当前确认的帧 + confirmed_frames.append(rotated_bgr) + print( + f"检测到'盖板对齐',连续计数: {consecutive_ready_count}/{required_consecutive_frames} (总帧:{frame_count})") + + # 达到所需连续帧数,开始录制 + if consecutive_ready_count >= required_consecutive_frames: + # 检查磁盘空间 + total, used, free = shutil.disk_usage(output_video_dir) + if free < 1024 * 1024 * 1024 * 5: + print(f"❌ 磁盘空间严重不足(仅剩 {free / (1024 ** 3):.2f} GB),停止录制。") + consecutive_ready_count = 0 + confirmed_frames = [] + raise SystemExit(1) + + # 生成视频文件名 + timestamp = time.strftime("%Y%m%d_%H%M%S") + video_filename = f"video_{timestamp}.mp4" + video_filepath = os.path.join(output_video_dir, video_filename) + + # 初始化视频写入器 + video_writer = cv2.VideoWriter( + video_filepath, video_codec, video_fps, (frame_width, frame_height) + ) + if not video_writer.isOpened(): + print(f"⚠️ 视频写入器初始化失败,无法录制视频(路径:{video_filepath})") + consecutive_ready_count = 0 + confirmed_frames = [] + continue + + # 写入之前确认的帧 + for confirmed_frame in confirmed_frames: + video_writer.write(confirmed_frame) + + is_recording = True + # 已录制帧数为确认帧数量 + recorded_frames = len(confirmed_frames) + print( + f"📹 开始录制视频(已连续{required_consecutive_frames}帧'盖板对齐',保存路径:{video_filepath})") + print(f"已写入 {recorded_frames} 帧确认帧") + + # 重置计数器,为下一次检测做准备 + consecutive_ready_count = 0 + confirmed_frames = [] + + # 检测到"盖板不对齐",重置计数器 + else: + print(f"盖板状态:{class_name} (总帧:{frame_count})") + if consecutive_ready_count > 0: + print( + f"检测到'盖板不对齐',重置连续计数 (当前计数: {consecutive_ready_count}/{required_consecutive_frames})") + consecutive_ready_count = 0 + confirmed_frames = [] + + except Exception as e: + print(f"分类模型调用异常: {e}(总帧:{frame_count})") + consecutive_ready_count = 0 + confirmed_frames = [] + continue + + # 若正在录制,持续写入旋转后的帧 + if is_recording: + video_writer.write(rotated_bgr) + recorded_frames += 1 + + # 检查是否达到录制时长 + if recorded_frames >= frame_per_video: + video_writer.release() + video_writer = None + is_recording = False + recorded_frames = 0 + print(f"✅ 视频录制完成(已达 {video_duration} 秒,共录制 {frame_per_video} 帧)") + + except KeyboardInterrupt: + print("\n用户中断程序") + # 中断时若正在录制,先保存视频 + if video_writer is not None: + video_writer.release() + print("⚠️ 用户中断时正在录制,已保存当前视频。") + break + + finally: + # 释放资源 + cap.release() + cv2.destroyAllWindows() + if video_writer is not None: + video_writer.release() + is_recording = False + recorded_frames = 0 + consecutive_ready_count = 0 + confirmed_frames = [] + print(f"视频流已关闭,共处理总帧:{frame_count}") + + print("程序结束") + diff --git a/video_new_test.py b/video_new_test.py new file mode 100644 index 0000000..3d68563 --- /dev/null +++ b/video_new_test.py @@ -0,0 +1,235 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +''' +# @Time : 2025/9/11 16:48 +# @Author : reenrr +# @File : video_new.py +''' +import cv2 +import time +import os +from PIL import Image +import shutil # 用于检查磁盘空间 +from cls_inference.cls_inference import yolov11_cls_inference +import numpy as np + +# ================== 配置参数 ================== +url = "rtsp://admin:XJ123456@192.168.1.50:554/streaming/channels/101" +check_interval = 100 # 每隔 N 帧检查一次“盖板对齐”状态 +output_video_dir = os.path.join("camera01_videos") # 视频保存目录 + +# 多帧确认参数 +required_consecutive_frames = 3 # 需要连续 N 帧检测为"盖板不对齐"才开始录制 + +# 摄像头重连参数 +max_retry_seconds = 10 # 最大重试时间为10秒 +retry_interval_seconds = 1 # 每隔1秒尝试重新连接一次 + +# 分类模型参数 +cls_model_path = "/userdata/data_collection/cls_inference/yolov11_cls.rknn" # 分类模型路径 +target_size = (640, 640) + +# 视频录制参数 +video_fps = 25 # 视频帧率 +video_codec = cv2.VideoWriter_fourcc(*'mp4v') # MP4 编码 +video_duration = 20 # 每次检测到符合条件后录制的秒数 +frame_per_video = video_fps * video_duration # 每个视频的总帧数 + +def rotate_frame_180(pil_image): + """ + 统一处理:将PIL图像旋转180度,并转为OpenCV录制所需的BGR格式 + input: pil_image (PIL.Image对象,RGB格式) + output: rotated_bgr (numpy数组,BGR格式,旋转180度后) + """ + # 1. 旋转180度(expand=True避免图像被裁剪) + rotated_pil = pil_image.rotate(180, expand=True) + # 2. 转为numpy数组(此时是RGB格式) + rotated_rgb = np.array(rotated_pil) + # 3. 转为BGR格式(OpenCV VideoWriter要求BGR输入) + rotated_bgr = cv2.cvtColor(rotated_rgb, cv2.COLOR_RGB2BGR) + return rotated_bgr + +if __name__ == '__main__': + # 视频录制状态变量 + is_recording = False # 是否正在录制视频 + video_writer = None # 视频写入对象 + recorded_frames = 0 # 当前视频已录制帧数 + + # 创建视频目录 + os.makedirs(output_video_dir, exist_ok=True) + print(f"✅ 已创建/确认视频目录: {output_video_dir}") + + while True: # 外层循环:处理摄像头断连重连 + cap = cv2.VideoCapture(url) + + # 设置RTSP流缓存大小 + cap.set(cv2.CAP_PROP_BUFFERSIZE, 5) # 5MB缓存 + # 强制指定解码方式 + cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264')) + + start_time = time.time() + + # 摄像头连接重试逻辑 + while not cap.isOpened(): + if time.time() - start_time >= max_retry_seconds: + print(f"已尝试重新连接 {max_retry_seconds} 秒,仍无法获取视频流,程序退出。") + # 若退出时正在录制,先释放视频写入对象 + if video_writer is not None: + video_writer.release() + video_writer = None + cap.release() + exit() + + print("无法打开摄像头,正在尝试重新连接...") + time.sleep(retry_interval_seconds) + cap.release() + cap = cv2.VideoCapture(url) + + # 获取摄像头实际参数 + frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + actual_fps = cap.get(cv2.CAP_PROP_FPS) + # 修正帧率 + if actual_fps > 0: + video_fps = int(actual_fps) + frame_per_video = video_fps * video_duration + print(f"✅ 开始读取视频流(分辨率:{frame_width}x{frame_height},帧率:{video_fps})...") + + frame_count = 0 # 总帧计数器 + consecutive_noready_count = 0 # 连续"盖板不对齐"计数 + confirmed_frames = [] # 存储确认的"盖板不对齐"帧(用于录制开始前的帧) + + try: + while True: + ret, frame = cap.read() + if not ret: + print("读取帧失败,可能是流中断或摄像头断开") + # 若断连时正在录制,先释放视频写入对象 + if video_writer is not None: + video_writer.release() + video_writer = None + is_recording = False + recorded_frames = 0 + print("⚠️ 流中断时正在录制,已保存当前视频。") + # 重置检测状态 + consecutive_noready_count = 0 + confirmed_frames = [] + cap.release() + break # 跳出内层循环,重新连接摄像头 + + frame_count += 1 + + # 转换为RGB并创建PIL图像(用于后续处理) + rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + pil_image = Image.fromarray(rgb_frame) + # 预先旋转180度(为后续可能的录制做准备) + rotated_bgr = rotate_frame_180(pil_image) + + # 按间隔检测“盖板对齐”状态(仅未录制时执行) + if not is_recording and frame_count % check_interval == 0: + try: + # 调用模型判断状态(使用旋转后的图像) + class_name = yolov11_cls_inference(cls_model_path, rotated_bgr, target_size) + + # 校验类别有效性 + if not isinstance(class_name, str) or class_name not in ["cover_ready", "cover_noready"]: + print(f"跳过检测:模型返回无效类别({class_name}) (总帧:{frame_count})") + consecutive_noready_count = 0 + confirmed_frames = [] + continue + + # 检测到"盖板不对齐" + if class_name == "cover_noready": + consecutive_noready_count += 1 + # 保存当前确认的帧 + confirmed_frames.append(rotated_bgr) + print(f"检测到'盖板不对齐',连续计数: {consecutive_noready_count}/{required_consecutive_frames} (总帧:{frame_count})") + + # 达到所需连续帧数,开始录制 + if consecutive_noready_count >= required_consecutive_frames: + # 检查磁盘空间 + total, used, free = shutil.disk_usage(output_video_dir) + if free < 1024 * 1024 * 1024 * 5: + print(f"❌ 磁盘空间严重不足(仅剩 {free / (1024 ** 3):.2f} GB),停止录制。") + consecutive_noready_count = 0 + confirmed_frames = [] + raise SystemExit(1) + + # 生成视频文件名 + timestamp = time.strftime("%Y%m%d_%H%M%S") + video_filename = f"video_{timestamp}.mp4" + video_filepath = os.path.join(output_video_dir, video_filename) + + # 初始化视频写入器 + video_writer = cv2.VideoWriter( + video_filepath, video_codec, video_fps, (frame_width, frame_height) + ) + if not video_writer.isOpened(): + print(f"⚠️ 视频写入器初始化失败,无法录制视频(路径:{video_filepath})") + consecutive_noready_count = 0 + confirmed_frames = [] + continue + + # 写入之前确认的帧 + for confirmed_frame in confirmed_frames: + video_writer.write(confirmed_frame) + + is_recording = True + # 已录制帧数为确认帧数量 + recorded_frames = len(confirmed_frames) + print(f"📹 开始录制视频(已连续{required_consecutive_frames}帧'盖板不对齐',保存路径:{video_filepath})") + print(f"已写入 {recorded_frames} 帧确认帧") + + # 重置计数器,为下一次检测做准备 + consecutive_noready_count = 0 + confirmed_frames = [] + + # 检测到"盖板对齐",重置计数器 + else: + print(f"盖板状态:{class_name} (总帧:{frame_count})") + if consecutive_noready_count > 0: + print(f"检测到'盖板对齐',重置连续计数 (当前计数: {consecutive_noready_count}/{required_consecutive_frames})") + consecutive_noready_count = 0 + confirmed_frames = [] + + except Exception as e: + print(f"分类模型调用异常: {e}(总帧:{frame_count})") + consecutive_noready_count = 0 + confirmed_frames = [] + continue + + # 若正在录制,持续写入旋转后的帧 + if is_recording: + video_writer.write(rotated_bgr) + recorded_frames += 1 + + # 检查是否达到录制时长 + if recorded_frames >= frame_per_video: + video_writer.release() + video_writer = None + is_recording = False + recorded_frames = 0 + print(f"✅ 视频录制完成(已达 {video_duration} 秒,共录制 {frame_per_video} 帧)") + + except KeyboardInterrupt: + print("\n用户中断程序") + # 中断时若正在录制,先保存视频 + if video_writer is not None: + video_writer.release() + print("⚠️ 用户中断时正在录制,已保存当前视频。") + break + + finally: + # 释放资源 + cap.release() + cv2.destroyAllWindows() + if video_writer is not None: + video_writer.release() + is_recording = False + recorded_frames = 0 + consecutive_noready_count = 0 + confirmed_frames = [] + print(f"视频流已关闭,共处理总帧:{frame_count}") + + print("程序结束") +