Files
ailai_image_point_diff/detect_image/image_01_3588.py
2025-12-28 00:12:46 +08:00

203 lines
6.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import cv2
import time
import os
import numpy as np
from PIL import Image
from skimage.metrics import structural_similarity as ssim
import shutil
from rknnlite.api import RKNNLite
# ================== 配置参数 ==================
RTSP_URL = "rtsp://admin:XJ123456@192.168.250.60:554/streaming/channels/101"
SAVE_INTERVAL = 15
SSIM_THRESHOLD = 0.9
OUTPUT_DIR = "camera_test"
RKNN_MODEL = "bag3588.rknn"
SHOW_WINDOW = False
# 灰度判断参数
GRAY_LOWER = 70
GRAY_UPPER = 230
GRAY_RATIO_THRESHOLD = 0.7
IMG_SIZE = (640, 640)
OBJ_THRESH = 0.001
NMS_THRESH = 0.45
CLASS_NAME = ["bag"]
os.makedirs(OUTPUT_DIR, exist_ok=True)
# ================== 灰度判断 ==================
def is_large_gray(image):
img = np.array(image)
if img.ndim != 3 or img.shape[2] != 3:
return True
h, w, _ = img.shape
gray_mask = (
(img[:, :, 0] >= GRAY_LOWER) & (img[:, :, 0] <= GRAY_UPPER) &
(img[:, :, 1] >= GRAY_LOWER) & (img[:, :, 1] <= GRAY_UPPER) &
(img[:, :, 2] >= GRAY_LOWER) & (img[:, :, 2] <= GRAY_UPPER)
)
return gray_mask.sum() / (h * w) > GRAY_RATIO_THRESHOLD
# ================== RKNN 工具函数 ==================
def letterbox_resize(image, size, bg_color=114):
target_w, target_h = size
h, w = image.shape[:2]
scale = min(target_w / w, target_h / h)
new_w, new_h = int(w * scale), int(h * scale)
resized = cv2.resize(image, (new_w, new_h))
canvas = np.full((target_h, target_w, 3), bg_color, dtype=np.uint8)
dx, dy = (target_w - new_w) // 2, (target_h - new_h) // 2
canvas[dy:dy + new_h, dx:dx + new_w] = resized
return canvas, scale, dx, dy
def dfl_numpy(position):
n, c, h, w = position.shape
p_num = 4
mc = c // p_num
y = position.reshape(n, p_num, mc, h, w)
y = np.exp(y) / np.sum(np.exp(y), axis=2, keepdims=True)
acc = np.arange(mc).reshape(1,1,mc,1,1)
return np.sum(y * acc, axis=2)
def box_process(position):
grid_h, grid_w = position.shape[2:4]
col, row = np.meshgrid(np.arange(grid_w), np.arange(grid_h))
col = col.reshape(1,1,grid_h,grid_w)
row = row.reshape(1,1,grid_h,grid_w)
grid = np.concatenate((col,row), axis=1)
stride = np.array([IMG_SIZE[1]//grid_h, IMG_SIZE[0]//grid_w]).reshape(1,2,1,1)
position = dfl_numpy(position)
box_xy = grid + 0.5 - position[:,0:2,:,:]
box_xy2 = grid + 0.5 + position[:,2:4,:,:]
return np.concatenate((box_xy*stride, box_xy2*stride), axis=1)
def filter_boxes(boxes, box_confidences, box_class_probs):
boxes = boxes.reshape(-1,4)
box_confidences = box_confidences.reshape(-1)
box_class_probs = np.array(box_class_probs)
class_ids = np.argmax(box_class_probs, axis=-1)
class_scores = box_class_probs[np.arange(len(class_ids)), class_ids]
scores = box_confidences * class_scores
mask = scores >= OBJ_THRESH
if np.sum(mask) == 0:
return None
return True # 只需要判断是否有目标
def post_process(outputs, scale, dx, dy):
boxes_list, conf_list, class_list = [], [], []
for i in range(3):
boxes_list.append(box_process(outputs[i*3]))
conf_list.append(outputs[i*3+2])
class_list.append(outputs[i*3+1])
def flatten(x):
x = x.transpose(0,2,3,1)
return x.reshape(-1,x.shape[3])
boxes = np.concatenate([flatten(b) for b in boxes_list])
box_conf = np.concatenate([flatten(c) for c in conf_list])
class_probs = np.concatenate([flatten(c) for c in class_list])
return filter_boxes(boxes, box_conf, class_probs)
# ================== RKNN 初始化 ==================
rknn = RKNNLite()
if rknn.load_rknn(RKNN_MODEL) != 0:
raise RuntimeError("❌ RKNN 模型加载失败")
if rknn.init_runtime(core_mask=RKNNLite.NPU_CORE_AUTO) != 0:
raise RuntimeError("❌ RKNN Runtime 初始化失败")
print("✅ RKNN 初始化完成")
# ================== 视频流处理 ==================
max_retry_seconds = 10
retry_interval_seconds = 1
last_gray = None
frame_count = 0
while True:
cap = cv2.VideoCapture(RTSP_URL)
start_time = time.time()
while not cap.isOpened():
if time.time() - start_time >= max_retry_seconds:
print("❌ 无法连接 RTSP")
exit(1)
time.sleep(retry_interval_seconds)
cap = cv2.VideoCapture(RTSP_URL)
print("✅ 开始读取视频流")
try:
while True:
ret, frame = cap.read()
if not ret:
print("❌ 读取失败")
break
frame_count += 1
if SHOW_WINDOW:
cv2.imshow("Camera", frame)
if cv2.waitKey(1) == ord('q'):
raise KeyboardInterrupt
if frame_count % SAVE_INTERVAL != 0:
continue
print(f"处理帧 {frame_count}")
# STEP1: 灰度过滤
pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
if is_large_gray(pil_image):
print("跳过:大面积灰色")
continue
# STEP2: SSIM 去重
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
if last_gray is not None:
sim = ssim(gray, last_gray)
if sim > SSIM_THRESHOLD:
print(f"跳过SSIM={sim:.3f}")
continue
last_gray = gray.copy()
# STEP3: RKNN 推理,只判断是否有 bag
img_resized, scale, dx, dy = letterbox_resize(frame, IMG_SIZE)
input_data = np.expand_dims(img_resized, 0)
outputs = rknn.inference(inputs=[input_data])
has_bag = post_process(outputs, scale, dx, dy)
if not has_bag:
print("跳过:未检测到 bag")
continue
# STEP4: 磁盘检查
_, _, free = shutil.disk_usage(OUTPUT_DIR)
if free < 5*1024**3:
print("❌ 磁盘空间不足")
raise SystemExit(1)
# STEP5: 保存原图
ts = time.strftime("%Y%m%d_%H%M%S")
ms = int((time.time()%1)*1000)
filename = f"bag_{ts}_{ms:03d}.png"
path = os.path.join(OUTPUT_DIR, filename)
cv2.imwrite(path, frame) # 保存原图
print(f"✅ 已保存: {path}")
except KeyboardInterrupt:
print("\n🛑 用户中断")
break
finally:
cap.release()
cv2.destroyAllWindows()
print(f"视频流关闭,共处理 {frame_count}")
rknn.release()
print("程序结束")