添加状态分类和液面分割

This commit is contained in:
琉璃月光
2025-09-01 14:14:18 +08:00
parent 6e553f6a20
commit ad52ab9125
2379 changed files with 102501 additions and 1465 deletions

View File

@ -0,0 +1 @@
# Ultralytics YOLO 🚀, AGPL-3.0 license

View File

@ -0,0 +1,377 @@
# Ultralytics YOLO 🚀, AGPL-3.0 license
import copy
import cv2
import numpy as np
from ultralytics.utils import LOGGER
class GMC:
"""
Generalized Motion Compensation (GMC) class for tracking and object detection in video frames.
This class provides methods for tracking and detecting objects based on several tracking algorithms including ORB,
SIFT, ECC, and Sparse Optical Flow. It also supports downscaling of frames for computational efficiency.
Attributes:
method (str): The method used for tracking. Options include 'orb', 'sift', 'ecc', 'sparseOptFlow', 'none'.
downscale (int): Factor by which to downscale the frames for processing.
prevFrame (np.ndarray): Stores the previous frame for tracking.
prevKeyPoints (List): Stores the keypoints from the previous frame.
prevDescriptors (np.ndarray): Stores the descriptors from the previous frame.
initializedFirstFrame (bool): Flag to indicate if the first frame has been processed.
Methods:
__init__: Initializes a GMC object with the specified method and downscale factor.
apply: Applies the chosen method to a raw frame and optionally uses provided detections.
applyEcc: Applies the ECC algorithm to a raw frame.
applyFeatures: Applies feature-based methods like ORB or SIFT to a raw frame.
applySparseOptFlow: Applies the Sparse Optical Flow method to a raw frame.
reset_params: Resets the internal parameters of the GMC object.
Examples:
Create a GMC object and apply it to a frame
>>> gmc = GMC(method="sparseOptFlow", downscale=2)
>>> frame = np.array([[1, 2, 3], [4, 5, 6]])
>>> processed_frame = gmc.apply(frame)
>>> print(processed_frame)
array([[1, 2, 3],
[4, 5, 6]])
"""
def __init__(self, method: str = "sparseOptFlow", downscale: int = 2) -> None:
"""
Initialize a Generalized Motion Compensation (GMC) object with tracking method and downscale factor.
Args:
method (str): The method used for tracking. Options include 'orb', 'sift', 'ecc', 'sparseOptFlow', 'none'.
downscale (int): Downscale factor for processing frames.
Examples:
Initialize a GMC object with the 'sparseOptFlow' method and a downscale factor of 2
>>> gmc = GMC(method="sparseOptFlow", downscale=2)
"""
super().__init__()
self.method = method
self.downscale = max(1, downscale)
if self.method == "orb":
self.detector = cv2.FastFeatureDetector_create(20)
self.extractor = cv2.ORB_create()
self.matcher = cv2.BFMatcher(cv2.NORM_HAMMING)
elif self.method == "sift":
self.detector = cv2.SIFT_create(nOctaveLayers=3, contrastThreshold=0.02, edgeThreshold=20)
self.extractor = cv2.SIFT_create(nOctaveLayers=3, contrastThreshold=0.02, edgeThreshold=20)
self.matcher = cv2.BFMatcher(cv2.NORM_L2)
elif self.method == "ecc":
number_of_iterations = 5000
termination_eps = 1e-6
self.warp_mode = cv2.MOTION_EUCLIDEAN
self.criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, number_of_iterations, termination_eps)
elif self.method == "sparseOptFlow":
self.feature_params = dict(
maxCorners=1000, qualityLevel=0.01, minDistance=1, blockSize=3, useHarrisDetector=False, k=0.04
)
elif self.method in {"none", "None", None}:
self.method = None
else:
raise ValueError(f"Error: Unknown GMC method:{method}")
self.prevFrame = None
self.prevKeyPoints = None
self.prevDescriptors = None
self.initializedFirstFrame = False
def apply(self, raw_frame: np.array, detections: list = None) -> np.array:
"""
Apply object detection on a raw frame using the specified method.
Args:
raw_frame (np.ndarray): The raw frame to be processed, with shape (H, W, C).
detections (List | None): List of detections to be used in the processing.
Returns:
(np.ndarray): Processed frame with applied object detection.
Examples:
>>> gmc = GMC(method="sparseOptFlow")
>>> raw_frame = np.random.rand(480, 640, 3)
>>> processed_frame = gmc.apply(raw_frame)
>>> print(processed_frame.shape)
(480, 640, 3)
"""
if self.method in {"orb", "sift"}:
return self.applyFeatures(raw_frame, detections)
elif self.method == "ecc":
return self.applyEcc(raw_frame)
elif self.method == "sparseOptFlow":
return self.applySparseOptFlow(raw_frame)
else:
return np.eye(2, 3)
def applyEcc(self, raw_frame: np.array) -> np.array:
"""
Apply the ECC (Enhanced Correlation Coefficient) algorithm to a raw frame for motion compensation.
Args:
raw_frame (np.ndarray): The raw frame to be processed, with shape (H, W, C).
Returns:
(np.ndarray): The processed frame with the applied ECC transformation.
Examples:
>>> gmc = GMC(method="ecc")
>>> processed_frame = gmc.applyEcc(np.array([[[1, 2, 3], [4, 5, 6]], [[7, 8, 9], [10, 11, 12]]]))
>>> print(processed_frame)
[[1. 0. 0.]
[0. 1. 0.]]
"""
height, width, _ = raw_frame.shape
frame = cv2.cvtColor(raw_frame, cv2.COLOR_BGR2GRAY)
H = np.eye(2, 3, dtype=np.float32)
# Downscale image
if self.downscale > 1.0:
frame = cv2.GaussianBlur(frame, (3, 3), 1.5)
frame = cv2.resize(frame, (width // self.downscale, height // self.downscale))
# Handle first frame
if not self.initializedFirstFrame:
# Initialize data
self.prevFrame = frame.copy()
# Initialization done
self.initializedFirstFrame = True
return H
# Run the ECC algorithm. The results are stored in warp_matrix.
# (cc, H) = cv2.findTransformECC(self.prevFrame, frame, H, self.warp_mode, self.criteria)
try:
(_, H) = cv2.findTransformECC(self.prevFrame, frame, H, self.warp_mode, self.criteria, None, 1)
except Exception as e:
LOGGER.warning(f"WARNING: find transform failed. Set warp as identity {e}")
return H
def applyFeatures(self, raw_frame: np.array, detections: list = None) -> np.array:
"""
Apply feature-based methods like ORB or SIFT to a raw frame.
Args:
raw_frame (np.ndarray): The raw frame to be processed, with shape (H, W, C).
detections (List | None): List of detections to be used in the processing.
Returns:
(np.ndarray): Processed frame.
Examples:
>>> gmc = GMC(method="orb")
>>> raw_frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
>>> processed_frame = gmc.applyFeatures(raw_frame)
>>> print(processed_frame.shape)
(2, 3)
"""
height, width, _ = raw_frame.shape
frame = cv2.cvtColor(raw_frame, cv2.COLOR_BGR2GRAY)
H = np.eye(2, 3)
# Downscale image
if self.downscale > 1.0:
frame = cv2.resize(frame, (width // self.downscale, height // self.downscale))
width = width // self.downscale
height = height // self.downscale
# Find the keypoints
mask = np.zeros_like(frame)
mask[int(0.02 * height) : int(0.98 * height), int(0.02 * width) : int(0.98 * width)] = 255
if detections is not None:
for det in detections:
tlbr = (det[:4] / self.downscale).astype(np.int_)
mask[tlbr[1] : tlbr[3], tlbr[0] : tlbr[2]] = 0
keypoints = self.detector.detect(frame, mask)
# Compute the descriptors
keypoints, descriptors = self.extractor.compute(frame, keypoints)
# Handle first frame
if not self.initializedFirstFrame:
# Initialize data
self.prevFrame = frame.copy()
self.prevKeyPoints = copy.copy(keypoints)
self.prevDescriptors = copy.copy(descriptors)
# Initialization done
self.initializedFirstFrame = True
return H
# Match descriptors
knnMatches = self.matcher.knnMatch(self.prevDescriptors, descriptors, 2)
# Filter matches based on smallest spatial distance
matches = []
spatialDistances = []
maxSpatialDistance = 0.25 * np.array([width, height])
# Handle empty matches case
if len(knnMatches) == 0:
# Store to next iteration
self.prevFrame = frame.copy()
self.prevKeyPoints = copy.copy(keypoints)
self.prevDescriptors = copy.copy(descriptors)
return H
for m, n in knnMatches:
if m.distance < 0.9 * n.distance:
prevKeyPointLocation = self.prevKeyPoints[m.queryIdx].pt
currKeyPointLocation = keypoints[m.trainIdx].pt
spatialDistance = (
prevKeyPointLocation[0] - currKeyPointLocation[0],
prevKeyPointLocation[1] - currKeyPointLocation[1],
)
if (np.abs(spatialDistance[0]) < maxSpatialDistance[0]) and (
np.abs(spatialDistance[1]) < maxSpatialDistance[1]
):
spatialDistances.append(spatialDistance)
matches.append(m)
meanSpatialDistances = np.mean(spatialDistances, 0)
stdSpatialDistances = np.std(spatialDistances, 0)
inliers = (spatialDistances - meanSpatialDistances) < 2.5 * stdSpatialDistances
goodMatches = []
prevPoints = []
currPoints = []
for i in range(len(matches)):
if inliers[i, 0] and inliers[i, 1]:
goodMatches.append(matches[i])
prevPoints.append(self.prevKeyPoints[matches[i].queryIdx].pt)
currPoints.append(keypoints[matches[i].trainIdx].pt)
prevPoints = np.array(prevPoints)
currPoints = np.array(currPoints)
# Draw the keypoint matches on the output image
# if False:
# import matplotlib.pyplot as plt
# matches_img = np.hstack((self.prevFrame, frame))
# matches_img = cv2.cvtColor(matches_img, cv2.COLOR_GRAY2BGR)
# W = self.prevFrame.shape[1]
# for m in goodMatches:
# prev_pt = np.array(self.prevKeyPoints[m.queryIdx].pt, dtype=np.int_)
# curr_pt = np.array(keypoints[m.trainIdx].pt, dtype=np.int_)
# curr_pt[0] += W
# color = np.random.randint(0, 255, 3)
# color = (int(color[0]), int(color[1]), int(color[2]))
#
# matches_img = cv2.line(matches_img, prev_pt, curr_pt, tuple(color), 1, cv2.LINE_AA)
# matches_img = cv2.circle(matches_img, prev_pt, 2, tuple(color), -1)
# matches_img = cv2.circle(matches_img, curr_pt, 2, tuple(color), -1)
#
# plt.figure()
# plt.imshow(matches_img)
# plt.show()
# Find rigid matrix
if prevPoints.shape[0] > 4:
H, inliers = cv2.estimateAffinePartial2D(prevPoints, currPoints, cv2.RANSAC)
# Handle downscale
if self.downscale > 1.0:
H[0, 2] *= self.downscale
H[1, 2] *= self.downscale
else:
LOGGER.warning("WARNING: not enough matching points")
# Store to next iteration
self.prevFrame = frame.copy()
self.prevKeyPoints = copy.copy(keypoints)
self.prevDescriptors = copy.copy(descriptors)
return H
def applySparseOptFlow(self, raw_frame: np.array) -> np.array:
"""
Apply Sparse Optical Flow method to a raw frame.
Args:
raw_frame (np.ndarray): The raw frame to be processed, with shape (H, W, C).
Returns:
(np.ndarray): Processed frame with shape (2, 3).
Examples:
>>> gmc = GMC()
>>> result = gmc.applySparseOptFlow(np.array([[[1, 2, 3], [4, 5, 6]], [[7, 8, 9], [10, 11, 12]]]))
>>> print(result)
[[1. 0. 0.]
[0. 1. 0.]]
"""
height, width, _ = raw_frame.shape
frame = cv2.cvtColor(raw_frame, cv2.COLOR_BGR2GRAY)
H = np.eye(2, 3)
# Downscale image
if self.downscale > 1.0:
frame = cv2.resize(frame, (width // self.downscale, height // self.downscale))
# Find the keypoints
keypoints = cv2.goodFeaturesToTrack(frame, mask=None, **self.feature_params)
# Handle first frame
if not self.initializedFirstFrame or self.prevKeyPoints is None:
self.prevFrame = frame.copy()
self.prevKeyPoints = copy.copy(keypoints)
self.initializedFirstFrame = True
return H
# Find correspondences
matchedKeypoints, status, _ = cv2.calcOpticalFlowPyrLK(self.prevFrame, frame, self.prevKeyPoints, None)
# Leave good correspondences only
prevPoints = []
currPoints = []
for i in range(len(status)):
if status[i]:
prevPoints.append(self.prevKeyPoints[i])
currPoints.append(matchedKeypoints[i])
prevPoints = np.array(prevPoints)
currPoints = np.array(currPoints)
# Find rigid matrix
if (prevPoints.shape[0] > 4) and (prevPoints.shape[0] == prevPoints.shape[0]):
H, _ = cv2.estimateAffinePartial2D(prevPoints, currPoints, cv2.RANSAC)
if self.downscale > 1.0:
H[0, 2] *= self.downscale
H[1, 2] *= self.downscale
else:
LOGGER.warning("WARNING: not enough matching points")
self.prevFrame = frame.copy()
self.prevKeyPoints = copy.copy(keypoints)
return H
def reset_params(self) -> None:
"""Reset the internal parameters including previous frame, keypoints, and descriptors."""
self.prevFrame = None
self.prevKeyPoints = None
self.prevDescriptors = None
self.initializedFirstFrame = False

View File

@ -0,0 +1,491 @@
# Ultralytics YOLO 🚀, AGPL-3.0 license
import numpy as np
import scipy.linalg
class KalmanFilterXYAH:
"""
A KalmanFilterXYAH class for tracking bounding boxes in image space using a Kalman filter.
Implements a simple Kalman filter for tracking bounding boxes in image space. The 8-dimensional state space
(x, y, a, h, vx, vy, va, vh) contains the bounding box center position (x, y), aspect ratio a, height h, and their
respective velocities. Object motion follows a constant velocity model, and bounding box location (x, y, a, h) is
taken as a direct observation of the state space (linear observation model).
Attributes:
_motion_mat (np.ndarray): The motion matrix for the Kalman filter.
_update_mat (np.ndarray): The update matrix for the Kalman filter.
_std_weight_position (float): Standard deviation weight for position.
_std_weight_velocity (float): Standard deviation weight for velocity.
Methods:
initiate: Creates a track from an unassociated measurement.
predict: Runs the Kalman filter prediction step.
project: Projects the state distribution to measurement space.
multi_predict: Runs the Kalman filter prediction step (vectorized version).
update: Runs the Kalman filter correction step.
gating_distance: Computes the gating distance between state distribution and measurements.
Examples:
Initialize the Kalman filter and create a track from a measurement
>>> kf = KalmanFilterXYAH()
>>> measurement = np.array([100, 200, 1.5, 50])
>>> mean, covariance = kf.initiate(measurement)
>>> print(mean)
>>> print(covariance)
"""
def __init__(self):
"""
Initialize Kalman filter model matrices with motion and observation uncertainty weights.
The Kalman filter is initialized with an 8-dimensional state space (x, y, a, h, vx, vy, va, vh), where (x, y)
represents the bounding box center position, 'a' is the aspect ratio, 'h' is the height, and their respective
velocities are (vx, vy, va, vh). The filter uses a constant velocity model for object motion and a linear
observation model for bounding box location.
Examples:
Initialize a Kalman filter for tracking:
>>> kf = KalmanFilterXYAH()
"""
ndim, dt = 4, 1.0
# Create Kalman filter model matrices
self._motion_mat = np.eye(2 * ndim, 2 * ndim)
for i in range(ndim):
self._motion_mat[i, ndim + i] = dt
self._update_mat = np.eye(ndim, 2 * ndim)
# Motion and observation uncertainty are chosen relative to the current state estimate. These weights control
# the amount of uncertainty in the model.
self._std_weight_position = 1.0 / 20
self._std_weight_velocity = 1.0 / 160
def initiate(self, measurement: np.ndarray) -> tuple:
"""
Create a track from an unassociated measurement.
Args:
measurement (ndarray): Bounding box coordinates (x, y, a, h) with center position (x, y), aspect ratio a,
and height h.
Returns:
(tuple[ndarray, ndarray]): Returns the mean vector (8-dimensional) and covariance matrix (8x8 dimensional)
of the new track. Unobserved velocities are initialized to 0 mean.
Examples:
>>> kf = KalmanFilterXYAH()
>>> measurement = np.array([100, 50, 1.5, 200])
>>> mean, covariance = kf.initiate(measurement)
"""
mean_pos = measurement
mean_vel = np.zeros_like(mean_pos)
mean = np.r_[mean_pos, mean_vel]
std = [
2 * self._std_weight_position * measurement[3],
2 * self._std_weight_position * measurement[3],
1e-2,
2 * self._std_weight_position * measurement[3],
10 * self._std_weight_velocity * measurement[3],
10 * self._std_weight_velocity * measurement[3],
1e-5,
10 * self._std_weight_velocity * measurement[3],
]
covariance = np.diag(np.square(std))
return mean, covariance
def predict(self, mean: np.ndarray, covariance: np.ndarray) -> tuple:
"""
Run Kalman filter prediction step.
Args:
mean (ndarray): The 8-dimensional mean vector of the object state at the previous time step.
covariance (ndarray): The 8x8-dimensional covariance matrix of the object state at the previous time step.
Returns:
(tuple[ndarray, ndarray]): Returns the mean vector and covariance matrix of the predicted state. Unobserved
velocities are initialized to 0 mean.
Examples:
>>> kf = KalmanFilterXYAH()
>>> mean = np.array([0, 0, 1, 1, 0, 0, 0, 0])
>>> covariance = np.eye(8)
>>> predicted_mean, predicted_covariance = kf.predict(mean, covariance)
"""
std_pos = [
self._std_weight_position * mean[3],
self._std_weight_position * mean[3],
1e-2,
self._std_weight_position * mean[3],
]
std_vel = [
self._std_weight_velocity * mean[3],
self._std_weight_velocity * mean[3],
1e-5,
self._std_weight_velocity * mean[3],
]
motion_cov = np.diag(np.square(np.r_[std_pos, std_vel]))
mean = np.dot(mean, self._motion_mat.T)
covariance = np.linalg.multi_dot((self._motion_mat, covariance, self._motion_mat.T)) + motion_cov
return mean, covariance
def project(self, mean: np.ndarray, covariance: np.ndarray) -> tuple:
"""
Project state distribution to measurement space.
Args:
mean (ndarray): The state's mean vector (8 dimensional array).
covariance (ndarray): The state's covariance matrix (8x8 dimensional).
Returns:
(tuple[ndarray, ndarray]): Returns the projected mean and covariance matrix of the given state estimate.
Examples:
>>> kf = KalmanFilterXYAH()
>>> mean = np.array([0, 0, 1, 1, 0, 0, 0, 0])
>>> covariance = np.eye(8)
>>> projected_mean, projected_covariance = kf.project(mean, covariance)
"""
std = [
self._std_weight_position * mean[3],
self._std_weight_position * mean[3],
1e-1,
self._std_weight_position * mean[3],
]
innovation_cov = np.diag(np.square(std))
mean = np.dot(self._update_mat, mean)
covariance = np.linalg.multi_dot((self._update_mat, covariance, self._update_mat.T))
return mean, covariance + innovation_cov
def multi_predict(self, mean: np.ndarray, covariance: np.ndarray) -> tuple:
"""
Run Kalman filter prediction step for multiple object states (Vectorized version).
Args:
mean (ndarray): The Nx8 dimensional mean matrix of the object states at the previous time step.
covariance (ndarray): The Nx8x8 covariance matrix of the object states at the previous time step.
Returns:
(tuple[ndarray, ndarray]): Returns the mean matrix and covariance matrix of the predicted states.
The mean matrix has shape (N, 8) and the covariance matrix has shape (N, 8, 8). Unobserved velocities
are initialized to 0 mean.
Examples:
>>> mean = np.random.rand(10, 8) # 10 object states
>>> covariance = np.random.rand(10, 8, 8) # Covariance matrices for 10 object states
>>> predicted_mean, predicted_covariance = kalman_filter.multi_predict(mean, covariance)
"""
std_pos = [
self._std_weight_position * mean[:, 3],
self._std_weight_position * mean[:, 3],
1e-2 * np.ones_like(mean[:, 3]),
self._std_weight_position * mean[:, 3],
]
std_vel = [
self._std_weight_velocity * mean[:, 3],
self._std_weight_velocity * mean[:, 3],
1e-5 * np.ones_like(mean[:, 3]),
self._std_weight_velocity * mean[:, 3],
]
sqr = np.square(np.r_[std_pos, std_vel]).T
motion_cov = [np.diag(sqr[i]) for i in range(len(mean))]
motion_cov = np.asarray(motion_cov)
mean = np.dot(mean, self._motion_mat.T)
left = np.dot(self._motion_mat, covariance).transpose((1, 0, 2))
covariance = np.dot(left, self._motion_mat.T) + motion_cov
return mean, covariance
def update(self, mean: np.ndarray, covariance: np.ndarray, measurement: np.ndarray) -> tuple:
"""
Run Kalman filter correction step.
Args:
mean (ndarray): The predicted state's mean vector (8 dimensional).
covariance (ndarray): The state's covariance matrix (8x8 dimensional).
measurement (ndarray): The 4 dimensional measurement vector (x, y, a, h), where (x, y) is the center
position, a the aspect ratio, and h the height of the bounding box.
Returns:
(tuple[ndarray, ndarray]): Returns the measurement-corrected state distribution.
Examples:
>>> kf = KalmanFilterXYAH()
>>> mean = np.array([0, 0, 1, 1, 0, 0, 0, 0])
>>> covariance = np.eye(8)
>>> measurement = np.array([1, 1, 1, 1])
>>> new_mean, new_covariance = kf.update(mean, covariance, measurement)
"""
projected_mean, projected_cov = self.project(mean, covariance)
chol_factor, lower = scipy.linalg.cho_factor(projected_cov, lower=True, check_finite=False)
kalman_gain = scipy.linalg.cho_solve(
(chol_factor, lower), np.dot(covariance, self._update_mat.T).T, check_finite=False
).T
innovation = measurement - projected_mean
new_mean = mean + np.dot(innovation, kalman_gain.T)
new_covariance = covariance - np.linalg.multi_dot((kalman_gain, projected_cov, kalman_gain.T))
return new_mean, new_covariance
def gating_distance(
self,
mean: np.ndarray,
covariance: np.ndarray,
measurements: np.ndarray,
only_position: bool = False,
metric: str = "maha",
) -> np.ndarray:
"""
Compute gating distance between state distribution and measurements.
A suitable distance threshold can be obtained from `chi2inv95`. If `only_position` is False, the chi-square
distribution has 4 degrees of freedom, otherwise 2.
Args:
mean (ndarray): Mean vector over the state distribution (8 dimensional).
covariance (ndarray): Covariance of the state distribution (8x8 dimensional).
measurements (ndarray): An (N, 4) matrix of N measurements, each in format (x, y, a, h) where (x, y) is the
bounding box center position, a the aspect ratio, and h the height.
only_position (bool): If True, distance computation is done with respect to box center position only.
metric (str): The metric to use for calculating the distance. Options are 'gaussian' for the squared
Euclidean distance and 'maha' for the squared Mahalanobis distance.
Returns:
(np.ndarray): Returns an array of length N, where the i-th element contains the squared distance between
(mean, covariance) and `measurements[i]`.
Examples:
Compute gating distance using Mahalanobis metric:
>>> kf = KalmanFilterXYAH()
>>> mean = np.array([0, 0, 1, 1, 0, 0, 0, 0])
>>> covariance = np.eye(8)
>>> measurements = np.array([[1, 1, 1, 1], [2, 2, 1, 1]])
>>> distances = kf.gating_distance(mean, covariance, measurements, only_position=False, metric="maha")
"""
mean, covariance = self.project(mean, covariance)
if only_position:
mean, covariance = mean[:2], covariance[:2, :2]
measurements = measurements[:, :2]
d = measurements - mean
if metric == "gaussian":
return np.sum(d * d, axis=1)
elif metric == "maha":
cholesky_factor = np.linalg.cholesky(covariance)
z = scipy.linalg.solve_triangular(cholesky_factor, d.T, lower=True, check_finite=False, overwrite_b=True)
return np.sum(z * z, axis=0) # square maha
else:
raise ValueError("Invalid distance metric")
class KalmanFilterXYWH(KalmanFilterXYAH):
"""
A KalmanFilterXYWH class for tracking bounding boxes in image space using a Kalman filter.
Implements a Kalman filter for tracking bounding boxes with state space (x, y, w, h, vx, vy, vw, vh), where
(x, y) is the center position, w is the width, h is the height, and vx, vy, vw, vh are their respective velocities.
The object motion follows a constant velocity model, and the bounding box location (x, y, w, h) is taken as a direct
observation of the state space (linear observation model).
Attributes:
_motion_mat (np.ndarray): The motion matrix for the Kalman filter.
_update_mat (np.ndarray): The update matrix for the Kalman filter.
_std_weight_position (float): Standard deviation weight for position.
_std_weight_velocity (float): Standard deviation weight for velocity.
Methods:
initiate: Creates a track from an unassociated measurement.
predict: Runs the Kalman filter prediction step.
project: Projects the state distribution to measurement space.
multi_predict: Runs the Kalman filter prediction step in a vectorized manner.
update: Runs the Kalman filter correction step.
Examples:
Create a Kalman filter and initialize a track
>>> kf = KalmanFilterXYWH()
>>> measurement = np.array([100, 50, 20, 40])
>>> mean, covariance = kf.initiate(measurement)
>>> print(mean)
>>> print(covariance)
"""
def initiate(self, measurement: np.ndarray) -> tuple:
"""
Create track from unassociated measurement.
Args:
measurement (ndarray): Bounding box coordinates (x, y, w, h) with center position (x, y), width, and height.
Returns:
(tuple[ndarray, ndarray]): Returns the mean vector (8 dimensional) and covariance matrix (8x8 dimensional)
of the new track. Unobserved velocities are initialized to 0 mean.
Examples:
>>> kf = KalmanFilterXYWH()
>>> measurement = np.array([100, 50, 20, 40])
>>> mean, covariance = kf.initiate(measurement)
>>> print(mean)
[100. 50. 20. 40. 0. 0. 0. 0.]
>>> print(covariance)
[[ 4. 0. 0. 0. 0. 0. 0. 0.]
[ 0. 4. 0. 0. 0. 0. 0. 0.]
[ 0. 0. 4. 0. 0. 0. 0. 0.]
[ 0. 0. 0. 4. 0. 0. 0. 0.]
[ 0. 0. 0. 0. 0.25 0. 0. 0.]
[ 0. 0. 0. 0. 0. 0.25 0. 0.]
[ 0. 0. 0. 0. 0. 0. 0.25 0.]
[ 0. 0. 0. 0. 0. 0. 0. 0.25]]
"""
mean_pos = measurement
mean_vel = np.zeros_like(mean_pos)
mean = np.r_[mean_pos, mean_vel]
std = [
2 * self._std_weight_position * measurement[2],
2 * self._std_weight_position * measurement[3],
2 * self._std_weight_position * measurement[2],
2 * self._std_weight_position * measurement[3],
10 * self._std_weight_velocity * measurement[2],
10 * self._std_weight_velocity * measurement[3],
10 * self._std_weight_velocity * measurement[2],
10 * self._std_weight_velocity * measurement[3],
]
covariance = np.diag(np.square(std))
return mean, covariance
def predict(self, mean, covariance) -> tuple:
"""
Run Kalman filter prediction step.
Args:
mean (ndarray): The 8-dimensional mean vector of the object state at the previous time step.
covariance (ndarray): The 8x8-dimensional covariance matrix of the object state at the previous time step.
Returns:
(tuple[ndarray, ndarray]): Returns the mean vector and covariance matrix of the predicted state. Unobserved
velocities are initialized to 0 mean.
Examples:
>>> kf = KalmanFilterXYWH()
>>> mean = np.array([0, 0, 1, 1, 0, 0, 0, 0])
>>> covariance = np.eye(8)
>>> predicted_mean, predicted_covariance = kf.predict(mean, covariance)
"""
std_pos = [
self._std_weight_position * mean[2],
self._std_weight_position * mean[3],
self._std_weight_position * mean[2],
self._std_weight_position * mean[3],
]
std_vel = [
self._std_weight_velocity * mean[2],
self._std_weight_velocity * mean[3],
self._std_weight_velocity * mean[2],
self._std_weight_velocity * mean[3],
]
motion_cov = np.diag(np.square(np.r_[std_pos, std_vel]))
mean = np.dot(mean, self._motion_mat.T)
covariance = np.linalg.multi_dot((self._motion_mat, covariance, self._motion_mat.T)) + motion_cov
return mean, covariance
def project(self, mean, covariance) -> tuple:
"""
Project state distribution to measurement space.
Args:
mean (ndarray): The state's mean vector (8 dimensional array).
covariance (ndarray): The state's covariance matrix (8x8 dimensional).
Returns:
(tuple[ndarray, ndarray]): Returns the projected mean and covariance matrix of the given state estimate.
Examples:
>>> kf = KalmanFilterXYWH()
>>> mean = np.array([0, 0, 1, 1, 0, 0, 0, 0])
>>> covariance = np.eye(8)
>>> projected_mean, projected_cov = kf.project(mean, covariance)
"""
std = [
self._std_weight_position * mean[2],
self._std_weight_position * mean[3],
self._std_weight_position * mean[2],
self._std_weight_position * mean[3],
]
innovation_cov = np.diag(np.square(std))
mean = np.dot(self._update_mat, mean)
covariance = np.linalg.multi_dot((self._update_mat, covariance, self._update_mat.T))
return mean, covariance + innovation_cov
def multi_predict(self, mean, covariance) -> tuple:
"""
Run Kalman filter prediction step (Vectorized version).
Args:
mean (ndarray): The Nx8 dimensional mean matrix of the object states at the previous time step.
covariance (ndarray): The Nx8x8 covariance matrix of the object states at the previous time step.
Returns:
(tuple[ndarray, ndarray]): Returns the mean vector and covariance matrix of the predicted state. Unobserved
velocities are initialized to 0 mean.
Examples:
>>> mean = np.random.rand(5, 8) # 5 objects with 8-dimensional state vectors
>>> covariance = np.random.rand(5, 8, 8) # 5 objects with 8x8 covariance matrices
>>> kf = KalmanFilterXYWH()
>>> predicted_mean, predicted_covariance = kf.multi_predict(mean, covariance)
"""
std_pos = [
self._std_weight_position * mean[:, 2],
self._std_weight_position * mean[:, 3],
self._std_weight_position * mean[:, 2],
self._std_weight_position * mean[:, 3],
]
std_vel = [
self._std_weight_velocity * mean[:, 2],
self._std_weight_velocity * mean[:, 3],
self._std_weight_velocity * mean[:, 2],
self._std_weight_velocity * mean[:, 3],
]
sqr = np.square(np.r_[std_pos, std_vel]).T
motion_cov = [np.diag(sqr[i]) for i in range(len(mean))]
motion_cov = np.asarray(motion_cov)
mean = np.dot(mean, self._motion_mat.T)
left = np.dot(self._motion_mat, covariance).transpose((1, 0, 2))
covariance = np.dot(left, self._motion_mat.T) + motion_cov
return mean, covariance
def update(self, mean, covariance, measurement) -> tuple:
"""
Run Kalman filter correction step.
Args:
mean (ndarray): The predicted state's mean vector (8 dimensional).
covariance (ndarray): The state's covariance matrix (8x8 dimensional).
measurement (ndarray): The 4 dimensional measurement vector (x, y, w, h), where (x, y) is the center
position, w the width, and h the height of the bounding box.
Returns:
(tuple[ndarray, ndarray]): Returns the measurement-corrected state distribution.
Examples:
>>> kf = KalmanFilterXYWH()
>>> mean = np.array([0, 0, 1, 1, 0, 0, 0, 0])
>>> covariance = np.eye(8)
>>> measurement = np.array([0.5, 0.5, 1.2, 1.2])
>>> new_mean, new_covariance = kf.update(mean, covariance, measurement)
"""
return super().update(mean, covariance, measurement)

View File

@ -0,0 +1,158 @@
# Ultralytics YOLO 🚀, AGPL-3.0 license
import numpy as np
import scipy
from scipy.spatial.distance import cdist
from ultralytics.utils.metrics import batch_probiou, bbox_ioa
try:
import lap # for linear_assignment
assert lap.__version__ # verify package is not directory
except (ImportError, AssertionError, AttributeError):
from ultralytics.utils.checks import check_requirements
check_requirements("lapx>=0.5.2") # update to lap package from https://github.com/rathaROG/lapx
import lap
def linear_assignment(cost_matrix: np.ndarray, thresh: float, use_lap: bool = True) -> tuple:
"""
Perform linear assignment using either the scipy or lap.lapjv method.
Args:
cost_matrix (np.ndarray): The matrix containing cost values for assignments, with shape (N, M).
thresh (float): Threshold for considering an assignment valid.
use_lap (bool): Use lap.lapjv for the assignment. If False, scipy.optimize.linear_sum_assignment is used.
Returns:
(tuple): A tuple containing:
- matched_indices (np.ndarray): Array of matched indices of shape (K, 2), where K is the number of matches.
- unmatched_a (np.ndarray): Array of unmatched indices from the first set, with shape (L,).
- unmatched_b (np.ndarray): Array of unmatched indices from the second set, with shape (M,).
Examples:
>>> cost_matrix = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
>>> thresh = 5.0
>>> matched_indices, unmatched_a, unmatched_b = linear_assignment(cost_matrix, thresh, use_lap=True)
"""
if cost_matrix.size == 0:
return np.empty((0, 2), dtype=int), tuple(range(cost_matrix.shape[0])), tuple(range(cost_matrix.shape[1]))
if use_lap:
# Use lap.lapjv
# https://github.com/gatagat/lap
_, x, y = lap.lapjv(cost_matrix, extend_cost=True, cost_limit=thresh)
matches = [[ix, mx] for ix, mx in enumerate(x) if mx >= 0]
unmatched_a = np.where(x < 0)[0]
unmatched_b = np.where(y < 0)[0]
else:
# Use scipy.optimize.linear_sum_assignment
# https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.linear_sum_assignment.html
x, y = scipy.optimize.linear_sum_assignment(cost_matrix) # row x, col y
matches = np.asarray([[x[i], y[i]] for i in range(len(x)) if cost_matrix[x[i], y[i]] <= thresh])
if len(matches) == 0:
unmatched_a = list(np.arange(cost_matrix.shape[0]))
unmatched_b = list(np.arange(cost_matrix.shape[1]))
else:
unmatched_a = list(set(np.arange(cost_matrix.shape[0])) - set(matches[:, 0]))
unmatched_b = list(set(np.arange(cost_matrix.shape[1])) - set(matches[:, 1]))
return matches, unmatched_a, unmatched_b
def iou_distance(atracks: list, btracks: list) -> np.ndarray:
"""
Compute cost based on Intersection over Union (IoU) between tracks.
Args:
atracks (list[STrack] | list[np.ndarray]): List of tracks 'a' or bounding boxes.
btracks (list[STrack] | list[np.ndarray]): List of tracks 'b' or bounding boxes.
Returns:
(np.ndarray): Cost matrix computed based on IoU.
Examples:
Compute IoU distance between two sets of tracks
>>> atracks = [np.array([0, 0, 10, 10]), np.array([20, 20, 30, 30])]
>>> btracks = [np.array([5, 5, 15, 15]), np.array([25, 25, 35, 35])]
>>> cost_matrix = iou_distance(atracks, btracks)
"""
if atracks and isinstance(atracks[0], np.ndarray) or btracks and isinstance(btracks[0], np.ndarray):
atlbrs = atracks
btlbrs = btracks
else:
atlbrs = [track.xywha if track.angle is not None else track.xyxy for track in atracks]
btlbrs = [track.xywha if track.angle is not None else track.xyxy for track in btracks]
ious = np.zeros((len(atlbrs), len(btlbrs)), dtype=np.float32)
if len(atlbrs) and len(btlbrs):
if len(atlbrs[0]) == 5 and len(btlbrs[0]) == 5:
ious = batch_probiou(
np.ascontiguousarray(atlbrs, dtype=np.float32),
np.ascontiguousarray(btlbrs, dtype=np.float32),
).numpy()
else:
ious = bbox_ioa(
np.ascontiguousarray(atlbrs, dtype=np.float32),
np.ascontiguousarray(btlbrs, dtype=np.float32),
iou=True,
)
return 1 - ious # cost matrix
def embedding_distance(tracks: list, detections: list, metric: str = "cosine") -> np.ndarray:
"""
Compute distance between tracks and detections based on embeddings.
Args:
tracks (list[STrack]): List of tracks, where each track contains embedding features.
detections (list[BaseTrack]): List of detections, where each detection contains embedding features.
metric (str): Metric for distance computation. Supported metrics include 'cosine', 'euclidean', etc.
Returns:
(np.ndarray): Cost matrix computed based on embeddings with shape (N, M), where N is the number of tracks
and M is the number of detections.
Examples:
Compute the embedding distance between tracks and detections using cosine metric
>>> tracks = [STrack(...), STrack(...)] # List of track objects with embedding features
>>> detections = [BaseTrack(...), BaseTrack(...)] # List of detection objects with embedding features
>>> cost_matrix = embedding_distance(tracks, detections, metric="cosine")
"""
cost_matrix = np.zeros((len(tracks), len(detections)), dtype=np.float32)
if cost_matrix.size == 0:
return cost_matrix
det_features = np.asarray([track.curr_feat for track in detections], dtype=np.float32)
# for i, track in enumerate(tracks):
# cost_matrix[i, :] = np.maximum(0.0, cdist(track.smooth_feat.reshape(1,-1), det_features, metric))
track_features = np.asarray([track.smooth_feat for track in tracks], dtype=np.float32)
cost_matrix = np.maximum(0.0, cdist(track_features, det_features, metric)) # Normalized features
return cost_matrix
def fuse_score(cost_matrix: np.ndarray, detections: list) -> np.ndarray:
"""
Fuses cost matrix with detection scores to produce a single similarity matrix.
Args:
cost_matrix (np.ndarray): The matrix containing cost values for assignments, with shape (N, M).
detections (list[BaseTrack]): List of detections, each containing a score attribute.
Returns:
(np.ndarray): Fused similarity matrix with shape (N, M).
Examples:
Fuse a cost matrix with detection scores
>>> cost_matrix = np.random.rand(5, 10) # 5 tracks and 10 detections
>>> detections = [BaseTrack(score=np.random.rand()) for _ in range(10)]
>>> fused_matrix = fuse_score(cost_matrix, detections)
"""
if cost_matrix.size == 0:
return cost_matrix
iou_sim = 1 - cost_matrix
det_scores = np.array([det.score for det in detections])
det_scores = np.expand_dims(det_scores, axis=0).repeat(cost_matrix.shape[0], axis=0)
fuse_sim = iou_sim * det_scores
return 1 - fuse_sim # fuse_cost