273 lines
10 KiB
Python
273 lines
10 KiB
Python
|
|
import os
|
|||
|
|
import cv2
|
|||
|
|
import numpy as np
|
|||
|
|
from rknnlite.api import RKNNLite
|
|||
|
|
from pathlib import Path
|
|||
|
|
|
|||
|
|
# ====================== 配置参数 ======================
|
|||
|
|
MODEL_PATH = "seg.rknn" # 转换好的 RKNN 模型路径
|
|||
|
|
SOURCE_IMG_DIR = "/home/hx/yolo/output_masks" # 输入图像目录
|
|||
|
|
OUTPUT_DIR = "/home/hx/yolo/output_masks_rknn" # 输出目录
|
|||
|
|
ROI_COORDS_FILE = "./roi_coordinates/1_rois.txt" # ROI 文件路径 (x,y,w,h)
|
|||
|
|
TARGET_SIZE = 640 # 模型输入大小
|
|||
|
|
CONF_THRESHOLD = 0.25
|
|||
|
|
IOU_THRESHOLD = 0.45 # 注意:NMS 可能在模型内部完成,也可后处理
|
|||
|
|
DEVICE = RKNNLite.NPU_CORE_0
|
|||
|
|
SAVE_TXT = True
|
|||
|
|
SAVE_MASKS = True
|
|||
|
|
VIEW_IMG = False
|
|||
|
|
LINE_WIDTH = 2
|
|||
|
|
|
|||
|
|
# YOLO 输出结构配置(根据你的模型调整)
|
|||
|
|
MASK_PROTO_IDX = 12 # proto 输出索引
|
|||
|
|
MASK_COEFF_IDXS = [3, 7, 11] # mask coefficient 输出
|
|||
|
|
CONF_IDXS = [1, 5, 9] # objectness 或 conf 输出
|
|||
|
|
BOX_IDXS = [0, 4, 8] # bbox 输出
|
|||
|
|
|
|||
|
|
|
|||
|
|
def load_roi_coords(txt_path):
|
|||
|
|
"""加载 ROI 文件,格式: x,y,w,h"""
|
|||
|
|
rois = []
|
|||
|
|
if not os.path.exists(txt_path):
|
|||
|
|
raise FileNotFoundError(f"❌ ROI 文件未找到: {txt_path}")
|
|||
|
|
with open(txt_path, 'r') as f:
|
|||
|
|
for line in f:
|
|||
|
|
line = line.strip()
|
|||
|
|
if line and not line.startswith('#'):
|
|||
|
|
try:
|
|||
|
|
x, y, w, h = map(int, line.split(','))
|
|||
|
|
rois.append((x, y, w, h))
|
|||
|
|
print(f"📌 加载 ROI: (x={x}, y={y}, w={w}, h={h})")
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"⚠️ 无法解析 ROI 行: '{line}' | 错误: {e}")
|
|||
|
|
return rois
|
|||
|
|
|
|||
|
|
|
|||
|
|
def sigmoid(x):
|
|||
|
|
return 1 / (1 + np.exp(-x))
|
|||
|
|
|
|||
|
|
|
|||
|
|
def letterbox_resize(image, size, bg_color=114):
|
|||
|
|
"""保持宽高比缩放并填充"""
|
|||
|
|
target_w, target_h = size
|
|||
|
|
h, w, _ = image.shape
|
|||
|
|
scale = min(target_w / w, target_h / h)
|
|||
|
|
new_w, new_h = int(w * scale), int(h * scale)
|
|||
|
|
resized = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
|
|||
|
|
canvas = np.full((target_h, target_w, 3), bg_color, dtype=np.uint8)
|
|||
|
|
dx = (target_w - new_w) // 2
|
|||
|
|
dy = (target_h - new_h) // 2
|
|||
|
|
canvas[dy:dy+new_h, dx:dx+new_w] = resized
|
|||
|
|
return canvas, scale, dx, dy
|
|||
|
|
|
|||
|
|
|
|||
|
|
def non_max_suppression(boxes, scores, iou_thresh=0.45):
|
|||
|
|
"""简单 NMS 实现"""
|
|||
|
|
if len(boxes) == 0:
|
|||
|
|
return []
|
|||
|
|
boxes = np.array(boxes)
|
|||
|
|
scores = np.array(scores)
|
|||
|
|
order = scores.argsort()[::-1]
|
|||
|
|
keep = []
|
|||
|
|
while len(order) > 0:
|
|||
|
|
i = order[0]
|
|||
|
|
keep.append(i)
|
|||
|
|
xx1 = np.maximum(boxes[i, 0], boxes[order[1:], 0])
|
|||
|
|
yy1 = np.maximum(boxes[i, 1], boxes[order[1:], 1])
|
|||
|
|
xx2 = np.minimum(boxes[i, 2], boxes[order[1:], 2])
|
|||
|
|
yy2 = np.minimum(boxes[i, 3], boxes[order[1:], 3])
|
|||
|
|
w = np.maximum(0.0, xx2 - xx1 + 1)
|
|||
|
|
h = np.maximum(0.0, yy2 - yy1 + 1)
|
|||
|
|
inter = w * h
|
|||
|
|
ovr = inter / (areas[i] + areas[order[1:]] - inter)
|
|||
|
|
inds = np.where(ovr <= iou_thresh)[0]
|
|||
|
|
order = order[inds + 1]
|
|||
|
|
return keep
|
|||
|
|
|
|||
|
|
|
|||
|
|
def run_rknn_inference_with_roi():
|
|||
|
|
# 加载模型
|
|||
|
|
rknn = RKNNLite(verbose=True)
|
|||
|
|
ret = rknn.load_rknn(MODEL_PATH)
|
|||
|
|
if ret != 0:
|
|||
|
|
print("❌ 加载 RKNN 模型失败")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
ret = rknn.init_runtime(core_mask=DEVICE)
|
|||
|
|
if ret != 0:
|
|||
|
|
print("❌ 初始化 NPU 运行时失败")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
print(f"✅ 模型 {MODEL_PATH} 加载成功")
|
|||
|
|
|
|||
|
|
# 创建输出目录
|
|||
|
|
output_dir = Path(OUTPUT_DIR)
|
|||
|
|
txt_dir = output_dir / "labels"
|
|||
|
|
mask_dir = output_dir / "masks"
|
|||
|
|
vis_dir = output_dir / "visualize"
|
|||
|
|
for d in [output_dir, txt_dir, mask_dir, vis_dir]:
|
|||
|
|
d.mkdir(parents=True, exist_ok=True)
|
|||
|
|
|
|||
|
|
# 加载 ROI
|
|||
|
|
rois = load_roi_coords(ROI_COORDS_FILE)
|
|||
|
|
if len(rois) == 0:
|
|||
|
|
print("❌ 没有有效 ROI,退出。")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
# 获取图像列表
|
|||
|
|
img_files = list(Path(SOURCE_IMG_DIR).glob("*.jpg")) + \
|
|||
|
|
list(Path(SOURCE_IMG_DIR).glob("*.png"))
|
|||
|
|
|
|||
|
|
for img_path in img_files:
|
|||
|
|
print(f"\n🔍 处理图像: {img_path.name}")
|
|||
|
|
orig_img = cv2.imread(str(img_path))
|
|||
|
|
if orig_img is None:
|
|||
|
|
print(f"❌ 无法读取图像: {img_path}")
|
|||
|
|
continue
|
|||
|
|
h_orig, w_orig = orig_img.shape[:2]
|
|||
|
|
|
|||
|
|
full_vis_img = orig_img.copy()
|
|||
|
|
all_segments = [] # 存储所有归一化后的多边形点
|
|||
|
|
|
|||
|
|
for roi_idx, (x, y, w, h) in enumerate(rois):
|
|||
|
|
if x < 0 or y < 0 or x + w > w_orig or y + h > h_orig:
|
|||
|
|
print(f"⚠️ ROI 越界,跳过: ({x},{y},{w},{h})")
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 提取并预处理 ROI
|
|||
|
|
roi_img = orig_img[y:y+h, x:x+w]
|
|||
|
|
if roi_img.size == 0:
|
|||
|
|
print(f"⚠️ 空 ROI: {roi_idx}")
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
preprocessed, scale, dx, dy = letterbox_resize(roi_img, (TARGET_SIZE, TARGET_SIZE))
|
|||
|
|
infer_input = preprocessed[..., ::-1].astype(np.float32) # BGR -> RGB
|
|||
|
|
infer_input = np.expand_dims(infer_input, axis=0)
|
|||
|
|
|
|||
|
|
# 推理
|
|||
|
|
outputs = rknn.inference(inputs=[infer_input])
|
|||
|
|
|
|||
|
|
# 解析输出
|
|||
|
|
proto = outputs[MASK_PROTO_IDX][0] # [32, 160, 160]
|
|||
|
|
mask_coeffs_list = [outputs[i] for i in MASK_COEFF_IDXS] # list of [1, C, H, W]
|
|||
|
|
conf_list = [outputs[i] for i in CONF_IDXS]
|
|||
|
|
box_list = [outputs[i] for i in BOX_IDXS]
|
|||
|
|
|
|||
|
|
# 合并所有尺度的检测结果
|
|||
|
|
candidates = []
|
|||
|
|
for s_idx in range(len(conf_list)):
|
|||
|
|
conf_map = conf_list[s_idx].flatten()
|
|||
|
|
box_map = box_list[s_idx].reshape(4, -1).T
|
|||
|
|
coeff_map = mask_coeffs_list[s_idx].reshape(-1, conf_map.shape[0])
|
|||
|
|
for j in range(conf_map.shape[0]):
|
|||
|
|
if conf_map[j] > CONF_THRESHOLD:
|
|||
|
|
candidates.append({
|
|||
|
|
'conf': conf_map[j],
|
|||
|
|
'box': box_map[j], # 相对 640 坐标
|
|||
|
|
'coeff': coeff_map[:, j]
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
if not candidates:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 按置信度排序
|
|||
|
|
candidates.sort(key=lambda x: x['conf'], reverse=True)
|
|||
|
|
top_dets = candidates[:100] # 取前100做NMS
|
|||
|
|
|
|||
|
|
boxes = np.array([d['box'] for d in top_dets])
|
|||
|
|
scores = np.array([d['conf'] for d in top_dets])
|
|||
|
|
coeffs = np.array([d['coeff'] for d in top_dets])
|
|||
|
|
|
|||
|
|
# 反算 bounding box 到 ROI 像素空间
|
|||
|
|
# boxes 是 cx,cy,w,h 归一化到 [0,1]?
|
|||
|
|
# 根据你的模型输出结构调整下面逻辑(假设是 cx,cy,w,h in [0,1])
|
|||
|
|
cx = boxes[:, 0] * TARGET_SIZE
|
|||
|
|
cy = boxes[:, 1] * TARGET_SIZE
|
|||
|
|
bw = boxes[:, 2] * TARGET_SIZE
|
|||
|
|
bh = boxes[:, 3] * TARGET_SIZE
|
|||
|
|
x1 = cx - bw / 2
|
|||
|
|
y1 = cy - bh / 2
|
|||
|
|
x2 = cx + bw / 2
|
|||
|
|
y2 = cy + bh / 2
|
|||
|
|
|
|||
|
|
# 映射回 letterbox 内的实际区域
|
|||
|
|
x1 = (x1 - dx) / scale
|
|||
|
|
y1 = (y1 - dy) / scale
|
|||
|
|
x2 = (x2 - dx) / scale
|
|||
|
|
y2 = (y2 - dy) / scale
|
|||
|
|
|
|||
|
|
# clip to ROI
|
|||
|
|
x1 = np.clip(x1, 0, w)
|
|||
|
|
y1 = np.clip(y1, 0, h)
|
|||
|
|
x2 = np.clip(x2, 0, w)
|
|||
|
|
y2 = np.clip(y2, 0, h)
|
|||
|
|
|
|||
|
|
# NMS
|
|||
|
|
areas = (x2 - x1) * (y2 - y1)
|
|||
|
|
keep_indices = non_max_suppression(np.stack([x1, y1, x2, y2], axis=1), scores, IOU_THRESHOLD)
|
|||
|
|
|
|||
|
|
for i in keep_indices:
|
|||
|
|
det = top_dets[i]
|
|||
|
|
conf = det['conf']
|
|||
|
|
mask_coeff = det['coeff'] # [32]
|
|||
|
|
|
|||
|
|
# 生成 mask
|
|||
|
|
mask_flat = sigmoid(np.matmul(mask_coeff, proto.reshape(32, -1)))
|
|||
|
|
mask = mask_flat.reshape(160, 160)
|
|||
|
|
mask_resized = cv2.resize(mask, (TARGET_SIZE, TARGET_SIZE), interpolation=cv2.INTER_LINEAR)
|
|||
|
|
|
|||
|
|
# 反变换到 ROI 原始尺寸
|
|||
|
|
mask_roi = cv2.resize(mask_resized, (w, h), interpolation=cv2.INTER_LINEAR)
|
|||
|
|
mask_bin = (mask_roi > 0.5).astype(np.uint8)
|
|||
|
|
|
|||
|
|
# 找轮廓
|
|||
|
|
contours, _ = cv2.findContours(mask_bin, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
|||
|
|
if len(contours) == 0:
|
|||
|
|
continue
|
|||
|
|
largest_contour = max(contours, key=cv2.contourArea)
|
|||
|
|
segment = largest_contour.squeeze().astype(float)
|
|||
|
|
|
|||
|
|
# 映射回原图坐标
|
|||
|
|
segment[:, 0] += x
|
|||
|
|
segment[:, 1] += y
|
|||
|
|
# 归一化到 [0,1]
|
|||
|
|
segment[:, 0] /= w_orig
|
|||
|
|
segment[:, 1] /= h_orig
|
|||
|
|
all_segments.append((0, segment, conf)) # cls_id=0
|
|||
|
|
|
|||
|
|
# 绘制可视化
|
|||
|
|
cv2.drawContours(full_vis_img[y:y+h, x:x+w], [largest_contour], -1, (0, 255, 0), LINE_WIDTH)
|
|||
|
|
cx_int = int((x1[i] + x2[i]) / 2) + x
|
|||
|
|
cy_int = int(y1[i]) + y - 10
|
|||
|
|
cv2.putText(full_vis_img, f'0 {conf:.2f}', (cx_int, cy_int),
|
|||
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 1)
|
|||
|
|
|
|||
|
|
# 保存 ROI mask(可选)
|
|||
|
|
if SAVE_MASKS:
|
|||
|
|
mask_canvas = np.zeros((h_orig, w_orig), dtype=np.uint8)
|
|||
|
|
combined = np.zeros_like(mask_bin)
|
|||
|
|
for i in keep_indices:
|
|||
|
|
mask_flat = sigmoid(np.matmul(top_dets[i]['coeff'], proto.reshape(32, -1)))
|
|||
|
|
mask = cv2.resize(mask_flat.reshape(160, 160), (w, h)) > 0.5
|
|||
|
|
combined |= mask
|
|||
|
|
mask_canvas[y:y+h, x:x+w] = (combined * 255).astype(np.uint8)
|
|||
|
|
cv2.imwrite(str(mask_dir / f"{img_path.stem}_roi{roi_idx}.png"), mask_canvas)
|
|||
|
|
|
|||
|
|
# 保存最终可视化
|
|||
|
|
cv2.imwrite(str(vis_dir / f"vis_{img_path.name}"), full_vis_img)
|
|||
|
|
|
|||
|
|
# 保存 TXT 标签
|
|||
|
|
if SAVE_TXT and all_segments:
|
|||
|
|
with open(txt_dir / f"{img_path.stem}.txt", 'w') as f:
|
|||
|
|
for cls_id, seg, conf in all_segments:
|
|||
|
|
seg_flat = seg.flatten()
|
|||
|
|
f.write(f"{cls_id} {' '.join(f'{x:.6f}' for x in seg_flat)}\n")
|
|||
|
|
|
|||
|
|
print(f"✅ 已保存结果: {vis_dir / f'vis_{img_path.name}'}")
|
|||
|
|
|
|||
|
|
rknn.release()
|
|||
|
|
print(f"\n🎉 全部完成!输出位于: {OUTPUT_DIR}")
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
run_rknn_inference_with_roi()
|