更新加入料带目标检测,判断料带到位,以及控制滚筒逻辑

This commit is contained in:
琉璃月光
2025-12-30 17:29:49 +08:00
parent d6918e90f2
commit 2028a96819
27 changed files with 1499 additions and 1224 deletions

BIN
ailai_pc/4.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 682 KiB

View File

@ -6,71 +6,79 @@ import cv2
# ======================
# 配置参数
# ======================
MODEL_PATH = '/home/hx/开发/ailai_image_obb/ailai_pc/best12.pt'
IMG_PATH = '1.jpg'
MODEL_PATH = '/home/hx/yolo/ultralytics_yolo11-main/runs/train/exp_ailai_detect2/weights/best.pt'
IMG_PATH = '4.jpg'
OUTPUT_PATH = 'output_pt.jpg'
CONF_THRESH = 0.5
IOU_THRESH = 0.45
CLASS_NAMES = ['bag']
CLASS_NAMES = ['bag', 'bag35']
# ======================
# 主函数(优化版)
# 主函数
# ======================
def main():
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"✅ 使用设备: {device}")
# 加载模型
model = YOLO(MODEL_PATH)
model.to(device)
model = YOLO(MODEL_PATH).to(device)
# 推理:获取原始结果(不立即解析)
print("➡️ 开始推理...")
results = model(IMG_PATH, imgsz=640, conf=CONF_THRESH, device=device, verbose=True)
# 获取第一张图的结果
r = results[0]
pred = r.boxes.data # GPU tensor [N,6]
# 🚀 关键:使用原始 tensor 在 GPU 上处理
# pred: [x1, y1, x2, y2, conf, cls] 形状为 [num_boxes, 6]
pred = r.boxes.data # 已经在 GPU 上,类型: torch.Tensor
# 🔍 在 GPU 上做 NMS这才是正确姿势
# 注意non_max_suppression 输入是 [batch, num_boxes, 6]
det = non_max_suppression(
pred.unsqueeze(0), # 增加 batch 维度
pred.unsqueeze(0),
conf_thres=CONF_THRESH,
iou_thres=IOU_THRESH,
classes=None,
agnostic=False,
max_det=100
)[0] # 取第一个也是唯一一个batch
)[0]
# ✅ 此时所有后处理已完成,现在才从 GPU 拷贝到 CPU
if det is not None and len(det):
det = det.cpu().numpy() # ← 只拷贝一次!
else:
det = []
if det is None or len(det) == 0:
print("❌ 未检测到任何目标")
return
# 读取图像
det = det.cpu().numpy() # 只拷贝一次
# ======================
# ⭐ 关键:取置信度最高的结果
# ======================
best_det = max(det, key=lambda x: x[4])
x1, y1, x2, y2, conf, cls_id = best_det
x1, y1, x2, y2 = map(int, [x1, y1, x2, y2])
cls_id = int(cls_id)
cls_name = CLASS_NAMES[cls_id]
print("\n🏆 置信度最高结果:")
print(f" 类别: {cls_name}")
print(f" 置信度: {conf:.3f}")
print(f" 框: [{x1}, {y1}, {x2}, {y2}]")
# ======================
# 可视化(只画最高的)
# ======================
img = cv2.imread(IMG_PATH)
if img is None:
raise FileNotFoundError(f"无法读取图像: {IMG_PATH}")
print("\n📋 检测结果:")
for *xyxy, conf, cls_id in det:
x1, y1, x2, y2 = map(int, xyxy)
cls_name = CLASS_NAMES[int(cls_id)]
print(f" 类别: {cls_name}, 置信度: {conf:.3f}, 框: [{x1}, {y1}, {x2}, {y2}]")
cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2)
label = f"{cls_name} {conf:.2f}"
cv2.putText(
img,
label,
(x1, max(y1 - 10, 0)),
cv2.FONT_HERSHEY_SIMPLEX,
0.9,
(0, 255, 0),
2
)
# 画框和标签
cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2)
label = f"{cls_name} {conf:.2f}"
cv2.putText(img, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
# 保存结果
cv2.imwrite(OUTPUT_PATH, img)
print(f"\n🖼️ 可视化结果已保存: {OUTPUT_PATH}")
if __name__ == '__main__':
main()
main()

View File

@ -1,135 +1,131 @@
from ultralytics import YOLO
from ultralytics.utils.ops import non_max_suppression
import torch
import cv2
import os
import time
import shutil
from pathlib import Path
# ======================
# 配置参数
# ======================
MODEL_PATH = 'detect.pt' # 你的模型路径
INPUT_FOLDER = '/home/hx/开发/ailai_image_obb/ailai_pc/train' # 输入图片文件夹
OUTPUT_FOLDER = '/home/hx/开发/ailai_image_obb/ailai_pc/results' # 输出结果文件夹(自动创建)
CONF_THRESH = 0.5
MODEL_PATH = '/home/hx/yolo/ultralytics_yolo11-main/runs/train/exp_ailai_detect/weights/best.pt'
INPUT_FOLDER = '/media/hx/04e879fa-d697-4b02-ac7e-a4148876ebb0/dataset/ailaidete/train/bag'
OUTPUT_FOLDER = '/media/hx/04e879fa-d697-4b02-ac7e-a4148876ebb0/dataset/ailaidete/train/bag'
CONF_BUCKETS = [0.93, 0.95] # ← ⭐ 自己改这里
IOU_THRESH = 0.45
CLASS_NAMES = ['bag']
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
IMG_SIZE = 640
SHOW_IMAGE = False # 是否逐张显示图像(适合调试)
# 支持的图像格式
IMG_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp'}
# ======================
# 获取文件夹中所有图片路径
# 获取图片路径
# ======================
def get_image_paths(folder):
folder = Path(folder)
if not folder.exists():
raise FileNotFoundError(f"输入文件夹不存在: {folder}")
paths = [p for p in folder.iterdir() if p.suffix.lower() in IMG_EXTENSIONS]
if not paths:
print(f"⚠️ 在 {folder} 中未找到图片")
return sorted(paths) # 按名称排序
return sorted([p for p in folder.iterdir() if p.suffix.lower() in IMG_EXTENSIONS])
# ======================
# 主函数(批量推理)
# 防止重名覆盖
# ======================
def safe_move(src, dst_dir):
os.makedirs(dst_dir, exist_ok=True)
dst = os.path.join(dst_dir, os.path.basename(src))
if not os.path.exists(dst):
shutil.move(src, dst)
return dst
stem, suffix = os.path.splitext(os.path.basename(src))
i = 1
while True:
new_dst = os.path.join(dst_dir, f"{stem}_{i}{suffix}")
if not os.path.exists(new_dst):
shutil.move(src, new_dst)
return new_dst
i += 1
# ======================
# 根据置信度选择目录
# ======================
def get_bucket_dir(max_conf, output_root, buckets):
for th in sorted(buckets, reverse=True):
if max_conf >= th:
return os.path.join(output_root, f"bag_{th}")
return os.path.join(output_root, "delet")
# ======================
# 主逻辑
# ======================
def main():
print(f"✅ 使用设备: {DEVICE}")
# 创建输出文件夹
os.makedirs(OUTPUT_FOLDER, exist_ok=True)
print(f"📁 输出结果将保存到: {OUTPUT_FOLDER}")
model = YOLO(MODEL_PATH).to(DEVICE)
# 加载模型
print("➡️ 加载 YOLO 模型...")
model = YOLO(MODEL_PATH)
model.to(DEVICE)
# 获取图片列表
img_paths = get_image_paths(INPUT_FOLDER)
img_paths = get_image_paths(Path(INPUT_FOLDER))
if not img_paths:
print("⚠️ 没有图片")
return
print(f"📸 共找到 {len(img_paths)} 张图片,开始批量推理...\n")
total_start_time = time.time()
print(f"📸 共 {len(img_paths)} 张图片")
print(f"📊 置信度档位: {CONF_BUCKETS}\n")
for idx, img_path in enumerate(img_paths, 1):
print(f"{'=' * 50}")
print(f"🖼️ 处理第 {idx}/{len(img_paths)}: {img_path.name}")
print(f"{'='*50}")
print(f"🖼️ {idx}/{len(img_paths)}: {img_path.name}")
# 手动计时
start_time = time.time()
# 推理verbose=True 输出内部耗时)
results = model(str(img_path), imgsz=IMG_SIZE, conf=CONF_THRESH, device=DEVICE, verbose=True)
inference_time = time.time() - start_time
results = model(
str(img_path),
imgsz=IMG_SIZE,
conf=min(CONF_BUCKETS),
device=DEVICE,
verbose=False
)
# 获取结果
r = results[0]
pred = r.boxes.data # GPU 上的原始输出
pred = r.boxes.data
# 在 GPU 上做 NMS
det = non_max_suppression(
pred.unsqueeze(0),
conf_thres=CONF_THRESH,
conf_thres=min(CONF_BUCKETS),
iou_thres=IOU_THRESH,
classes=None,
agnostic=False,
max_det=100
)[0]
# 拷贝到 CPU仅一次
if det is not None and len(det):
det = det.cpu().numpy()
else:
det = []
# 读取图像并绘制
img = cv2.imread(str(img_path))
if img is None:
print(f"❌ 无法读取图像: {img_path}")
continue
max_conf = 0.0
for *_, conf, cls_id in det:
if int(cls_id) == 0:
max_conf = max(max_conf, float(conf))
print(f"\n📋 检测结果:")
for *xyxy, conf, cls_id in det:
x1, y1, x2, y2 = map(int, xyxy)
cls_name = CLASS_NAMES[int(cls_id)]
print(f" 类别: {cls_name}, 置信度: {conf:.3f}, 框: [{x1}, {y1}, {x2}, {y2}]")
cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2)
label = f"{cls_name} {conf:.2f}"
cv2.putText(img, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
dst_dir = get_bucket_dir(max_conf, OUTPUT_FOLDER, CONF_BUCKETS)
new_path = safe_move(str(img_path), dst_dir)
# 保存结果
output_path = os.path.join(OUTPUT_FOLDER, f"result_{img_path.name}")
cv2.imwrite(output_path, img)
print(f"\n✅ 结果已保存: {output_path}")
if max_conf > 0:
print(f"✅ bag max_conf={max_conf:.3f}{os.path.basename(dst_dir)}")
else:
print("❌ 未检测到 bag")
# 显示(可选)
if SHOW_IMAGE:
cv2.imshow("Detection", img)
if cv2.waitKey(1) & 0xFF == ord('q'): # 按 Q 退出
break
print(f"🚚 已移动到: {new_path}")
print(f"⏱️ {(time.time() - start_time)*1000:.1f} ms")
# 输出总耗时
total_infer_time = time.time() - start_time
print(f"⏱️ 总处理时间: {total_infer_time * 1000:.1f}ms (推理+后处理)")
# 结束
total_elapsed = time.time() - total_start_time
print(f"\n🎉 批量推理完成!共处理 {len(img_paths)} 张图片,总耗时: {total_elapsed:.2f}")
print(
f"🚀 平均每张: {total_elapsed / len(img_paths) * 1000:.1f} ms ({1 / (total_elapsed / len(img_paths)):.1f} FPS)")
if SHOW_IMAGE:
cv2.destroyAllWindows()
print("\n🎉 全部处理完成")
if __name__ == '__main__':
main()
main()

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,180 @@
import os
import cv2
import numpy as np
from rknnlite.api import RKNNLite
# ====================== 配置 ======================
MODEL_PATH = "bag3588.rknn"
IMG_PATH = "1.jpg"
IMG_SIZE = (640, 640)
OBJ_THRESH = 0.25 # objectness * class_prob
NMS_THRESH = 0.45
CLASS_NAME = ["bag", "bag35"]
OUTPUT_DIR = "./result"
os.makedirs(OUTPUT_DIR, exist_ok=True)
VISUALIZE = True # False = 只输出类别和置信度,不保存图
# ====================== 工具函数 ======================
def softmax(x, axis=-1):
x = x - np.max(x, axis=axis, keepdims=True)
exp_x = np.exp(x)
return exp_x / np.sum(exp_x, axis=axis, keepdims=True)
def letterbox_resize(image, size, bg_color=114):
target_w, target_h = size
h, w = image.shape[:2]
scale = min(target_w / w, target_h / h)
new_w, new_h = int(w * scale), int(h * scale)
resized = cv2.resize(image, (new_w, new_h))
canvas = np.full((target_h, target_w, 3), bg_color, dtype=np.uint8)
dx = (target_w - new_w) // 2
dy = (target_h - new_h) // 2
canvas[dy:dy + new_h, dx:dx + new_w] = resized
return canvas, scale, dx, dy
# ====================== DFL 解码 ======================
def dfl_decode(reg):
reg = reg.reshape(4, -1)
prob = softmax(reg, axis=1)
acc = np.arange(reg.shape[1])
return np.sum(prob * acc, axis=1)
# ====================== NMS ======================
def nms(boxes, scores, thresh):
boxes = np.array(boxes)
scores = np.array(scores)
x1, y1, x2, y2 = boxes.T
areas = (x2 - x1) * (y2 - y1)
order = scores.argsort()[::-1]
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
inter = np.maximum(0, xx2 - xx1) * np.maximum(0, yy2 - yy1)
iou = inter / (areas[i] + areas[order[1:]] - inter)
order = order[1:][iou <= thresh]
return keep
# ====================== 后处理 ======================
def post_process(outputs, scale, dx, dy):
boxes_all, scores_all, classes_all = [], [], []
strides = [8, 16, 32]
for i, stride in enumerate(strides):
reg = outputs[i * 3 + 0][0]
cls = outputs[i * 3 + 1][0]
obj = outputs[i * 3 + 2][0]
num_classes, H, W = cls.shape
for h in range(H):
for w in range(W):
class_prob = cls[:, h, w]
cls_id = int(np.argmax(class_prob))
cls_score = class_prob[cls_id]
obj_score = obj[0, h, w]
final_score = cls_score * obj_score
if final_score < OBJ_THRESH:
continue
l, t, r, b = dfl_decode(reg[:, h, w])
cx = (w + 0.5) * stride
cy = (h + 0.5) * stride
x1 = cx - l * stride
y1 = cy - t * stride
x2 = cx + r * stride
y2 = cy + b * stride
boxes_all.append([x1, y1, x2, y2])
scores_all.append(final_score)
classes_all.append(cls_id)
if not boxes_all:
return None, None, None
keep = nms(boxes_all, scores_all, NMS_THRESH)
boxes = np.array(boxes_all)[keep]
scores = np.array(scores_all)[keep]
classes = np.array(classes_all)[keep]
boxes[:, [0, 2]] = (boxes[:, [0, 2]] - dx) / scale
boxes[:, [1, 3]] = (boxes[:, [1, 3]] - dy) / scale
return boxes, classes, scores
# ====================== 推理入口 ======================
def detect_single_image(img_path, visualize=True):
rknn = RKNNLite()
rknn.load_rknn(MODEL_PATH)
rknn.init_runtime()
img = cv2.imread(img_path)
if img is None:
raise FileNotFoundError(img_path)
img_r, scale, dx, dy = letterbox_resize(img, IMG_SIZE)
outputs = rknn.inference([np.expand_dims(img_r, 0)])
boxes, cls_ids, scores = post_process(outputs, scale, dx, dy)
if boxes is None or len(scores) == 0:
print("未检测到目标")
rknn.release()
return None, None
best_idx = int(np.argmax(scores))
best_score = float(scores[best_idx])
best_cls_id = int(cls_ids[best_idx])
best_cls_name = CLASS_NAME[best_cls_id]
best_box = boxes[best_idx]
# ======== 可视化(可选) ========
if visualize:
x1, y1, x2, y2 = best_box.astype(int)
cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(
img,
f"{best_cls_name}:{best_score:.3f}",
(x1, max(y1 - 5, 0)),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(0, 255, 0),
2
)
save_path = os.path.join(OUTPUT_DIR, os.path.basename(img_path))
cv2.imwrite(save_path, img)
print("可视化结果已保存:", save_path)
rknn.release()
return best_cls_name, best_score
# ====================== 主入口 ======================
if __name__ == "__main__":
best_cls_name, best_score = detect_single_image(IMG_PATH, visualize=VISUALIZE)
# ======== 只输出你要的 ========
print(f"类别: {best_cls_name}, 置信度: {best_score:.4f}")

View File

@ -0,0 +1,223 @@
import cv2
import time
import os
import numpy as np
from PIL import Image
from skimage.metrics import structural_similarity as ssim
import shutil
from rknnlite.api import RKNNLite
# ================== 配置参数 ==================
RTSP_URL = "rtsp://admin:ailaimiye123@192.168.0.125:554/streaming/channels/101"
SAVE_INTERVAL = 15
SSIM_THRESHOLD = 0.9
OUTPUT_DIR = "camera_test"
MODEL_PATH = "bag3568.rknn"
SHOW_WINDOW = False
GRAY_LOWER = 70
GRAY_UPPER = 230
GRAY_RATIO_THRESHOLD = 0.7
IMG_SIZE = (640, 640)
OBJ_THRESH = 0.25
NMS_THRESH = 0.45
CLASS_NAME = ["bag"]
os.makedirs(OUTPUT_DIR, exist_ok=True)
# ================== 基础工具 ==================
def softmax(x, axis=-1):
x = x - np.max(x, axis=axis, keepdims=True)
exp_x = np.exp(x)
return exp_x / np.sum(exp_x, axis=axis, keepdims=True)
def letterbox_resize(image, size, bg_color=114):
target_w, target_h = size
h, w = image.shape[:2]
scale = min(target_w / w, target_h / h)
new_w, new_h = int(w * scale), int(h * scale)
resized = cv2.resize(image, (new_w, new_h))
canvas = np.full((target_h, target_w, 3), bg_color, dtype=np.uint8)
dx, dy = (target_w - new_w) // 2, (target_h - new_h) // 2
canvas[dy:dy+new_h, dx:dx+new_w] = resized
return canvas, scale, dx, dy
# ================== DFL ==================
def dfl_decode(reg):
reg = reg.reshape(4, -1)
prob = softmax(reg, axis=1)
acc = np.arange(reg.shape[1])
return np.sum(prob * acc, axis=1)
# ================== NMS ==================
def nms(boxes, scores, thresh):
boxes = np.array(boxes)
scores = np.array(scores)
x1, y1, x2, y2 = boxes.T
areas = (x2 - x1) * (y2 - y1)
order = scores.argsort()[::-1]
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
inter = np.maximum(0, xx2 - xx1) * np.maximum(0, yy2 - yy1)
iou = inter / (areas[i] + areas[order[1:]] - inter)
order = order[1:][iou <= thresh]
return keep
# ================== 后处理 ==================
def post_process(outputs, scale, dx, dy):
boxes_all, scores_all, classes_all = [], [], []
strides = [8, 16, 32]
for i, stride in enumerate(strides):
reg = outputs[i*3 + 0][0]
cls = outputs[i*3 + 1][0]
obj = outputs[i*3 + 2][0]
num_classes, H, W = cls.shape
for h in range(H):
for w in range(W):
class_prob = cls[:, h, w]
cls_id = np.argmax(class_prob)
score = class_prob[cls_id]
obj_score = obj[0, h, w]
final_score = score * obj_score
if final_score < OBJ_THRESH:
continue
l, t, r, b = dfl_decode(reg[:, h, w])
cx = (w + 0.5) * stride
cy = (h + 0.5) * stride
x1 = cx - l * stride
y1 = cy - t * stride
x2 = cx + r * stride
y2 = cy + b * stride
boxes_all.append([x1, y1, x2, y2])
scores_all.append(final_score)
classes_all.append(cls_id)
if len(boxes_all) == 0:
return None, None, None
keep = nms(boxes_all, scores_all, NMS_THRESH)
boxes = np.array(boxes_all)[keep]
scores = np.array(scores_all)[keep]
classes = np.array(classes_all)[keep]
boxes[:, [0, 2]] = (boxes[:, [0, 2]] - dx) / scale
boxes[:, [1, 3]] = (boxes[:, [1, 3]] - dy) / scale
return boxes, classes, scores
# ================== 灰度判断 ==================
def is_large_gray(image):
img = np.array(image)
if img.ndim != 3 or img.shape[2] != 3:
return True
h, w, _ = img.shape
gray_mask = (
(img[:,:,0] >= GRAY_LOWER) & (img[:,:,0] <= GRAY_UPPER) &
(img[:,:,1] >= GRAY_LOWER) & (img[:,:,1] <= GRAY_UPPER) &
(img[:,:,2] >= GRAY_LOWER) & (img[:,:,2] <= GRAY_UPPER)
)
return gray_mask.sum() / (h * w) > GRAY_RATIO_THRESHOLD
# ================== RKNN 初始化 ==================
rknn = RKNNLite()
if rknn.load_rknn(MODEL_PATH) != 0:
raise RuntimeError("❌ RKNN 模型加载失败")
if rknn.init_runtime() != 0:
raise RuntimeError("❌ RKNN Runtime 初始化失败")
print("✅ RKNN 初始化完成")
# ================== 视频流处理 ==================
last_gray = None
frame_count = 0
while True:
cap = cv2.VideoCapture(RTSP_URL)
if not cap.isOpened():
print("❌ 无法连接 RTSP")
time.sleep(1)
continue
print("✅ 开始读取视频流")
try:
while True:
ret, frame = cap.read()
if not ret:
break
frame_count += 1
if SHOW_WINDOW:
cv2.imshow("Camera", frame)
if cv2.waitKey(1) == ord('q'):
raise KeyboardInterrupt
if frame_count % SAVE_INTERVAL != 0:
continue
print(f"处理帧 {frame_count}")
# STEP1 灰度过滤(可启用)
# if is_large_gray(Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))):
# print("跳过:大面积灰色")
# continue
# STEP2 SSIM 去重
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
if last_gray is not None:
sim = ssim(gray, last_gray)
if sim > SSIM_THRESHOLD:
print(f"跳过SSIM={sim:.3f}")
continue
last_gray = gray.copy()
# STEP3 RKNN 推理(和 detect_single_image 一样)
img_r, scale, dx, dy = letterbox_resize(frame, IMG_SIZE)
outputs = rknn.inference([np.expand_dims(img_r, 0)])
boxes, cls_ids, scores = post_process(outputs, scale, dx, dy)
if boxes is None or len(boxes) == 0:
print("跳过:未检测到 bag")
continue
# STEP4 磁盘检查
_, _, free = shutil.disk_usage(OUTPUT_DIR)
if free < 5 * 1024**3:
raise SystemExit("❌ 磁盘空间不足")
# STEP5 保存
ts = time.strftime("%Y%m%d_%H%M%S")
ms = int((time.time() % 1) * 1000)
path = os.path.join(OUTPUT_DIR, f"bag_{ts}_{ms:03d}.png")
cv2.imwrite(path, frame)
print(f"✅ 已保存: {path}")
except KeyboardInterrupt:
print("🛑 用户中断")
break
finally:
cap.release()
cv2.destroyAllWindows()
rknn.release()
print("程序结束")

View File

@ -0,0 +1,213 @@
import cv2
import time
import os
import numpy as np
from PIL import Image
from skimage.metrics import structural_similarity as ssim
from rknnlite.api import RKNNLite
import shutil
# ================== 配置 ==================
RTSP_URL = "rtsp://admin:ailaimiye123@192.168.0.125:554/streaming/channels/101"
RKNN_MODEL = "bag3568.rknn"
OUTPUT_DIR = "camera_event_capture"
CONF_THRESHOLD = 0.25 # bag 最终置信度阈值obj * class_prob
SSIM_THRESHOLD = 0.9
END_MISS_FRAMES = 30
SAVE_EVERY_N_FRAMES = 1
SHOW_WINDOW = False
IMG_SIZE = (640, 640)
CLASS_NAME = ["bag"]
NMS_THRESH = 0.45
os.makedirs(OUTPUT_DIR, exist_ok=True)
# ================== 基础工具 ==================
def softmax(x, axis=-1):
x = x - np.max(x, axis=axis, keepdims=True)
exp_x = np.exp(x)
return exp_x / np.sum(exp_x, axis=axis, keepdims=True)
def letterbox_resize(image, size, bg_color=114):
target_w, target_h = size
h, w = image.shape[:2]
scale = min(target_w / w, target_h / h)
new_w, new_h = int(w * scale), int(h * scale)
resized = cv2.resize(image, (new_w, new_h))
canvas = np.full((target_h, target_w, 3), bg_color, dtype=np.uint8)
dx, dy = (target_w - new_w) // 2, (target_h - new_h) // 2
canvas[dy:dy+new_h, dx:dx+new_w] = resized
return canvas, scale, dx, dy
def dfl_decode(reg):
reg = reg.reshape(4, -1)
prob = softmax(reg, axis=1)
acc = np.arange(reg.shape[1])
return np.sum(prob * acc, axis=1)
def nms(boxes, scores, thresh):
boxes = np.array(boxes)
scores = np.array(scores)
x1, y1, x2, y2 = boxes.T
areas = (x2 - x1) * (y2 - y1)
order = scores.argsort()[::-1]
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
inter = np.maximum(0, xx2 - xx1) * np.maximum(0, yy2 - yy1)
iou = inter / (areas[i] + areas[order[1:]] - inter)
order = order[1:][iou <= thresh]
return keep
def post_process(outputs, scale, dx, dy):
boxes_all, scores_all, classes_all = [], [], []
strides = [8, 16, 32]
for i, stride in enumerate(strides):
reg = outputs[i*3 + 0][0]
cls = outputs[i*3 + 1][0]
obj = outputs[i*3 + 2][0]
num_classes, H, W = cls.shape
for h in range(H):
for w in range(W):
class_prob = cls[:, h, w]
cls_id = np.argmax(class_prob)
score = class_prob[cls_id]
obj_score = obj[0, h, w]
final_score = score * obj_score
if final_score < CONF_THRESHOLD:
continue
l, t, r, b = dfl_decode(reg[:, h, w])
cx = (w + 0.5) * stride
cy = (h + 0.5) * stride
x1 = cx - l * stride
y1 = cy - t * stride
x2 = cx + r * stride
y2 = cy + b * stride
boxes_all.append([x1, y1, x2, y2])
scores_all.append(final_score)
classes_all.append(cls_id)
if len(boxes_all) == 0:
return None, None, None
keep = nms(boxes_all, scores_all, NMS_THRESH)
boxes = np.array(boxes_all)[keep]
scores = np.array(scores_all)[keep]
classes = np.array(classes_all)[keep]
boxes[:, [0,2]] = (boxes[:, [0,2]] - dx) / scale
boxes[:, [1,3]] = (boxes[:, [1,3]] - dy) / scale
return boxes, classes, scores
# ================== 灰度判断 ==================
def is_large_gray(image, gray_ratio_thresh=0.9):
img = np.array(image).astype(np.float32)
if img.ndim != 3 or img.shape[2] != 3:
return True
b, g, r = img[:,:,0], img[:,:,1], img[:,:,2]
max_c = np.maximum(np.maximum(r,g), b)
min_c = np.minimum(np.minimum(r,g), b)
gray_ratio = 1.0 - (max_c - min_c)/255.0
gray_pixels = np.sum(gray_ratio >= 0.9)
total_pixels = img.shape[0]*img.shape[1]
return (gray_pixels/total_pixels) >= gray_ratio_thresh
# ================== RKNN 初始化 ==================
rknn = RKNNLite()
assert rknn.load_rknn(RKNN_MODEL) == 0, "RKNN 模型加载失败"
assert rknn.init_runtime() == 0, "RKNN Runtime 初始化失败"
print("✅ RKNN 初始化完成")
# ================== 视频流 & 状态机 ==================
cap = cv2.VideoCapture(RTSP_URL)
assert cap.isOpened(), "RTSP 连接失败"
print("🎥 视频流已连接")
STATE_IDLE = 0
STATE_CAPTURING = 1
state = STATE_IDLE
miss_count = 0
save_idx = 0
session_dir = None
session_id = 0
last_gray = None
frame_count = 0
try:
while True:
ret, frame = cap.read()
if not ret:
time.sleep(0.2)
continue
frame_count += 1
if SHOW_WINDOW:
cv2.imshow("Camera", frame)
if cv2.waitKey(1) == ord('q'):
break
# ---------- 灰度过滤 ----------
#pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
#if is_large_gray(pil_img):
# continue
# ---------- SSIM ----------
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
if last_gray is not None and state == STATE_IDLE:
if ssim(gray, last_gray) > SSIM_THRESHOLD:
continue
last_gray = gray.copy()
# ---------- RKNN 推理 ----------
img_r, scale, dx, dy = letterbox_resize(frame, IMG_SIZE)
outputs = rknn.inference([np.expand_dims(img_r, 0)])
boxes, cls_ids, scores = post_process(outputs, scale, dx, dy)
has_bag = boxes is not None and len(boxes) > 0
# ---------- 状态机 ----------
if state == STATE_IDLE:
if has_bag:
session_id += 1
ts = time.strftime("%Y%m%d_%H%M%S")
session_dir = os.path.join(OUTPUT_DIR, f"session_{session_id:04d}_{ts}")
os.makedirs(session_dir, exist_ok=True)
print("🚀 进入采集")
state = STATE_CAPTURING
miss_count = 0
save_idx = 0
else:
if has_bag:
miss_count = 0
else:
miss_count += 1
if save_idx % SAVE_EVERY_N_FRAMES == 0:
fname = f"{save_idx:06d}.png"
cv2.imwrite(os.path.join(session_dir, fname), frame)
save_idx += 1
if miss_count >= END_MISS_FRAMES:
print(f"🛑 退出采集,保存 {save_idx}")
state = STATE_IDLE
miss_count = 0
session_dir = None
except KeyboardInterrupt:
print("\n🛑 用户退出")
finally:
cap.release()
cv2.destroyAllWindows()
rknn.release()
print("程序结束")

View File

@ -0,0 +1,171 @@
import cv2
import time
import os
import numpy as np
from PIL import Image
from skimage.metrics import structural_similarity as ssim
import shutil
from rknnlite.api import RKNNLite
# ================== 配置参数 ==================
RTSP_URL = "rtsp://admin:XJ123456@192.168.250.60:554/streaming/channels/101"
RKNN_MODEL = "bag3588.rknn"
SAVE_INTERVAL = 15
SSIM_THRESHOLD = 0.9
OUTPUT_DIR = "camera_test"
SHOW_WINDOW = False
GRAY_LOWER = 70
GRAY_UPPER = 230
GRAY_RATIO_THRESHOLD = 0.7
IMG_SIZE = (640, 640)
OBJ_THRESH = 0.25
CLASS_NAME = ["bag"]
os.makedirs(OUTPUT_DIR, exist_ok=True)
# ================== 灰度判断 ==================
def is_large_gray(image):
img = np.array(image)
if img.ndim != 3 or img.shape[2] != 3:
return True
h, w, _ = img.shape
gray_mask = (
(img[:,:,0] >= GRAY_LOWER) & (img[:,:,0] <= GRAY_UPPER) &
(img[:,:,1] >= GRAY_LOWER) & (img[:,:,1] <= GRAY_UPPER) &
(img[:,:,2] >= GRAY_LOWER) & (img[:,:,2] <= GRAY_UPPER)
)
return gray_mask.sum() / (h * w) > GRAY_RATIO_THRESHOLD
# ================== RKNN 工具 ==================
def letterbox_resize(image, size, bg_color=114):
target_w, target_h = size
h, w = image.shape[:2]
scale = min(target_w / w, target_h / h)
new_w, new_h = int(w * scale), int(h * scale)
resized = cv2.resize(image, (new_w, new_h))
canvas = np.full((target_h, target_w, 3), bg_color, dtype=np.uint8)
dx, dy = (target_w - new_w) // 2, (target_h - new_h) // 2
canvas[dy:dy+new_h, dx:dx+new_w] = resized
return canvas, scale, dx, dy
# ================== DFL ==================
def dfl_numpy(position):
n, c, h, w = position.shape
mc = c // 4
y = position.reshape(n, 4, mc, h, w)
y = np.exp(y) / np.sum(np.exp(y), axis=2, keepdims=True)
acc = np.arange(mc).reshape(1,1,mc,1,1)
return np.sum(y * acc, axis=2)
def box_process(position):
grid_h, grid_w = position.shape[2:4]
col, row = np.meshgrid(np.arange(grid_w), np.arange(grid_h))
col = col.reshape(1,1,grid_h,grid_w)
row = row.reshape(1,1,grid_h,grid_w)
grid = np.concatenate((col,row), axis=1)
stride = np.array([
IMG_SIZE[1]//grid_h,
IMG_SIZE[0]//grid_w
]).reshape(1,2,1,1)
position = dfl_numpy(position)
box_xy1 = grid + 0.5 - position[:,0:2,:,:]
box_xy2 = grid + 0.5 + position[:,2:4,:,:]
return np.concatenate((box_xy1*stride, box_xy2*stride), axis=1)
# ================== 核心修改:只用 cls 置信度 ==================
def has_bag_from_outputs(outputs):
"""
只判断是否存在 cls_prob >= OBJ_THRESH
"""
for i in range(3):
cls_map = outputs[i*3 + 1][0] # (1,H,W)
if cls_map.max() >= OBJ_THRESH:
return True
return False
# ================== RKNN 初始化 ==================
rknn = RKNNLite()
if rknn.load_rknn(RKNN_MODEL) != 0:
raise RuntimeError("❌ RKNN 模型加载失败")
if rknn.init_runtime(core_mask=RKNNLite.NPU_CORE_AUTO) != 0:
raise RuntimeError("❌ RKNN Runtime 初始化失败")
print("✅ RKNN 初始化完成")
# ================== 视频流处理 ==================
last_gray = None
frame_count = 0
while True:
cap = cv2.VideoCapture(RTSP_URL)
if not cap.isOpened():
print("❌ 无法连接 RTSP")
time.sleep(1)
continue
print("✅ 开始读取视频流")
try:
while True:
ret, frame = cap.read()
if not ret:
break
frame_count += 1
if SHOW_WINDOW:
cv2.imshow("Camera", frame)
if cv2.waitKey(1) == ord('q'):
raise KeyboardInterrupt
if frame_count % SAVE_INTERVAL != 0:
continue
print(f"处理帧 {frame_count}")
# STEP1 灰度过滤
#if is_large_gray(Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))):
#print("跳过:大面积灰色")
#continue
# STEP2 SSIM 去重
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
if last_gray is not None:
sim = ssim(gray, last_gray)
if sim > SSIM_THRESHOLD:
print(f"跳过SSIM={sim:.3f}")
continue
last_gray = gray.copy()
# STEP3 RKNN 推理(只判断 cls
img_r, scale, dx, dy = letterbox_resize(frame, IMG_SIZE)
outputs = rknn.inference([np.expand_dims(img_r, 0)])
if not has_bag_from_outputs(outputs):
print("跳过:未检测到 bag")
continue
# STEP4 磁盘检查
_, _, free = shutil.disk_usage(OUTPUT_DIR)
if free < 5 * 1024**3:
raise SystemExit("❌ 磁盘空间不足")
# STEP5 保存
ts = time.strftime("%Y%m%d_%H%M%S")
ms = int((time.time() % 1) * 1000)
path = os.path.join(OUTPUT_DIR, f"bag_{ts}_{ms:03d}.png")
cv2.imwrite(path, frame)
print(f"✅ 已保存: {path}")
except KeyboardInterrupt:
print("🛑 用户中断")
break
finally:
cap.release()
cv2.destroyAllWindows()
rknn.release()
print("程序结束")

View File

@ -0,0 +1,209 @@
import cv2
import time
import os
import numpy as np
from PIL import Image
from skimage.metrics import structural_similarity as ssim
from rknnlite.api import RKNNLite
# ================== 配置 ==================
RTSP_URL = "rtsp://admin:XJ123456@192.168.250.60:554/streaming/channels/101"
RKNN_MODEL = "bag3588.rknn"
OUTPUT_DIR = "camera_event_capture"
CONF_THRESHOLD = 0.25 # ← bag class prob 阈值(真实置信度)
SSIM_THRESHOLD = 0.9
END_MISS_FRAMES = 30
SAVE_EVERY_N_FRAMES = 1
SHOW_WINDOW = False
IMG_SIZE = (640, 640)
CLASS_NAME = ["bag"]
os.makedirs(OUTPUT_DIR, exist_ok=True)
# =====================================================
# 灰度判断≥90% 像素为灰色R≈G≈B
# =====================================================
def is_large_gray(image, gray_ratio_thresh=0.9):
img = np.array(image).astype(np.float32)
if img.ndim != 3 or img.shape[2] != 3:
return True
b, g, r = img[:, :, 0], img[:, :, 1], img[:, :, 2]
max_c = np.maximum(np.maximum(r, g), b)
min_c = np.minimum(np.minimum(r, g), b)
gray_ratio = 1.0 - (max_c - min_c) / 255.0
gray_pixels = np.sum(gray_ratio >= 0.9)
total_pixels = img.shape[0] * img.shape[1]
return (gray_pixels / total_pixels) >= gray_ratio_thresh
# =====================================================
# RKNN 推理工具
# =====================================================
def letterbox_resize(image, size, bg_color=114):
target_w, target_h = size
h, w = image.shape[:2]
scale = min(target_w / w, target_h / h)
new_w, new_h = int(w * scale), int(h * scale)
resized = cv2.resize(image, (new_w, new_h))
canvas = np.full((target_h, target_w, 3), bg_color, dtype=np.uint8)
dx, dy = (target_w - new_w) // 2, (target_h - new_h) // 2
canvas[dy:dy + new_h, dx:dx + new_w] = resized
return canvas, scale, dx, dy
def dfl_numpy(position):
n, c, h, w = position.shape
p_num = 4
mc = c // p_num
y = position.reshape(n, p_num, mc, h, w)
y = np.exp(y) / np.sum(np.exp(y), axis=2, keepdims=True)
acc = np.arange(mc).reshape(1, 1, mc, 1, 1)
return np.sum(y * acc, axis=2)
def box_process(position):
grid_h, grid_w = position.shape[2:4]
col, row = np.meshgrid(np.arange(grid_w), np.arange(grid_h))
col = col.reshape(1, 1, grid_h, grid_w)
row = row.reshape(1, 1, grid_h, grid_w)
grid = np.concatenate((col, row), axis=1)
stride = np.array([IMG_SIZE[1] // grid_w, IMG_SIZE[0] // grid_h]).reshape(1, 2, 1, 1)
position = dfl_numpy(position)
box_xy1 = grid + 0.5 - position[:, 0:2, :, :]
box_xy2 = grid + 0.5 + position[:, 2:4, :, :]
return np.concatenate((box_xy1 * stride, box_xy2 * stride), axis=1)
# =====================================================
# ✅ 关键修改:只用 class prob 作为置信度
# =====================================================
def filter_boxes(box_class_probs):
"""
rknn_model_zoo 风格:
- 没有 obj_conf
- bag 置信度 = class_prob
"""
box_class_probs = np.array(box_class_probs)
bag_scores = box_class_probs[:, 0] # 只有一个类别 bag
return np.any(bag_scores >= CONF_THRESHOLD)
def post_process(outputs):
boxes_list, class_list = [], []
for i in range(3):
boxes_list.append(box_process(outputs[i * 3]))
class_list.append(outputs[i * 3 + 1])
def flatten(x):
x = x.transpose(0, 2, 3, 1)
return x.reshape(-1, x.shape[3])
class_probs = np.concatenate([flatten(c) for c in class_list])
return filter_boxes(class_probs)
# =====================================================
# RKNN 初始化
# =====================================================
rknn = RKNNLite()
assert rknn.load_rknn(RKNN_MODEL) == 0, "RKNN 模型加载失败"
assert rknn.init_runtime(core_mask=RKNNLite.NPU_CORE_AUTO) == 0, "RKNN Runtime 初始化失败"
print("✅ RKNN 初始化完成")
# =====================================================
# 视频流 & 状态机
# =====================================================
cap = cv2.VideoCapture(RTSP_URL)
assert cap.isOpened(), "RTSP 连接失败"
print("🎥 视频流已连接")
STATE_IDLE = 0
STATE_CAPTURING = 1
state = STATE_IDLE
miss_count = 0
save_idx = 0
session_dir = None
session_id = 0
last_gray = None
frame_count = 0
try:
while True:
ret, frame = cap.read()
if not ret:
time.sleep(0.2)
continue
frame_count += 1
if SHOW_WINDOW:
cv2.imshow("Camera", frame)
if cv2.waitKey(1) == ord('q'):
break
# ---------- 灰度过滤 ----------
pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
if is_large_gray(pil_img):
continue
# ---------- SSIM ----------
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
if last_gray is not None and state == STATE_IDLE:
if ssim(gray, last_gray) > SSIM_THRESHOLD:
continue
last_gray = gray.copy()
# ---------- RKNN 推理 ----------
img_resized, _, _, _ = letterbox_resize(frame, IMG_SIZE)
outputs = rknn.inference(inputs=[np.expand_dims(img_resized, 0)])
has_bag = post_process(outputs)
# ---------- 状态机 ----------
if state == STATE_IDLE:
if has_bag:
session_id += 1
ts = time.strftime("%Y%m%d_%H%M%S")
session_dir = os.path.join(OUTPUT_DIR, f"session_{session_id:04d}_{ts}")
os.makedirs(session_dir, exist_ok=True)
print("🚀 进入采集")
state = STATE_CAPTURING
miss_count = 0
save_idx = 0
else: # STATE_CAPTURING
if has_bag:
miss_count = 0
else:
miss_count += 1
if save_idx % SAVE_EVERY_N_FRAMES == 0:
fname = f"{save_idx:06d}.png"
cv2.imwrite(os.path.join(session_dir, fname), frame)
save_idx += 1
if miss_count >= END_MISS_FRAMES:
print(f"🛑 退出采集,保存 {save_idx}")
state = STATE_IDLE
miss_count = 0
session_dir = None
except KeyboardInterrupt:
print("\n🛑 用户退出")
finally:
cap.release()
cv2.destroyAllWindows()
rknn.release()
print("程序结束")

Binary file not shown.

Before

Width:  |  Height:  |  Size: 586 KiB

After

Width:  |  Height:  |  Size: 692 KiB