Deepsort源码详解及个人理解 您所在的位置:网站首页 img是啥意思 Deepsort源码详解及个人理解

Deepsort源码详解及个人理解

2023-07-31 22:02| 来源: 网络整理| 查看: 265

self.tracker.update(detections)

对跟踪对象进行更新。

首先会进行级联特征匹配,先理解一下级联特征匹配的过程。

matches, unmatched_tracks, unmatched_detections = \ self._match(detections)#级联特征匹配

这里是函数的入口,通过级联匹配找到该帧中所有匹配上的目标,以及未匹配上的跟踪对象,未匹配上的检测对象

在看_match函数之前先理解一下其中的方法:

def gated_metric(tracks, dets, track_indices, detection_indices): features = np.array([dets[i].feature for i in detection_indices])#所有检测对象的特征 targets = np.array([tracks[i].track_id for i in track_indices])#跟踪对象的id cost_matrix = self.metric.distance(features, targets)#这里得到的代价矩阵就是每个track对象和现在det对象的代价值 #self.metric 是NearestNeighborDistanceMetric("cosine", max_cosine_distance, nn_budget)对象 # 这里返回的是使用余弦距离计算出来的代价矩阵 cost_matrix = linear_assignment.gate_cost_matrix( self.kf, cost_matrix, tracks, dets, track_indices, detection_indices)#判断距离关系,使用马氏距离,大于阈值的都把代价变成无穷。 return cost_matrix

该方法的作用就是返回一个代价矩阵,每一个状态为confirmed的跟踪对象和该帧中的检测对象的一个代价

其中的self.metric.distance():

def distance(self, features, targets): #features是所有检测框的特征,每个元素是一个对象 cost_matrix = np.zeros((len(targets), len(features))) for i, target in enumerate(targets): cost_matrix[i, :] = self._metric(self.samples[target], features)#算余弦距离,这里的samples就是目前所有confirmed对象的特征。这里其实是找到了每个confirmed对象和相似度最高的那个检测对象的代价,比如有5个跟踪对象,4个检测对象,那么这里返回的矩阵就是5*4,其中元素i,j表示第i个跟踪对象和第j个检测对象之间的代价。所以这里每次多一行,一行就表示该track对象和每个det的最小代价 return cost_matrix def _nn_cosine_distance(x, y): distances = _cosine_distance(x, y) return distances.min(axis=0)#取每列的最小值,一列就是一个det对象和各个track对象的代价,这样相当于计算出了每个det对象跟这个track对象所有特征计算出来的最小代价值,也就是表示每个det对象和track对象的最小代价。 def _cosine_distance(a, b, data_is_normalized=False): if not data_is_normalized: a = np.asarray(a) / np.linalg.norm(a, axis=1, keepdims=True)#分母表示每个行向量求二范数 b = np.asarray(b) / np.linalg.norm(b, axis=1, keepdims=True) #其实画个图也很好理解,这里上面的步骤是把行向量化为单位向量,然后再进行np.dot求出来的就是行向量两两之间的一个余弦距离。 return 1. - np.dot(a, b.T)#因为这里是代价矩阵,所以代价越大,相似性越低。

如上三块代码就计算出了每个跟踪对象和检测对象的相似度(取相似度最高的)简单解释一下。如下图,是我运行代码时的一个结果,这就表示传入_nn_cosine_distance的track对象一共有3个特征,一共有9个检测对象,这里每个特征都是1*512。由此可以得到一个3*9的矩阵,然后再取每一列的最小值,返回的就是一个向量了。在这里插入图片描述

在得到余弦代价矩阵之后,要经过一个方法来对距离进行一个度量,跟踪对象的bbox和检测对象的bbox距离超过阈值的话,就把代价设为无穷大。

