Files
zjsh_video_collection/video_new.py

241 lines
11 KiB
Python
Raw Permalink Normal View History

2025-09-26 20:41:44 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
# @Time : 2025/9/11 16:48
# @Author : reenrr
# @File : video_new.py
'''
import cv2
import time
import os
from PIL import Image
import shutil # 用于检查磁盘空间
from cls_inference.cls_inference import yolov11_cls_inference
import numpy as np
# ================== 配置参数 ==================
url = "rtsp://admin:XJ123456@192.168.1.50:554/streaming/channels/101"
check_interval = 100 # 每隔 N 帧检查一次“盖板对齐”状态
output_video_dir = os.path.join("camera01_videos") # 视频保存目录
# 多帧确认参数
required_consecutive_frames = 3 # 需要连续 N 帧检测为"盖板对齐"才开始录制
# 摄像头重连参数
max_retry_seconds = 10 # 最大重试时间为10秒
retry_interval_seconds = 1 # 每隔1秒尝试重新连接一次
# 分类模型参数
cls_model_path = "/userdata/data_collection/cls_inference/yolov11_cls.rknn" # 分类模型路径
target_size = (640, 640)
# 视频录制参数
video_fps = 25 # 视频帧率
video_codec = cv2.VideoWriter_fourcc(*'mp4v') # MP4 编码
video_duration = 60 # 每次检测到符合条件后录制的秒数
frame_per_video = video_fps * video_duration # 每个视频的总帧数
def rotate_frame_180(pil_image):
"""
统一处理将PIL图像旋转180度并转为OpenCV录制所需的BGR格式
input: pil_image (PIL.Image对象RGB格式)
output: rotated_bgr (numpy数组BGR格式旋转180度后)
"""
# 1. 旋转180度expand=True避免图像被裁剪
rotated_pil = pil_image.rotate(180, expand=True)
# 2. 转为numpy数组此时是RGB格式
rotated_rgb = np.array(rotated_pil)
# 3. 转为BGR格式OpenCV VideoWriter要求BGR输入
rotated_bgr = cv2.cvtColor(rotated_rgb, cv2.COLOR_RGB2BGR)
return rotated_bgr
if __name__ == '__main__':
# 视频录制状态变量
is_recording = False # 是否正在录制视频
video_writer = None # 视频写入对象
recorded_frames = 0 # 当前视频已录制帧数
# 创建视频目录
os.makedirs(output_video_dir, exist_ok=True)
print(f"✅ 已创建/确认视频目录: {output_video_dir}")
while True: # 外层循环:处理摄像头断连重连
cap = cv2.VideoCapture(url)
# 设置RTSP流缓存大小
cap.set(cv2.CAP_PROP_BUFFERSIZE, 5) # 5MB缓存
# 强制指定解码方式
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264'))
start_time = time.time()
# 摄像头连接重试逻辑
while not cap.isOpened():
if time.time() - start_time >= max_retry_seconds:
print(f"已尝试重新连接 {max_retry_seconds} 秒,仍无法获取视频流,程序退出。")
# 若退出时正在录制,先释放视频写入对象
if video_writer is not None:
video_writer.release()
video_writer = None
cap.release()
exit()
print("无法打开摄像头,正在尝试重新连接...")
time.sleep(retry_interval_seconds)
cap.release()
cap = cv2.VideoCapture(url)
# 获取摄像头实际参数
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
actual_fps = cap.get(cv2.CAP_PROP_FPS)
# 修正帧率
if actual_fps > 0:
video_fps = int(actual_fps)
frame_per_video = video_fps * video_duration
print(f"✅ 开始读取视频流(分辨率:{frame_width}x{frame_height},帧率:{video_fps}...")
frame_count = 0 # 总帧计数器
consecutive_ready_count = 0 # 连续"盖板对齐"计数
confirmed_frames = [] # 存储确认的"盖板对齐"帧(用于录制开始前的帧)
try:
while True:
ret, frame = cap.read()
if not ret:
print("读取帧失败,可能是流中断或摄像头断开")
# 若断连时正在录制,先释放视频写入对象
if video_writer is not None:
video_writer.release()
video_writer = None
is_recording = False
recorded_frames = 0
print("⚠️ 流中断时正在录制,已保存当前视频。")
# 重置检测状态
consecutive_ready_count = 0
confirmed_frames = []
cap.release()
break # 跳出内层循环,重新连接摄像头
frame_count += 1
# 转换为RGB并创建PIL图像用于后续处理
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(rgb_frame)
# 预先旋转180度为后续可能的录制做准备
rotated_bgr = rotate_frame_180(pil_image)
# 按间隔检测“盖板对齐”状态(仅未录制时执行)
if not is_recording and frame_count % check_interval == 0:
try:
# 调用模型判断状态(使用旋转后的图像)
class_name = yolov11_cls_inference(cls_model_path, rotated_bgr, target_size)
# 校验类别有效性
if not isinstance(class_name, str) or class_name not in ["cover_ready", "cover_noready"]:
print(f"跳过检测:模型返回无效类别({class_name} (总帧:{frame_count})")
consecutive_ready_count = 0
confirmed_frames = []
continue
# 检测到"盖板对齐"
if class_name == "cover_ready":
consecutive_ready_count += 1
# 保存当前确认的帧
confirmed_frames.append(rotated_bgr)
print(
f"检测到'盖板对齐',连续计数: {consecutive_ready_count}/{required_consecutive_frames} (总帧:{frame_count})")
# 达到所需连续帧数,开始录制
if consecutive_ready_count >= required_consecutive_frames:
# 检查磁盘空间
total, used, free = shutil.disk_usage(output_video_dir)
if free < 1024 * 1024 * 1024 * 5:
print(f"❌ 磁盘空间严重不足(仅剩 {free / (1024 ** 3):.2f} GB停止录制。")
consecutive_ready_count = 0
confirmed_frames = []
raise SystemExit(1)
# 生成视频文件名
timestamp = time.strftime("%Y%m%d_%H%M%S")
video_filename = f"video_{timestamp}.mp4"
video_filepath = os.path.join(output_video_dir, video_filename)
# 初始化视频写入器
video_writer = cv2.VideoWriter(
video_filepath, video_codec, video_fps, (frame_width, frame_height)
)
if not video_writer.isOpened():
print(f"⚠️ 视频写入器初始化失败,无法录制视频(路径:{video_filepath}")
consecutive_ready_count = 0
confirmed_frames = []
continue
# 写入之前确认的帧
for confirmed_frame in confirmed_frames:
video_writer.write(confirmed_frame)
is_recording = True
# 已录制帧数为确认帧数量
recorded_frames = len(confirmed_frames)
print(
f"📹 开始录制视频(已连续{required_consecutive_frames}'盖板对齐',保存路径:{video_filepath}")
print(f"已写入 {recorded_frames} 帧确认帧")
# 重置计数器,为下一次检测做准备
consecutive_ready_count = 0
confirmed_frames = []
# 检测到"盖板不对齐",重置计数器
else:
print(f"盖板状态:{class_name} (总帧:{frame_count})")
if consecutive_ready_count > 0:
print(
f"检测到'盖板不对齐',重置连续计数 (当前计数: {consecutive_ready_count}/{required_consecutive_frames})")
consecutive_ready_count = 0
confirmed_frames = []
except Exception as e:
print(f"分类模型调用异常: {e}(总帧:{frame_count}")
consecutive_ready_count = 0
confirmed_frames = []
continue
# 若正在录制,持续写入旋转后的帧
if is_recording:
video_writer.write(rotated_bgr)
recorded_frames += 1
# 检查是否达到录制时长
if recorded_frames >= frame_per_video:
video_writer.release()
video_writer = None
is_recording = False
recorded_frames = 0
print(f"✅ 视频录制完成(已达 {video_duration} 秒,共录制 {frame_per_video} 帧)")
except KeyboardInterrupt:
print("\n用户中断程序")
# 中断时若正在录制,先保存视频
if video_writer is not None:
video_writer.release()
print("⚠️ 用户中断时正在录制,已保存当前视频。")
break
finally:
# 释放资源
cap.release()
cv2.destroyAllWindows()
if video_writer is not None:
video_writer.release()
is_recording = False
recorded_frames = 0
consecutive_ready_count = 0
confirmed_frames = []
print(f"视频流已关闭,共处理总帧:{frame_count}")
print("程序结束")