#!/usr/bin/env python # -*- coding: utf-8 -*- ''' # @Time : 2024/9/10 16:33 # @Author : hjw # @File : yolov8_openvino.py ''' import math import time import cv2 import numpy as np from openvino.runtime import Core from Vision.tool.utils import xywh2xyxy, nms, draw_detections, sigmoid class yolov8_segment_openvino: def __init__(self, path, device_name="CPU", conf_thres=0.7, iou_thres=0.5, num_masks=32): self.conf_threshold = conf_thres self.iou_threshold = iou_thres self.num_masks = num_masks # Initialize model self.initialize_model(path, device_name) def __call__(self, image): return self.segment_objects(image) def initialize_model(self, path, device_name): self.core = Core() self.net = self.core.compile_model(path, device_name=device_name) self.ir = self.net.create_infer_request() input_shape = self.net.inputs[0].shape self.input_height = input_shape[2] self.input_width = input_shape[3] def segment_objects(self, image): input_tensor = self.prepare_input(image) # Perform inference on the image outputs = self.ir.infer(input_tensor) self.boxes, self.scores, self.class_ids, mask_pred = self.process_box_output(outputs[0]) self.mask_maps = self.process_mask_output(mask_pred, outputs[1]) if type(self.boxes) == list: if len(self.boxes)==0: return 0, None, None, None, None self.boxes = self.boxes.tolist() self.scores = self.scores.tolist() for t in range(len(self.scores)): self.boxes[t].append(self.scores[t]) self.boxes = np.array(self.boxes) if len(self.boxes) == 0: return 0, None, None, None, None else: return 1, self.boxes, self.scores, self.mask_maps, self.class_ids def prepare_input(self, image): self.img_height, self.img_width = image.shape[:2] input_img = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # Resize input image input_img = cv2.resize(input_img, (self.input_width, self.input_height)) # Scale input pixel values to 0 to 1 input_img = input_img / 255.0 input_img = input_img.transpose(2, 0, 1) input_tensor = input_img[np.newaxis, :, :, :].astype(np.float32) return input_tensor def process_box_output(self, box_output): predictions = np.squeeze(box_output).T num_classes = box_output.shape[1] - self.num_masks - 4 # Filter out object confidence scores below threshold scores = np.max(predictions[:, 4:4+num_classes], axis=1) predictions = predictions[scores > self.conf_threshold, :] scores = scores[scores > self.conf_threshold] if len(scores) == 0: return [], [], [], np.array([]) box_predictions = predictions[..., :num_classes+4] mask_predictions = predictions[..., num_classes+4:] # Get the class with the highest confidence class_ids = np.argmax(box_predictions[:, 4:], axis=1) # Get bounding boxes for each object boxes = self.extract_boxes(box_predictions) # Apply non-maxima suppression to suppress weak, overlapping bounding boxes indices = nms(boxes, scores, self.iou_threshold) return boxes[indices], scores[indices], class_ids[indices], mask_predictions[indices] def process_mask_output(self, mask_predictions, mask_output): if mask_predictions.shape[0] == 0: return [] mask_output = np.squeeze(mask_output) # Calculate the mask maps for each box num_mask, mask_height, mask_width = mask_output.shape # CHW masks = sigmoid(mask_predictions @ mask_output.reshape((num_mask, -1))) masks = masks.reshape((-1, mask_height, mask_width)) # Downscale the boxes to match the mask size scale_boxes = self.rescale_boxes(self.boxes, (self.img_height, self.img_width), (mask_height, mask_width)) # For every box/mask pair, get the mask map mask_maps = np.zeros((len(scale_boxes), self.img_height, self.img_width)) blur_size = (int(self.img_width / mask_width), int(self.img_height / mask_height)) for i in range(len(scale_boxes)): scale_x1 = int(math.floor(scale_boxes[i][0])) scale_y1 = int(math.floor(scale_boxes[i][1])) scale_x2 = int(math.ceil(scale_boxes[i][2])) scale_y2 = int(math.ceil(scale_boxes[i][3])) x1 = int(math.floor(self.boxes[i][0])) y1 = int(math.floor(self.boxes[i][1])) x2 = int(math.ceil(self.boxes[i][2])) y2 = int(math.ceil(self.boxes[i][3])) scale_crop_mask = masks[i][scale_y1:scale_y2, scale_x1:scale_x2] crop_mask = cv2.resize(scale_crop_mask, (x2 - x1, y2 - y1), interpolation=cv2.INTER_CUBIC) crop_mask = cv2.blur(crop_mask, blur_size) crop_mask = (crop_mask > 0.5).astype(np.uint8) mask_maps[i, y1:y2, x1:x2] = crop_mask return mask_maps def extract_boxes(self, box_predictions): # Extract boxes from predictions boxes = box_predictions[:, :4] # Scale boxes to original image dimensions boxes = self.rescale_boxes(boxes, (self.input_height, self.input_width), (self.img_height, self.img_width)) # Convert boxes to xyxy format boxes = xywh2xyxy(boxes) # Check the boxes are within the image boxes[:, 0] = np.clip(boxes[:, 0], 0, self.img_width) boxes[:, 1] = np.clip(boxes[:, 1], 0, self.img_height) boxes[:, 2] = np.clip(boxes[:, 2], 0, self.img_width) boxes[:, 3] = np.clip(boxes[:, 3], 0, self.img_height) return boxes def draw_detections(self, image, draw_scores=True, mask_alpha=0.4): return draw_detections(image, self.boxes, self.scores, self.class_ids, mask_alpha) def draw_masks(self, image, draw_scores=True, mask_alpha=0.5): return draw_detections(image, self.boxes, self.scores, self.class_ids, mask_alpha, mask_maps=self.mask_maps) @staticmethod def rescale_boxes(boxes, input_shape, image_shape): # Rescale boxes to original image dimensions input_shape = np.array([input_shape[1], input_shape[0], input_shape[1], input_shape[0]]) boxes = np.divide(boxes, input_shape, dtype=np.float32) boxes *= np.array([image_shape[1], image_shape[0], image_shape[1], image_shape[0]]) return boxes # frame = cv2.imread('../1.png') # # frame = cv2.resize(frame,(640,640)) # model_path = ("../runs/segment/train2/weights/last-0903_openvino_model/last-0903.xml") # #model_path = ("../yolov8n-seg_openvino_model/yolov8n-seg.xml") # device_name = "GPU" # yoloseg = yolov8_segment_openvino(model_path, device_name, conf_thres=0.3, iou_thres=0.3) # # start = time.time() # boxes, scores, class_ids, masks = yoloseg(frame) # # postprocess and draw masks # combined_img = yoloseg.draw_masks(frame) # end = time.time() # # show FPS # print(end- start) # fps = (1 / (end - start)) # fps_label = "Throughput: %.2f FPS" % fps # cv2.putText(combined_img, fps_label, (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) # cv2.imwrite('rs2.jpg', combined_img) # # show ALL # cv2.imshow("YOLOv8 Segmentation OpenVINO inference Demo", combined_img) # # cv2.waitKey(0) # # Initialize the VideoCapture # cap = cv2.VideoCapture("store-aisle-detection.mp4") # img = cv2.imread('../1.png') # # Initialize YOLOv5 Instance Segmentator # model_path = "yolov8n-seg.xml" # device_name = "GPU" # yoloseg = YOLOSeg(model_path, device_name, conf_thres=0.3, iou_thres=0.3) # while cap.isOpened(): # # Read frame from the video # ret, frame = cap.read() # if not ret: # break # # Update object localizer # start = time.time() # boxes, scores, class_ids, masks = yoloseg(frame) # # postprocess and draw masks # combined_img = yoloseg.draw_masks(frame) # end = time.time() # # show FPS # fps = (1 / (end - start)) # fps_label = "Throughput: %.2f FPS" % fps # cv2.putText(combined_img, fps_label, (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) # # show ALL # cv2.imshow("YOLOv8 Segmentation OpenVINO inference Demo", combined_img) # # # Press Any key stop # if cv2.waitKey(1) > -1: # print("finished by user") # break # # cap.release() # cv2.destroyAllWindows()