Files
zjsh_yolov11/yemian/yemian_bushu.py

273 lines
10 KiB
Python
Raw Permalink Normal View History

2025-09-11 20:44:35 +08:00
import os
import cv2
import numpy as np
from rknnlite.api import RKNNLite
from pathlib import Path
# ====================== 配置参数 ======================
MODEL_PATH = "seg.rknn" # 转换好的 RKNN 模型路径
SOURCE_IMG_DIR = "/home/hx/yolo/output_masks" # 输入图像目录
OUTPUT_DIR = "/home/hx/yolo/output_masks_rknn" # 输出目录
ROI_COORDS_FILE = "./roi_coordinates/1_rois.txt" # ROI 文件路径 (x,y,w,h)
TARGET_SIZE = 640 # 模型输入大小
CONF_THRESHOLD = 0.25
IOU_THRESHOLD = 0.45 # 注意NMS 可能在模型内部完成,也可后处理
DEVICE = RKNNLite.NPU_CORE_0
SAVE_TXT = True
SAVE_MASKS = True
VIEW_IMG = False
LINE_WIDTH = 2
# YOLO 输出结构配置(根据你的模型调整)
MASK_PROTO_IDX = 12 # proto 输出索引
MASK_COEFF_IDXS = [3, 7, 11] # mask coefficient 输出
CONF_IDXS = [1, 5, 9] # objectness 或 conf 输出
BOX_IDXS = [0, 4, 8] # bbox 输出
def load_roi_coords(txt_path):
"""加载 ROI 文件,格式: x,y,w,h"""
rois = []
if not os.path.exists(txt_path):
raise FileNotFoundError(f"❌ ROI 文件未找到: {txt_path}")
with open(txt_path, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
try:
x, y, w, h = map(int, line.split(','))
rois.append((x, y, w, h))
print(f"📌 加载 ROI: (x={x}, y={y}, w={w}, h={h})")
except Exception as e:
print(f"⚠️ 无法解析 ROI 行: '{line}' | 错误: {e}")
return rois
def sigmoid(x):
return 1 / (1 + np.exp(-x))
def letterbox_resize(image, size, bg_color=114):
"""保持宽高比缩放并填充"""
target_w, target_h = size
h, w, _ = image.shape
scale = min(target_w / w, target_h / h)
new_w, new_h = int(w * scale), int(h * scale)
resized = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
canvas = np.full((target_h, target_w, 3), bg_color, dtype=np.uint8)
dx = (target_w - new_w) // 2
dy = (target_h - new_h) // 2
canvas[dy:dy+new_h, dx:dx+new_w] = resized
return canvas, scale, dx, dy
def non_max_suppression(boxes, scores, iou_thresh=0.45):
"""简单 NMS 实现"""
if len(boxes) == 0:
return []
boxes = np.array(boxes)
scores = np.array(scores)
order = scores.argsort()[::-1]
keep = []
while len(order) > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(boxes[i, 0], boxes[order[1:], 0])
yy1 = np.maximum(boxes[i, 1], boxes[order[1:], 1])
xx2 = np.minimum(boxes[i, 2], boxes[order[1:], 2])
yy2 = np.minimum(boxes[i, 3], boxes[order[1:], 3])
w = np.maximum(0.0, xx2 - xx1 + 1)
h = np.maximum(0.0, yy2 - yy1 + 1)
inter = w * h
ovr = inter / (areas[i] + areas[order[1:]] - inter)
inds = np.where(ovr <= iou_thresh)[0]
order = order[inds + 1]
return keep
def run_rknn_inference_with_roi():
# 加载模型
rknn = RKNNLite(verbose=True)
ret = rknn.load_rknn(MODEL_PATH)
if ret != 0:
print("❌ 加载 RKNN 模型失败")
return
ret = rknn.init_runtime(core_mask=DEVICE)
if ret != 0:
print("❌ 初始化 NPU 运行时失败")
return
print(f"✅ 模型 {MODEL_PATH} 加载成功")
# 创建输出目录
output_dir = Path(OUTPUT_DIR)
txt_dir = output_dir / "labels"
mask_dir = output_dir / "masks"
vis_dir = output_dir / "visualize"
for d in [output_dir, txt_dir, mask_dir, vis_dir]:
d.mkdir(parents=True, exist_ok=True)
# 加载 ROI
rois = load_roi_coords(ROI_COORDS_FILE)
if len(rois) == 0:
print("❌ 没有有效 ROI退出。")
return
# 获取图像列表
img_files = list(Path(SOURCE_IMG_DIR).glob("*.jpg")) + \
list(Path(SOURCE_IMG_DIR).glob("*.png"))
for img_path in img_files:
print(f"\n🔍 处理图像: {img_path.name}")
orig_img = cv2.imread(str(img_path))
if orig_img is None:
print(f"❌ 无法读取图像: {img_path}")
continue
h_orig, w_orig = orig_img.shape[:2]
full_vis_img = orig_img.copy()
all_segments = [] # 存储所有归一化后的多边形点
for roi_idx, (x, y, w, h) in enumerate(rois):
if x < 0 or y < 0 or x + w > w_orig or y + h > h_orig:
print(f"⚠️ ROI 越界,跳过: ({x},{y},{w},{h})")
continue
# 提取并预处理 ROI
roi_img = orig_img[y:y+h, x:x+w]
if roi_img.size == 0:
print(f"⚠️ 空 ROI: {roi_idx}")
continue
preprocessed, scale, dx, dy = letterbox_resize(roi_img, (TARGET_SIZE, TARGET_SIZE))
infer_input = preprocessed[..., ::-1].astype(np.float32) # BGR -> RGB
infer_input = np.expand_dims(infer_input, axis=0)
# 推理
outputs = rknn.inference(inputs=[infer_input])
# 解析输出
proto = outputs[MASK_PROTO_IDX][0] # [32, 160, 160]
mask_coeffs_list = [outputs[i] for i in MASK_COEFF_IDXS] # list of [1, C, H, W]
conf_list = [outputs[i] for i in CONF_IDXS]
box_list = [outputs[i] for i in BOX_IDXS]
# 合并所有尺度的检测结果
candidates = []
for s_idx in range(len(conf_list)):
conf_map = conf_list[s_idx].flatten()
box_map = box_list[s_idx].reshape(4, -1).T
coeff_map = mask_coeffs_list[s_idx].reshape(-1, conf_map.shape[0])
for j in range(conf_map.shape[0]):
if conf_map[j] > CONF_THRESHOLD:
candidates.append({
'conf': conf_map[j],
'box': box_map[j], # 相对 640 坐标
'coeff': coeff_map[:, j]
})
if not candidates:
continue
# 按置信度排序
candidates.sort(key=lambda x: x['conf'], reverse=True)
top_dets = candidates[:100] # 取前100做NMS
boxes = np.array([d['box'] for d in top_dets])
scores = np.array([d['conf'] for d in top_dets])
coeffs = np.array([d['coeff'] for d in top_dets])
# 反算 bounding box 到 ROI 像素空间
# boxes 是 cx,cy,w,h 归一化到 [0,1]
# 根据你的模型输出结构调整下面逻辑(假设是 cx,cy,w,h in [0,1]
cx = boxes[:, 0] * TARGET_SIZE
cy = boxes[:, 1] * TARGET_SIZE
bw = boxes[:, 2] * TARGET_SIZE
bh = boxes[:, 3] * TARGET_SIZE
x1 = cx - bw / 2
y1 = cy - bh / 2
x2 = cx + bw / 2
y2 = cy + bh / 2
# 映射回 letterbox 内的实际区域
x1 = (x1 - dx) / scale
y1 = (y1 - dy) / scale
x2 = (x2 - dx) / scale
y2 = (y2 - dy) / scale
# clip to ROI
x1 = np.clip(x1, 0, w)
y1 = np.clip(y1, 0, h)
x2 = np.clip(x2, 0, w)
y2 = np.clip(y2, 0, h)
# NMS
areas = (x2 - x1) * (y2 - y1)
keep_indices = non_max_suppression(np.stack([x1, y1, x2, y2], axis=1), scores, IOU_THRESHOLD)
for i in keep_indices:
det = top_dets[i]
conf = det['conf']
mask_coeff = det['coeff'] # [32]
# 生成 mask
mask_flat = sigmoid(np.matmul(mask_coeff, proto.reshape(32, -1)))
mask = mask_flat.reshape(160, 160)
mask_resized = cv2.resize(mask, (TARGET_SIZE, TARGET_SIZE), interpolation=cv2.INTER_LINEAR)
# 反变换到 ROI 原始尺寸
mask_roi = cv2.resize(mask_resized, (w, h), interpolation=cv2.INTER_LINEAR)
mask_bin = (mask_roi > 0.5).astype(np.uint8)
# 找轮廓
contours, _ = cv2.findContours(mask_bin, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if len(contours) == 0:
continue
largest_contour = max(contours, key=cv2.contourArea)
segment = largest_contour.squeeze().astype(float)
# 映射回原图坐标
segment[:, 0] += x
segment[:, 1] += y
# 归一化到 [0,1]
segment[:, 0] /= w_orig
segment[:, 1] /= h_orig
all_segments.append((0, segment, conf)) # cls_id=0
# 绘制可视化
cv2.drawContours(full_vis_img[y:y+h, x:x+w], [largest_contour], -1, (0, 255, 0), LINE_WIDTH)
cx_int = int((x1[i] + x2[i]) / 2) + x
cy_int = int(y1[i]) + y - 10
cv2.putText(full_vis_img, f'0 {conf:.2f}', (cx_int, cy_int),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 1)
# 保存 ROI mask可选
if SAVE_MASKS:
mask_canvas = np.zeros((h_orig, w_orig), dtype=np.uint8)
combined = np.zeros_like(mask_bin)
for i in keep_indices:
mask_flat = sigmoid(np.matmul(top_dets[i]['coeff'], proto.reshape(32, -1)))
mask = cv2.resize(mask_flat.reshape(160, 160), (w, h)) > 0.5
combined |= mask
mask_canvas[y:y+h, x:x+w] = (combined * 255).astype(np.uint8)
cv2.imwrite(str(mask_dir / f"{img_path.stem}_roi{roi_idx}.png"), mask_canvas)
# 保存最终可视化
cv2.imwrite(str(vis_dir / f"vis_{img_path.name}"), full_vis_img)
# 保存 TXT 标签
if SAVE_TXT and all_segments:
with open(txt_dir / f"{img_path.stem}.txt", 'w') as f:
for cls_id, seg, conf in all_segments:
seg_flat = seg.flatten()
f.write(f"{cls_id} {' '.join(f'{x:.6f}' for x in seg_flat)}\n")
print(f"✅ 已保存结果: {vis_dir / f'vis_{img_path.name}'}")
rknn.release()
print(f"\n🎉 全部完成!输出位于: {OUTPUT_DIR}")
if __name__ == "__main__":
run_rknn_inference_with_roi()