Files
zjsh_video_collection/test.py
2025-09-26 20:41:44 +08:00

306 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
# @Time : 2025/9/11 16:48
# @Author : reenrr
# @File : test_01.py
'''
import cv2
import time
import os
from PIL import Image
import shutil
from cls_inference.cls_inference import yolov11_cls_inference
import numpy as np
# ================== 配置参数(严格按“定义在前,使用在后”排序)==================
# 1. 检测核心参数(先定义,后续打印和逻辑会用到)
detection_interval = 10 # 每隔10秒检查一次
detection_frame_count = 3 # 每次检测抽取3帧
required_all_noready = True # 要求所有帧都为“盖板不对齐”
# 2. 视频存储与摄像头基础参数
url = "rtsp://admin:XJ123456@192.168.1.50:554/streaming/channels/101"
output_video_dir = os.path.join("camera01_videos") # 视频保存目录
# 3. 摄像头重连参数
max_retry_seconds = 10
retry_interval_seconds = 1
# 4. 分类模型参数
cls_model_path = "/userdata/data_collection/cls_inference/yolov11_cls.rknn"
target_size = (640, 640)
# 5. 视频录制参数
video_fps = 25
video_codec = cv2.VideoWriter_fourcc(*'mp4v')
single_recording_duration = 10 # 每次录制10秒
total_target_duration = 60 # 累计目标60秒
single_recording_frames = video_fps * single_recording_duration
total_target_frames = video_fps * total_target_duration
def rotate_frame_180(pil_image):
"""将PIL图像旋转180度并转为OpenCV的BGR格式"""
rotated_pil = pil_image.rotate(180, expand=True)
rotated_rgb = np.array(rotated_pil)
rotated_bgr = cv2.cvtColor(rotated_rgb, cv2.COLOR_RGB2BGR)
return rotated_bgr
if __name__ == '__main__':
# 全局状态变量(断连重连后保留)
total_recorded_frames = 0 # 累计录制总帧数
current_segment = 0 # 当前视频分段编号
is_recording = False # 是否正在录制
current_video_filepath = None# 当前录制视频路径
# 单次连接内的临时状态变量
video_writer = None # 视频写入对象
recorded_frames = 0 # 当前分段已录制帧数
frame_count = 0 # 摄像头总读取帧数
confirmed_frames = [] # 检测通过的确认帧(用于录制起始)
last_detection_time = time.time() # 上次检测时间
detection_window_frames = [] # 检测窗口帧缓存
# 创建视频目录(确保目录存在)
os.makedirs(output_video_dir, exist_ok=True)
# 打印目标信息此时detection_frame_count已提前定义
print(f"✅ 已创建/确认视频目录: {output_video_dir}")
print(f"🎯 目标:累计录制{total_target_duration}秒,每次录制{single_recording_duration}秒,需{detection_frame_count}帧全为盖板不对齐")
# 外层循环:处理摄像头断连与重连
while True:
# 初始化摄像头连接
cap = cv2.VideoCapture(url)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 5) # 设置RTSP缓存为5MB
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264')) # 强制H264解码
# 摄像头连接重试逻辑
start_retry_time = time.time()
while not cap.isOpened():
# 超过最大重试时间则退出程序
if time.time() - start_retry_time >= max_retry_seconds:
print(f"\n❌ 已尝试重新连接 {max_retry_seconds} 秒,仍无法获取视频流,程序退出。")
# 退出前释放未关闭的视频写入器
if video_writer is not None:
video_writer.release()
print(f"📊 程序退出时累计录制:{total_recorded_frames/video_fps:.1f}")
exit()
# 每隔1秒重试一次
print(f"🔄 无法打开摄像头,正在尝试重新连接...(已重试{int(time.time()-start_retry_time)}秒)")
time.sleep(retry_interval_seconds)
cap.release() # 释放旧连接
cap = cv2.VideoCapture(url) # 重新创建连接
# 获取摄像头实际参数(可能与配置值不同)
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
actual_fps = cap.get(cv2.CAP_PROP_FPS)
# 修正帧率(以摄像头实际帧率为准)
if actual_fps > 0:
video_fps = int(actual_fps)
single_recording_frames = video_fps * single_recording_duration
total_target_frames = video_fps * total_target_duration
# 打印重连成功信息
print(f"\n✅ 摄像头重连成功(分辨率:{frame_width}x{frame_height},实际帧率:{video_fps}")
print(f"📊 当前累计录制:{total_recorded_frames / video_fps:.1f}/{total_target_duration}")
# 重连后恢复录制状态(如果断连前正在录制)
if is_recording and current_video_filepath:
print(f"🔄 恢复录制状态,继续录制视频:{current_video_filepath}")
# 重新初始化视频写入器(确保参数与摄像头匹配)
video_writer = cv2.VideoWriter(
current_video_filepath, video_codec, video_fps, (frame_width, frame_height)
)
# 恢复失败则重置录制状态
if not video_writer.isOpened():
print(f"⚠️ 视频写入器重新初始化失败,无法继续录制")
is_recording = False
video_writer = None
recorded_frames = 0
# 内层循环:读取摄像头帧并处理(检测/录制)
try:
while True:
# 读取一帧图像
ret, frame = cap.read()
if not ret:
print(f"\n⚠️ 读取帧失败,可能是流中断或摄像头断开")
# 断连时保存当前录制进度(不释放全局状态)
if video_writer is not None:
video_writer.release()
video_writer = None
print(f"⏸️ 流中断,暂停录制(已保存当前进度)")
# 重置单次连接的临时状态(全局状态保留)
frame_count = 0
detection_window_frames = []
cap.release() # 释放当前摄像头连接
break # 跳出内层循环,进入重连流程
# 累计总帧数
frame_count += 1
# 预处理帧转为RGB→PIL→旋转180度→转为BGR适配录制和模型
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(rgb_frame)
rotated_bgr = rotate_frame_180(pil_image)
# -------------------------- 1. 未录制时:执行检测逻辑 --------------------------
if not is_recording:
# 将帧加入检测窗口缓存用于10秒后的检测
detection_window_frames.append(rotated_bgr)
# 限制缓存大小避免内存占用过高最多保留2倍检测帧数
if len(detection_window_frames) > detection_frame_count * 2:
detection_window_frames = detection_window_frames[-detection_frame_count * 2:]
# 触发检测的条件:
# 1. 距离上次检测超过10秒2. 检测窗口有足够帧3. 累计录制未完成
if (time.time() - last_detection_time) >= detection_interval and \
len(detection_window_frames) >= detection_frame_count and \
total_recorded_frames < total_target_frames:
try:
print(f"\n==== 开始10秒间隔检测总帧{frame_count},累计已录:{total_recorded_frames/video_fps:.1f}秒) ====")
# 从检测窗口中均匀抽取3帧避免连续帧重复
sample_step = max(1, len(detection_window_frames) // detection_frame_count)
sample_frames = detection_window_frames[::sample_step][:detection_frame_count]
print(f"📋 检测窗口共{len(detection_window_frames)}帧,均匀抽取{len(sample_frames)}帧进行判断")
# 统计“盖板不对齐”的帧数
noready_frame_count = 0
valid_detection = True # 标记检测是否有效(无无效帧)
for idx, sample_frame in enumerate(sample_frames):
# 调用模型获取分类结果
class_name = yolov11_cls_inference(cls_model_path, sample_frame, target_size)
# 校验分类结果有效性
if not isinstance(class_name, str) or class_name not in ["cover_ready", "cover_noready"]:
print(f"❌ 抽取帧{idx+1}:分类结果无效({class_name}),本次检测失败")
valid_detection = False
break # 有无效帧则终止本次检测
# 统计不对齐帧
if class_name == "cover_noready":
noready_frame_count += 1
print(f"✅ 抽取帧{idx+1}:分类结果={class_name}(符合条件)")
else:
print(f"❌ 抽取帧{idx+1}:分类结果={class_name}(不符合条件)")
# 检测通过条件:所有帧有效 + 3帧全为不对齐
if valid_detection and noready_frame_count == detection_frame_count:
print(f"\n✅ 本次检测通过({noready_frame_count}/{detection_frame_count}帧均为盖板不对齐)")
# 保存检测通过的帧(用于录制起始,避免丢失检测阶段的画面)
confirmed_frames.extend(sample_frames)
# 检查磁盘空间(剩余<5GB则停止
total_disk, used_disk, free_disk = shutil.disk_usage(output_video_dir)
if free_disk < 1024 * 1024 * 1024 * 5:
print(f"❌ 磁盘空间严重不足(仅剩 {free_disk / (1024 ** 3):.2f} GB停止录制并退出。")
raise SystemExit(1)
# 生成当前分段的视频文件名(包含时间戳和分段号)
current_segment = total_recorded_frames // single_recording_frames + 1
timestamp = time.strftime("%Y%m%d_%H%M%S")
current_video_filepath = os.path.join(
output_video_dir, f"video_{timestamp}_part{current_segment}.mp4"
)
# 初始化视频写入器
video_writer = cv2.VideoWriter(
current_video_filepath, video_codec, video_fps, (frame_width, frame_height)
)
if not video_writer.isOpened():
print(f"⚠️ 视频写入器初始化失败(路径:{current_video_filepath}),跳过本次录制")
confirmed_frames = []
continue
# 写入检测阶段的确认帧(录制起始画面)
for frame in confirmed_frames:
video_writer.write(frame)
recorded_frames = len(confirmed_frames)
is_recording = True # 标记为正在录制
# 打印录制开始信息
print(f"\n📹 开始录制第{current_segment}段视频目标10秒")
print(f"📁 视频保存路径:{current_video_filepath}")
print(f"🔢 已写入检测阶段的确认帧:{recorded_frames}")
# 重置检测相关的临时状态
confirmed_frames = []
# 检测未通过未满足“3帧全不对齐”或有无效帧
else:
if valid_detection:
print(f"\n❌ 本次检测未通过(仅{noready_frame_count}/{detection_frame_count}帧为盖板不对齐,需全部符合)")
else:
print(f"\n❌ 本次检测未通过(存在无效分类结果)")
# 重置检测临时状态
confirmed_frames = []
# 更新检测时间,清空检测窗口(准备下一次检测)
last_detection_time = time.time()
detection_window_frames = []
# 捕获模型调用异常(不终止程序,仅重置检测状态)
except Exception as e:
print(f"\n⚠️ 分类模型调用异常: {str(e)}(总帧:{frame_count}")
confirmed_frames = []
last_detection_time = time.time()
detection_window_frames = []
continue
# -------------------------- 2. 正在录制时:执行写入逻辑 --------------------------
if is_recording and video_writer is not None:
# 写入当前帧已预处理为旋转180度的BGR格式
video_writer.write(rotated_bgr)
recorded_frames += 1
# 检查当前分段是否录制完成达到10秒的帧数
if recorded_frames >= single_recording_frames:
# 释放当前视频写入器(完成当前分段)
video_writer.release()
video_writer = None
is_recording = False # 标记为未录制
# 更新累计录制进度
total_recorded_frames += recorded_frames
actual_recording_time = recorded_frames / video_fps
# 打印分段完成信息
print(f"\n✅ 第{current_segment}段视频录制完成")
print(f"🔢 实际录制:{recorded_frames}帧 ≈ {actual_recording_time:.1f}")
print(f"📊 累计录制:{total_recorded_frames/video_fps:.1f}/{total_target_duration}")
# 检查是否达到总目标60秒
if total_recorded_frames >= total_target_frames:
print(f"\n🎉 已完成累计{total_target_duration}秒的录制目标!")
# 重置累计状态如需重复录制保留此逻辑如需单次录制可添加break退出
total_recorded_frames = 0
current_segment = 0
# 重置录制相关的临时状态,准备下一次检测
recorded_frames = 0
last_detection_time = time.time()
detection_window_frames = []
# 捕获用户中断Ctrl+C
except KeyboardInterrupt:
print(f"\n\n👋 用户中断程序")
# 中断前保存当前录制的视频
if video_writer is not None:
video_writer.release()
print(f"⚠️ 已保存当前录制的视频:{current_video_filepath}")
print(f"📊 中断时累计录制:{total_recorded_frames/video_fps:.1f}")
break
# 最终释放资源(无论内层循环因何退出)
finally:
cap.release() # 释放摄像头连接
if video_writer is not None:
video_writer.release() # 释放视频写入器
print(f"\n🔌 视频流已关闭,累计已录:{total_recorded_frames/video_fps:.1f}")
# 程序结束
print("\n📋 程序结束")