cost_matrix = linear_assignment.gate_cost_matrix( self.kf, cost_matrix, tracks, dets, track_indices, detection_indices)#判断距离关系,使用马氏距离,大于阈值的都把代价变成无穷。 def gate_cost_matrix( kf, cost_matrix, tracks, detections, track_indices, detection_indices, gated_cost=INFTY_COST, only_position=False):#INFTY_COST=1e+5 gating_dim = 2 if only_position else 4 gating_threshold = kalman_filter.chi2inv95[gating_dim] measurements = np.asarray( [detections[i].to_xyah() for i in detection_indices]) for row, track_idx in enumerate(track_indices): track = tracks[track_idx] gating_distance = kf.gating_distance( track.mean, track.covariance, measurements, only_position)#算出的马氏距离 cost_matrix[row, gating_distance > gating_threshold] = gated_cost return cost_matrix

搞懂gated_metric方法之后(简单说来就是这个方法可以让我们返回一个状态为confirmed的跟踪对象和检测对象的一个代价矩阵)就可以继续往下看。

confirmed_tracks = [ i for i, t in enumerate(self.tracks) if t.is_confirmed()]#获取状态位confirmed跟踪对象 unconfirmed_tracks = [ i for i, t in enumerate(self.tracks) if not t.is_confirmed()]#获取状态非confirmed的对象,例如Tentative,Deleted # Associate confirmed tracks using appearance features. matches_a, unmatched_tracks_a, unmatched_detections = \ linear_assignment.matching_cascade( gated_metric, self.metric.matching_threshold, self.max_age, self.tracks, detections, confirmed_tracks)#对检测出来的对象和状态位confirmed的跟踪对象进行特征匹配 def matching_cascade( distance_metric, max_distance, cascade_depth, tracks, detections, track_indices=None, detection_indices=None): #track_indices传入的是状态为confirmed跟踪对象,整体来看这个方法就是对检测结果和状态位confirmed跟踪对象进行匹配。 #detection_indices传过来的时候就是None if track_indices is None: track_indices = list(range(len(tracks)))#跟踪对象索引 if detection_indices is None: detection_indices = list(range(len(detections)))#检测对象索引 unmatched_detections = detection_indices#初始化为匹配上的检测框 matches = [] for level in range(cascade_depth): if len(unmatched_detections) == 0: # 当所有的检测框都匹配上之后结束匹配 break track_indices_l = [ k for k in track_indices if tracks[k].time_since_update == 1 + level ]#比较好理解,从后往前匹配,首先找到time_since_update为1的也就是上一帧刚更新过的那些跟踪对象,可以理解为先匹配连续轨迹然后匹配有间断但为达到max_age的对象。 if len(track_indices_l) == 0: # Nothing to match at this level continue matches_l, _, unmatched_detections = \ min_cost_matching( distance_metric, max_distance, tracks, detections, track_indices_l, unmatched_detections) matches += matches_l unmatched_tracks = list(set(track_indices) - set(k for k, _ in matches)) return matches, unmatched_tracks, unmatched_detections

主要是上面这个matching_cascade()方法。这里的匹配过程是根据更新的先后顺序来进行特征匹配,越近更新的就更早的进行匹配,也就是可以更容易得到匹配对象。

def min_cost_matching( distance_metric, max_distance, tracks, detections, track_indices=None, detection_indices=None): if track_indices is None: track_indices = np.arange(len(tracks)) if detection_indices is None: detection_indices = np.arange(len(detections)) if len(detection_indices) == 0 or len(track_indices) == 0: return [], track_indices, detection_indices # Nothing to match. cost_matrix = distance_metric( tracks, detections, track_indices, detection_indices)#这个方法是traker.py中_match方法下定义的一个方法。这个方法返回代价矩阵,其中的内容就是每个跟踪对象和每个检测对象的代价 cost_matrix[cost_matrix > max_distance] = max_distance + 1e-5#余弦距离大于阈值的也都变成,代价越大,相似度越小,代价大于0.2的,也就是相似度小于0.8的,下面71行用到了 row_indices, col_indices = linear_assignment(cost_matrix) matches, unmatched_tracks, unmatched_detections = [], [], [] for col, detection_idx in enumerate(detection_indices): if col not in col_indices: unmatched_detections.append(detection_idx) for row, track_idx in enumerate(track_indices): if row not in row_indices: unmatched_tracks.append(track_idx) for row, col in zip(row_indices, col_indices):#这个循环是把那些虽然在最优解中,但是超过阈值的去掉 track_idx = track_indices[row] detection_idx = detection_indices[col] if cost_matrix[row, col] > max_distance: unmatched_tracks.append(track_idx) unmatched_detections.append(detection_idx) else: matches.append((track_idx, detection_idx)) return matches, unmatched_tracks, unmatched_detections

