Files
zjsh_yolov11/camera/siyi_caiji.py
琉璃月光 df7c0730f5 bushu
2025-10-21 14:11:52 +08:00

137 lines
4.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import cv2
import time
import os
import numpy as np
from PIL import Image
from skimage.metrics import structural_similarity as ssim
import shutil
# ================== 配置参数 ==================
rtsp_url = "rtsp://192.168.144.25:8554/main.264" # RTSP 流地址
capture_interval = 1.0 # 每隔多少秒采集一次(单位:秒)
SSIM_THRESHOLD = 0.9 # SSIM 相似度阈值,>0.9 认为太像
output_dir = os.path.join("userdata", "image") # 图片保存路径
# 灰色判断参数
GRAY_LOWER = 70
GRAY_UPPER = 230
GRAY_RATIO_THRESHOLD = 0.7
# 创建输出目录
if not os.path.exists(output_dir):
os.makedirs(output_dir)
print(f"已创建目录: {output_dir}")
def is_large_gray(image, gray_lower=GRAY_LOWER, gray_upper=GRAY_UPPER, ratio_thresh=GRAY_RATIO_THRESHOLD):
"""
判断图片是否大面积为灰色R/G/B 都在 [gray_lower, gray_upper] 区间)
"""
img_array = np.array(image)
if len(img_array.shape) != 3 or img_array.shape[2] != 3:
return True # 非三通道图视为无效/灰色
h, w, _ = img_array.shape
total = h * w
gray_mask = (
(img_array[:, :, 0] >= gray_lower) & (img_array[:, :, 0] <= gray_upper) &
(img_array[:, :, 1] >= gray_lower) & (img_array[:, :, 1] <= gray_upper) &
(img_array[:, :, 2] >= gray_lower) & (img_array[:, :, 2] <= gray_upper)
)
gray_pixels = np.sum(gray_mask)
gray_ratio = gray_pixels / total
return gray_ratio > ratio_thresh
max_retry_seconds = 10 # 最大重试时间为10秒
retry_interval_seconds = 1 # 每隔1秒尝试重新连接一次
last_gray = None # 用于 SSIM 去重
while True: # 外层循环用于处理重新连接逻辑
cap = cv2.VideoCapture(rtsp_url)
start_time = time.time() # 记录开始尝试连接的时间
while not cap.isOpened():
if time.time() - start_time >= max_retry_seconds:
print(f"已尝试重新连接 {max_retry_seconds} 秒,但仍无法获取视频流。")
exit()
print("无法打开摄像头,正在尝试重新连接...")
time.sleep(retry_interval_seconds)
cap = cv2.VideoCapture(rtsp_url)
print("✅ 开始读取视频流...")
last_capture_time = time.time()
frame_count = 0
try:
while True:
ret, frame = cap.read()
if not ret:
print("读取帧失败,可能是流中断或摄像头断开")
cap.release()
break
current_time = time.time()
if current_time - last_capture_time < capture_interval:
continue
frame_count += 1
last_capture_time = current_time
print(f"处理帧 {frame_count}")
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(rgb_frame)
if is_large_gray(pil_image):
print(f"跳过:大面积灰色图像 (frame_{frame_count})")
continue
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
if last_gray is not None:
similarity = ssim(gray, last_gray)
if similarity > SSIM_THRESHOLD:
print(f"跳过:与上一帧太相似 (SSIM={similarity:.3f})")
continue
last_gray = gray.copy()
timestamp = time.strftime("%Y%m%d_%H%M%S")
ms = int((time.time() % 1) * 1000)
filename = f"frame_{timestamp}_{ms:03d}.png"
filepath = os.path.join(output_dir, filename)
total, used, free = shutil.disk_usage(output_dir)
if free < 1024 * 1024 * 20: # 小于 20MB 就停止
print(f"❌ 磁盘空间严重不足(仅剩 {free / (1024**3):.2f} GB停止运行。")
raise SystemExit(1)
try:
pil_image.save(filepath, format='PNG')
print(f"已保存: {filepath}")
except (OSError, IOError) as e:
error_msg = str(e)
if "No space left on device" in error_msg or "disk full" in error_msg.lower() or "quota" in error_msg.lower():
print(f"磁盘空间不足,无法保存 {filepath}!错误: {e}")
print("停止程序以防止无限错误。")
raise SystemExit(1)
else:
print(f"保存失败 {filename}: {e}(非磁盘空间问题,继续运行)")
cv2.imshow('Camera Stream (Live)', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
raise KeyboardInterrupt
except KeyboardInterrupt:
print("\n用户中断")
break
finally:
cap.release()
cv2.destroyAllWindows()
print(f"视频流已关闭,共处理 {frame_count} 帧。")
print("程序结束")