import cv2 import time import os import numpy as np from PIL import Image from skimage.metrics import structural_similarity as ssim # ================== 配置参数 ================== url = "rtsp://admin:XJ123456@192.168.1.51:554/streaming/channels/101" save_interval = 15 # 每隔 N 帧处理一次(可调) SSIM_THRESHOLD = 0.9 # SSIM 相似度阈值,>0.9 认为太像(重复,测试肉眼看不出区别的差不多在0.92以上,肉眼看的出区别的在0.7-0.85) output_dir = os.path.join("userdata", "image") # 固定路径:userdata/image 可修改 # 灰色判断参数/测试过异常的完全可删除,且不会影响正常图片 GRAY_LOWER = 70 GRAY_UPPER = 230 GRAY_RATIO_THRESHOLD = 0.7 # 创建输出目录 if not os.path.exists(output_dir): os.makedirs(output_dir) print(f"已创建目录: {output_dir}") def is_large_gray(image, gray_lower=GRAY_LOWER, gray_upper=GRAY_UPPER, ratio_thresh=GRAY_RATIO_THRESHOLD): """ 判断图片是否大面积为灰色(R/G/B 都在 [gray_lower, gray_upper] 区间) """ img_array = np.array(image) if len(img_array.shape) != 3 or img_array.shape[2] != 3: return True # 非三通道图视为无效/灰色 h, w, _ = img_array.shape total = h * w gray_mask = ( (img_array[:, :, 0] >= gray_lower) & (img_array[:, :, 0] <= gray_upper) & (img_array[:, :, 1] >= gray_lower) & (img_array[:, :, 1] <= gray_upper) & (img_array[:, :, 2] >= gray_lower) & (img_array[:, :, 2] <= gray_upper) ) gray_pixels = np.sum(gray_mask) gray_ratio = gray_pixels / total return gray_ratio > ratio_thresh # ================ 主程序开始 ================ cap = cv2.VideoCapture(url) if not cap.isOpened(): print("❌ 无法打开摄像头") exit() print("✅ 开始读取视频流...") frame_count = 0 last_gray = None # 用于 SSIM 去重 try: while True: ret, frame = cap.read() if not ret: print("读取帧失败,可能是流中断或摄像头断开") break frame_count += 1 # 仅在指定间隔处理保存逻辑 if frame_count % save_interval != 0: cv2.imshow('Camera Stream (Live)', frame) if cv2.waitKey(1) == ord('q'): break continue print(f"处理帧 {frame_count}") # 转为 PIL 图像(用于后续判断和旋转) rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) pil_image = Image.fromarray(rgb_frame) # STEP 1: 判断是否为大面积灰色(优先级最高) if is_large_gray(pil_image): print(f"跳过:大面积灰色图像 (frame_{frame_count})") cv2.imshow('Camera Stream (Live)', frame) if cv2.waitKey(1) == ord('q'): break continue # STEP 2: 判断是否为重复帧(基于 SSIM) gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) if last_gray is not None: try: similarity = ssim(gray, last_gray) if similarity > SSIM_THRESHOLD: print(f"跳过:与上一帧太相似 (SSIM={similarity:.3f})") cv2.imshow('Camera Stream (Live)', frame) if cv2.waitKey(1) == ord('q'): break continue except Exception as e: print(f"SSIM 计算异常: {e}") # 更新 last_gray 用于下一帧比较 last_gray = gray.copy() # STEP 3: 旋转 180 度 rotated_pil = pil_image.rotate(180, expand=False) # 生成文件名(时间戳 + 毫秒防重) timestamp = time.strftime("%Y%m%d_%H%M%S") ms = int((time.time() % 1) * 1000) filename = f"frame_{timestamp}_{ms:03d}.jpg" filepath = os.path.join(output_dir, filename) # 保存图像 try: rotated_pil.save(filepath, format='JPEG', quality=95) print(f"已保存: {filepath}") except Exception as e: print(f"保存失败 {filename}: {e}") # 显示画面 cv2.imshow('Camera Stream (Live)', frame) if cv2.waitKey(1) == ord('q'): break except KeyboardInterrupt: print("\n用户中断") finally: cap.release() cv2.destroyAllWindows() print(f"视频流已关闭,共处理 {frame_count} 帧。")