diff --git a/ailai_pc/4.jpg b/ailai_pc/4.jpg new file mode 100644 index 0000000..cdadc33 Binary files /dev/null and b/ailai_pc/4.jpg differ diff --git a/ailai_pc/detet_pc.py b/ailai_pc/detet_pc.py index ea56eb8..b20ab26 100644 --- a/ailai_pc/detet_pc.py +++ b/ailai_pc/detet_pc.py @@ -6,71 +6,79 @@ import cv2 # ====================== # 配置参数 # ====================== -MODEL_PATH = '/home/hx/开发/ailai_image_obb/ailai_pc/best12.pt' -IMG_PATH = '1.jpg' +MODEL_PATH = '/home/hx/yolo/ultralytics_yolo11-main/runs/train/exp_ailai_detect2/weights/best.pt' +IMG_PATH = '4.jpg' OUTPUT_PATH = 'output_pt.jpg' CONF_THRESH = 0.5 IOU_THRESH = 0.45 -CLASS_NAMES = ['bag'] +CLASS_NAMES = ['bag', 'bag35'] # ====================== -# 主函数(优化版) +# 主函数 # ====================== def main(): device = 'cuda' if torch.cuda.is_available() else 'cpu' print(f"✅ 使用设备: {device}") - # 加载模型 - model = YOLO(MODEL_PATH) - model.to(device) + model = YOLO(MODEL_PATH).to(device) - # 推理:获取原始结果(不立即解析) print("➡️ 开始推理...") results = model(IMG_PATH, imgsz=640, conf=CONF_THRESH, device=device, verbose=True) - # 获取第一张图的结果 r = results[0] + pred = r.boxes.data # GPU tensor [N,6] - # 🚀 关键:使用原始 tensor 在 GPU 上处理 - # pred: [x1, y1, x2, y2, conf, cls] 形状为 [num_boxes, 6] - pred = r.boxes.data # 已经在 GPU 上,类型: torch.Tensor - - # 🔍 在 GPU 上做 NMS(这才是正确姿势) - # 注意:non_max_suppression 输入是 [batch, num_boxes, 6] det = non_max_suppression( - pred.unsqueeze(0), # 增加 batch 维度 + pred.unsqueeze(0), conf_thres=CONF_THRESH, iou_thres=IOU_THRESH, classes=None, agnostic=False, max_det=100 - )[0] # 取第一个(也是唯一一个)batch + )[0] - # ✅ 此时所有后处理已完成,现在才从 GPU 拷贝到 CPU - if det is not None and len(det): - det = det.cpu().numpy() # ← 只拷贝一次! - else: - det = [] + if det is None or len(det) == 0: + print("❌ 未检测到任何目标") + return - # 读取图像 + det = det.cpu().numpy() # 只拷贝一次 + + # ====================== + # ⭐ 关键:取置信度最高的结果 + # ====================== + best_det = max(det, key=lambda x: x[4]) + + x1, y1, x2, y2, conf, cls_id = best_det + x1, y1, x2, y2 = map(int, [x1, y1, x2, y2]) + cls_id = int(cls_id) + cls_name = CLASS_NAMES[cls_id] + + print("\n🏆 置信度最高结果:") + print(f" 类别: {cls_name}") + print(f" 置信度: {conf:.3f}") + print(f" 框: [{x1}, {y1}, {x2}, {y2}]") + + # ====================== + # 可视化(只画最高的) + # ====================== img = cv2.imread(IMG_PATH) if img is None: raise FileNotFoundError(f"无法读取图像: {IMG_PATH}") - print("\n📋 检测结果:") - for *xyxy, conf, cls_id in det: - x1, y1, x2, y2 = map(int, xyxy) - cls_name = CLASS_NAMES[int(cls_id)] - print(f" 类别: {cls_name}, 置信度: {conf:.3f}, 框: [{x1}, {y1}, {x2}, {y2}]") + cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2) + label = f"{cls_name} {conf:.2f}" + cv2.putText( + img, + label, + (x1, max(y1 - 10, 0)), + cv2.FONT_HERSHEY_SIMPLEX, + 0.9, + (0, 255, 0), + 2 + ) - # 画框和标签 - cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2) - label = f"{cls_name} {conf:.2f}" - cv2.putText(img, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2) - - # 保存结果 cv2.imwrite(OUTPUT_PATH, img) print(f"\n🖼️ 可视化结果已保存: {OUTPUT_PATH}") if __name__ == '__main__': - main() \ No newline at end of file + main() diff --git a/ailai_pc/detet_pc_f.py b/ailai_pc/detet_pc_f.py index f2c7658..0bd795e 100644 --- a/ailai_pc/detet_pc_f.py +++ b/ailai_pc/detet_pc_f.py @@ -1,135 +1,131 @@ from ultralytics import YOLO from ultralytics.utils.ops import non_max_suppression import torch -import cv2 import os import time +import shutil from pathlib import Path # ====================== # 配置参数 # ====================== -MODEL_PATH = 'detect.pt' # 你的模型路径 -INPUT_FOLDER = '/home/hx/开发/ailai_image_obb/ailai_pc/train' # 输入图片文件夹 -OUTPUT_FOLDER = '/home/hx/开发/ailai_image_obb/ailai_pc/results' # 输出结果文件夹(自动创建) -CONF_THRESH = 0.5 +MODEL_PATH = '/home/hx/yolo/ultralytics_yolo11-main/runs/train/exp_ailai_detect/weights/best.pt' +INPUT_FOLDER = '/media/hx/04e879fa-d697-4b02-ac7e-a4148876ebb0/dataset/ailaidete/train/bag' +OUTPUT_FOLDER = '/media/hx/04e879fa-d697-4b02-ac7e-a4148876ebb0/dataset/ailaidete/train/bag' + +CONF_BUCKETS = [0.93, 0.95] # ← ⭐ 自己改这里 IOU_THRESH = 0.45 CLASS_NAMES = ['bag'] + DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu' IMG_SIZE = 640 -SHOW_IMAGE = False # 是否逐张显示图像(适合调试) -# 支持的图像格式 IMG_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp'} # ====================== -# 获取文件夹中所有图片路径 +# 获取图片路径 # ====================== def get_image_paths(folder): folder = Path(folder) - if not folder.exists(): - raise FileNotFoundError(f"输入文件夹不存在: {folder}") - paths = [p for p in folder.iterdir() if p.suffix.lower() in IMG_EXTENSIONS] - if not paths: - print(f"⚠️ 在 {folder} 中未找到图片") - return sorted(paths) # 按名称排序 + return sorted([p for p in folder.iterdir() if p.suffix.lower() in IMG_EXTENSIONS]) # ====================== -# 主函数(批量推理) +# 防止重名覆盖 +# ====================== +def safe_move(src, dst_dir): + os.makedirs(dst_dir, exist_ok=True) + dst = os.path.join(dst_dir, os.path.basename(src)) + if not os.path.exists(dst): + shutil.move(src, dst) + return dst + + stem, suffix = os.path.splitext(os.path.basename(src)) + i = 1 + while True: + new_dst = os.path.join(dst_dir, f"{stem}_{i}{suffix}") + if not os.path.exists(new_dst): + shutil.move(src, new_dst) + return new_dst + i += 1 + + +# ====================== +# 根据置信度选择目录 +# ====================== +def get_bucket_dir(max_conf, output_root, buckets): + for th in sorted(buckets, reverse=True): + if max_conf >= th: + return os.path.join(output_root, f"bag_{th}") + return os.path.join(output_root, "delet") + + +# ====================== +# 主逻辑 # ====================== def main(): print(f"✅ 使用设备: {DEVICE}") - # 创建输出文件夹 - os.makedirs(OUTPUT_FOLDER, exist_ok=True) - print(f"📁 输出结果将保存到: {OUTPUT_FOLDER}") + model = YOLO(MODEL_PATH).to(DEVICE) - # 加载模型 - print("➡️ 加载 YOLO 模型...") - model = YOLO(MODEL_PATH) - model.to(DEVICE) - - # 获取图片列表 - img_paths = get_image_paths(INPUT_FOLDER) + img_paths = get_image_paths(Path(INPUT_FOLDER)) if not img_paths: + print("⚠️ 没有图片") return - print(f"📸 共找到 {len(img_paths)} 张图片,开始批量推理...\n") - - total_start_time = time.time() + print(f"📸 共 {len(img_paths)} 张图片") + print(f"📊 置信度档位: {CONF_BUCKETS}\n") for idx, img_path in enumerate(img_paths, 1): - print(f"{'=' * 50}") - print(f"🖼️ 处理第 {idx}/{len(img_paths)} 张: {img_path.name}") + print(f"{'='*50}") + print(f"🖼️ {idx}/{len(img_paths)}: {img_path.name}") - # 手动计时 start_time = time.time() - # 推理(verbose=True 输出内部耗时) - results = model(str(img_path), imgsz=IMG_SIZE, conf=CONF_THRESH, device=DEVICE, verbose=True) - inference_time = time.time() - start_time + results = model( + str(img_path), + imgsz=IMG_SIZE, + conf=min(CONF_BUCKETS), + device=DEVICE, + verbose=False + ) - # 获取结果 r = results[0] - pred = r.boxes.data # GPU 上的原始输出 + pred = r.boxes.data - # 在 GPU 上做 NMS det = non_max_suppression( pred.unsqueeze(0), - conf_thres=CONF_THRESH, + conf_thres=min(CONF_BUCKETS), iou_thres=IOU_THRESH, classes=None, agnostic=False, max_det=100 )[0] - # 拷贝到 CPU(仅一次) if det is not None and len(det): det = det.cpu().numpy() else: det = [] - # 读取图像并绘制 - img = cv2.imread(str(img_path)) - if img is None: - print(f"❌ 无法读取图像: {img_path}") - continue + max_conf = 0.0 + for *_, conf, cls_id in det: + if int(cls_id) == 0: + max_conf = max(max_conf, float(conf)) - print(f"\n📋 检测结果:") - for *xyxy, conf, cls_id in det: - x1, y1, x2, y2 = map(int, xyxy) - cls_name = CLASS_NAMES[int(cls_id)] - print(f" 类别: {cls_name}, 置信度: {conf:.3f}, 框: [{x1}, {y1}, {x2}, {y2}]") - cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2) - label = f"{cls_name} {conf:.2f}" - cv2.putText(img, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2) + dst_dir = get_bucket_dir(max_conf, OUTPUT_FOLDER, CONF_BUCKETS) + new_path = safe_move(str(img_path), dst_dir) - # 保存结果 - output_path = os.path.join(OUTPUT_FOLDER, f"result_{img_path.name}") - cv2.imwrite(output_path, img) - print(f"\n✅ 结果已保存: {output_path}") + if max_conf > 0: + print(f"✅ bag max_conf={max_conf:.3f} → {os.path.basename(dst_dir)}") + else: + print("❌ 未检测到 bag") - # 显示(可选) - if SHOW_IMAGE: - cv2.imshow("Detection", img) - if cv2.waitKey(1) & 0xFF == ord('q'): # 按 Q 退出 - break + print(f"🚚 已移动到: {new_path}") + print(f"⏱️ {(time.time() - start_time)*1000:.1f} ms") - # 输出总耗时 - total_infer_time = time.time() - start_time - print(f"⏱️ 总处理时间: {total_infer_time * 1000:.1f}ms (推理+后处理)") - - # 结束 - total_elapsed = time.time() - total_start_time - print(f"\n🎉 批量推理完成!共处理 {len(img_paths)} 张图片,总耗时: {total_elapsed:.2f} 秒") - print( - f"🚀 平均每张: {total_elapsed / len(img_paths) * 1000:.1f} ms ({1 / (total_elapsed / len(img_paths)):.1f} FPS)") - - if SHOW_IMAGE: - cv2.destroyAllWindows() + print("\n🎉 全部处理完成") if __name__ == '__main__': - main() \ No newline at end of file + main() diff --git a/detect_image/bag3568.rknn b/ailai_pc/image_capture_detect/bag3568.rknn similarity index 52% rename from detect_image/bag3568.rknn rename to ailai_pc/image_capture_detect/bag3568.rknn index 9c261e6..2ffac1b 100644 Binary files a/detect_image/bag3568.rknn and b/ailai_pc/image_capture_detect/bag3568.rknn differ diff --git a/ailai_pc/image_capture_detect/bag3588.rknn b/ailai_pc/image_capture_detect/bag3588.rknn new file mode 100644 index 0000000..b003b38 Binary files /dev/null and b/ailai_pc/image_capture_detect/bag3588.rknn differ diff --git a/ailai_pc/image_capture_detect/bag_detect.py b/ailai_pc/image_capture_detect/bag_detect.py new file mode 100644 index 0000000..ad56e23 --- /dev/null +++ b/ailai_pc/image_capture_detect/bag_detect.py @@ -0,0 +1,180 @@ +import os +import cv2 +import numpy as np +from rknnlite.api import RKNNLite + +# ====================== 配置 ====================== +MODEL_PATH = "bag3588.rknn" +IMG_PATH = "1.jpg" +IMG_SIZE = (640, 640) + +OBJ_THRESH = 0.25 # objectness * class_prob +NMS_THRESH = 0.45 + +CLASS_NAME = ["bag", "bag35"] + +OUTPUT_DIR = "./result" +os.makedirs(OUTPUT_DIR, exist_ok=True) + +VISUALIZE = True # False = 只输出类别和置信度,不保存图 + +# ====================== 工具函数 ====================== +def softmax(x, axis=-1): + x = x - np.max(x, axis=axis, keepdims=True) + exp_x = np.exp(x) + return exp_x / np.sum(exp_x, axis=axis, keepdims=True) + +def letterbox_resize(image, size, bg_color=114): + target_w, target_h = size + h, w = image.shape[:2] + scale = min(target_w / w, target_h / h) + + new_w, new_h = int(w * scale), int(h * scale) + resized = cv2.resize(image, (new_w, new_h)) + + canvas = np.full((target_h, target_w, 3), bg_color, dtype=np.uint8) + dx = (target_w - new_w) // 2 + dy = (target_h - new_h) // 2 + canvas[dy:dy + new_h, dx:dx + new_w] = resized + + return canvas, scale, dx, dy + +# ====================== DFL 解码 ====================== +def dfl_decode(reg): + reg = reg.reshape(4, -1) + prob = softmax(reg, axis=1) + acc = np.arange(reg.shape[1]) + return np.sum(prob * acc, axis=1) + +# ====================== NMS ====================== +def nms(boxes, scores, thresh): + boxes = np.array(boxes) + scores = np.array(scores) + + x1, y1, x2, y2 = boxes.T + areas = (x2 - x1) * (y2 - y1) + order = scores.argsort()[::-1] + + keep = [] + while order.size > 0: + i = order[0] + keep.append(i) + + xx1 = np.maximum(x1[i], x1[order[1:]]) + yy1 = np.maximum(y1[i], y1[order[1:]]) + xx2 = np.minimum(x2[i], x2[order[1:]]) + yy2 = np.minimum(y2[i], y2[order[1:]]) + + inter = np.maximum(0, xx2 - xx1) * np.maximum(0, yy2 - yy1) + iou = inter / (areas[i] + areas[order[1:]] - inter) + + order = order[1:][iou <= thresh] + + return keep + +# ====================== 后处理 ====================== +def post_process(outputs, scale, dx, dy): + boxes_all, scores_all, classes_all = [], [], [] + + strides = [8, 16, 32] + + for i, stride in enumerate(strides): + reg = outputs[i * 3 + 0][0] + cls = outputs[i * 3 + 1][0] + obj = outputs[i * 3 + 2][0] + + num_classes, H, W = cls.shape + + for h in range(H): + for w in range(W): + class_prob = cls[:, h, w] + cls_id = int(np.argmax(class_prob)) + cls_score = class_prob[cls_id] + + obj_score = obj[0, h, w] + final_score = cls_score * obj_score + + if final_score < OBJ_THRESH: + continue + + l, t, r, b = dfl_decode(reg[:, h, w]) + + cx = (w + 0.5) * stride + cy = (h + 0.5) * stride + + x1 = cx - l * stride + y1 = cy - t * stride + x2 = cx + r * stride + y2 = cy + b * stride + + boxes_all.append([x1, y1, x2, y2]) + scores_all.append(final_score) + classes_all.append(cls_id) + + if not boxes_all: + return None, None, None + + keep = nms(boxes_all, scores_all, NMS_THRESH) + + boxes = np.array(boxes_all)[keep] + scores = np.array(scores_all)[keep] + classes = np.array(classes_all)[keep] + + boxes[:, [0, 2]] = (boxes[:, [0, 2]] - dx) / scale + boxes[:, [1, 3]] = (boxes[:, [1, 3]] - dy) / scale + + return boxes, classes, scores + +# ====================== 推理入口 ====================== +def detect_single_image(img_path, visualize=True): + rknn = RKNNLite() + rknn.load_rknn(MODEL_PATH) + rknn.init_runtime() + + img = cv2.imread(img_path) + if img is None: + raise FileNotFoundError(img_path) + + img_r, scale, dx, dy = letterbox_resize(img, IMG_SIZE) + outputs = rknn.inference([np.expand_dims(img_r, 0)]) + + boxes, cls_ids, scores = post_process(outputs, scale, dx, dy) + + if boxes is None or len(scores) == 0: + print("未检测到目标") + rknn.release() + return None, None + + best_idx = int(np.argmax(scores)) + best_score = float(scores[best_idx]) + best_cls_id = int(cls_ids[best_idx]) + best_cls_name = CLASS_NAME[best_cls_id] + best_box = boxes[best_idx] + + # ======== 可视化(可选) ======== + if visualize: + x1, y1, x2, y2 = best_box.astype(int) + cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2) + cv2.putText( + img, + f"{best_cls_name}:{best_score:.3f}", + (x1, max(y1 - 5, 0)), + cv2.FONT_HERSHEY_SIMPLEX, + 0.6, + (0, 255, 0), + 2 + ) + + save_path = os.path.join(OUTPUT_DIR, os.path.basename(img_path)) + cv2.imwrite(save_path, img) + print("可视化结果已保存:", save_path) + + rknn.release() + return best_cls_name, best_score + +# ====================== 主入口 ====================== +if __name__ == "__main__": + best_cls_name, best_score = detect_single_image(IMG_PATH, visualize=VISUALIZE) + # ======== 只输出你要的 ======== + print(f"类别: {best_cls_name}, 置信度: {best_score:.4f}") + diff --git a/ailai_pc/image_capture_detect/capture-image_1.py b/ailai_pc/image_capture_detect/capture-image_1.py new file mode 100644 index 0000000..b0dc154 --- /dev/null +++ b/ailai_pc/image_capture_detect/capture-image_1.py @@ -0,0 +1,223 @@ +import cv2 +import time +import os +import numpy as np +from PIL import Image +from skimage.metrics import structural_similarity as ssim +import shutil +from rknnlite.api import RKNNLite + +# ================== 配置参数 ================== +RTSP_URL = "rtsp://admin:ailaimiye123@192.168.0.125:554/streaming/channels/101" +SAVE_INTERVAL = 15 +SSIM_THRESHOLD = 0.9 +OUTPUT_DIR = "camera_test" +MODEL_PATH = "bag3568.rknn" +SHOW_WINDOW = False + +GRAY_LOWER = 70 +GRAY_UPPER = 230 +GRAY_RATIO_THRESHOLD = 0.7 + +IMG_SIZE = (640, 640) +OBJ_THRESH = 0.25 +NMS_THRESH = 0.45 +CLASS_NAME = ["bag"] + +os.makedirs(OUTPUT_DIR, exist_ok=True) + +# ================== 基础工具 ================== +def softmax(x, axis=-1): + x = x - np.max(x, axis=axis, keepdims=True) + exp_x = np.exp(x) + return exp_x / np.sum(exp_x, axis=axis, keepdims=True) + +def letterbox_resize(image, size, bg_color=114): + target_w, target_h = size + h, w = image.shape[:2] + scale = min(target_w / w, target_h / h) + new_w, new_h = int(w * scale), int(h * scale) + resized = cv2.resize(image, (new_w, new_h)) + canvas = np.full((target_h, target_w, 3), bg_color, dtype=np.uint8) + dx, dy = (target_w - new_w) // 2, (target_h - new_h) // 2 + canvas[dy:dy+new_h, dx:dx+new_w] = resized + return canvas, scale, dx, dy + +# ================== DFL ================== +def dfl_decode(reg): + reg = reg.reshape(4, -1) + prob = softmax(reg, axis=1) + acc = np.arange(reg.shape[1]) + return np.sum(prob * acc, axis=1) + +# ================== NMS ================== +def nms(boxes, scores, thresh): + boxes = np.array(boxes) + scores = np.array(scores) + + x1, y1, x2, y2 = boxes.T + areas = (x2 - x1) * (y2 - y1) + order = scores.argsort()[::-1] + + keep = [] + while order.size > 0: + i = order[0] + keep.append(i) + + xx1 = np.maximum(x1[i], x1[order[1:]]) + yy1 = np.maximum(y1[i], y1[order[1:]]) + xx2 = np.minimum(x2[i], x2[order[1:]]) + yy2 = np.minimum(y2[i], y2[order[1:]]) + + inter = np.maximum(0, xx2 - xx1) * np.maximum(0, yy2 - yy1) + iou = inter / (areas[i] + areas[order[1:]] - inter) + + order = order[1:][iou <= thresh] + + return keep + +# ================== 后处理 ================== +def post_process(outputs, scale, dx, dy): + boxes_all, scores_all, classes_all = [], [], [] + strides = [8, 16, 32] + + for i, stride in enumerate(strides): + reg = outputs[i*3 + 0][0] + cls = outputs[i*3 + 1][0] + obj = outputs[i*3 + 2][0] + + num_classes, H, W = cls.shape + for h in range(H): + for w in range(W): + class_prob = cls[:, h, w] + cls_id = np.argmax(class_prob) + score = class_prob[cls_id] + + obj_score = obj[0, h, w] + final_score = score * obj_score + if final_score < OBJ_THRESH: + continue + + l, t, r, b = dfl_decode(reg[:, h, w]) + cx = (w + 0.5) * stride + cy = (h + 0.5) * stride + + x1 = cx - l * stride + y1 = cy - t * stride + x2 = cx + r * stride + y2 = cy + b * stride + + boxes_all.append([x1, y1, x2, y2]) + scores_all.append(final_score) + classes_all.append(cls_id) + + if len(boxes_all) == 0: + return None, None, None + + keep = nms(boxes_all, scores_all, NMS_THRESH) + boxes = np.array(boxes_all)[keep] + scores = np.array(scores_all)[keep] + classes = np.array(classes_all)[keep] + + boxes[:, [0, 2]] = (boxes[:, [0, 2]] - dx) / scale + boxes[:, [1, 3]] = (boxes[:, [1, 3]] - dy) / scale + + return boxes, classes, scores + +# ================== 灰度判断 ================== +def is_large_gray(image): + img = np.array(image) + if img.ndim != 3 or img.shape[2] != 3: + return True + h, w, _ = img.shape + gray_mask = ( + (img[:,:,0] >= GRAY_LOWER) & (img[:,:,0] <= GRAY_UPPER) & + (img[:,:,1] >= GRAY_LOWER) & (img[:,:,1] <= GRAY_UPPER) & + (img[:,:,2] >= GRAY_LOWER) & (img[:,:,2] <= GRAY_UPPER) + ) + return gray_mask.sum() / (h * w) > GRAY_RATIO_THRESHOLD + +# ================== RKNN 初始化 ================== +rknn = RKNNLite() +if rknn.load_rknn(MODEL_PATH) != 0: + raise RuntimeError("❌ RKNN 模型加载失败") +if rknn.init_runtime() != 0: + raise RuntimeError("❌ RKNN Runtime 初始化失败") +print("✅ RKNN 初始化完成") + +# ================== 视频流处理 ================== +last_gray = None +frame_count = 0 + +while True: + cap = cv2.VideoCapture(RTSP_URL) + if not cap.isOpened(): + print("❌ 无法连接 RTSP") + time.sleep(1) + continue + + print("✅ 开始读取视频流") + + try: + while True: + ret, frame = cap.read() + if not ret: + break + + frame_count += 1 + + if SHOW_WINDOW: + cv2.imshow("Camera", frame) + if cv2.waitKey(1) == ord('q'): + raise KeyboardInterrupt + + if frame_count % SAVE_INTERVAL != 0: + continue + + print(f"处理帧 {frame_count}") + + # STEP1 灰度过滤(可启用) + # if is_large_gray(Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))): + # print("跳过:大面积灰色") + # continue + + # STEP2 SSIM 去重 + gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + if last_gray is not None: + sim = ssim(gray, last_gray) + if sim > SSIM_THRESHOLD: + print(f"跳过:SSIM={sim:.3f}") + continue + last_gray = gray.copy() + + # STEP3 RKNN 推理(和 detect_single_image 一样) + img_r, scale, dx, dy = letterbox_resize(frame, IMG_SIZE) + outputs = rknn.inference([np.expand_dims(img_r, 0)]) + boxes, cls_ids, scores = post_process(outputs, scale, dx, dy) + + if boxes is None or len(boxes) == 0: + print("跳过:未检测到 bag") + continue + + # STEP4 磁盘检查 + _, _, free = shutil.disk_usage(OUTPUT_DIR) + if free < 5 * 1024**3: + raise SystemExit("❌ 磁盘空间不足") + + # STEP5 保存 + ts = time.strftime("%Y%m%d_%H%M%S") + ms = int((time.time() % 1) * 1000) + path = os.path.join(OUTPUT_DIR, f"bag_{ts}_{ms:03d}.png") + cv2.imwrite(path, frame) + print(f"✅ 已保存: {path}") + + except KeyboardInterrupt: + print("🛑 用户中断") + break + finally: + cap.release() + cv2.destroyAllWindows() + +rknn.release() +print("程序结束") + diff --git a/ailai_pc/image_capture_detect/capture-image_2.py b/ailai_pc/image_capture_detect/capture-image_2.py new file mode 100644 index 0000000..6014b23 --- /dev/null +++ b/ailai_pc/image_capture_detect/capture-image_2.py @@ -0,0 +1,213 @@ +import cv2 +import time +import os +import numpy as np +from PIL import Image +from skimage.metrics import structural_similarity as ssim +from rknnlite.api import RKNNLite +import shutil + +# ================== 配置 ================== +RTSP_URL = "rtsp://admin:ailaimiye123@192.168.0.125:554/streaming/channels/101" +RKNN_MODEL = "bag3568.rknn" +OUTPUT_DIR = "camera_event_capture" + +CONF_THRESHOLD = 0.25 # bag 最终置信度阈值(obj * class_prob) +SSIM_THRESHOLD = 0.9 + +END_MISS_FRAMES = 30 +SAVE_EVERY_N_FRAMES = 1 +SHOW_WINDOW = False + +IMG_SIZE = (640, 640) +CLASS_NAME = ["bag"] +NMS_THRESH = 0.45 + +os.makedirs(OUTPUT_DIR, exist_ok=True) + +# ================== 基础工具 ================== +def softmax(x, axis=-1): + x = x - np.max(x, axis=axis, keepdims=True) + exp_x = np.exp(x) + return exp_x / np.sum(exp_x, axis=axis, keepdims=True) + +def letterbox_resize(image, size, bg_color=114): + target_w, target_h = size + h, w = image.shape[:2] + scale = min(target_w / w, target_h / h) + new_w, new_h = int(w * scale), int(h * scale) + resized = cv2.resize(image, (new_w, new_h)) + canvas = np.full((target_h, target_w, 3), bg_color, dtype=np.uint8) + dx, dy = (target_w - new_w) // 2, (target_h - new_h) // 2 + canvas[dy:dy+new_h, dx:dx+new_w] = resized + return canvas, scale, dx, dy + +def dfl_decode(reg): + reg = reg.reshape(4, -1) + prob = softmax(reg, axis=1) + acc = np.arange(reg.shape[1]) + return np.sum(prob * acc, axis=1) + +def nms(boxes, scores, thresh): + boxes = np.array(boxes) + scores = np.array(scores) + x1, y1, x2, y2 = boxes.T + areas = (x2 - x1) * (y2 - y1) + order = scores.argsort()[::-1] + keep = [] + while order.size > 0: + i = order[0] + keep.append(i) + xx1 = np.maximum(x1[i], x1[order[1:]]) + yy1 = np.maximum(y1[i], y1[order[1:]]) + xx2 = np.minimum(x2[i], x2[order[1:]]) + yy2 = np.minimum(y2[i], y2[order[1:]]) + inter = np.maximum(0, xx2 - xx1) * np.maximum(0, yy2 - yy1) + iou = inter / (areas[i] + areas[order[1:]] - inter) + order = order[1:][iou <= thresh] + return keep + +def post_process(outputs, scale, dx, dy): + boxes_all, scores_all, classes_all = [], [], [] + strides = [8, 16, 32] + + for i, stride in enumerate(strides): + reg = outputs[i*3 + 0][0] + cls = outputs[i*3 + 1][0] + obj = outputs[i*3 + 2][0] + + num_classes, H, W = cls.shape + for h in range(H): + for w in range(W): + class_prob = cls[:, h, w] + cls_id = np.argmax(class_prob) + score = class_prob[cls_id] + obj_score = obj[0, h, w] + final_score = score * obj_score + if final_score < CONF_THRESHOLD: + continue + l, t, r, b = dfl_decode(reg[:, h, w]) + cx = (w + 0.5) * stride + cy = (h + 0.5) * stride + x1 = cx - l * stride + y1 = cy - t * stride + x2 = cx + r * stride + y2 = cy + b * stride + boxes_all.append([x1, y1, x2, y2]) + scores_all.append(final_score) + classes_all.append(cls_id) + + if len(boxes_all) == 0: + return None, None, None + + keep = nms(boxes_all, scores_all, NMS_THRESH) + boxes = np.array(boxes_all)[keep] + scores = np.array(scores_all)[keep] + classes = np.array(classes_all)[keep] + boxes[:, [0,2]] = (boxes[:, [0,2]] - dx) / scale + boxes[:, [1,3]] = (boxes[:, [1,3]] - dy) / scale + return boxes, classes, scores + +# ================== 灰度判断 ================== +def is_large_gray(image, gray_ratio_thresh=0.9): + img = np.array(image).astype(np.float32) + if img.ndim != 3 or img.shape[2] != 3: + return True + b, g, r = img[:,:,0], img[:,:,1], img[:,:,2] + max_c = np.maximum(np.maximum(r,g), b) + min_c = np.minimum(np.minimum(r,g), b) + gray_ratio = 1.0 - (max_c - min_c)/255.0 + gray_pixels = np.sum(gray_ratio >= 0.9) + total_pixels = img.shape[0]*img.shape[1] + return (gray_pixels/total_pixels) >= gray_ratio_thresh + +# ================== RKNN 初始化 ================== +rknn = RKNNLite() +assert rknn.load_rknn(RKNN_MODEL) == 0, "RKNN 模型加载失败" +assert rknn.init_runtime() == 0, "RKNN Runtime 初始化失败" +print("✅ RKNN 初始化完成") + +# ================== 视频流 & 状态机 ================== +cap = cv2.VideoCapture(RTSP_URL) +assert cap.isOpened(), "RTSP 连接失败" +print("🎥 视频流已连接") + +STATE_IDLE = 0 +STATE_CAPTURING = 1 + +state = STATE_IDLE +miss_count = 0 +save_idx = 0 +session_dir = None +session_id = 0 +last_gray = None +frame_count = 0 + +try: + while True: + ret, frame = cap.read() + if not ret: + time.sleep(0.2) + continue + frame_count += 1 + + if SHOW_WINDOW: + cv2.imshow("Camera", frame) + if cv2.waitKey(1) == ord('q'): + break + + # ---------- 灰度过滤 ---------- + #pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) + #if is_large_gray(pil_img): + # continue + + # ---------- SSIM ---------- + gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + if last_gray is not None and state == STATE_IDLE: + if ssim(gray, last_gray) > SSIM_THRESHOLD: + continue + last_gray = gray.copy() + + # ---------- RKNN 推理 ---------- + img_r, scale, dx, dy = letterbox_resize(frame, IMG_SIZE) + outputs = rknn.inference([np.expand_dims(img_r, 0)]) + boxes, cls_ids, scores = post_process(outputs, scale, dx, dy) + has_bag = boxes is not None and len(boxes) > 0 + + # ---------- 状态机 ---------- + if state == STATE_IDLE: + if has_bag: + session_id += 1 + ts = time.strftime("%Y%m%d_%H%M%S") + session_dir = os.path.join(OUTPUT_DIR, f"session_{session_id:04d}_{ts}") + os.makedirs(session_dir, exist_ok=True) + print("🚀 进入采集") + state = STATE_CAPTURING + miss_count = 0 + save_idx = 0 + else: + if has_bag: + miss_count = 0 + else: + miss_count += 1 + + if save_idx % SAVE_EVERY_N_FRAMES == 0: + fname = f"{save_idx:06d}.png" + cv2.imwrite(os.path.join(session_dir, fname), frame) + save_idx += 1 + + if miss_count >= END_MISS_FRAMES: + print(f"🛑 退出采集,保存 {save_idx} 帧") + state = STATE_IDLE + miss_count = 0 + session_dir = None + +except KeyboardInterrupt: + print("\n🛑 用户退出") + +finally: + cap.release() + cv2.destroyAllWindows() + rknn.release() + print("程序结束") + diff --git a/detect_image/image_01_3588.py b/ailai_pc/image_capture_detect/image_01_3588.py similarity index 52% rename from detect_image/image_01_3588.py rename to ailai_pc/image_capture_detect/image_01_3588.py index de9cde6..d822dbf 100644 --- a/detect_image/image_01_3588.py +++ b/ailai_pc/image_capture_detect/image_01_3588.py @@ -9,20 +9,18 @@ from rknnlite.api import RKNNLite # ================== 配置参数 ================== RTSP_URL = "rtsp://admin:XJ123456@192.168.250.60:554/streaming/channels/101" +RKNN_MODEL = "bag3588.rknn" SAVE_INTERVAL = 15 SSIM_THRESHOLD = 0.9 OUTPUT_DIR = "camera_test" -RKNN_MODEL = "bag3588.rknn" SHOW_WINDOW = False -# 灰度判断参数 GRAY_LOWER = 70 GRAY_UPPER = 230 GRAY_RATIO_THRESHOLD = 0.7 IMG_SIZE = (640, 640) -OBJ_THRESH = 0.001 -NMS_THRESH = 0.45 +OBJ_THRESH = 0.25 CLASS_NAME = ["bag"] os.makedirs(OUTPUT_DIR, exist_ok=True) @@ -34,13 +32,13 @@ def is_large_gray(image): return True h, w, _ = img.shape gray_mask = ( - (img[:, :, 0] >= GRAY_LOWER) & (img[:, :, 0] <= GRAY_UPPER) & - (img[:, :, 1] >= GRAY_LOWER) & (img[:, :, 1] <= GRAY_UPPER) & - (img[:, :, 2] >= GRAY_LOWER) & (img[:, :, 2] <= GRAY_UPPER) + (img[:,:,0] >= GRAY_LOWER) & (img[:,:,0] <= GRAY_UPPER) & + (img[:,:,1] >= GRAY_LOWER) & (img[:,:,1] <= GRAY_UPPER) & + (img[:,:,2] >= GRAY_LOWER) & (img[:,:,2] <= GRAY_UPPER) ) return gray_mask.sum() / (h * w) > GRAY_RATIO_THRESHOLD -# ================== RKNN 工具函数 ================== +# ================== RKNN 工具 ================== def letterbox_resize(image, size, bg_color=114): target_w, target_h = size h, w = image.shape[:2] @@ -49,14 +47,14 @@ def letterbox_resize(image, size, bg_color=114): resized = cv2.resize(image, (new_w, new_h)) canvas = np.full((target_h, target_w, 3), bg_color, dtype=np.uint8) dx, dy = (target_w - new_w) // 2, (target_h - new_h) // 2 - canvas[dy:dy + new_h, dx:dx + new_w] = resized + canvas[dy:dy+new_h, dx:dx+new_w] = resized return canvas, scale, dx, dy +# ================== DFL ================== def dfl_numpy(position): n, c, h, w = position.shape - p_num = 4 - mc = c // p_num - y = position.reshape(n, p_num, mc, h, w) + mc = c // 4 + y = position.reshape(n, 4, mc, h, w) y = np.exp(y) / np.sum(np.exp(y), axis=2, keepdims=True) acc = np.arange(mc).reshape(1,1,mc,1,1) return np.sum(y * acc, axis=2) @@ -67,41 +65,26 @@ def box_process(position): col = col.reshape(1,1,grid_h,grid_w) row = row.reshape(1,1,grid_h,grid_w) grid = np.concatenate((col,row), axis=1) - stride = np.array([IMG_SIZE[1]//grid_h, IMG_SIZE[0]//grid_w]).reshape(1,2,1,1) + stride = np.array([ + IMG_SIZE[1]//grid_h, + IMG_SIZE[0]//grid_w + ]).reshape(1,2,1,1) + position = dfl_numpy(position) - box_xy = grid + 0.5 - position[:,0:2,:,:] + box_xy1 = grid + 0.5 - position[:,0:2,:,:] box_xy2 = grid + 0.5 + position[:,2:4,:,:] - return np.concatenate((box_xy*stride, box_xy2*stride), axis=1) + return np.concatenate((box_xy1*stride, box_xy2*stride), axis=1) -def filter_boxes(boxes, box_confidences, box_class_probs): - boxes = boxes.reshape(-1,4) - box_confidences = box_confidences.reshape(-1) - box_class_probs = np.array(box_class_probs) - - class_ids = np.argmax(box_class_probs, axis=-1) - class_scores = box_class_probs[np.arange(len(class_ids)), class_ids] - scores = box_confidences * class_scores - - mask = scores >= OBJ_THRESH - if np.sum(mask) == 0: - return None - return True # 只需要判断是否有目标 - -def post_process(outputs, scale, dx, dy): - boxes_list, conf_list, class_list = [], [], [] +# ================== 核心修改:只用 cls 置信度 ================== +def has_bag_from_outputs(outputs): + """ + 只判断是否存在 cls_prob >= OBJ_THRESH + """ for i in range(3): - boxes_list.append(box_process(outputs[i*3])) - conf_list.append(outputs[i*3+2]) - class_list.append(outputs[i*3+1]) - - def flatten(x): - x = x.transpose(0,2,3,1) - return x.reshape(-1,x.shape[3]) - - boxes = np.concatenate([flatten(b) for b in boxes_list]) - box_conf = np.concatenate([flatten(c) for c in conf_list]) - class_probs = np.concatenate([flatten(c) for c in class_list]) - return filter_boxes(boxes, box_conf, class_probs) + cls_map = outputs[i*3 + 1][0] # (1,H,W) + if cls_map.max() >= OBJ_THRESH: + return True + return False # ================== RKNN 初始化 ================== rknn = RKNNLite() @@ -112,22 +95,15 @@ if rknn.init_runtime(core_mask=RKNNLite.NPU_CORE_AUTO) != 0: print("✅ RKNN 初始化完成") # ================== 视频流处理 ================== -max_retry_seconds = 10 -retry_interval_seconds = 1 - last_gray = None frame_count = 0 while True: cap = cv2.VideoCapture(RTSP_URL) - start_time = time.time() - - while not cap.isOpened(): - if time.time() - start_time >= max_retry_seconds: - print("❌ 无法连接 RTSP") - exit(1) - time.sleep(retry_interval_seconds) - cap = cv2.VideoCapture(RTSP_URL) + if not cap.isOpened(): + print("❌ 无法连接 RTSP") + time.sleep(1) + continue print("✅ 开始读取视频流") @@ -135,7 +111,6 @@ while True: while True: ret, frame = cap.read() if not ret: - print("❌ 读取失败") break frame_count += 1 @@ -150,13 +125,12 @@ while True: print(f"处理帧 {frame_count}") - # STEP1: 灰度过滤 - pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) - if is_large_gray(pil_image): - print("跳过:大面积灰色") - continue + # STEP1 灰度过滤 + #if is_large_gray(Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))): + #print("跳过:大面积灰色") + #continue - # STEP2: SSIM 去重 + # STEP2 SSIM 去重 gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) if last_gray is not None: sim = ssim(gray, last_gray) @@ -165,37 +139,32 @@ while True: continue last_gray = gray.copy() - # STEP3: RKNN 推理,只判断是否有 bag - img_resized, scale, dx, dy = letterbox_resize(frame, IMG_SIZE) - input_data = np.expand_dims(img_resized, 0) - outputs = rknn.inference(inputs=[input_data]) - has_bag = post_process(outputs, scale, dx, dy) - if not has_bag: + # STEP3 RKNN 推理(只判断 cls) + img_r, scale, dx, dy = letterbox_resize(frame, IMG_SIZE) + outputs = rknn.inference([np.expand_dims(img_r, 0)]) + + if not has_bag_from_outputs(outputs): print("跳过:未检测到 bag") continue - # STEP4: 磁盘检查 + # STEP4 磁盘检查 _, _, free = shutil.disk_usage(OUTPUT_DIR) - if free < 5*1024**3: - print("❌ 磁盘空间不足") - raise SystemExit(1) + if free < 5 * 1024**3: + raise SystemExit("❌ 磁盘空间不足") - # STEP5: 保存原图 + # STEP5 保存 ts = time.strftime("%Y%m%d_%H%M%S") - ms = int((time.time()%1)*1000) - filename = f"bag_{ts}_{ms:03d}.png" - path = os.path.join(OUTPUT_DIR, filename) - cv2.imwrite(path, frame) # 保存原图 + ms = int((time.time() % 1) * 1000) + path = os.path.join(OUTPUT_DIR, f"bag_{ts}_{ms:03d}.png") + cv2.imwrite(path, frame) print(f"✅ 已保存: {path}") except KeyboardInterrupt: - print("\n🛑 用户中断") + print("🛑 用户中断") break - finally: cap.release() cv2.destroyAllWindows() - print(f"视频流关闭,共处理 {frame_count} 帧") rknn.release() print("程序结束") diff --git a/ailai_pc/image_capture_detect/image_02_3588.py b/ailai_pc/image_capture_detect/image_02_3588.py new file mode 100644 index 0000000..83a3b2e --- /dev/null +++ b/ailai_pc/image_capture_detect/image_02_3588.py @@ -0,0 +1,209 @@ +import cv2 +import time +import os +import numpy as np +from PIL import Image +from skimage.metrics import structural_similarity as ssim +from rknnlite.api import RKNNLite + +# ================== 配置 ================== +RTSP_URL = "rtsp://admin:XJ123456@192.168.250.60:554/streaming/channels/101" +RKNN_MODEL = "bag3588.rknn" +OUTPUT_DIR = "camera_event_capture" + +CONF_THRESHOLD = 0.25 # ← bag class prob 阈值(真实置信度) +SSIM_THRESHOLD = 0.9 + +END_MISS_FRAMES = 30 +SAVE_EVERY_N_FRAMES = 1 +SHOW_WINDOW = False + +IMG_SIZE = (640, 640) +CLASS_NAME = ["bag"] + +os.makedirs(OUTPUT_DIR, exist_ok=True) + +# ===================================================== +# 灰度判断:≥90% 像素为灰色(R≈G≈B) +# ===================================================== +def is_large_gray(image, gray_ratio_thresh=0.9): + img = np.array(image).astype(np.float32) + if img.ndim != 3 or img.shape[2] != 3: + return True + + b, g, r = img[:, :, 0], img[:, :, 1], img[:, :, 2] + max_c = np.maximum(np.maximum(r, g), b) + min_c = np.minimum(np.minimum(r, g), b) + + gray_ratio = 1.0 - (max_c - min_c) / 255.0 + gray_pixels = np.sum(gray_ratio >= 0.9) + total_pixels = img.shape[0] * img.shape[1] + + return (gray_pixels / total_pixels) >= gray_ratio_thresh + + +# ===================================================== +# RKNN 推理工具 +# ===================================================== +def letterbox_resize(image, size, bg_color=114): + target_w, target_h = size + h, w = image.shape[:2] + scale = min(target_w / w, target_h / h) + new_w, new_h = int(w * scale), int(h * scale) + resized = cv2.resize(image, (new_w, new_h)) + canvas = np.full((target_h, target_w, 3), bg_color, dtype=np.uint8) + dx, dy = (target_w - new_w) // 2, (target_h - new_h) // 2 + canvas[dy:dy + new_h, dx:dx + new_w] = resized + return canvas, scale, dx, dy + + +def dfl_numpy(position): + n, c, h, w = position.shape + p_num = 4 + mc = c // p_num + y = position.reshape(n, p_num, mc, h, w) + y = np.exp(y) / np.sum(np.exp(y), axis=2, keepdims=True) + acc = np.arange(mc).reshape(1, 1, mc, 1, 1) + return np.sum(y * acc, axis=2) + + +def box_process(position): + grid_h, grid_w = position.shape[2:4] + col, row = np.meshgrid(np.arange(grid_w), np.arange(grid_h)) + col = col.reshape(1, 1, grid_h, grid_w) + row = row.reshape(1, 1, grid_h, grid_w) + grid = np.concatenate((col, row), axis=1) + stride = np.array([IMG_SIZE[1] // grid_w, IMG_SIZE[0] // grid_h]).reshape(1, 2, 1, 1) + + position = dfl_numpy(position) + box_xy1 = grid + 0.5 - position[:, 0:2, :, :] + box_xy2 = grid + 0.5 + position[:, 2:4, :, :] + return np.concatenate((box_xy1 * stride, box_xy2 * stride), axis=1) + + +# ===================================================== +# ✅ 关键修改:只用 class prob 作为置信度 +# ===================================================== +def filter_boxes(box_class_probs): + """ + rknn_model_zoo 风格: + - 没有 obj_conf + - bag 置信度 = class_prob + """ + box_class_probs = np.array(box_class_probs) + bag_scores = box_class_probs[:, 0] # 只有一个类别 bag + return np.any(bag_scores >= CONF_THRESHOLD) + + +def post_process(outputs): + boxes_list, class_list = [], [] + + for i in range(3): + boxes_list.append(box_process(outputs[i * 3])) + class_list.append(outputs[i * 3 + 1]) + + def flatten(x): + x = x.transpose(0, 2, 3, 1) + return x.reshape(-1, x.shape[3]) + + class_probs = np.concatenate([flatten(c) for c in class_list]) + return filter_boxes(class_probs) + + +# ===================================================== +# RKNN 初始化 +# ===================================================== +rknn = RKNNLite() +assert rknn.load_rknn(RKNN_MODEL) == 0, "RKNN 模型加载失败" +assert rknn.init_runtime(core_mask=RKNNLite.NPU_CORE_AUTO) == 0, "RKNN Runtime 初始化失败" +print("✅ RKNN 初始化完成") + + +# ===================================================== +# 视频流 & 状态机 +# ===================================================== +cap = cv2.VideoCapture(RTSP_URL) +assert cap.isOpened(), "RTSP 连接失败" +print("🎥 视频流已连接") + +STATE_IDLE = 0 +STATE_CAPTURING = 1 + +state = STATE_IDLE +miss_count = 0 +save_idx = 0 +session_dir = None +session_id = 0 +last_gray = None +frame_count = 0 + + +try: + while True: + ret, frame = cap.read() + if not ret: + time.sleep(0.2) + continue + + frame_count += 1 + + if SHOW_WINDOW: + cv2.imshow("Camera", frame) + if cv2.waitKey(1) == ord('q'): + break + + # ---------- 灰度过滤 ---------- + pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) + if is_large_gray(pil_img): + continue + + # ---------- SSIM ---------- + gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + if last_gray is not None and state == STATE_IDLE: + if ssim(gray, last_gray) > SSIM_THRESHOLD: + continue + last_gray = gray.copy() + + # ---------- RKNN 推理 ---------- + img_resized, _, _, _ = letterbox_resize(frame, IMG_SIZE) + outputs = rknn.inference(inputs=[np.expand_dims(img_resized, 0)]) + has_bag = post_process(outputs) + + # ---------- 状态机 ---------- + if state == STATE_IDLE: + if has_bag: + session_id += 1 + ts = time.strftime("%Y%m%d_%H%M%S") + session_dir = os.path.join(OUTPUT_DIR, f"session_{session_id:04d}_{ts}") + os.makedirs(session_dir, exist_ok=True) + print("🚀 进入采集") + state = STATE_CAPTURING + miss_count = 0 + save_idx = 0 + + else: # STATE_CAPTURING + if has_bag: + miss_count = 0 + else: + miss_count += 1 + + if save_idx % SAVE_EVERY_N_FRAMES == 0: + fname = f"{save_idx:06d}.png" + cv2.imwrite(os.path.join(session_dir, fname), frame) + save_idx += 1 + + if miss_count >= END_MISS_FRAMES: + print(f"🛑 退出采集,保存 {save_idx} 帧") + state = STATE_IDLE + miss_count = 0 + session_dir = None + +except KeyboardInterrupt: + print("\n🛑 用户退出") + +finally: + cap.release() + cv2.destroyAllWindows() + rknn.release() + print("程序结束") + diff --git a/ailai_pc/output_pt.jpg b/ailai_pc/output_pt.jpg index d935dfa..5ac3bd1 100644 Binary files a/ailai_pc/output_pt.jpg and b/ailai_pc/output_pt.jpg differ diff --git a/detect_bagor35bag/README.md b/detect_bagor35bag/README.md new file mode 100644 index 0000000..8bbcff9 --- /dev/null +++ b/detect_bagor35bag/README.md @@ -0,0 +1,111 @@ +# RKNN 料袋(bag / bag35)检测与滚筒控制逻辑 + +本工程基于 **RKNN 模型** 对流水线上的料袋进行检测与分类(`bag` / `bag35`), +并根据检测结果与位置关系判断料袋状态(未到位 / 到位 / 掉出滚筒), +最终执行对应的 **滚筒控制逻辑** 或用于 **纯判断测试**。 + +--- + +## 一、目录结构 + +``` +detect_bagor35bag/ +├── bag3568.rknn +├── detect_bag.py +├── main_bag_judgment.py +├── test_bag_onlyjudgment_withou-motor-contral.py +├── test_image/ +└── README.md +``` + +--- + +## 二、功能说明 + +### 料袋检测 +- RKNN 推理 +- 支持 `bag` / `bag35` 目标检测 +- 输出 `cls / conf / min_x` 50kg料包为bag,35kg为bag35;conf是置信度;min_x是判断料包底部距离现在传感器物理位置的距离,未到位是负数,到位后是正数距离 + +### 状态判断 + +| 状态 | 条件 | +|----|----| +| 没有料袋 | min_x is None | +| 料袋未到位 | min_x < THRESHOLD_X | +| 料袋到位 | THRESHOLD_X ≤ min_x ≤ THRESHOLD_maxX | +| 料包掉出滚筒 | min_x > THRESHOLD_maxX | + +```python +THRESHOLD_X = 537 # 到位阈值 +THRESHOLD_maxX = 1430 # 掉出滚筒阈值 +``` + +### 滚筒控制规则 + +- 未检测 / 未到位 → 不动作 +- 掉出滚筒 → 停机报警 +- 到位: + - bag → 立即停止滚筒 + - bag35 → 延时2s → 反转2s → 停止 + +--- + +## 三、依赖安装(已安装) + +```bash +pip install opencv-python numpy rknnlite +``` + +--- + +## 四、使用方式 + +### 主程序(含电机控制) + +```bash +python main_bag_judgment.py +``` + +### 仅判断测试(无电机) + +```bash +python test_bag_onlyjudgment_withou-motor-contral.py +``` + +--- + +## 五、核心接口 + +### detect_bag + +```python +cls, conf, min_x = detect_bag(img) #不可视化图像 +cls, conf, min_x, vis_img = detect_bag(img, return_vis=True) #可视化图像 +``` + +### bag_judgment + +```python +status_bool, status_text, conf, min_x, vis_img = bag_judgment(img) #不可视化图像+滚筒控制 +``` + +--- + +## 六、状态文本规范 + +``` +没有料袋 +料袋未到位 +料袋到位 +料包掉出滚筒 +``` + +--- + +## 七、说明 + +- 检测与控制逻辑解耦 +- 易于扩展新料袋类型 +- 支持现场与离线测试 + diff --git a/detect_bagor35bag/bag3568.rknn b/detect_bagor35bag/bag3568.rknn new file mode 100644 index 0000000..2ffac1b Binary files /dev/null and b/detect_bagor35bag/bag3568.rknn differ diff --git a/detect_bagor35bag/detect_bag.py b/detect_bagor35bag/detect_bag.py new file mode 100644 index 0000000..7cd7919 --- /dev/null +++ b/detect_bagor35bag/detect_bag.py @@ -0,0 +1,202 @@ +import os +import cv2 +import numpy as np +from rknnlite.api import RKNNLite + +# ====================== 配置 ====================== +MODEL_PATH = "bag3568.rknn" +IMG_SIZE = (640, 640) + +OBJ_THRESH = 0.25 +NMS_THRESH = 0.45 + +CLASS_NAME = ["bag", "bag35"] + +# ====================== 工具函数 ====================== +def softmax(x, axis=-1): + x = x - np.max(x, axis=axis, keepdims=True) + exp_x = np.exp(x) + return exp_x / np.sum(exp_x, axis=axis, keepdims=True) + +def letterbox_resize(image, size, bg_color=114): + target_w, target_h = size + h, w = image.shape[:2] + scale = min(target_w / w, target_h / h) + + new_w, new_h = int(w * scale), int(h * scale) + resized = cv2.resize(image, (new_w, new_h)) + + canvas = np.full((target_h, target_w, 3), bg_color, dtype=np.uint8) + dx = (target_w - new_w) // 2 + dy = (target_h - new_h) // 2 + canvas[dy:dy + new_h, dx:dx + new_w] = resized + + return canvas, scale, dx, dy + +# ====================== DFL 解码 ====================== +def dfl_decode(reg): + reg = reg.reshape(4, -1) + prob = softmax(reg, axis=1) + acc = np.arange(reg.shape[1]) + return np.sum(prob * acc, axis=1) + +# ====================== NMS ====================== +def nms(boxes, scores, thresh): + boxes = np.array(boxes) + scores = np.array(scores) + + x1, y1, x2, y2 = boxes.T + areas = (x2 - x1) * (y2 - y1) + order = scores.argsort()[::-1] + + keep = [] + while order.size > 0: + i = order[0] + keep.append(i) + + xx1 = np.maximum(x1[i], x1[order[1:]]) + yy1 = np.maximum(y1[i], y1[order[1:]]) + xx2 = np.minimum(x2[i], x2[order[1:]]) + yy2 = np.minimum(y2[i], y2[order[1:]]) + + inter = np.maximum(0, xx2 - xx1) * np.maximum(0, yy2 - yy1) + iou = inter / (areas[i] + areas[order[1:]] - inter) + + order = order[1:][iou <= thresh] + + return keep + +# ====================== 后处理 ====================== +def post_process(outputs, scale, dx, dy): + boxes_all, scores_all, classes_all = [], [], [] + + strides = [8, 16, 32] + + for i, stride in enumerate(strides): + reg = outputs[i * 3 + 0][0] + cls = outputs[i * 3 + 1][0] + obj = outputs[i * 3 + 2][0] + + num_classes, H, W = cls.shape + + for h in range(H): + for w in range(W): + class_prob = cls[:, h, w] + cls_id = int(np.argmax(class_prob)) + cls_score = class_prob[cls_id] + + obj_score = obj[0, h, w] + score = cls_score * obj_score + + if score < OBJ_THRESH: + continue + + l, t, r, b = dfl_decode(reg[:, h, w]) + + cx = (w + 0.5) * stride + cy = (h + 0.5) * stride + + x1 = cx - l * stride + y1 = cy - t * stride + x2 = cx + r * stride + y2 = cy + b * stride + + boxes_all.append([x1, y1, x2, y2]) + scores_all.append(score) + classes_all.append(cls_id) + + if not boxes_all: + return None, None, None + + keep = nms(boxes_all, scores_all, NMS_THRESH) + + boxes = np.array(boxes_all)[keep] + scores = np.array(scores_all)[keep] + classes = np.array(classes_all)[keep] + + boxes[:, [0, 2]] = (boxes[:, [0, 2]] - dx) / scale + boxes[:, [1, 3]] = (boxes[:, [1, 3]] - dy) / scale + + return boxes, classes, scores + +# ====================== RKNN 初始化(全局一次) ====================== +_rknn = RKNNLite() +_rknn.load_rknn(MODEL_PATH) +_rknn.init_runtime() + +# ====================== 统一接口函数 ====================== +def detect_bag(img, return_vis=False): + """ + Args: + img (np.ndarray): BGR 原图 + return_vis (bool) + + Returns: + cls (str | None) + conf (float | None) + min_x (int | None) + vis_img (np.ndarray) # optional + """ + + img_r, scale, dx, dy = letterbox_resize(img, IMG_SIZE) + outputs = _rknn.inference([np.expand_dims(img_r, 0)]) + + boxes, cls_ids, scores = post_process(outputs, scale, dx, dy) + + if boxes is None or len(scores) == 0: + if return_vis: + return None, None, None, img.copy() + return None, None, None + + best_idx = int(np.argmax(scores)) + + conf = float(scores[best_idx]) + cls_id = int(cls_ids[best_idx]) + cls = CLASS_NAME[cls_id] + + x1, y1, x2, y2 = boxes[best_idx].astype(int) + min_x = int(x1) + + if return_vis: + vis = img.copy() + cv2.rectangle(vis, (x1, y1), (x2, y2), (0, 255, 0), 2) + cv2.putText( + vis, + f"{cls}:{conf:.3f}", + (x1, max(y1 - 5, 0)), + cv2.FONT_HERSHEY_SIMPLEX, + 0.6, + (0, 255, 0), + 2 + ) + return cls, conf, min_x, vis + + return cls, conf, min_x + + +# ====================== 测试 ====================== +# ====================== 测试 ====================== +if __name__ == "__main__": + IMG_PATH = "./test_image/4.jpg" + OUTPUT_DIR = "./result" + os.makedirs(OUTPUT_DIR, exist_ok=True) + + img = cv2.imread(IMG_PATH) + if img is None: + raise FileNotFoundError(IMG_PATH) + + cls, conf, min_x, vis = detect_bag(img, return_vis=True) + + if cls is None: + print("未检测到目标") + else: + print(f"类别: {cls}") + print(f"置信度: {conf:.4f}") + print(f"最左 x: {min_x}") + + if vis is not None: + save_path = os.path.join(OUTPUT_DIR, "vis_result.jpg") + cv2.imwrite(save_path, vis) + print("可视化结果已保存:", save_path) + + diff --git a/detect_bagor35bag/main_bag_judgment.py b/detect_bagor35bag/main_bag_judgment.py new file mode 100644 index 0000000..c28529c --- /dev/null +++ b/detect_bagor35bag/main_bag_judgment.py @@ -0,0 +1,128 @@ +import cv2 +import time +from detect_bag import detect_bag +#这个要注意放在Feeding同一目录下是这样调用EMV的 +from EMV.EMV import RelayController + +THRESHOLD_X = 537 # 到位阈值 +THRESHOLD_maxX = 1430 # 掉出滚筒阈值 + +relay_controller = RelayController() + +# ================================================== +# 不同料包的滚筒控制逻辑 +# ================================================== +def handle_bag_motor(cls, status_bool, status_text): + """ + 滚筒控制总逻辑: + - 没检测到料包 → 不发信号 + - 未到位 → 不发信号 + - 掉出滚筒 → 报警(不再操作滚筒) + - 到位: + bag → 立刻停止滚筒 + bag35 → 持续正转1.5s反转1.5秒 → 停止 + """ + + # 没检测到料包 + if cls is None: + return + + # 掉出滚筒(最高优先级) + if status_text == "料包掉出滚筒": + print("料包掉出滚筒 → 报警 / 停机") + relay_controller.close(conveyor2=True) + relay_controller.close(conveyor2_reverse=True) + return + + # 未到位 → 什么都不做 + if status_bool is not True: + return + + # ================== 到位 + 分类 ================== + if cls == "bag": + print("[bag] 到位 → 立刻停止滚筒") + relay_controller.close(conveyor2=True) + + elif cls == "bag35": + print("[bag35] 到位 → 持续正转滚筒1.5秒 后,反转滚筒 1.5 秒 到原位置→ 停止滚筒") + time.sleep(1.5) + relay_controller.open(conveyor2_reverse=True) + time.sleep(1.5) + relay_controller.close(conveyor2_reverse=True) + + else: + # 预留扩展 + return + + +# ================================================== +# 料袋状态判断 +# ================================================== +def bag_judgment(img, return_conf=True, return_vis=False): + """ + 判断图片中的料袋状态 + """ + cls = None + conf = None + min_x = None + vis_img = None + + # ================== 唯一检测调用 ================== + if return_vis: + cls, conf, min_x, vis_img = detect_bag(img, return_vis=True) + else: + cls, conf, min_x = detect_bag(img, return_vis=False) + + # ================== 状态判断 ================== + if min_x is None: + status_bool = None + status_text = "没有料袋" + + elif min_x > THRESHOLD_maxX: + status_bool = False + status_text = "料包掉出滚筒" + + elif THRESHOLD_X <= min_x <= THRESHOLD_maxX: + status_bool = True + status_text = "料袋到位" + + else: + status_bool = False + status_text = "料袋未到位" + + # ================== 滚筒控制 ================== + handle_bag_motor(cls, status_bool, status_text) + + # ================== 返回 ================== + if not return_conf: + conf = None + if not return_vis: + vis_img = None + + return status_bool, status_text, conf, min_x, vis_img + + +# ====================== 测试 ====================== +if __name__ == "__main__": + IMG_PATH = "./test_image/3.jpg" + img = cv2.imread(IMG_PATH) + if img is None: + raise FileNotFoundError(f"图片无法读取: {IMG_PATH}") + #这里面包含 handle_bag_motor滚筒控制,只要你记得后面机械臂抓完包之后要打开滚筒,Feeding里self.relay_controller.open(conveyor2=True) + status_bool, status_text, conf, min_x, vis_img = bag_judgment( + img, + return_conf = True, + return_vis = False + ) + + print( + f"判断结果: {status_bool}, " + f"中文状态: {status_text}, " + f"conf={conf}, min_x={min_x}" + ) + + if vis_img is not None: + cv2.imshow("Vis", vis_img) + cv2.waitKey(0) + cv2.destroyAllWindows() + diff --git a/detect_bagor35bag/test_bag_onlyjudgment_withou-motor-contral.py b/detect_bagor35bag/test_bag_onlyjudgment_withou-motor-contral.py new file mode 100644 index 0000000..885a792 --- /dev/null +++ b/detect_bagor35bag/test_bag_onlyjudgment_withou-motor-contral.py @@ -0,0 +1,72 @@ +import cv2 +from detect_bag import detect_bag + +THRESHOLD_X = 537 # 到位阈值 +THRESHOLD_maxX = 1430 # 掉出滚筒阈值 + + +def bag_judgment(img, return_conf=True, return_vis=False): + """ + 判断图片中的料袋状态(测试版,不控制电机) + """ + cls = None + conf = None + min_x = None + vis_img = None + + # ================== 唯一调用 ================== + if return_vis: + cls, conf, min_x, vis_img = detect_bag(img, return_vis=True) + else: + cls, conf, min_x = detect_bag(img, return_vis=False) + + # ================== 状态判断 ================== + if min_x is None: + status_bool = None + status_text = "没有料袋" + + elif min_x > THRESHOLD_maxX: + status_bool = False + status_text = "料包掉出滚筒" + + elif THRESHOLD_X <= min_x <= THRESHOLD_maxX: + status_bool = True + status_text = "料袋到位" + + else: + status_bool = False + status_text = "料袋未到位" + + # ================== 返回 ================== + if not return_conf: + conf = None + if not return_vis: + vis_img = None + + return status_bool, status_text, conf, min_x, vis_img + + +# ====================== 测试 ====================== +if __name__ == "__main__": + IMG_PATH = "./test_image/3.jpg" + img = cv2.imread(IMG_PATH) + if img is None: + raise FileNotFoundError(f"图片无法读取: {IMG_PATH}") + + status_bool, status_text, conf, min_x, vis_img = bag_judgment( + img, + return_conf=True, + return_vis=True + ) + + print( + f"判断结果: {status_bool}, " + f"中文状态: {status_text}, " + f"conf={conf}, min_x={min_x}" + ) + + if vis_img is not None: + cv2.imshow("Vis", vis_img) + cv2.waitKey(0) + cv2.destroyAllWindows() + diff --git a/detect_image/1.jpg b/detect_bagor35bag/test_image/1.jpg similarity index 100% rename from detect_image/1.jpg rename to detect_bagor35bag/test_image/1.jpg diff --git a/detect_image/2.jpg b/detect_bagor35bag/test_image/2.jpg similarity index 100% rename from detect_image/2.jpg rename to detect_bagor35bag/test_image/2.jpg diff --git a/detect_image/3.jpg b/detect_bagor35bag/test_image/3.jpg similarity index 100% rename from detect_image/3.jpg rename to detect_bagor35bag/test_image/3.jpg diff --git a/detect_bagor35bag/test_image/4.jpg b/detect_bagor35bag/test_image/4.jpg new file mode 100644 index 0000000..cdadc33 Binary files /dev/null and b/detect_bagor35bag/test_image/4.jpg differ diff --git a/detect_image/bag3588.rknn b/detect_image/bag3588.rknn deleted file mode 100644 index 2278721..0000000 Binary files a/detect_image/bag3588.rknn and /dev/null differ diff --git a/detect_image/bag_detect.py b/detect_image/bag_detect.py deleted file mode 100644 index eb767c0..0000000 --- a/detect_image/bag_detect.py +++ /dev/null @@ -1,166 +0,0 @@ -import os -import cv2 -import numpy as np -from rknnlite.api import RKNNLite - -# ====================== 配置 ====================== -MODEL_PATH = "bag3588.rknn" # RKNN 模型路径 -IMG_PATH = "2.jpg" # 待推理图片路径 -IMG_SIZE = (640, 640) # 模型输入尺寸 (w,h) -OBJ_THRESH = 0.001 # 目标置信度阈值 -NMS_THRESH = 0.45 # NMS 阈值 -CLASS_NAME = ["bag"] -OUTPUT_DIR = "./result" -os.makedirs(OUTPUT_DIR, exist_ok=True) - -# ====================== 工具函数 ====================== -def letterbox_resize(image, size, bg_color=114): - target_w, target_h = size - h, w = image.shape[:2] - scale = min(target_w / w, target_h / h) - new_w, new_h = int(w * scale), int(h * scale) - resized = cv2.resize(image, (new_w, new_h)) - canvas = np.full((target_h, target_w, 3), bg_color, dtype=np.uint8) - dx, dy = (target_w - new_w) // 2, (target_h - new_h) // 2 - canvas[dy:dy + new_h, dx:dx + new_w] = resized - return canvas, scale, dx, dy - -def dfl_numpy(position): - n, c, h, w = position.shape - p_num = 4 - mc = c // p_num - y = position.reshape(n, p_num, mc, h, w) - y = np.exp(y) / np.sum(np.exp(y), axis=2, keepdims=True) - acc = np.arange(mc).reshape(1,1,mc,1,1) - y = np.sum(y * acc, axis=2) - return y - -def box_process(position): - grid_h, grid_w = position.shape[2:4] - col, row = np.meshgrid(np.arange(grid_w), np.arange(grid_h)) - col = col.reshape(1,1,grid_h,grid_w) - row = row.reshape(1,1,grid_h,grid_w) - grid = np.concatenate((col,row), axis=1) - stride = np.array([IMG_SIZE[1] // grid_h, IMG_SIZE[0] // grid_w]).reshape(1,2,1,1) - position = dfl_numpy(position) - box_xy = grid + 0.5 - position[:,0:2,:,:] - box_xy2 = grid + 0.5 + position[:,2:4,:,:] - xyxy = np.concatenate((box_xy*stride, box_xy2*stride), axis=1) - return xyxy - -def filter_boxes(boxes, box_confidences, box_class_probs): - boxes = np.array(boxes).reshape(-1, 4) - box_confidences = np.array(box_confidences).reshape(-1) - box_class_probs = np.array(box_class_probs) - - class_ids = np.argmax(box_class_probs, axis=-1) - class_scores = box_class_probs[np.arange(len(class_ids)), class_ids] - scores = box_confidences * class_scores - - mask = scores >= OBJ_THRESH - if np.sum(mask) == 0: - return None, None, None, None - - boxes = boxes[mask] - classes = class_ids[mask] - scores = scores[mask] - conf_keep = box_confidences[mask] # 原始 objectness - - # NMS - x1, y1, x2, y2 = boxes[:,0], boxes[:,1], boxes[:,2], boxes[:,3] - areas = (x2 - x1 + 1) * (y2 - y1 + 1) - order = scores.argsort()[::-1] - keep = [] - while order.size > 0: - i = order[0] - keep.append(i) - xx1 = np.maximum(x1[i], x1[order[1:]]) - yy1 = np.maximum(y1[i], y1[order[1:]]) - xx2 = np.minimum(x2[i], x2[order[1:]]) - yy2 = np.minimum(y2[i], y2[order[1:]]) - w = np.maximum(0, xx2 - xx1 + 1) - h = np.maximum(0, yy2 - yy1 + 1) - inter = w * h - ovr = inter / (areas[i] + areas[order[1:]] - inter) - inds = np.where(ovr <= NMS_THRESH)[0] - order = order[inds + 1] - return boxes[keep], classes[keep], scores[keep], conf_keep[keep] - -def post_process(outputs, scale, dx, dy): - boxes_list, conf_list, class_list = [], [], [] - branch_num = 3 - for i in range(branch_num): - boxes_list.append(box_process(outputs[i*3])) - conf_list.append(outputs[i*3+2]) - class_list.append(outputs[i*3+1]) - - def flatten(x): - ch = x.shape[1] - x = x.transpose(0,2,3,1) - return x.reshape(-1,ch) - - boxes = np.concatenate([flatten(b) for b in boxes_list]) - box_conf = np.concatenate([flatten(c) for c in conf_list]) - class_probs = np.concatenate([flatten(c) for c in class_list]) - - boxes, classes, scores, conf_keep = filter_boxes(boxes, box_conf, class_probs) - if boxes is None: - return None, None, None, None - - boxes[:, [0,2]] -= dx - boxes[:, [1,3]] -= dy - boxes /= scale - boxes = boxes.clip(min=0) - - # 将 objectness 置信度放大 255 - scores = 1-scores - conf_keep = conf_keep * 255 - return boxes, classes, scores, conf_keep - -# ====================== 单张图片推理 ====================== -def detect_single_image(img_path): - rknn = RKNNLite(verbose=False) - rknn.load_rknn(MODEL_PATH) - rknn.init_runtime() - - img_name = os.path.basename(img_path) - img = cv2.imread(img_path) - if img is None: - raise FileNotFoundError(f"图片无法读取: {img_path}") - - img_resized, scale, dx, dy = letterbox_resize(img, IMG_SIZE) - input_data = np.expand_dims(img_resized, 0) - outputs = rknn.inference(inputs=[input_data]) - boxes, classes, scores, conf_keep = post_process(outputs, scale, dx, dy) - - if boxes is not None: - for i, box in enumerate(boxes): - x1, y1, x2, y2 = box.astype(int) - cls_id = classes[i] - score = scores[i] - cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2) - cv2.putText(img, - f"{CLASS_NAME[cls_id]}:{score:.1f}", - (x1, max(y1-5,0)), - cv2.FONT_HERSHEY_SIMPLEX, - 0.6, - (0, 255, 0), - 2) - - # 保存图像 - if conf_keep is not None and len(conf_keep) > 0: - score_strs = ["{:.0f}".format(s) for s in conf_keep] - name_root, ext = os.path.splitext(img_name) - new_name = name_root + "_conf_" + "_".join(score_strs) + ext - else: - new_name = img_name - - save_path = os.path.join(OUTPUT_DIR, new_name) - cv2.imwrite(save_path, img) - print(f"{img_name} 推理完成,结果保存到: {save_path}") - - rknn.release() - -# ====================== 调用 ====================== -detect_single_image(IMG_PATH) - diff --git a/detect_image/bag_judgment.py b/detect_image/bag_judgment.py deleted file mode 100644 index 87a5b11..0000000 --- a/detect_image/bag_judgment.py +++ /dev/null @@ -1,77 +0,0 @@ -import cv2 -from detect_bag import detect_bag - -THRESHOLD_X = 537 # min_x 阈值 - -def bag_judgment(img, return_conf=True, return_vis=False): - """ - 判断图片中的料袋状态,可动态返回置信度和可视化图像 - Args: - img (np.ndarray): 待检测图片 - return_conf (bool): 是否返回置信度 - return_vis (bool): 是否返回可视化图像 - Returns: - status_bool: True=到位, False=未到位, None=未检测到 - status_text: 中文状态 - conf: 最大置信度或 None - min_x: 最左边 x 坐标或 None - vis_img: 可视化图像或 None - """ - # 调用 detect_bag - outputs = detect_bag(img, return_conf=return_conf, return_vis=return_vis) - - # 初始化占位 - conf = None - min_x = None - vis_img = None - - # 根据返回值长度解析 - if return_conf and return_vis: - if len(outputs) == 3: - conf, min_x, vis_img = outputs - elif len(outputs) == 2: - conf, min_x = outputs - elif len(outputs) == 1: - min_x = outputs[0] - elif return_conf and not return_vis: - if len(outputs) >= 2: - conf, min_x = outputs[:2] - elif len(outputs) == 1: - min_x = outputs[0] - elif not return_conf and return_vis: - if len(outputs) == 2: - min_x, vis_img = outputs - elif len(outputs) == 1: - min_x = outputs[0] - else: - min_x = outputs if isinstance(outputs, (int, float, np.number)) else outputs[0] - - # 判断状态 - if min_x is None: - status_bool = None - status_text = "没有料袋" - elif min_x >= THRESHOLD_X: - status_bool = True - status_text = "料袋到位" - else: - status_bool = False - status_text = "料袋未到位" - - return status_bool, status_text, conf, min_x, vis_img - - -# ====================== 测试 ====================== -if __name__ == "__main__": - IMG_PATH = "3.jpg" - img = cv2.imread(IMG_PATH) - if img is None: - raise FileNotFoundError(f"图片无法读取: {IMG_PATH}") - - status_bool, status_text, conf, min_x, vis_img = bag_judgment(img, return_conf=True, return_vis=True) - print(f"判断结果: {status_bool}, 中文状态: {status_text}, conf={conf}, min_x={min_x}") - - if vis_img is not None: - cv2.imshow("Vis", vis_img) - cv2.waitKey(0) - cv2.destroyAllWindows() - diff --git a/detect_image/capture-image_1.py b/detect_image/capture-image_1.py deleted file mode 100644 index 426c88b..0000000 --- a/detect_image/capture-image_1.py +++ /dev/null @@ -1,202 +0,0 @@ -import cv2 -import time -import os -import numpy as np -from PIL import Image -from skimage.metrics import structural_similarity as ssim -import shutil -from rknnlite.api import RKNNLite - -# ================== 配置参数 ================== -RTSP_URL = "rtsp://admin:ailaimiye123@192.168.0.234:554/streaming/channels/101" -SAVE_INTERVAL = 15 -SSIM_THRESHOLD = 0.9 -OUTPUT_DIR = "camera_test" -RKNN_MODEL = "bag3568.rknn" -SHOW_WINDOW = False - -# 灰度判断参数 -GRAY_LOWER = 70 -GRAY_UPPER = 230 -GRAY_RATIO_THRESHOLD = 0.7 - -IMG_SIZE = (640, 640) -OBJ_THRESH = 0.001 -NMS_THRESH = 0.45 -CLASS_NAME = ["bag"] - -os.makedirs(OUTPUT_DIR, exist_ok=True) - -# ================== 灰度判断 ================== -def is_large_gray(image): - img = np.array(image) - if img.ndim != 3 or img.shape[2] != 3: - return True - h, w, _ = img.shape - gray_mask = ( - (img[:, :, 0] >= GRAY_LOWER) & (img[:, :, 0] <= GRAY_UPPER) & - (img[:, :, 1] >= GRAY_LOWER) & (img[:, :, 1] <= GRAY_UPPER) & - (img[:, :, 2] >= GRAY_LOWER) & (img[:, :, 2] <= GRAY_UPPER) - ) - return gray_mask.sum() / (h * w) > GRAY_RATIO_THRESHOLD - -# ================== RKNN 工具函数 ================== -def letterbox_resize(image, size, bg_color=114): - target_w, target_h = size - h, w = image.shape[:2] - scale = min(target_w / w, target_h / h) - new_w, new_h = int(w * scale), int(h * scale) - resized = cv2.resize(image, (new_w, new_h)) - canvas = np.full((target_h, target_w, 3), bg_color, dtype=np.uint8) - dx, dy = (target_w - new_w) // 2, (target_h - new_h) // 2 - canvas[dy:dy + new_h, dx:dx + new_w] = resized - return canvas, scale, dx, dy - -def dfl_numpy(position): - n, c, h, w = position.shape - p_num = 4 - mc = c // p_num - y = position.reshape(n, p_num, mc, h, w) - y = np.exp(y) / np.sum(np.exp(y), axis=2, keepdims=True) - acc = np.arange(mc).reshape(1,1,mc,1,1) - return np.sum(y * acc, axis=2) - -def box_process(position): - grid_h, grid_w = position.shape[2:4] - col, row = np.meshgrid(np.arange(grid_w), np.arange(grid_h)) - col = col.reshape(1,1,grid_h,grid_w) - row = row.reshape(1,1,grid_h,grid_w) - grid = np.concatenate((col,row), axis=1) - stride = np.array([IMG_SIZE[1]//grid_h, IMG_SIZE[0]//grid_w]).reshape(1,2,1,1) - position = dfl_numpy(position) - box_xy = grid + 0.5 - position[:,0:2,:,:] - box_xy2 = grid + 0.5 + position[:,2:4,:,:] - return np.concatenate((box_xy*stride, box_xy2*stride), axis=1) - -def filter_boxes(boxes, box_confidences, box_class_probs): - boxes = boxes.reshape(-1,4) - box_confidences = box_confidences.reshape(-1) - box_class_probs = np.array(box_class_probs) - - class_ids = np.argmax(box_class_probs, axis=-1) - class_scores = box_class_probs[np.arange(len(class_ids)), class_ids] - scores = box_confidences * class_scores - - mask = scores >= OBJ_THRESH - if np.sum(mask) == 0: - return None - return True # 只需要判断是否有目标 - -def post_process(outputs, scale, dx, dy): - boxes_list, conf_list, class_list = [], [], [] - for i in range(3): - boxes_list.append(box_process(outputs[i*3])) - conf_list.append(outputs[i*3+2]) - class_list.append(outputs[i*3+1]) - - def flatten(x): - x = x.transpose(0,2,3,1) - return x.reshape(-1,x.shape[3]) - - boxes = np.concatenate([flatten(b) for b in boxes_list]) - box_conf = np.concatenate([flatten(c) for c in conf_list]) - class_probs = np.concatenate([flatten(c) for c in class_list]) - return filter_boxes(boxes, box_conf, class_probs) - -# ================== RKNN 初始化 ================== -rknn = RKNNLite() -if rknn.load_rknn(RKNN_MODEL) != 0: - raise RuntimeError("❌ RKNN 模型加载失败") -if rknn.init_runtime(core_mask=RKNNLite.NPU_CORE_AUTO) != 0: - raise RuntimeError("❌ RKNN Runtime 初始化失败") -print("✅ RKNN 初始化完成") - -# ================== 视频流处理 ================== -max_retry_seconds = 10 -retry_interval_seconds = 1 - -last_gray = None -frame_count = 0 - -while True: - cap = cv2.VideoCapture(RTSP_URL) - start_time = time.time() - - while not cap.isOpened(): - if time.time() - start_time >= max_retry_seconds: - print("❌ 无法连接 RTSP") - exit(1) - time.sleep(retry_interval_seconds) - cap = cv2.VideoCapture(RTSP_URL) - - print("✅ 开始读取视频流") - - try: - while True: - ret, frame = cap.read() - if not ret: - print("❌ 读取失败") - break - - frame_count += 1 - - if SHOW_WINDOW: - cv2.imshow("Camera", frame) - if cv2.waitKey(1) == ord('q'): - raise KeyboardInterrupt - - if frame_count % SAVE_INTERVAL != 0: - continue - - print(f"处理帧 {frame_count}") - - # STEP1: 灰度过滤 - pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) - if is_large_gray(pil_image): - print("跳过:大面积灰色") - continue - - # STEP2: SSIM 去重 - gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) - if last_gray is not None: - sim = ssim(gray, last_gray) - if sim > SSIM_THRESHOLD: - print(f"跳过:SSIM={sim:.3f}") - continue - last_gray = gray.copy() - - # STEP3: RKNN 推理,只判断是否有 bag - img_resized, scale, dx, dy = letterbox_resize(frame, IMG_SIZE) - input_data = np.expand_dims(img_resized, 0) - outputs = rknn.inference(inputs=[input_data]) - has_bag = post_process(outputs, scale, dx, dy) - if not has_bag: - print("跳过:未检测到 bag") - continue - - # STEP4: 磁盘检查 - _, _, free = shutil.disk_usage(OUTPUT_DIR) - if free < 5*1024**3: - print("❌ 磁盘空间不足") - raise SystemExit(1) - - # STEP5: 保存原图 - ts = time.strftime("%Y%m%d_%H%M%S") - ms = int((time.time()%1)*1000) - filename = f"bag_{ts}_{ms:03d}.png" - path = os.path.join(OUTPUT_DIR, filename) - cv2.imwrite(path, frame) # 保存原图 - print(f"✅ 已保存: {path}") - - except KeyboardInterrupt: - print("\n🛑 用户中断") - break - - finally: - cap.release() - cv2.destroyAllWindows() - print(f"视频流关闭,共处理 {frame_count} 帧") - -rknn.release() -print("程序结束") - diff --git a/detect_image/capture-image_2.py b/detect_image/capture-image_2.py deleted file mode 100644 index a216e1b..0000000 --- a/detect_image/capture-image_2.py +++ /dev/null @@ -1,205 +0,0 @@ -import cv2 -import time -import os -import numpy as np -from PIL import Image -from skimage.metrics import structural_similarity as ssim -from rknnlite.api import RKNNLite - -# ================== 配置 ================== -RTSP_URL = "rtsp://admin:ailaimiye123@192.168.0.234:554/streaming/channels/101" -RKNN_MODEL = "bag3568.rknn" -OUTPUT_DIR = "camera_event_capture" - -CONF_THRESHOLD = 0.5 -SSIM_THRESHOLD = 0.9 - -END_MISS_FRAMES = 30 # 连续多少帧未检测到 → 结束采集 -SAVE_EVERY_N_FRAMES = 1 # 采集中每 N 帧保存一次 -SHOW_WINDOW = False - -# 灰度判断参数 -GRAY_LOWER = 70 -GRAY_UPPER = 230 -GRAY_RATIO_THRESHOLD = 0.7 - -IMG_SIZE = (640, 640) -OBJ_THRESH = 0.001 -NMS_THRESH = 0.45 -CLASS_NAME = ["bag"] - -os.makedirs(OUTPUT_DIR, exist_ok=True) - -# ================== 灰度判断 ================== -def is_large_gray(image): - img = np.array(image) - if img.ndim != 3 or img.shape[2] != 3: - return True - h, w, _ = img.shape - gray_mask = ( - (img[:, :, 0] >= GRAY_LOWER) & (img[:, :, 0] <= GRAY_UPPER) & - (img[:, :, 1] >= GRAY_LOWER) & (img[:, :, 1] <= GRAY_UPPER) & - (img[:, :, 2] >= GRAY_LOWER) & (img[:, :, 2] <= GRAY_UPPER) - ) - return gray_mask.sum() / (h * w) > GRAY_RATIO_THRESHOLD - -# ================== RKNN 推理工具 ================== -def letterbox_resize(image, size, bg_color=114): - target_w, target_h = size - h, w = image.shape[:2] - scale = min(target_w / w, target_h / h) - new_w, new_h = int(w * scale), int(h * scale) - resized = cv2.resize(image, (new_w, new_h)) - canvas = np.full((target_h, target_w, 3), bg_color, dtype=np.uint8) - dx, dy = (target_w - new_w) // 2, (target_h - new_h) // 2 - canvas[dy:dy + new_h, dx:dx + new_w] = resized - return canvas, scale, dx, dy - -def dfl_numpy(position): - n, c, h, w = position.shape - p_num = 4 - mc = c // p_num - y = position.reshape(n, p_num, mc, h, w) - y = np.exp(y) / np.sum(np.exp(y), axis=2, keepdims=True) - acc = np.arange(mc).reshape(1,1,mc,1,1) - return np.sum(y * acc, axis=2) - -def box_process(position): - grid_h, grid_w = position.shape[2:4] - col, row = np.meshgrid(np.arange(grid_w), np.arange(grid_h)) - col = col.reshape(1,1,grid_h,grid_w) - row = row.reshape(1,1,grid_h,grid_w) - grid = np.concatenate((col,row), axis=1) - stride = np.array([IMG_SIZE[1]//grid_h, IMG_SIZE[0]//grid_w]).reshape(1,2,1,1) - position = dfl_numpy(position) - box_xy = grid + 0.5 - position[:,0:2,:,:] - box_xy2 = grid + 0.5 + position[:,2:4,:,:] - return np.concatenate((box_xy*stride, box_xy2*stride), axis=1) - -def filter_boxes(boxes, box_confidences, box_class_probs): - boxes = boxes.reshape(-1,4) - box_confidences = box_confidences.reshape(-1) - box_class_probs = np.array(box_class_probs) - - class_ids = np.argmax(box_class_probs, axis=-1) - class_scores = box_class_probs[np.arange(len(class_ids)), class_ids] - scores = box_confidences * class_scores - - mask = scores >= OBJ_THRESH - return np.sum(mask) > 0 # True: 有 bag, False: 无 bag - -def post_process(outputs, scale, dx, dy): - boxes_list, conf_list, class_list = [], [], [] - for i in range(3): - boxes_list.append(box_process(outputs[i*3])) - conf_list.append(outputs[i*3+2]) - class_list.append(outputs[i*3+1]) - - def flatten(x): - x = x.transpose(0,2,3,1) - return x.reshape(-1,x.shape[3]) - - boxes = np.concatenate([flatten(b) for b in boxes_list]) - box_conf = np.concatenate([flatten(c) for c in conf_list]) - class_probs = np.concatenate([flatten(c) for c in class_list]) - return filter_boxes(boxes, box_conf, class_probs) - -# ================== RKNN 初始化 ================== -rknn = RKNNLite() -if rknn.load_rknn(RKNN_MODEL) != 0: - raise RuntimeError("RKNN 模型加载失败") -if rknn.init_runtime(core_mask=RKNNLite.NPU_CORE_AUTO) != 0: - raise RuntimeError("RKNN Runtime 初始化失败") -print("✅ RKNN 初始化完成") - -# ================== 视频流 ================== -cap = cv2.VideoCapture(RTSP_URL) -if not cap.isOpened(): - raise RuntimeError("RTSP 连接失败") -print("🎥 视频流已连接") - -# ================== 状态机 ================== -STATE_IDLE = 0 -STATE_CAPTURING = 1 - -state = STATE_IDLE -miss_count = 0 -save_idx = 0 -session_dir = None -session_id = 0 -last_gray = None -frame_count = 0 - -try: - while True: - ret, frame = cap.read() - if not ret: - time.sleep(0.5) - continue - - frame_count += 1 - - if SHOW_WINDOW: - cv2.imshow("Camera", frame) - if cv2.waitKey(1) == ord('q'): - break - - # ---------- 灰度过滤 ---------- - pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) - if is_large_gray(pil_image): - continue - - # ---------- SSIM 去重 ---------- - gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) - if last_gray is not None and state == STATE_IDLE: - sim = ssim(gray, last_gray) - if sim > SSIM_THRESHOLD: - continue - last_gray = gray.copy() - - # ---------- RKNN 推理判断是否有 bag ---------- - img_resized, scale, dx, dy = letterbox_resize(frame, IMG_SIZE) - input_data = np.expand_dims(img_resized, 0) - outputs = rknn.inference(inputs=[input_data]) - has_bag = post_process(outputs, scale, dx, dy) - - # ---------- 状态机 ---------- - if state == STATE_IDLE: - if has_bag: - session_id += 1 - ts = time.strftime("%Y%m%d_%H%M%S") - session_dir = os.path.join(OUTPUT_DIR, f"session_{session_id:04d}_{ts}") - os.makedirs(session_dir, exist_ok=True) - print(f"\n🚀 进入采集") - state = STATE_CAPTURING - miss_count = 0 - save_idx = 0 - - elif state == STATE_CAPTURING: - if has_bag: - miss_count = 0 - else: - miss_count += 1 - - if save_idx % SAVE_EVERY_N_FRAMES == 0: - ts = time.strftime("%Y%m%d_%H%M%S") - ms = int((time.time()%1)*1000) - fname = f"{save_idx:06d}_{ts}_{ms:03d}.png" - cv2.imwrite(os.path.join(session_dir, fname), frame) # 保存原图 - save_idx += 1 - - if miss_count >= END_MISS_FRAMES: - print(f"🛑 退出采集,本次保存 {save_idx} 帧") - state = STATE_IDLE - miss_count = 0 - session_dir = None - -except KeyboardInterrupt: - print("\n🛑 用户退出") - -finally: - cap.release() - cv2.destroyAllWindows() - rknn.release() - print("程序结束") - diff --git a/detect_image/detect_bag.py b/detect_image/detect_bag.py deleted file mode 100644 index 6a60034..0000000 --- a/detect_image/detect_bag.py +++ /dev/null @@ -1,181 +0,0 @@ -import os -import cv2 -import numpy as np -from rknnlite.api import RKNNLite - -# ====================== 配置 ====================== -MODEL_PATH = "bag3588.rknn" -IMG_PATH = "2.jpg" -IMG_SIZE = (640, 640) -OBJ_THRESH = 0.001 -NMS_THRESH = 0.45 -CLASS_NAME = ["bag"] -OUTPUT_DIR = "./result" -os.makedirs(OUTPUT_DIR, exist_ok=True) - -# ====================== 全局 RKNN ====================== -_global_rknn = None - -def init_rknn(model_path): - global _global_rknn - if _global_rknn is None: - rknn = RKNNLite(verbose=False) - rknn.load_rknn(model_path) - rknn.init_runtime() - _global_rknn = rknn - return _global_rknn - -# ====================== 工具函数 ====================== -def letterbox_resize(image, size, bg_color=114): - target_w, target_h = size - h, w = image.shape[:2] - scale = min(target_w / w, target_h / h) - new_w, new_h = int(w * scale), int(h * scale) - resized = cv2.resize(image, (new_w, new_h)) - canvas = np.full((target_h, target_w, 3), bg_color, dtype=np.uint8) - dx, dy = (target_w - new_w) // 2, (target_h - new_h) // 2 - canvas[dy:dy + new_h, dx:dx + new_w] = resized - return canvas, scale, dx, dy - -def dfl_numpy(position): - n, c, h, w = position.shape - p_num = 4 - mc = c // p_num - y = position.reshape(n, p_num, mc, h, w) - y = np.exp(y) / np.sum(np.exp(y), axis=2, keepdims=True) - acc = np.arange(mc).reshape(1,1,mc,1,1) - y = np.sum(y * acc, axis=2) - return y - -def box_process(position): - grid_h, grid_w = position.shape[2:4] - col, row = np.meshgrid(np.arange(grid_w), np.arange(grid_h)) - col = col.reshape(1,1,grid_h,grid_w) - row = row.reshape(1,1,grid_h,grid_w) - grid = np.concatenate((col,row), axis=1) - stride = np.array([IMG_SIZE[1] // grid_h, IMG_SIZE[0] // grid_w]).reshape(1,2,1,1) - position = dfl_numpy(position) - box_xy = grid + 0.5 - position[:,0:2,:,:] - box_xy2 = grid + 0.5 + position[:,2:4,:,:] - xyxy = np.concatenate((box_xy*stride, box_xy2*stride), axis=1) - return xyxy - -def filter_boxes(boxes, box_confidences, box_class_probs): - boxes = np.array(boxes).reshape(-1, 4) - box_confidences = np.array(box_confidences).reshape(-1) - box_class_probs = np.array(box_class_probs) - - class_ids = np.argmax(box_class_probs, axis=-1) - class_scores = box_class_probs[np.arange(len(class_ids)), class_ids] - scores = box_confidences * class_scores - - mask = scores >= OBJ_THRESH - if np.sum(mask) == 0: - return None, None, None, None - - boxes = boxes[mask] - classes = class_ids[mask] - scores = scores[mask] - conf_keep = box_confidences[mask] - - x1, y1, x2, y2 = boxes[:,0], boxes[:,1], boxes[:,2], boxes[:,3] - areas = (x2 - x1 + 1) * (y2 - y1 + 1) - order = scores.argsort()[::-1] - keep = [] - while order.size > 0: - i = order[0] - keep.append(i) - xx1 = np.maximum(x1[i], x1[order[1:]]) - yy1 = np.maximum(y1[i], y1[order[1:]]) - xx2 = np.minimum(x2[i], x2[order[1:]]) - yy2 = np.minimum(y2[i], y2[order[1:]]) - w = np.maximum(0, xx2 - xx1 + 1) - h = np.maximum(0, yy2 - yy1 + 1) - inter = w * h - ovr = inter / (areas[i] + areas[order[1:]] - inter) - inds = np.where(ovr <= NMS_THRESH)[0] - order = order[inds + 1] - return boxes[keep], classes[keep], scores[keep], conf_keep[keep] - -def post_process(outputs, scale, dx, dy): - boxes_list, conf_list, class_list = [], [], [] - branch_num = 3 - for i in range(branch_num): - boxes_list.append(box_process(outputs[i*3])) - conf_list.append(outputs[i*3+2]) - class_list.append(outputs[i*3+1]) - - def flatten(x): - ch = x.shape[1] - x = x.transpose(0,2,3,1) - return x.reshape(-1,ch) - - boxes = np.concatenate([flatten(b) for b in boxes_list]) - box_conf = np.concatenate([flatten(c) for c in conf_list]) - class_probs = np.concatenate([flatten(c) for c in class_list]) - - boxes, classes, scores, conf_keep = filter_boxes(boxes, box_conf, class_probs) - if boxes is None: - return None, None, None, None - - boxes[:, [0,2]] -= dx - boxes[:, [1,3]] -= dy - boxes /= scale - boxes = boxes.clip(min=0) - - scores = 1-scores - conf_keep = conf_keep * 255 - return boxes, classes, scores, conf_keep - -# ====================== detect_bag ====================== -def detect_bag(img, return_conf=True, return_vis=False): - rknn = init_rknn(MODEL_PATH) - - img_resized, scale, dx, dy = letterbox_resize(img, IMG_SIZE) - input_data = np.expand_dims(img_resized, 0) - outputs = rknn.inference(inputs=[input_data]) - boxes, classes, scores, conf_keep = post_process(outputs, scale, dx, dy) - - if boxes is None or len(boxes) == 0: - return (None, None) if return_conf else (None,) - - min_x = float(boxes[:,0].min()) - conf_val = float(scores.max()) if return_conf else None - vis_img = None - - if return_vis: - vis_img = img.copy() - for i, box in enumerate(boxes): - x1, y1, x2, y2 = box.astype(int) - cls_id = classes[i] - score = scores[i] - cv2.rectangle(vis_img, (x1, y1), (x2, y2), (0, 255, 0), 2) - cv2.putText(vis_img, - f"{CLASS_NAME[cls_id]}:{score:.1f}", - (x1, max(y1-5,0)), - cv2.FONT_HERSHEY_SIMPLEX, - 0.6, - (0, 255, 0), - 2) - save_path = os.path.join(OUTPUT_DIR, "vis_" + "result.jpg") - cv2.imwrite(save_path, vis_img) - - if return_conf: - return conf_val, min_x - else: - return min_x, vis_img - -# ====================== 测试 ====================== -if __name__ == "__main__": - img = cv2.imread(IMG_PATH) - if img is None: - raise FileNotFoundError(f"图片无法读取: {IMG_PATH}") - - # 可控制输出:conf, vis - conf, min_x = detect_bag(img, return_conf=True, return_vis=True) - - if conf is None: - print("❌ 未检测到 bag") - else: - print(f"✅ 最大置信度: {conf:.4f}, 最左 x: {min_x:.1f}") - diff --git a/detect_image/image_02_3588.py b/detect_image/image_02_3588.py deleted file mode 100644 index 8c0a9a7..0000000 --- a/detect_image/image_02_3588.py +++ /dev/null @@ -1,205 +0,0 @@ -import cv2 -import time -import os -import numpy as np -from PIL import Image -from skimage.metrics import structural_similarity as ssim -from rknnlite.api import RKNNLite - -# ================== 配置 ================== -RTSP_URL = "rtsp://admin:XJ123456@192.168.250.60:554/streaming/channels/101" -RKNN_MODEL = "bag3588.rknn" -OUTPUT_DIR = "camera_event_capture" - -CONF_THRESHOLD = 0.5 -SSIM_THRESHOLD = 0.9 - -END_MISS_FRAMES = 30 # 连续多少帧未检测到 → 结束采集 -SAVE_EVERY_N_FRAMES = 1 # 采集中每 N 帧保存一次 -SHOW_WINDOW = False - -# 灰度判断参数 -GRAY_LOWER = 70 -GRAY_UPPER = 230 -GRAY_RATIO_THRESHOLD = 0.7 - -IMG_SIZE = (640, 640) -OBJ_THRESH = 0.001 -NMS_THRESH = 0.45 -CLASS_NAME = ["bag"] - -os.makedirs(OUTPUT_DIR, exist_ok=True) - -# ================== 灰度判断 ================== -def is_large_gray(image): - img = np.array(image) - if img.ndim != 3 or img.shape[2] != 3: - return True - h, w, _ = img.shape - gray_mask = ( - (img[:, :, 0] >= GRAY_LOWER) & (img[:, :, 0] <= GRAY_UPPER) & - (img[:, :, 1] >= GRAY_LOWER) & (img[:, :, 1] <= GRAY_UPPER) & - (img[:, :, 2] >= GRAY_LOWER) & (img[:, :, 2] <= GRAY_UPPER) - ) - return gray_mask.sum() / (h * w) > GRAY_RATIO_THRESHOLD - -# ================== RKNN 推理工具 ================== -def letterbox_resize(image, size, bg_color=114): - target_w, target_h = size - h, w = image.shape[:2] - scale = min(target_w / w, target_h / h) - new_w, new_h = int(w * scale), int(h * scale) - resized = cv2.resize(image, (new_w, new_h)) - canvas = np.full((target_h, target_w, 3), bg_color, dtype=np.uint8) - dx, dy = (target_w - new_w) // 2, (target_h - new_h) // 2 - canvas[dy:dy + new_h, dx:dx + new_w] = resized - return canvas, scale, dx, dy - -def dfl_numpy(position): - n, c, h, w = position.shape - p_num = 4 - mc = c // p_num - y = position.reshape(n, p_num, mc, h, w) - y = np.exp(y) / np.sum(np.exp(y), axis=2, keepdims=True) - acc = np.arange(mc).reshape(1,1,mc,1,1) - return np.sum(y * acc, axis=2) - -def box_process(position): - grid_h, grid_w = position.shape[2:4] - col, row = np.meshgrid(np.arange(grid_w), np.arange(grid_h)) - col = col.reshape(1,1,grid_h,grid_w) - row = row.reshape(1,1,grid_h,grid_w) - grid = np.concatenate((col,row), axis=1) - stride = np.array([IMG_SIZE[1]//grid_h, IMG_SIZE[0]//grid_w]).reshape(1,2,1,1) - position = dfl_numpy(position) - box_xy = grid + 0.5 - position[:,0:2,:,:] - box_xy2 = grid + 0.5 + position[:,2:4,:,:] - return np.concatenate((box_xy*stride, box_xy2*stride), axis=1) - -def filter_boxes(boxes, box_confidences, box_class_probs): - boxes = boxes.reshape(-1,4) - box_confidences = box_confidences.reshape(-1) - box_class_probs = np.array(box_class_probs) - - class_ids = np.argmax(box_class_probs, axis=-1) - class_scores = box_class_probs[np.arange(len(class_ids)), class_ids] - scores = box_confidences * class_scores - - mask = scores >= OBJ_THRESH - return np.sum(mask) > 0 # True: 有 bag, False: 无 bag - -def post_process(outputs, scale, dx, dy): - boxes_list, conf_list, class_list = [], [], [] - for i in range(3): - boxes_list.append(box_process(outputs[i*3])) - conf_list.append(outputs[i*3+2]) - class_list.append(outputs[i*3+1]) - - def flatten(x): - x = x.transpose(0,2,3,1) - return x.reshape(-1,x.shape[3]) - - boxes = np.concatenate([flatten(b) for b in boxes_list]) - box_conf = np.concatenate([flatten(c) for c in conf_list]) - class_probs = np.concatenate([flatten(c) for c in class_list]) - return filter_boxes(boxes, box_conf, class_probs) - -# ================== RKNN 初始化 ================== -rknn = RKNNLite() -if rknn.load_rknn(RKNN_MODEL) != 0: - raise RuntimeError("RKNN 模型加载失败") -if rknn.init_runtime(core_mask=RKNNLite.NPU_CORE_AUTO) != 0: - raise RuntimeError("RKNN Runtime 初始化失败") -print("✅ RKNN 初始化完成") - -# ================== 视频流 ================== -cap = cv2.VideoCapture(RTSP_URL) -if not cap.isOpened(): - raise RuntimeError("RTSP 连接失败") -print("🎥 视频流已连接") - -# ================== 状态机 ================== -STATE_IDLE = 0 -STATE_CAPTURING = 1 - -state = STATE_IDLE -miss_count = 0 -save_idx = 0 -session_dir = None -session_id = 0 -last_gray = None -frame_count = 0 - -try: - while True: - ret, frame = cap.read() - if not ret: - time.sleep(0.5) - continue - - frame_count += 1 - - if SHOW_WINDOW: - cv2.imshow("Camera", frame) - if cv2.waitKey(1) == ord('q'): - break - - # ---------- 灰度过滤 ---------- - pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) - if is_large_gray(pil_image): - continue - - # ---------- SSIM 去重 ---------- - gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) - if last_gray is not None and state == STATE_IDLE: - sim = ssim(gray, last_gray) - if sim > SSIM_THRESHOLD: - continue - last_gray = gray.copy() - - # ---------- RKNN 推理判断是否有 bag ---------- - img_resized, scale, dx, dy = letterbox_resize(frame, IMG_SIZE) - input_data = np.expand_dims(img_resized, 0) - outputs = rknn.inference(inputs=[input_data]) - has_bag = post_process(outputs, scale, dx, dy) - - # ---------- 状态机 ---------- - if state == STATE_IDLE: - if has_bag: - session_id += 1 - ts = time.strftime("%Y%m%d_%H%M%S") - session_dir = os.path.join(OUTPUT_DIR, f"session_{session_id:04d}_{ts}") - os.makedirs(session_dir, exist_ok=True) - print(f"\n🚀 进入采集") - state = STATE_CAPTURING - miss_count = 0 - save_idx = 0 - - elif state == STATE_CAPTURING: - if has_bag: - miss_count = 0 - else: - miss_count += 1 - - if save_idx % SAVE_EVERY_N_FRAMES == 0: - ts = time.strftime("%Y%m%d_%H%M%S") - ms = int((time.time()%1)*1000) - fname = f"{save_idx:06d}_{ts}_{ms:03d}.png" - cv2.imwrite(os.path.join(session_dir, fname), frame) # 保存原图 - save_idx += 1 - - if miss_count >= END_MISS_FRAMES: - print(f"🛑 退出采集,本次保存 {save_idx} 帧") - state = STATE_IDLE - miss_count = 0 - session_dir = None - -except KeyboardInterrupt: - print("\n🛑 用户退出") - -finally: - cap.release() - cv2.destroyAllWindows() - rknn.release() - print("程序结束") -