Files
zjsh_yolov11/yemian/yemian_bushu.py
2025-09-11 20:44:35 +08:00

273 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import cv2
import numpy as np
from rknnlite.api import RKNNLite
from pathlib import Path
# ====================== 配置参数 ======================
MODEL_PATH = "seg.rknn" # 转换好的 RKNN 模型路径
SOURCE_IMG_DIR = "/home/hx/yolo/output_masks" # 输入图像目录
OUTPUT_DIR = "/home/hx/yolo/output_masks_rknn" # 输出目录
ROI_COORDS_FILE = "./roi_coordinates/1_rois.txt" # ROI 文件路径 (x,y,w,h)
TARGET_SIZE = 640 # 模型输入大小
CONF_THRESHOLD = 0.25
IOU_THRESHOLD = 0.45 # 注意NMS 可能在模型内部完成,也可后处理
DEVICE = RKNNLite.NPU_CORE_0
SAVE_TXT = True
SAVE_MASKS = True
VIEW_IMG = False
LINE_WIDTH = 2
# YOLO 输出结构配置(根据你的模型调整)
MASK_PROTO_IDX = 12 # proto 输出索引
MASK_COEFF_IDXS = [3, 7, 11] # mask coefficient 输出
CONF_IDXS = [1, 5, 9] # objectness 或 conf 输出
BOX_IDXS = [0, 4, 8] # bbox 输出
def load_roi_coords(txt_path):
"""加载 ROI 文件,格式: x,y,w,h"""
rois = []
if not os.path.exists(txt_path):
raise FileNotFoundError(f"❌ ROI 文件未找到: {txt_path}")
with open(txt_path, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
try:
x, y, w, h = map(int, line.split(','))
rois.append((x, y, w, h))
print(f"📌 加载 ROI: (x={x}, y={y}, w={w}, h={h})")
except Exception as e:
print(f"⚠️ 无法解析 ROI 行: '{line}' | 错误: {e}")
return rois
def sigmoid(x):
return 1 / (1 + np.exp(-x))
def letterbox_resize(image, size, bg_color=114):
"""保持宽高比缩放并填充"""
target_w, target_h = size
h, w, _ = image.shape
scale = min(target_w / w, target_h / h)
new_w, new_h = int(w * scale), int(h * scale)
resized = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
canvas = np.full((target_h, target_w, 3), bg_color, dtype=np.uint8)
dx = (target_w - new_w) // 2
dy = (target_h - new_h) // 2
canvas[dy:dy+new_h, dx:dx+new_w] = resized
return canvas, scale, dx, dy
def non_max_suppression(boxes, scores, iou_thresh=0.45):
"""简单 NMS 实现"""
if len(boxes) == 0:
return []
boxes = np.array(boxes)
scores = np.array(scores)
order = scores.argsort()[::-1]
keep = []
while len(order) > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(boxes[i, 0], boxes[order[1:], 0])
yy1 = np.maximum(boxes[i, 1], boxes[order[1:], 1])
xx2 = np.minimum(boxes[i, 2], boxes[order[1:], 2])
yy2 = np.minimum(boxes[i, 3], boxes[order[1:], 3])
w = np.maximum(0.0, xx2 - xx1 + 1)
h = np.maximum(0.0, yy2 - yy1 + 1)
inter = w * h
ovr = inter / (areas[i] + areas[order[1:]] - inter)
inds = np.where(ovr <= iou_thresh)[0]
order = order[inds + 1]
return keep
def run_rknn_inference_with_roi():
# 加载模型
rknn = RKNNLite(verbose=True)
ret = rknn.load_rknn(MODEL_PATH)
if ret != 0:
print("❌ 加载 RKNN 模型失败")
return
ret = rknn.init_runtime(core_mask=DEVICE)
if ret != 0:
print("❌ 初始化 NPU 运行时失败")
return
print(f"✅ 模型 {MODEL_PATH} 加载成功")
# 创建输出目录
output_dir = Path(OUTPUT_DIR)
txt_dir = output_dir / "labels"
mask_dir = output_dir / "masks"
vis_dir = output_dir / "visualize"
for d in [output_dir, txt_dir, mask_dir, vis_dir]:
d.mkdir(parents=True, exist_ok=True)
# 加载 ROI
rois = load_roi_coords(ROI_COORDS_FILE)
if len(rois) == 0:
print("❌ 没有有效 ROI退出。")
return
# 获取图像列表
img_files = list(Path(SOURCE_IMG_DIR).glob("*.jpg")) + \
list(Path(SOURCE_IMG_DIR).glob("*.png"))
for img_path in img_files:
print(f"\n🔍 处理图像: {img_path.name}")
orig_img = cv2.imread(str(img_path))
if orig_img is None:
print(f"❌ 无法读取图像: {img_path}")
continue
h_orig, w_orig = orig_img.shape[:2]
full_vis_img = orig_img.copy()
all_segments = [] # 存储所有归一化后的多边形点
for roi_idx, (x, y, w, h) in enumerate(rois):
if x < 0 or y < 0 or x + w > w_orig or y + h > h_orig:
print(f"⚠️ ROI 越界,跳过: ({x},{y},{w},{h})")
continue
# 提取并预处理 ROI
roi_img = orig_img[y:y+h, x:x+w]
if roi_img.size == 0:
print(f"⚠️ 空 ROI: {roi_idx}")
continue
preprocessed, scale, dx, dy = letterbox_resize(roi_img, (TARGET_SIZE, TARGET_SIZE))
infer_input = preprocessed[..., ::-1].astype(np.float32) # BGR -> RGB
infer_input = np.expand_dims(infer_input, axis=0)
# 推理
outputs = rknn.inference(inputs=[infer_input])
# 解析输出
proto = outputs[MASK_PROTO_IDX][0] # [32, 160, 160]
mask_coeffs_list = [outputs[i] for i in MASK_COEFF_IDXS] # list of [1, C, H, W]
conf_list = [outputs[i] for i in CONF_IDXS]
box_list = [outputs[i] for i in BOX_IDXS]
# 合并所有尺度的检测结果
candidates = []
for s_idx in range(len(conf_list)):
conf_map = conf_list[s_idx].flatten()
box_map = box_list[s_idx].reshape(4, -1).T
coeff_map = mask_coeffs_list[s_idx].reshape(-1, conf_map.shape[0])
for j in range(conf_map.shape[0]):
if conf_map[j] > CONF_THRESHOLD:
candidates.append({
'conf': conf_map[j],
'box': box_map[j], # 相对 640 坐标
'coeff': coeff_map[:, j]
})
if not candidates:
continue
# 按置信度排序
candidates.sort(key=lambda x: x['conf'], reverse=True)
top_dets = candidates[:100] # 取前100做NMS
boxes = np.array([d['box'] for d in top_dets])
scores = np.array([d['conf'] for d in top_dets])
coeffs = np.array([d['coeff'] for d in top_dets])
# 反算 bounding box 到 ROI 像素空间
# boxes 是 cx,cy,w,h 归一化到 [0,1]
# 根据你的模型输出结构调整下面逻辑(假设是 cx,cy,w,h in [0,1]
cx = boxes[:, 0] * TARGET_SIZE
cy = boxes[:, 1] * TARGET_SIZE
bw = boxes[:, 2] * TARGET_SIZE
bh = boxes[:, 3] * TARGET_SIZE
x1 = cx - bw / 2
y1 = cy - bh / 2
x2 = cx + bw / 2
y2 = cy + bh / 2
# 映射回 letterbox 内的实际区域
x1 = (x1 - dx) / scale
y1 = (y1 - dy) / scale
x2 = (x2 - dx) / scale
y2 = (y2 - dy) / scale
# clip to ROI
x1 = np.clip(x1, 0, w)
y1 = np.clip(y1, 0, h)
x2 = np.clip(x2, 0, w)
y2 = np.clip(y2, 0, h)
# NMS
areas = (x2 - x1) * (y2 - y1)
keep_indices = non_max_suppression(np.stack([x1, y1, x2, y2], axis=1), scores, IOU_THRESHOLD)
for i in keep_indices:
det = top_dets[i]
conf = det['conf']
mask_coeff = det['coeff'] # [32]
# 生成 mask
mask_flat = sigmoid(np.matmul(mask_coeff, proto.reshape(32, -1)))
mask = mask_flat.reshape(160, 160)
mask_resized = cv2.resize(mask, (TARGET_SIZE, TARGET_SIZE), interpolation=cv2.INTER_LINEAR)
# 反变换到 ROI 原始尺寸
mask_roi = cv2.resize(mask_resized, (w, h), interpolation=cv2.INTER_LINEAR)
mask_bin = (mask_roi > 0.5).astype(np.uint8)
# 找轮廓
contours, _ = cv2.findContours(mask_bin, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if len(contours) == 0:
continue
largest_contour = max(contours, key=cv2.contourArea)
segment = largest_contour.squeeze().astype(float)
# 映射回原图坐标
segment[:, 0] += x
segment[:, 1] += y
# 归一化到 [0,1]
segment[:, 0] /= w_orig
segment[:, 1] /= h_orig
all_segments.append((0, segment, conf)) # cls_id=0
# 绘制可视化
cv2.drawContours(full_vis_img[y:y+h, x:x+w], [largest_contour], -1, (0, 255, 0), LINE_WIDTH)
cx_int = int((x1[i] + x2[i]) / 2) + x
cy_int = int(y1[i]) + y - 10
cv2.putText(full_vis_img, f'0 {conf:.2f}', (cx_int, cy_int),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 1)
# 保存 ROI mask可选
if SAVE_MASKS:
mask_canvas = np.zeros((h_orig, w_orig), dtype=np.uint8)
combined = np.zeros_like(mask_bin)
for i in keep_indices:
mask_flat = sigmoid(np.matmul(top_dets[i]['coeff'], proto.reshape(32, -1)))
mask = cv2.resize(mask_flat.reshape(160, 160), (w, h)) > 0.5
combined |= mask
mask_canvas[y:y+h, x:x+w] = (combined * 255).astype(np.uint8)
cv2.imwrite(str(mask_dir / f"{img_path.stem}_roi{roi_idx}.png"), mask_canvas)
# 保存最终可视化
cv2.imwrite(str(vis_dir / f"vis_{img_path.name}"), full_vis_img)
# 保存 TXT 标签
if SAVE_TXT and all_segments:
with open(txt_dir / f"{img_path.stem}.txt", 'w') as f:
for cls_id, seg, conf in all_segments:
seg_flat = seg.flatten()
f.write(f"{cls_id} {' '.join(f'{x:.6f}' for x in seg_flat)}\n")
print(f"✅ 已保存结果: {vis_dir / f'vis_{img_path.name}'}")
rknn.release()
print(f"\n🎉 全部完成!输出位于: {OUTPUT_DIR}")
if __name__ == "__main__":
run_rknn_inference_with_roi()