其中的min_cost_matching方法可以得到KM匹配算法之后的匹配结果(这个结果需要进行修正,代价超过0.2的都算做未匹配)

下面是我运行时查看KM之前的代价矩阵情况在这里插入图片描述

然后得到了confirmed的匹配结果

iou_track_candidates = unconfirmed_tracks + [ k for k in unmatched_tracks_a if self.tracks[k].time_since_update == 1]#不是confirmed加上未匹配上的track中刚更新过的。 unmatched_tracks_a = [ k for k in unmatched_tracks_a if self.tracks[k].time_since_update != 1]#未匹配上的且不是刚更新的 matches_b, unmatched_tracks_b, unmatched_detections = \ linear_assignment.min_cost_matching( iou_matching.iou_cost, self.max_iou_distance, self.tracks, detections, iou_track_candidates, unmatched_detections)

之后再把非confirmed的跟踪对象和上一帧刚更新过但是未匹配上的那些跟踪对象加入同一个iou_track_candidates列表,与未匹配上的检测对象进行IOU匹配。

def iou_cost(tracks, detections, track_indices=None, detection_indices=None): """An intersection over union distance metric. Parameters ---------- tracks : List[deep_sort.track.Track] A list of tracks. detections : List[deep_sort.detection.Detection] A list of detections. track_indices : Optional[List[int]] A list of indices to tracks that should be matched. Defaults to all `tracks`. detection_indices : Optional[List[int]] A list of indices to detections that should be matched. Defaults to all `detections`. Returns ------- ndarray Returns a cost matrix of shape len(track_indices), len(detection_indices) where entry (i, j) is `1 - iou(tracks[track_indices[i]], detections[detection_indices[j]])`. """ if track_indices is None: track_indices = np.arange(len(tracks)) if detection_indices is None: detection_indices = np.arange(len(detections)) cost_matrix = np.zeros((len(track_indices), len(detection_indices))) for row, track_idx in enumerate(track_indices): if tracks[track_idx].time_since_update > 1:#把不是刚更新的都设为无穷大,这对删除了的直接没戏了,除非所有的都匹配完了才会有结果。 cost_matrix[row, :] = linear_assignment.INFTY_COST continue bbox = tracks[track_idx].to_tlwh() candidates = np.asarray([detections[i].tlwh for i in detection_indices]) cost_matrix[row, :] = 1. - iou(bbox, candidates) return cost_matrix

后面的步骤跟上面都是一样的。上面的搞懂之后,就可以看_match方法了:直接顺一遍应该很顺

