Files
zjsh_video_collection/video_new_test.py
2025-09-26 20:41:44 +08:00

236 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
# @Time : 2025/9/11 16:48
# @Author : reenrr
# @File : video_new.py
'''
import cv2
import time
import os
from PIL import Image
import shutil # 用于检查磁盘空间
from cls_inference.cls_inference import yolov11_cls_inference
import numpy as np
# ================== 配置参数 ==================
url = "rtsp://admin:XJ123456@192.168.1.50:554/streaming/channels/101"
check_interval = 100 # 每隔 N 帧检查一次“盖板对齐”状态
output_video_dir = os.path.join("camera01_videos") # 视频保存目录
# 多帧确认参数
required_consecutive_frames = 3 # 需要连续 N 帧检测为"盖板不对齐"才开始录制
# 摄像头重连参数
max_retry_seconds = 10 # 最大重试时间为10秒
retry_interval_seconds = 1 # 每隔1秒尝试重新连接一次
# 分类模型参数
cls_model_path = "/userdata/data_collection/cls_inference/yolov11_cls.rknn" # 分类模型路径
target_size = (640, 640)
# 视频录制参数
video_fps = 25 # 视频帧率
video_codec = cv2.VideoWriter_fourcc(*'mp4v') # MP4 编码
video_duration = 20 # 每次检测到符合条件后录制的秒数
frame_per_video = video_fps * video_duration # 每个视频的总帧数
def rotate_frame_180(pil_image):
"""
统一处理将PIL图像旋转180度并转为OpenCV录制所需的BGR格式
input: pil_image (PIL.Image对象RGB格式)
output: rotated_bgr (numpy数组BGR格式旋转180度后)
"""
# 1. 旋转180度expand=True避免图像被裁剪
rotated_pil = pil_image.rotate(180, expand=True)
# 2. 转为numpy数组此时是RGB格式
rotated_rgb = np.array(rotated_pil)
# 3. 转为BGR格式OpenCV VideoWriter要求BGR输入
rotated_bgr = cv2.cvtColor(rotated_rgb, cv2.COLOR_RGB2BGR)
return rotated_bgr
if __name__ == '__main__':
# 视频录制状态变量
is_recording = False # 是否正在录制视频
video_writer = None # 视频写入对象
recorded_frames = 0 # 当前视频已录制帧数
# 创建视频目录
os.makedirs(output_video_dir, exist_ok=True)
print(f"✅ 已创建/确认视频目录: {output_video_dir}")
while True: # 外层循环:处理摄像头断连重连
cap = cv2.VideoCapture(url)
# 设置RTSP流缓存大小
cap.set(cv2.CAP_PROP_BUFFERSIZE, 5) # 5MB缓存
# 强制指定解码方式
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264'))
start_time = time.time()
# 摄像头连接重试逻辑
while not cap.isOpened():
if time.time() - start_time >= max_retry_seconds:
print(f"已尝试重新连接 {max_retry_seconds} 秒,仍无法获取视频流,程序退出。")
# 若退出时正在录制,先释放视频写入对象
if video_writer is not None:
video_writer.release()
video_writer = None
cap.release()
exit()
print("无法打开摄像头,正在尝试重新连接...")
time.sleep(retry_interval_seconds)
cap.release()
cap = cv2.VideoCapture(url)
# 获取摄像头实际参数
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
actual_fps = cap.get(cv2.CAP_PROP_FPS)
# 修正帧率
if actual_fps > 0:
video_fps = int(actual_fps)
frame_per_video = video_fps * video_duration
print(f"✅ 开始读取视频流(分辨率:{frame_width}x{frame_height},帧率:{video_fps}...")
frame_count = 0 # 总帧计数器
consecutive_noready_count = 0 # 连续"盖板不对齐"计数
confirmed_frames = [] # 存储确认的"盖板不对齐"帧(用于录制开始前的帧)
try:
while True:
ret, frame = cap.read()
if not ret:
print("读取帧失败,可能是流中断或摄像头断开")
# 若断连时正在录制,先释放视频写入对象
if video_writer is not None:
video_writer.release()
video_writer = None
is_recording = False
recorded_frames = 0
print("⚠️ 流中断时正在录制,已保存当前视频。")
# 重置检测状态
consecutive_noready_count = 0
confirmed_frames = []
cap.release()
break # 跳出内层循环,重新连接摄像头
frame_count += 1
# 转换为RGB并创建PIL图像用于后续处理
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(rgb_frame)
# 预先旋转180度为后续可能的录制做准备
rotated_bgr = rotate_frame_180(pil_image)
# 按间隔检测“盖板对齐”状态(仅未录制时执行)
if not is_recording and frame_count % check_interval == 0:
try:
# 调用模型判断状态(使用旋转后的图像)
class_name = yolov11_cls_inference(cls_model_path, rotated_bgr, target_size)
# 校验类别有效性
if not isinstance(class_name, str) or class_name not in ["cover_ready", "cover_noready"]:
print(f"跳过检测:模型返回无效类别({class_name} (总帧:{frame_count})")
consecutive_noready_count = 0
confirmed_frames = []
continue
# 检测到"盖板不对齐"
if class_name == "cover_noready":
consecutive_noready_count += 1
# 保存当前确认的帧
confirmed_frames.append(rotated_bgr)
print(f"检测到'盖板不对齐',连续计数: {consecutive_noready_count}/{required_consecutive_frames} (总帧:{frame_count})")
# 达到所需连续帧数,开始录制
if consecutive_noready_count >= required_consecutive_frames:
# 检查磁盘空间
total, used, free = shutil.disk_usage(output_video_dir)
if free < 1024 * 1024 * 1024 * 5:
print(f"❌ 磁盘空间严重不足(仅剩 {free / (1024 ** 3):.2f} GB停止录制。")
consecutive_noready_count = 0
confirmed_frames = []
raise SystemExit(1)
# 生成视频文件名
timestamp = time.strftime("%Y%m%d_%H%M%S")
video_filename = f"video_{timestamp}.mp4"
video_filepath = os.path.join(output_video_dir, video_filename)
# 初始化视频写入器
video_writer = cv2.VideoWriter(
video_filepath, video_codec, video_fps, (frame_width, frame_height)
)
if not video_writer.isOpened():
print(f"⚠️ 视频写入器初始化失败,无法录制视频(路径:{video_filepath}")
consecutive_noready_count = 0
confirmed_frames = []
continue
# 写入之前确认的帧
for confirmed_frame in confirmed_frames:
video_writer.write(confirmed_frame)
is_recording = True
# 已录制帧数为确认帧数量
recorded_frames = len(confirmed_frames)
print(f"📹 开始录制视频(已连续{required_consecutive_frames}'盖板不对齐',保存路径:{video_filepath}")
print(f"已写入 {recorded_frames} 帧确认帧")
# 重置计数器,为下一次检测做准备
consecutive_noready_count = 0
confirmed_frames = []
# 检测到"盖板对齐",重置计数器
else:
print(f"盖板状态:{class_name} (总帧:{frame_count})")
if consecutive_noready_count > 0:
print(f"检测到'盖板对齐',重置连续计数 (当前计数: {consecutive_noready_count}/{required_consecutive_frames})")
consecutive_noready_count = 0
confirmed_frames = []
except Exception as e:
print(f"分类模型调用异常: {e}(总帧:{frame_count}")
consecutive_noready_count = 0
confirmed_frames = []
continue
# 若正在录制,持续写入旋转后的帧
if is_recording:
video_writer.write(rotated_bgr)
recorded_frames += 1
# 检查是否达到录制时长
if recorded_frames >= frame_per_video:
video_writer.release()
video_writer = None
is_recording = False
recorded_frames = 0
print(f"✅ 视频录制完成(已达 {video_duration} 秒,共录制 {frame_per_video} 帧)")
except KeyboardInterrupt:
print("\n用户中断程序")
# 中断时若正在录制,先保存视频
if video_writer is not None:
video_writer.release()
print("⚠️ 用户中断时正在录制,已保存当前视频。")
break
finally:
# 释放资源
cap.release()
cv2.destroyAllWindows()
if video_writer is not None:
video_writer.release()
is_recording = False
recorded_frames = 0
consecutive_noready_count = 0
confirmed_frames = []
print(f"视频流已关闭,共处理总帧:{frame_count}")
print("程序结束")