273 lines
10 KiB
Python
273 lines
10 KiB
Python
import os
|
||
import cv2
|
||
import numpy as np
|
||
from rknnlite.api import RKNNLite
|
||
from pathlib import Path
|
||
|
||
# ====================== 配置参数 ======================
|
||
MODEL_PATH = "seg.rknn" # 转换好的 RKNN 模型路径
|
||
SOURCE_IMG_DIR = "/home/hx/yolo/output_masks" # 输入图像目录
|
||
OUTPUT_DIR = "/home/hx/yolo/output_masks_rknn" # 输出目录
|
||
ROI_COORDS_FILE = "./roi_coordinates/1_rois.txt" # ROI 文件路径 (x,y,w,h)
|
||
TARGET_SIZE = 640 # 模型输入大小
|
||
CONF_THRESHOLD = 0.25
|
||
IOU_THRESHOLD = 0.45 # 注意:NMS 可能在模型内部完成,也可后处理
|
||
DEVICE = RKNNLite.NPU_CORE_0
|
||
SAVE_TXT = True
|
||
SAVE_MASKS = True
|
||
VIEW_IMG = False
|
||
LINE_WIDTH = 2
|
||
|
||
# YOLO 输出结构配置(根据你的模型调整)
|
||
MASK_PROTO_IDX = 12 # proto 输出索引
|
||
MASK_COEFF_IDXS = [3, 7, 11] # mask coefficient 输出
|
||
CONF_IDXS = [1, 5, 9] # objectness 或 conf 输出
|
||
BOX_IDXS = [0, 4, 8] # bbox 输出
|
||
|
||
|
||
def load_roi_coords(txt_path):
|
||
"""加载 ROI 文件,格式: x,y,w,h"""
|
||
rois = []
|
||
if not os.path.exists(txt_path):
|
||
raise FileNotFoundError(f"❌ ROI 文件未找到: {txt_path}")
|
||
with open(txt_path, 'r') as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if line and not line.startswith('#'):
|
||
try:
|
||
x, y, w, h = map(int, line.split(','))
|
||
rois.append((x, y, w, h))
|
||
print(f"📌 加载 ROI: (x={x}, y={y}, w={w}, h={h})")
|
||
except Exception as e:
|
||
print(f"⚠️ 无法解析 ROI 行: '{line}' | 错误: {e}")
|
||
return rois
|
||
|
||
|
||
def sigmoid(x):
|
||
return 1 / (1 + np.exp(-x))
|
||
|
||
|
||
def letterbox_resize(image, size, bg_color=114):
|
||
"""保持宽高比缩放并填充"""
|
||
target_w, target_h = size
|
||
h, w, _ = image.shape
|
||
scale = min(target_w / w, target_h / h)
|
||
new_w, new_h = int(w * scale), int(h * scale)
|
||
resized = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
|
||
canvas = np.full((target_h, target_w, 3), bg_color, dtype=np.uint8)
|
||
dx = (target_w - new_w) // 2
|
||
dy = (target_h - new_h) // 2
|
||
canvas[dy:dy+new_h, dx:dx+new_w] = resized
|
||
return canvas, scale, dx, dy
|
||
|
||
|
||
def non_max_suppression(boxes, scores, iou_thresh=0.45):
|
||
"""简单 NMS 实现"""
|
||
if len(boxes) == 0:
|
||
return []
|
||
boxes = np.array(boxes)
|
||
scores = np.array(scores)
|
||
order = scores.argsort()[::-1]
|
||
keep = []
|
||
while len(order) > 0:
|
||
i = order[0]
|
||
keep.append(i)
|
||
xx1 = np.maximum(boxes[i, 0], boxes[order[1:], 0])
|
||
yy1 = np.maximum(boxes[i, 1], boxes[order[1:], 1])
|
||
xx2 = np.minimum(boxes[i, 2], boxes[order[1:], 2])
|
||
yy2 = np.minimum(boxes[i, 3], boxes[order[1:], 3])
|
||
w = np.maximum(0.0, xx2 - xx1 + 1)
|
||
h = np.maximum(0.0, yy2 - yy1 + 1)
|
||
inter = w * h
|
||
ovr = inter / (areas[i] + areas[order[1:]] - inter)
|
||
inds = np.where(ovr <= iou_thresh)[0]
|
||
order = order[inds + 1]
|
||
return keep
|
||
|
||
|
||
def run_rknn_inference_with_roi():
|
||
# 加载模型
|
||
rknn = RKNNLite(verbose=True)
|
||
ret = rknn.load_rknn(MODEL_PATH)
|
||
if ret != 0:
|
||
print("❌ 加载 RKNN 模型失败")
|
||
return
|
||
|
||
ret = rknn.init_runtime(core_mask=DEVICE)
|
||
if ret != 0:
|
||
print("❌ 初始化 NPU 运行时失败")
|
||
return
|
||
|
||
print(f"✅ 模型 {MODEL_PATH} 加载成功")
|
||
|
||
# 创建输出目录
|
||
output_dir = Path(OUTPUT_DIR)
|
||
txt_dir = output_dir / "labels"
|
||
mask_dir = output_dir / "masks"
|
||
vis_dir = output_dir / "visualize"
|
||
for d in [output_dir, txt_dir, mask_dir, vis_dir]:
|
||
d.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 加载 ROI
|
||
rois = load_roi_coords(ROI_COORDS_FILE)
|
||
if len(rois) == 0:
|
||
print("❌ 没有有效 ROI,退出。")
|
||
return
|
||
|
||
# 获取图像列表
|
||
img_files = list(Path(SOURCE_IMG_DIR).glob("*.jpg")) + \
|
||
list(Path(SOURCE_IMG_DIR).glob("*.png"))
|
||
|
||
for img_path in img_files:
|
||
print(f"\n🔍 处理图像: {img_path.name}")
|
||
orig_img = cv2.imread(str(img_path))
|
||
if orig_img is None:
|
||
print(f"❌ 无法读取图像: {img_path}")
|
||
continue
|
||
h_orig, w_orig = orig_img.shape[:2]
|
||
|
||
full_vis_img = orig_img.copy()
|
||
all_segments = [] # 存储所有归一化后的多边形点
|
||
|
||
for roi_idx, (x, y, w, h) in enumerate(rois):
|
||
if x < 0 or y < 0 or x + w > w_orig or y + h > h_orig:
|
||
print(f"⚠️ ROI 越界,跳过: ({x},{y},{w},{h})")
|
||
continue
|
||
|
||
# 提取并预处理 ROI
|
||
roi_img = orig_img[y:y+h, x:x+w]
|
||
if roi_img.size == 0:
|
||
print(f"⚠️ 空 ROI: {roi_idx}")
|
||
continue
|
||
|
||
preprocessed, scale, dx, dy = letterbox_resize(roi_img, (TARGET_SIZE, TARGET_SIZE))
|
||
infer_input = preprocessed[..., ::-1].astype(np.float32) # BGR -> RGB
|
||
infer_input = np.expand_dims(infer_input, axis=0)
|
||
|
||
# 推理
|
||
outputs = rknn.inference(inputs=[infer_input])
|
||
|
||
# 解析输出
|
||
proto = outputs[MASK_PROTO_IDX][0] # [32, 160, 160]
|
||
mask_coeffs_list = [outputs[i] for i in MASK_COEFF_IDXS] # list of [1, C, H, W]
|
||
conf_list = [outputs[i] for i in CONF_IDXS]
|
||
box_list = [outputs[i] for i in BOX_IDXS]
|
||
|
||
# 合并所有尺度的检测结果
|
||
candidates = []
|
||
for s_idx in range(len(conf_list)):
|
||
conf_map = conf_list[s_idx].flatten()
|
||
box_map = box_list[s_idx].reshape(4, -1).T
|
||
coeff_map = mask_coeffs_list[s_idx].reshape(-1, conf_map.shape[0])
|
||
for j in range(conf_map.shape[0]):
|
||
if conf_map[j] > CONF_THRESHOLD:
|
||
candidates.append({
|
||
'conf': conf_map[j],
|
||
'box': box_map[j], # 相对 640 坐标
|
||
'coeff': coeff_map[:, j]
|
||
})
|
||
|
||
if not candidates:
|
||
continue
|
||
|
||
# 按置信度排序
|
||
candidates.sort(key=lambda x: x['conf'], reverse=True)
|
||
top_dets = candidates[:100] # 取前100做NMS
|
||
|
||
boxes = np.array([d['box'] for d in top_dets])
|
||
scores = np.array([d['conf'] for d in top_dets])
|
||
coeffs = np.array([d['coeff'] for d in top_dets])
|
||
|
||
# 反算 bounding box 到 ROI 像素空间
|
||
# boxes 是 cx,cy,w,h 归一化到 [0,1]?
|
||
# 根据你的模型输出结构调整下面逻辑(假设是 cx,cy,w,h in [0,1])
|
||
cx = boxes[:, 0] * TARGET_SIZE
|
||
cy = boxes[:, 1] * TARGET_SIZE
|
||
bw = boxes[:, 2] * TARGET_SIZE
|
||
bh = boxes[:, 3] * TARGET_SIZE
|
||
x1 = cx - bw / 2
|
||
y1 = cy - bh / 2
|
||
x2 = cx + bw / 2
|
||
y2 = cy + bh / 2
|
||
|
||
# 映射回 letterbox 内的实际区域
|
||
x1 = (x1 - dx) / scale
|
||
y1 = (y1 - dy) / scale
|
||
x2 = (x2 - dx) / scale
|
||
y2 = (y2 - dy) / scale
|
||
|
||
# clip to ROI
|
||
x1 = np.clip(x1, 0, w)
|
||
y1 = np.clip(y1, 0, h)
|
||
x2 = np.clip(x2, 0, w)
|
||
y2 = np.clip(y2, 0, h)
|
||
|
||
# NMS
|
||
areas = (x2 - x1) * (y2 - y1)
|
||
keep_indices = non_max_suppression(np.stack([x1, y1, x2, y2], axis=1), scores, IOU_THRESHOLD)
|
||
|
||
for i in keep_indices:
|
||
det = top_dets[i]
|
||
conf = det['conf']
|
||
mask_coeff = det['coeff'] # [32]
|
||
|
||
# 生成 mask
|
||
mask_flat = sigmoid(np.matmul(mask_coeff, proto.reshape(32, -1)))
|
||
mask = mask_flat.reshape(160, 160)
|
||
mask_resized = cv2.resize(mask, (TARGET_SIZE, TARGET_SIZE), interpolation=cv2.INTER_LINEAR)
|
||
|
||
# 反变换到 ROI 原始尺寸
|
||
mask_roi = cv2.resize(mask_resized, (w, h), interpolation=cv2.INTER_LINEAR)
|
||
mask_bin = (mask_roi > 0.5).astype(np.uint8)
|
||
|
||
# 找轮廓
|
||
contours, _ = cv2.findContours(mask_bin, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||
if len(contours) == 0:
|
||
continue
|
||
largest_contour = max(contours, key=cv2.contourArea)
|
||
segment = largest_contour.squeeze().astype(float)
|
||
|
||
# 映射回原图坐标
|
||
segment[:, 0] += x
|
||
segment[:, 1] += y
|
||
# 归一化到 [0,1]
|
||
segment[:, 0] /= w_orig
|
||
segment[:, 1] /= h_orig
|
||
all_segments.append((0, segment, conf)) # cls_id=0
|
||
|
||
# 绘制可视化
|
||
cv2.drawContours(full_vis_img[y:y+h, x:x+w], [largest_contour], -1, (0, 255, 0), LINE_WIDTH)
|
||
cx_int = int((x1[i] + x2[i]) / 2) + x
|
||
cy_int = int(y1[i]) + y - 10
|
||
cv2.putText(full_vis_img, f'0 {conf:.2f}', (cx_int, cy_int),
|
||
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 1)
|
||
|
||
# 保存 ROI mask(可选)
|
||
if SAVE_MASKS:
|
||
mask_canvas = np.zeros((h_orig, w_orig), dtype=np.uint8)
|
||
combined = np.zeros_like(mask_bin)
|
||
for i in keep_indices:
|
||
mask_flat = sigmoid(np.matmul(top_dets[i]['coeff'], proto.reshape(32, -1)))
|
||
mask = cv2.resize(mask_flat.reshape(160, 160), (w, h)) > 0.5
|
||
combined |= mask
|
||
mask_canvas[y:y+h, x:x+w] = (combined * 255).astype(np.uint8)
|
||
cv2.imwrite(str(mask_dir / f"{img_path.stem}_roi{roi_idx}.png"), mask_canvas)
|
||
|
||
# 保存最终可视化
|
||
cv2.imwrite(str(vis_dir / f"vis_{img_path.name}"), full_vis_img)
|
||
|
||
# 保存 TXT 标签
|
||
if SAVE_TXT and all_segments:
|
||
with open(txt_dir / f"{img_path.stem}.txt", 'w') as f:
|
||
for cls_id, seg, conf in all_segments:
|
||
seg_flat = seg.flatten()
|
||
f.write(f"{cls_id} {' '.join(f'{x:.6f}' for x in seg_flat)}\n")
|
||
|
||
print(f"✅ 已保存结果: {vis_dir / f'vis_{img_path.name}'}")
|
||
|
||
rknn.release()
|
||
print(f"\n🎉 全部完成!输出位于: {OUTPUT_DIR}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
run_rknn_inference_with_roi() |