Files
fluent_widgets_pyside6/image_new.py

129 lines
4.3 KiB
Python
Raw Normal View History

2025-08-14 18:45:16 +08:00
import cv2
import time
import os
import numpy as np
from PIL import Image
from skimage.metrics import structural_similarity as ssim
# ================== 配置参数 ==================
url = "rtsp://admin:XJ123456@192.168.1.51:554/streaming/channels/101"
save_interval = 15 # 每隔 N 帧处理一次(可调)
SSIM_THRESHOLD = 0.9 # SSIM 相似度阈值,>0.9 认为太像重复测试肉眼看不出区别的差不多在0.92以上肉眼看的出区别的在0.7-0.85
output_dir = os.path.join("userdata", "image") # 固定路径userdata/image 可修改
# 灰色判断参数/测试过异常的完全可删除,且不会影响正常图片
GRAY_LOWER = 70
GRAY_UPPER = 230
GRAY_RATIO_THRESHOLD = 0.7
# 创建输出目录
if not os.path.exists(output_dir):
os.makedirs(output_dir)
print(f"已创建目录: {output_dir}")
def is_large_gray(image, gray_lower=GRAY_LOWER, gray_upper=GRAY_UPPER, ratio_thresh=GRAY_RATIO_THRESHOLD):
"""
判断图片是否大面积为灰色R/G/B 都在 [gray_lower, gray_upper] 区间
"""
img_array = np.array(image)
if len(img_array.shape) != 3 or img_array.shape[2] != 3:
return True # 非三通道图视为无效/灰色
h, w, _ = img_array.shape
total = h * w
gray_mask = (
(img_array[:, :, 0] >= gray_lower) & (img_array[:, :, 0] <= gray_upper) &
(img_array[:, :, 1] >= gray_lower) & (img_array[:, :, 1] <= gray_upper) &
(img_array[:, :, 2] >= gray_lower) & (img_array[:, :, 2] <= gray_upper)
)
gray_pixels = np.sum(gray_mask)
gray_ratio = gray_pixels / total
return gray_ratio > ratio_thresh
# ================ 主程序开始 ================
cap = cv2.VideoCapture(url)
if not cap.isOpened():
print("❌ 无法打开摄像头")
exit()
print("✅ 开始读取视频流...")
frame_count = 0
last_gray = None # 用于 SSIM 去重
try:
while True:
ret, frame = cap.read()
if not ret:
print("读取帧失败,可能是流中断或摄像头断开")
break
frame_count += 1
# 仅在指定间隔处理保存逻辑
if frame_count % save_interval != 0:
cv2.imshow('Camera Stream (Live)', frame)
if cv2.waitKey(1) == ord('q'):
break
continue
print(f"处理帧 {frame_count}")
# 转为 PIL 图像(用于后续判断和旋转)
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(rgb_frame)
# STEP 1: 判断是否为大面积灰色(优先级最高)
if is_large_gray(pil_image):
print(f"跳过:大面积灰色图像 (frame_{frame_count})")
cv2.imshow('Camera Stream (Live)', frame)
if cv2.waitKey(1) == ord('q'):
break
continue
# STEP 2: 判断是否为重复帧(基于 SSIM
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
if last_gray is not None:
try:
similarity = ssim(gray, last_gray)
if similarity > SSIM_THRESHOLD:
print(f"跳过:与上一帧太相似 (SSIM={similarity:.3f})")
cv2.imshow('Camera Stream (Live)', frame)
if cv2.waitKey(1) == ord('q'):
break
continue
except Exception as e:
print(f"SSIM 计算异常: {e}")
# 更新 last_gray 用于下一帧比较
last_gray = gray.copy()
# STEP 3: 旋转 180 度
rotated_pil = pil_image.rotate(180, expand=False)
# 生成文件名(时间戳 + 毫秒防重)
timestamp = time.strftime("%Y%m%d_%H%M%S")
ms = int((time.time() % 1) * 1000)
filename = f"frame_{timestamp}_{ms:03d}.jpg"
filepath = os.path.join(output_dir, filename)
# 保存图像
try:
rotated_pil.save(filepath, format='JPEG', quality=95)
print(f"已保存: {filepath}")
except Exception as e:
print(f"保存失败 {filename}: {e}")
# 显示画面
cv2.imshow('Camera Stream (Live)', frame)
if cv2.waitKey(1) == ord('q'):
break
except KeyboardInterrupt:
print("\n用户中断")
finally:
cap.release()
cv2.destroyAllWindows()
print(f"视频流已关闭,共处理 {frame_count} 帧。")