def _match(self, detections): ''' Args: detections:传入的参数就是经过一系列筛选得到的bbox Returns: ''' def gated_metric(tracks, dets, track_indices, detection_indices): features = np.array([dets[i].feature for i in detection_indices])#所有检测对象的特征 targets = np.array([tracks[i].track_id for i in track_indices])#跟踪对象的id cost_matrix = self.metric.distance(features, targets)#这里得到的代价矩阵就是每个track对象和现在det对象的代价值 #self.metric 是NearestNeighborDistanceMetric("cosine", max_cosine_distance, nn_budget)对象 # 这里返回的是使用余弦距离计算出来的代价矩阵 cost_matrix = linear_assignment.gate_cost_matrix( self.kf, cost_matrix, tracks, dets, track_indices, detection_indices)#判断距离关系,使用马氏距离,大于阈值的都把代价变成无穷。 return cost_matrix # Split track set into confirmed and unconfirmed tracks. confirmed_tracks = [ i for i, t in enumerate(self.tracks) if t.is_confirmed()]#获取状态位confirmed跟踪对象 unconfirmed_tracks = [ i for i, t in enumerate(self.tracks) if not t.is_confirmed()]#获取状态非confirmed的对象,例如Tentative,Deleted # Associate confirmed tracks using appearance features. matches_a, unmatched_tracks_a, unmatched_detections = \ linear_assignment.matching_cascade( gated_metric, self.metric.matching_threshold, self.max_age, self.tracks, detections, confirmed_tracks)#对检测出来的对象和状态位confirmed的跟踪对象进行特征匹配 # Associate remaining tracks together with unconfirmed tracks using IOU.剩下的用IOU进行匹配 iou_track_candidates = unconfirmed_tracks + [ k for k in unmatched_tracks_a if self.tracks[k].time_since_update == 1]#不是confirmed加上未匹配上的track中刚更新过的。 unmatched_tracks_a = [ k for k in unmatched_tracks_a if self.tracks[k].time_since_update != 1]#未匹配上的且不是刚更新的 matches_b, unmatched_tracks_b, unmatched_detections = \ linear_assignment.min_cost_matching( iou_matching.iou_cost, self.max_iou_distance, self.tracks, detections, iou_track_candidates, unmatched_detections) matches = matches_a + matches_b unmatched_tracks = list(set(unmatched_tracks_a + unmatched_tracks_b)) return matches, unmatched_tracks, unmatched_detections

得到了最终的matches,unmatched_tracks,unmatched_detections

对其分别进行处理:

for track_idx, detection_idx in matches: self.tracks[track_idx].update( self.kf, detections[detection_idx]) for track_idx in unmatched_tracks: self.tracks[track_idx].mark_missed()#如果是Tentative状态下未匹配上就直接删了,或者大于最大值了 for detection_idx in unmatched_detections: self._initiate_track(detections[detection_idx]) self.tracks = [t for t in self.tracks if not t.is_deleted()]

这些操作也不难理解,主要是这个未匹配上的跟踪对象,

def mark_missed(self): """Mark this track as missed (no association at the current time step). """ if self.state == TrackState.Tentative:# self.state = TrackState.Deleted elif self.time_since_update > self._max_age: self.state = TrackState.Deleted

如果状态为Tentative,则直接转为Deleted,如果为Confirmed且超过max_age也变为confirmed

且self.track更新为非deleted对象。

# Update distance metric. active_targets = [t.track_id for t in self.tracks if t.is_confirmed()]#状态为confirmed的trackid features, targets = [], [] for track in self.tracks: if not track.is_confirmed(): continue features += track.features targets += [track.track_id for _ in track.features] track.features = [] self.metric.partial_fit( np.asarray(features), np.asarray(targets), active_targets)

下面这个metric.partial_fit()

def partial_fit(self, features, targets, active_targets): """Update the distance metric with new data. Parameters ---------- features : ndarray An NxM matrix of N features of dimensionality M. targets : ndarray An integer array of associated target identities. active_targets : List[int] A list of targets that are currently present in the scene. """ #该方法的作用就是调整存储特征的字典使其中存储的特征都是状态为confirmed的对象的 #activate_targets:状态为confirmed的跟踪对象。 #targets和features是目前所有状态为confirmed的跟踪对象的id和特征 for feature, target in zip(features, targets): self.samples.setdefault(target, []).append(feature)#setdefalut方法就是如果这个字典中有这个target这个键就返回其键值,没有的话就新建这个键并且返回逗号后面的值最为键值 if self.budget is not None: self.samples[target] = self.samples[target][-self.budget:]#只取最新的特征,比如一个跟踪对象出现在1-200帧,只有100-200会被保存 self.samples = {k: self.samples[k] for k in active_targets}

只加入状态为confirmed那些对象的特征,Tentative不加



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有