import cv2 import time import os import numpy as np from PIL import Image from skimage.metrics import structural_similarity as ssim # ================== 配置区域 ================== RTSP_URL = "rtsp://admin:XJ123456@192.168.10.50:554/streaming/channels/101" SAVE_INTERVAL = 10 # 每 20 帧尝试一次处理 DISPLAY_STREAM = False # 是否显示画面 # --- 灰图过滤配置 --- GRAY_LOWER = 70 GRAY_UPPER = 230 GRAY_RATIO_THRESHOLD = 0.7 # 灰色像素占比超过此值视为灰图 # --- SSIM 去重配置 --- SSIM_THRESHOLD = 0.9 # 相似度超过此值视为重复图片 # --- 保存目录 --- OUTPUT_DIR = "camera02_save" os.makedirs(OUTPUT_DIR, exist_ok=True) # ================== 辅助函数 ================== def is_large_gray(image): """ 检测图像是否为大面积灰图 image: PIL Image 对象 (RGB) """ arr = np.array(image) # 检查 R, G, B 三个通道是否都在灰色范围内 gray_mask = ( (arr[:, :, 0] >= GRAY_LOWER) & (arr[:, :, 0] <= GRAY_UPPER) & (arr[:, :, 1] >= GRAY_LOWER) & (arr[:, :, 1] <= GRAY_UPPER) & (arr[:, :, 2] >= GRAY_LOWER) & (arr[:, :, 2] <= GRAY_UPPER) ) # 计算灰色像素占比 return np.mean(gray_mask) > GRAY_RATIO_THRESHOLD def open_camera(url): cap = cv2.VideoCapture(url) if not cap.isOpened(): return None cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) cap.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 5000) cap.set(cv2.CAP_PROP_READ_TIMEOUT_MSEC, 5000) return cap def get_valid_frame(cap): """安全读取一帧""" if cap is None or not cap.isOpened(): return None, False ret, frame = cap.read() if not ret or frame is None or frame.size == 0: return None, False return frame, True # ================== 主程序 ================== print(f"✅ 正在连接摄像头: {RTSP_URL} ...") cap = open_camera(RTSP_URL) if cap is None: print(f"❌ 连接失败!请检查 IP、账号密码或网络。") exit(1) print("📡 摄像头已连接,开始采集...") print(f" - 保存目录: {os.path.abspath(OUTPUT_DIR)}") print(f" - 灰度阈值: [{GRAY_LOWER}, {GRAY_UPPER}], 占比 > {GRAY_RATIO_THRESHOLD}") print(f" - SSIM 去重阈值: > {SSIM_THRESHOLD}") print(" - 按 Ctrl+C 停止\n") frame_count = 0 saved_count = 0 last_gray_frame = None # 用于存储上一帧的灰度图以计算 SSIM try: while True: frame, ret = get_valid_frame(cap) if not ret: print("⚠️ 读取帧失败,尝试重连...") time.sleep(2) cap.release() cap = open_camera(RTSP_URL) if cap is None: print("❌ 重连失败,退出程序。") break last_gray_frame = None # 重连后重置对比帧 continue frame_count += 1 # 间隔采样:不是指定间隔的帧直接跳过(可选显示) if frame_count % SAVE_INTERVAL != 0: if DISPLAY_STREAM: cv2.imshow("Camera Stream", frame) if cv2.waitKey(1) & 0xFF == ord('q'): break continue # --- 步骤 1: 灰图检测 --- try: # 转 RGB 供 PIL 使用 img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) pil_img = Image.fromarray(img_rgb) if is_large_gray(pil_img): print(f"⏭️ 跳过:检测到灰图 (帧 {frame_count})") continue except Exception as e: print(f"⚠️ 灰图检测异常: {e}") continue # --- 步骤 2: SSIM 相似性去重 --- try: gray_curr = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) if last_gray_frame is not None: # 计算结构相似性 # resize 确保尺寸一致(防止分辨率动态变化导致报错) if last_gray_frame.shape != gray_curr.shape: last_gray_frame = cv2.resize(last_gray_frame, (gray_curr.shape[1], gray_curr.shape[0])) sim_score = ssim(last_gray_frame, gray_curr) if sim_score > SSIM_THRESHOLD: print(f"⏭️ 跳过:画面重复 (SSIM={sim_score:.3f}, 帧 {frame_count})") continue # 更新参考帧 last_gray_frame = gray_curr.copy() except Exception as e: print(f"⚠️ SSIM 计算异常: {e}") # 如果计算出错,可以选择跳过或强制保存,这里选择跳过以防崩溃 continue # --- 步骤 3: 保存图片 --- ts = time.strftime("%Y%m%d_%H%M%S") ms = int((time.time() % 1) * 1000) filename = f"img_{ts}_{ms:03d}.png" save_path = os.path.join(OUTPUT_DIR, filename) # 如果需要旋转/翻转,在这里操作 (例如翻转 180 度) # frame = cv2.flip(frame, -1) cv2.imwrite(save_path, frame) saved_count += 1 print(f"✅ [{saved_count}] 已保存有效图片: {filename}") if DISPLAY_STREAM: cv2.putText(frame, f"Saved: {filename}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) cv2.imshow("Camera Stream", frame) if cv2.waitKey(1) & 0xFF == ord('q'): break except KeyboardInterrupt: print("\n🛑 用户手动停止") finally: if cap: cap.release() cv2.destroyAllWindows() print(f"🔚 程序结束。共保存 {saved_count} 张图片。")