import os import cv2 import numpy as np from rknnlite.api import RKNNLite from pathlib import Path # ====================== 配置参数 ====================== MODEL_PATH = "seg.rknn" # 转换好的 RKNN 模型路径 SOURCE_IMG_DIR = "/home/hx/yolo/output_masks" # 输入图像目录 OUTPUT_DIR = "/home/hx/yolo/output_masks_rknn" # 输出目录 ROI_COORDS_FILE = "./roi_coordinates/1_rois.txt" # ROI 文件路径 (x,y,w,h) TARGET_SIZE = 640 # 模型输入大小 CONF_THRESHOLD = 0.25 IOU_THRESHOLD = 0.45 # 注意:NMS 可能在模型内部完成,也可后处理 DEVICE = RKNNLite.NPU_CORE_0 SAVE_TXT = True SAVE_MASKS = True VIEW_IMG = False LINE_WIDTH = 2 # YOLO 输出结构配置(根据你的模型调整) MASK_PROTO_IDX = 12 # proto 输出索引 MASK_COEFF_IDXS = [3, 7, 11] # mask coefficient 输出 CONF_IDXS = [1, 5, 9] # objectness 或 conf 输出 BOX_IDXS = [0, 4, 8] # bbox 输出 def load_roi_coords(txt_path): """加载 ROI 文件,格式: x,y,w,h""" rois = [] if not os.path.exists(txt_path): raise FileNotFoundError(f"❌ ROI 文件未找到: {txt_path}") with open(txt_path, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#'): try: x, y, w, h = map(int, line.split(',')) rois.append((x, y, w, h)) print(f"📌 加载 ROI: (x={x}, y={y}, w={w}, h={h})") except Exception as e: print(f"⚠️ 无法解析 ROI 行: '{line}' | 错误: {e}") return rois def sigmoid(x): return 1 / (1 + np.exp(-x)) def letterbox_resize(image, size, bg_color=114): """保持宽高比缩放并填充""" target_w, target_h = size h, w, _ = image.shape scale = min(target_w / w, target_h / h) new_w, new_h = int(w * scale), int(h * scale) resized = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_LINEAR) canvas = np.full((target_h, target_w, 3), bg_color, dtype=np.uint8) dx = (target_w - new_w) // 2 dy = (target_h - new_h) // 2 canvas[dy:dy+new_h, dx:dx+new_w] = resized return canvas, scale, dx, dy def non_max_suppression(boxes, scores, iou_thresh=0.45): """简单 NMS 实现""" if len(boxes) == 0: return [] boxes = np.array(boxes) scores = np.array(scores) order = scores.argsort()[::-1] keep = [] while len(order) > 0: i = order[0] keep.append(i) xx1 = np.maximum(boxes[i, 0], boxes[order[1:], 0]) yy1 = np.maximum(boxes[i, 1], boxes[order[1:], 1]) xx2 = np.minimum(boxes[i, 2], boxes[order[1:], 2]) yy2 = np.minimum(boxes[i, 3], boxes[order[1:], 3]) w = np.maximum(0.0, xx2 - xx1 + 1) h = np.maximum(0.0, yy2 - yy1 + 1) inter = w * h ovr = inter / (areas[i] + areas[order[1:]] - inter) inds = np.where(ovr <= iou_thresh)[0] order = order[inds + 1] return keep def run_rknn_inference_with_roi(): # 加载模型 rknn = RKNNLite(verbose=True) ret = rknn.load_rknn(MODEL_PATH) if ret != 0: print("❌ 加载 RKNN 模型失败") return ret = rknn.init_runtime(core_mask=DEVICE) if ret != 0: print("❌ 初始化 NPU 运行时失败") return print(f"✅ 模型 {MODEL_PATH} 加载成功") # 创建输出目录 output_dir = Path(OUTPUT_DIR) txt_dir = output_dir / "labels" mask_dir = output_dir / "masks" vis_dir = output_dir / "visualize" for d in [output_dir, txt_dir, mask_dir, vis_dir]: d.mkdir(parents=True, exist_ok=True) # 加载 ROI rois = load_roi_coords(ROI_COORDS_FILE) if len(rois) == 0: print("❌ 没有有效 ROI,退出。") return # 获取图像列表 img_files = list(Path(SOURCE_IMG_DIR).glob("*.jpg")) + \ list(Path(SOURCE_IMG_DIR).glob("*.png")) for img_path in img_files: print(f"\n🔍 处理图像: {img_path.name}") orig_img = cv2.imread(str(img_path)) if orig_img is None: print(f"❌ 无法读取图像: {img_path}") continue h_orig, w_orig = orig_img.shape[:2] full_vis_img = orig_img.copy() all_segments = [] # 存储所有归一化后的多边形点 for roi_idx, (x, y, w, h) in enumerate(rois): if x < 0 or y < 0 or x + w > w_orig or y + h > h_orig: print(f"⚠️ ROI 越界,跳过: ({x},{y},{w},{h})") continue # 提取并预处理 ROI roi_img = orig_img[y:y+h, x:x+w] if roi_img.size == 0: print(f"⚠️ 空 ROI: {roi_idx}") continue preprocessed, scale, dx, dy = letterbox_resize(roi_img, (TARGET_SIZE, TARGET_SIZE)) infer_input = preprocessed[..., ::-1].astype(np.float32) # BGR -> RGB infer_input = np.expand_dims(infer_input, axis=0) # 推理 outputs = rknn.inference(inputs=[infer_input]) # 解析输出 proto = outputs[MASK_PROTO_IDX][0] # [32, 160, 160] mask_coeffs_list = [outputs[i] for i in MASK_COEFF_IDXS] # list of [1, C, H, W] conf_list = [outputs[i] for i in CONF_IDXS] box_list = [outputs[i] for i in BOX_IDXS] # 合并所有尺度的检测结果 candidates = [] for s_idx in range(len(conf_list)): conf_map = conf_list[s_idx].flatten() box_map = box_list[s_idx].reshape(4, -1).T coeff_map = mask_coeffs_list[s_idx].reshape(-1, conf_map.shape[0]) for j in range(conf_map.shape[0]): if conf_map[j] > CONF_THRESHOLD: candidates.append({ 'conf': conf_map[j], 'box': box_map[j], # 相对 640 坐标 'coeff': coeff_map[:, j] }) if not candidates: continue # 按置信度排序 candidates.sort(key=lambda x: x['conf'], reverse=True) top_dets = candidates[:100] # 取前100做NMS boxes = np.array([d['box'] for d in top_dets]) scores = np.array([d['conf'] for d in top_dets]) coeffs = np.array([d['coeff'] for d in top_dets]) # 反算 bounding box 到 ROI 像素空间 # boxes 是 cx,cy,w,h 归一化到 [0,1]? # 根据你的模型输出结构调整下面逻辑(假设是 cx,cy,w,h in [0,1]) cx = boxes[:, 0] * TARGET_SIZE cy = boxes[:, 1] * TARGET_SIZE bw = boxes[:, 2] * TARGET_SIZE bh = boxes[:, 3] * TARGET_SIZE x1 = cx - bw / 2 y1 = cy - bh / 2 x2 = cx + bw / 2 y2 = cy + bh / 2 # 映射回 letterbox 内的实际区域 x1 = (x1 - dx) / scale y1 = (y1 - dy) / scale x2 = (x2 - dx) / scale y2 = (y2 - dy) / scale # clip to ROI x1 = np.clip(x1, 0, w) y1 = np.clip(y1, 0, h) x2 = np.clip(x2, 0, w) y2 = np.clip(y2, 0, h) # NMS areas = (x2 - x1) * (y2 - y1) keep_indices = non_max_suppression(np.stack([x1, y1, x2, y2], axis=1), scores, IOU_THRESHOLD) for i in keep_indices: det = top_dets[i] conf = det['conf'] mask_coeff = det['coeff'] # [32] # 生成 mask mask_flat = sigmoid(np.matmul(mask_coeff, proto.reshape(32, -1))) mask = mask_flat.reshape(160, 160) mask_resized = cv2.resize(mask, (TARGET_SIZE, TARGET_SIZE), interpolation=cv2.INTER_LINEAR) # 反变换到 ROI 原始尺寸 mask_roi = cv2.resize(mask_resized, (w, h), interpolation=cv2.INTER_LINEAR) mask_bin = (mask_roi > 0.5).astype(np.uint8) # 找轮廓 contours, _ = cv2.findContours(mask_bin, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if len(contours) == 0: continue largest_contour = max(contours, key=cv2.contourArea) segment = largest_contour.squeeze().astype(float) # 映射回原图坐标 segment[:, 0] += x segment[:, 1] += y # 归一化到 [0,1] segment[:, 0] /= w_orig segment[:, 1] /= h_orig all_segments.append((0, segment, conf)) # cls_id=0 # 绘制可视化 cv2.drawContours(full_vis_img[y:y+h, x:x+w], [largest_contour], -1, (0, 255, 0), LINE_WIDTH) cx_int = int((x1[i] + x2[i]) / 2) + x cy_int = int(y1[i]) + y - 10 cv2.putText(full_vis_img, f'0 {conf:.2f}', (cx_int, cy_int), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 1) # 保存 ROI mask(可选) if SAVE_MASKS: mask_canvas = np.zeros((h_orig, w_orig), dtype=np.uint8) combined = np.zeros_like(mask_bin) for i in keep_indices: mask_flat = sigmoid(np.matmul(top_dets[i]['coeff'], proto.reshape(32, -1))) mask = cv2.resize(mask_flat.reshape(160, 160), (w, h)) > 0.5 combined |= mask mask_canvas[y:y+h, x:x+w] = (combined * 255).astype(np.uint8) cv2.imwrite(str(mask_dir / f"{img_path.stem}_roi{roi_idx}.png"), mask_canvas) # 保存最终可视化 cv2.imwrite(str(vis_dir / f"vis_{img_path.name}"), full_vis_img) # 保存 TXT 标签 if SAVE_TXT and all_segments: with open(txt_dir / f"{img_path.stem}.txt", 'w') as f: for cls_id, seg, conf in all_segments: seg_flat = seg.flatten() f.write(f"{cls_id} {' '.join(f'{x:.6f}' for x in seg_flat)}\n") print(f"✅ 已保存结果: {vis_dir / f'vis_{img_path.name}'}") rknn.release() print(f"\n🎉 全部完成!输出位于: {OUTPUT_DIR}") if __name__ == "__main__": run_rknn_inference_with_roi()