-
Notifications
You must be signed in to change notification settings - Fork 102
/
Copy pathcentroid_kf_tracker.py
159 lines (130 loc) · 6.68 KB
/
centroid_kf_tracker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
from collections import OrderedDict
import numpy as np
from scipy.spatial import distance
from scipy.optimize import linear_sum_assignment
from motrackers.tracker import Tracker
from motrackers.track import KFTrackCentroid
from motrackers.utils.misc import get_centroid
def assign_tracks2detection_centroid_distances(bbox_tracks, bbox_detections, distance_threshold=10.):
"""
Assigns detected bounding boxes to tracked bounding boxes using IoU as a distance metric.
Args:
bbox_tracks (numpy.ndarray): Tracked bounding boxes with shape `(n, 4)`
and each row as `(xmin, ymin, width, height)`.
bbox_detections (numpy.ndarray): detection bounding boxes with shape `(m, 4)` and
each row as `(xmin, ymin, width, height)`.
distance_threshold (float): Minimum distance between the tracked object
and new detection to consider for assignment.
Returns:
tuple: Tuple containing the following elements:
- matches (numpy.ndarray): Array of shape `(n, 2)` where `n` is number of pairs formed after matching tracks to detections. This is an array of tuples with each element as matched pair of indices`(track_index, detection_index)`.
- unmatched_detections (numpy.ndarray): Array of shape `(m,)` where `m` is number of unmatched detections.
- unmatched_tracks (numpy.ndarray): Array of shape `(k,)` where `k` is the number of unmatched tracks.
"""
if (bbox_tracks.size == 0) or (bbox_detections.size == 0):
return np.empty((0, 2), dtype=int), np.arange(len(bbox_detections), dtype=int), np.empty((0,), dtype=int)
if len(bbox_tracks.shape) == 1:
bbox_tracks = bbox_tracks[None, :]
if len(bbox_detections.shape) == 1:
bbox_detections = bbox_detections[None, :]
estimated_track_centroids = get_centroid(bbox_tracks)
detection_centroids = get_centroid(bbox_detections)
centroid_distances = distance.cdist(estimated_track_centroids, detection_centroids)
assigned_tracks, assigned_detections = linear_sum_assignment(centroid_distances)
unmatched_detections, unmatched_tracks = [], []
for d in range(bbox_detections.shape[0]):
if d not in assigned_detections:
unmatched_detections.append(d)
for t in range(bbox_tracks.shape[0]):
if t not in assigned_tracks:
unmatched_tracks.append(t)
# filter out matched with high distance between centroids
matches = []
for t, d in zip(assigned_tracks, assigned_detections):
if centroid_distances[t, d] > distance_threshold:
unmatched_detections.append(d)
unmatched_tracks.append(t)
else:
matches.append((t, d))
if len(matches):
matches = np.array(matches)
else:
matches = np.empty((0, 2), dtype=int)
return matches, np.array(unmatched_detections), np.array(unmatched_tracks)
class CentroidKF_Tracker(Tracker):
"""
Kalman filter based tracking of multiple detected objects.
Args:
max_lost (int): Maximum number of consecutive frames object was not detected.
tracker_output_format (str): Output format of the tracker.
process_noise_scale (float or numpy.ndarray): Process noise covariance matrix of shape (3, 3) or
covariance magnitude as scalar value.
measurement_noise_scale (float or numpy.ndarray): Measurement noise covariance matrix of shape (1,)
or covariance magnitude as scalar value.
time_step (int or float): Time step for Kalman Filter.
"""
def __init__(
self,
max_lost=1,
centroid_distance_threshold=30.,
tracker_output_format='mot_challenge',
process_noise_scale=1.0,
measurement_noise_scale=1.0,
time_step=1
):
self.time_step = time_step
self.process_noise_scale = process_noise_scale
self.measurement_noise_scale = measurement_noise_scale
self.centroid_distance_threshold = centroid_distance_threshold
self.kalman_trackers = OrderedDict()
super().__init__(max_lost, tracker_output_format)
def _add_track(self, frame_id, bbox, detection_confidence, class_id, **kwargs):
self.tracks[self.next_track_id] = KFTrackCentroid(
self.next_track_id, frame_id, bbox, detection_confidence, class_id=class_id,
data_output_format=self.tracker_output_format, process_noise_scale=self.process_noise_scale,
measurement_noise_scale=self.measurement_noise_scale, **kwargs
)
self.next_track_id += 1
def update(self, bboxes, detection_scores, class_ids):
self.frame_count += 1
bbox_detections = np.array(bboxes, dtype='int')
track_ids = list(self.tracks.keys())
bbox_tracks = []
for track_id in track_ids:
bbox_tracks.append(self.tracks[track_id].predict())
bbox_tracks = np.array(bbox_tracks)
if len(bboxes) == 0:
for i in range(len(bbox_tracks)):
track_id = track_ids[i]
bbox = bbox_tracks[i, :]
confidence = self.tracks[track_id].detection_confidence
cid = self.tracks[track_id].class_id
self._update_track(track_id, self.frame_count, bbox, detection_confidence=confidence, class_id=cid, lost=1)
if self.tracks[track_id].lost > self.max_lost:
self._remove_track(track_id)
else:
matches, unmatched_detections, unmatched_tracks = assign_tracks2detection_centroid_distances(
bbox_tracks, bbox_detections, distance_threshold=self.centroid_distance_threshold
)
for i in range(matches.shape[0]):
t, d = matches[i, :]
track_id = track_ids[t]
bbox = bboxes[d, :]
cid = class_ids[d]
confidence = detection_scores[d]
self._update_track(track_id, self.frame_count, bbox, confidence, cid, lost=0)
for d in unmatched_detections:
bbox = bboxes[d, :]
cid = class_ids[d]
confidence = detection_scores[d]
self._add_track(self.frame_count, bbox, confidence, cid)
for t in unmatched_tracks:
track_id = track_ids[t]
bbox = bbox_tracks[t, :]
confidence = self.tracks[track_id].detection_confidence
cid = self.tracks[track_id].class_id
self._update_track(track_id, self.frame_count, bbox, confidence, cid, lost=1)
if self.tracks[track_id].lost > self.max_lost:
self._remove_track(track_id)
outputs = self._get_tracks(self.tracks)
return outputs