项目5 PROJECT 5 行人检测与追踪计数 本项目以行人的视频为研究对象,通过YOLO V3目标检测算法和Deep SORT跟踪算法对人数进行统计。 5.1总体设计 本部分包括系统整体结构和系统流程。 5.1.1系统整体结构 系统整体结构如图51所示。 图51系统整体结构 5.1.2系统流程 系统流程如图52所示。 图52系统流程 5.2运行环境 本部分包括Python环境、TensorFlow环境、安装所需的软件包和硬件环境。 5.2.1Python环境 需要Python 3.7及以上配置,在Windows环境下推荐下载Anaconda完成Python所需配置,下载地址为: https://www.anaconda.com/。 5.2.2TensorFlow环境 在https://developer.nvidia.com/cudadownloads下载CUDA并进行安装。在https://developer.nvidia.com/rdp/cudnnarchive下载cuDNN并进行安装(要与CUDA版本匹配),下载完成后解压到安装好的CUDA文件夹中。 打开CMD,输入命令: nvccV 检查安装版本,查看是否已经安装成功。 打开Anaconda Prompt,输入清华仓库镜像,输入命令: conda config--add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/ conda config set show_channel_urls yes 创建Python 3.7的环境,名称为TensorFlow,此时Python版本和后面TensorFlow的版本有匹配问题,此步选择Python 3.x,输入命令: conda create n tensorflow python=3.7 有需要确认的地方,都输入y。 在Anaconda Prompt中激活TensorFlow环境,输入命令: activate tensorflow 安装完毕。 5.2.3安装所需的软件包 系统运行版本如下: Keras==2.3.1 tensorflowgpu==2.0.0 opencvpython==4.2.0.32 scikitlearn==0.19.2 scipy==1.3.1 Pillow==7.0.0 将上述语句写入一个.txt文件内,命名为Requirements,在TensorFlow环境中安装上述软件包,输入命令: pip install r Requirements.txt 安装完毕。 5.2.4硬件环境 系统运行的硬件环境如下: (1) 处理器: IntelCoreTM i76700HQ @2.60gHz(8CPUs),~2.6GHz。 (2) 显卡: NVIDIA Geforce GTX 960M。 (3) 显示内存: 4055MB。 (4) 内存: 16384MB RAM。 5.3模块实现 本项目包括5个模块: 准备数据、数据预处理、目标检测、目标追踪、主函数。下面分别给出各模块的功能介绍及相关代码。 5.3.1准备数据 本部分包括数据准备工作及相关代码。 将YOLO官网的数据集通过Keras转换为.h5文件,需要下载.weights文件和yolov3_keras版本的源代码。下载地址分别为https://pjreddie.com/media/files/yolov3.weights和https://github.com/qqwweee/kerasyolo3。 在TensorFlow环境中运行命令: $ python convert.py model_data/yolov3.cfg model_data/yolov3.weights model_data/yolo.h5,将需要的数据集文件放入model_data文件夹中,相关代码如下: import argparse #argparse的作用是为.py文件封装可以选择的参数 import configparser #configparser模块读取配置文件 import io import os from collections import defaultdict import numpy as np from keras import backend as K from keras.layers import (Conv2D, Input, ZeroPadding2D, Add, UpSampling2D, MaxPooling2D, Concatenate) from keras.layers.advanced_activations import LeakyReLU from keras.layers.normalization import BatchNormalization from keras.models import Model from keras.regularizers import l2 from keras.utils.vis_utils import plot_model as plot parser = argparse.ArgumentParser(description='Darknet To Keras Converter.') #创建一个解析对象 parser.add_argument('config_path', help='Path to Darknet cfg file.') #向该对象中添加需要关注的命令行参数和选项,每个方法对应一个参数或选项 parser.add_argument('weights_path', help='Path to Darknet weights file.') parser.add_argument('output_path',help='Path to output Keras model file.') parser.add_argument( 'p', 'plot_model', help='Plot generated Keras model and save as image.', action='store_true') parser.add_argument( 'w', 'weights_only', help='Save as Keras weights file instead of model file.', action='store_true') def unique_config_sections(config_file): #转换所有config部分,使其具有唯一名称,为config解析器的兼容性向配置中添加唯一后缀 section_counters = defaultdict(int) #defaultdict的作用是: 当字典里key不存在但被查找时,返回一个默认值 output_stream = io.StringIO() #表示在内存中以I/O流的方式读写字符串 with open(config_file) as fin: for line in fin: if line.startswith('['): section = line.strip().strip('[]') _section = section + '_' + str(section_counters[section]) section_counters[section] += 1 line = line.replace(section, _section) output_stream.write(line) output_stream.seek(0)#把文件指针移动到文件开始处 return output_stream def _main(args): config_path = os.path.expanduser(args.config_path) #返回参数,以"~"起始的参数被替换成用户主目录; 扩展失败或者参数不以"~"开始,则直接返回 #参数(path) weights_path = os.path.expanduser(args.weights_path) weights_path = os.path.expanduser(args.weights_path) assert config_path.endswith('.cfg'), '{} is not a .cfg file'.format( config_path) #"断言"是声明其布尔值必须为真的判定,如果发生 #异常说明表达式为假 assert weights_path.endswith( '.weights'), '{} is not a .weights file'.format(weights_path) output_path = os.path.expanduser(args.output_path) assert output_path.endswith( '.h5'), 'output path {} is not a .h5 file'.format(output_path) output_root = os.path.splitext(output_path)[0] #加载权重和配置 print('Loading weights.') weights_file = open(weights_path, 'rb') major, minor, revision = np.ndarray( shape=(3, ), dtype='int32', buffer=weights_file.read(12)) #ndarray 是多维数组对象,它的一个特点是同构,即其中所有元素的类型必须相同 if (major*10+minor)>=2 and major<1000 and minor<1000: seen = np.ndarray(shape=(1,), dtype='int64', buffer=weights_file.read(8)) else: seen = np.ndarray(shape=(1,), dtype='int32', buffer=weights_file.read(4)) print('Weights Header: ', major, minor, revision, seen) #打印参数 print('Parsing Darknet config.') unique_config_file = unique_config_sections(config_path)#配置文件 cfg_parser = configparser.ConfigParser() cfg_parser.read_file(unique_config_file) print('Creating Keras model.') input_layer = Input(shape=(None, None, 3)) prev_layer = input_layer all_layers = [] weight_decay = float(cfg_parser['net_0']['decay'] #衰减加权 ) if 'net_0' in cfg_parser.sections() else 5e4 count = 0 out_index = [] for section in cfg_parser.sections(): #解析 print('Parsing section {}'.format(section)) if section.startswith('convolutional'): filters = int(cfg_parser[section]['filters']) size = int(cfg_parser[section]['size']) stride = int(cfg_parser[section]['stride']) pad = int(cfg_parser[section]['pad']) activation = cfg_parser[section]['activation'] batch_normalize = 'batch_normalize' in cfg_parser[section] padding = 'same' if pad == 1 and stride == 1 else 'valid' #设置权重 #Darknet将卷积权重序列化 #格式为[bias / beta,[gamma,mean,variance],conv_weights] prev_layer_shape = K.int_shape(prev_layer) weights_shape = (size, size, prev_layer_shape[1], filters) darknet_w_shape = (filters, weights_shape[2], size, size) weights_size = np.product(weights_shape) #加权维度 print('conv2d', 'bn' if batch_normalize else ' ', activation, weights_shape) conv_bias = np.ndarray( #卷积偏置 shape=(filters, ), dtype='float32', buffer=weights_file.read(filters * 4)) count += filters if batch_normalize: #批标准化 bn_weights = np.ndarray( #加权数据 shape=(3, filters), dtype='float32', buffer=weights_file.read(filters * 12)) count += 3 * filters bn_weight_list = [ #加权列表 bn_weights[0], conv_bias, bn_weights[1], bn_weights[2] ] conv_weights = np.ndarray( #卷积加权 shape=darknet_w_shape, dtype='float32', buffer=weights_file.read(weights_size * 4)) count += weights_size #Darknet conv_weights是Caffe的序列化:(out_dim,in_dim,高度,宽度) #将它们设置为TensorFlow顺序:(高度,宽度,in_dim,out_dim) conv_weights = np.transpose(conv_weights, [2, 3, 1, 0]) conv_weights = [conv_weights] if batch_normalize else [ conv_weights, conv_bias ] #激活处理 act_fn = None if activation == 'leaky': pass #以后使用 elif activation != 'linear': raise ValueError( 'Unknown activation function `{}` in section {}'.format( activation, section)) #创建二维卷积层 if stride>1: #Darknet使用的左侧和顶部填充不是相同模式 prev_layer = ZeroPadding2D(((1,0),(1,0)))(prev_layer) conv_layer = (Conv2D( #卷积层 filters, (size, size), strides=(stride, stride), kernel_regularizer=l2(weight_decay), use_bias=not batch_normalize, weights=conv_weights, activation=act_fn, padding=padding))(prev_layer) if batch_normalize: #批标准化 conv_layer = (BatchNormalization( weights=bn_weight_list))(conv_layer) prev_layer = conv_layer if activation == 'linear': #线性激活 all_layers.append(prev_layer) elif activation == 'leaky': #使用Leaky ReLU act_layer = LeakyReLU(alpha=0.1)(prev_layer) prev_layer = act_layer all_layers.append(act_layer) elif section.startswith('route'): #始于路由 ids = [int(i) for i in cfg_parser[section]['layers'].split(',')] layers = [all_layers[i] for i in ids] if len(layers) > 1: print('Concatenating route layers:', layers) concatenate_layer = Concatenate()(layers) all_layers.append(concatenate_layer) prev_layer = concatenate_layer else: skip_layer = layers[0] #只需一层路由 all_layers.append(skip_layer) prev_layer = skip_layer elif section.startswith('maxpool'): #始于最大池化 size = int(cfg_parser[section]['size']) stride = int(cfg_parser[section]['stride']) all_layers.append( MaxPooling2D( #二维最大池化 pool_size=(size, size), strides=(stride, stride), padding='same')(prev_layer)) prev_layer = all_layers[1] elif section.startswith('shortcut'): #始于直连 index = int(cfg_parser[section]['from']) activation = cfg_parser[section]['activation'] assert activation == 'linear', 'Only linear activation supported.' all_layers.append(Add()([all_layers[index], prev_layer])) prev_layer = all_layers[1] elif section.startswith('upsample'): #始于上采样 stride = int(cfg_parser[section]['stride']) assert stride == 2, 'Only stride=2 supported.' all_layers.append(UpSampling2D(stride)(prev_layer)) prev_layer = all_layers[1] elif section.startswith('yolo'): #始于yolo out_index.append(len(all_layers)1) all_layers.append(None) prev_layer = all_layers[1] elif section.startswith('net'): #始于net pass else: raise ValueError( 'Unsupported section header type: {}'.format(section)) #创建并保存模型 if len(out_index)==0: out_index.append(len(all_layers)1) model = Model(inputs=input_layer, outputs=[all_layers[i] for i in out_index]) print(model.summary()) if args.weights_only: model.save_weights('{}'.format(output_path)) print('Saved Keras weights to {}'.format(output_path)) else: model.save('{}'.format(output_path)) print('Saved Keras model to {}'.format(output_path)) #检查是否已读取所有权重 remaining_weights = len(weights_file.read()) / 4 weights_file.close() print('Read {} of {} from Darknet weights.'.format(count, count + remaining_weights)) if remaining_weights > 0: print('Warning: {} unused weights'.format(remaining_weights)) if args.plot_model: plot(model, to_file='{}.png'.format(output_root), show_shapes=True) print('Saved model plot to {}.png'.format(output_root)) if __name__ == '__main__': _main(parser.parse_args())#最后调用解析方法,成功之后即可使用 Deep_Sort下载与特征训练: 从https://github.com/nwojke/deep_sort 下载Deep_Sort的代码,这里使用的是MARS训练集,因数据量过大,可直接使用训练好的.pb文件。 MARS是Market1501数据集的扩展。视频收集过程中,在清华大学校园内放置了6个同步摄像机。有5个1080×1920高清摄像机和1个640×480的SD摄像机。视频内包含1261个不同的行人,每个行人至少被2个摄像机捕获。加载训练模型的代码如下: self.model_path = './model_data/yolo.h5' self.anchors_path = 'model_data/yolo_anchors.txt' self.classes_path = 'model_data/coco_classes.txt' model_filename = 'model_data/market1501.pb' 5.3.2数据预处理 输入目标视频后,需要对其进行预处理。 1. 视频读取 视频读取相关代码如下: video_capture = cv2.VideoCapture(args["input"]) #打开路径中的视频 ret, frame = video_capture.read() #ret(布尔值)表示是否读取到图片,frame表示截取一帧图片 if ret != True: break 2. 获取视频流帧的宽度和高度 获取视频流帧的宽度和高度代码如下: w = int(video_capture.get(3))#获取视频流帧的宽度 h = int(video_capture.get(4))#获取视频流帧的高度 3. 指定写入视频帧编码格式为MJPG 相关代码如下: fourcc = cv2.VideoWriter_fourcc(*'MJPG') #OpenCV转换成PIL.Image格式 image = Image.fromarray(frame[...,::-1]) 5.3.3目标检测 训练模型准备和数据预处理相关工作完成之后,使用YOLO V3进行目标检测,寻找到画面中相应的类别。 1. 目标识别 目标识别相关代码如下: boxs,class_names = yolo.detect_image(image) features = encoder(frame,boxs) #这里的置信分数是1.0 detections = [Detection(bbox, 1.0, feature) for bbox, feature in zip(boxs, features)] #运行非最大值抑制 boxes = np.array([d.tlwh for d in detections]) scores = np.array([d.confidence for d in detections]) indices = preprocessing.non_max_suppression(boxes, nms_max_overlap, scores) #抑制重复检测 detections = [detections[i] for i in indices] preprocessing.non_max_suppression用于抑制重复检测,使计数更加精准,函数代码如下: def non_max_suppression(boxes, max_bbox_overlap, scores=None): #非最大抑制 if len(boxes) == 0: return [] boxes = boxes.astype(np.float) #检测框 pick = [] x1 = boxes[:, 0] y1 = boxes[:, 1] x2 = boxes[:, 2] + boxes[:, 0] y2 = boxes[:, 3] + boxes[:, 1] area = (x2 - x1 + 1) * (y2 - y1 + 1) if scores is not None: idxs = np.argsort(scores) #索引 else: idxs = np.argsort(y2) while len(idxs) > 0: last = len(idxs) - 1 i = idxs[last] pick.append(i) xx1 = np.maximum(x1[i], x1[idxs[:last]]) yy1 = np.maximum(y1[i], y1[idxs[:last]]) xx2 = np.minimum(x2[i], x2[idxs[:last]]) yy2 = np.minimum(y2[i], y2[idxs[:last]]) w = np.maximum(0, xx2 - xx1 + 1) h = np.maximum(0, yy2 - yy1 + 1) overlap = (w * h) / area[idxs[:last]] #计算重叠区域 idxs = np.delete( idxs, np.concatenate( ([last], np.where(overlap > max_bbox_overlap)[0]))) return pick 2. 绘制方框 为使物体能被清晰检测到,需要绘制方框标识物体位置。 bbox = track.to_tlbr() #以边界框格式获取当前位置 color = [int(c) for c in COLORS[indexIDs[i] % len(COLORS)]] cv2.rectangle(frame, (int(bbox[0]), int(bbox[1])), (int(bbox[2]), int(bbox[3])),(color), 3) cv2.putText(frame,str(track.track_id),(int(bbox[0]), int(bbox[1] 50)),0, 5e-3 * 150, (color),2) #添加文字 if len(class_names) > 0: class_name = class_names[0] cv2.putText(frame, str(class_names[0]),(int(bbox[0]), int(bbox[1] 20)),0, 5e-3 * 150, (color),2) 3. 目标计数 目标计数相关代码如下: indexIDs.append(int(track.track_id)) counter.append(int(track.track_id)) 5.3.4目标追踪 目标追踪包括两部分,一是对被检测到的物体进行预测,将跟踪状态分布向前传播一步; 二是执行测量更新和跟踪管理。 1. predict() 对被检测到的物体进行预测,具体函数定义如下: def predict(self): #前进一个时间步长传播轨道状态分布,该函数应该在每次update之前调用一次 for track in self.tracks: track.predict(self.kf) #由卡尔曼滤波器预测状态分布,记录每个轨迹的均值和方差作为滤波器的输入 2. update() 本部分相关代码如下: def update(self, detections): #执行测量更新和跟踪管理 #调用_match进行级联匹配 matches, unmatched_tracks, unmatched_detections = \ self._match(detections) #根据匹配结果更新轨迹集合 for track_idx, detection_idx in matches: self.tracks[track_idx].update( self.kf, detections[detection_idx]) for track_idx in unmatched_tracks: self.tracks[track_idx].mark_missed() for detection_idx in unmatched_detections: self._initiate_track(detections[detection_idx]) self.tracks = [t for t in self.tracks if not t.is_deleted()] #传入特征列表及其对应ID,构造一个活跃目标的特征字典 active_targets = [t.track_id for t in self.tracks if t.is_confirmed()] features, targets = [], [] for track in self.tracks: if not track.is_confirmed(): continue features += track.features targets += [track.track_id for _ in track.features] track.features = [] self.metric.partial_fit( np.asarray(features), np.asarray(targets), active_targets) 3. 绘制物体运动路径 绘制物体运动路径的相关代码如下: for j in range(1, len(pts[track.track_id])): if pts[track.track_id][j  1] is None or pts[track.track_id][j] is None: continue thickness = int(np.sqrt(64 / float(j + 1)) * 2) #线条粗细 cv2.line(frame,(pts[track.track_id][j1]), (pts[track.track_id][j]),(color),thickness) #画线 5.3.5主函数 主函数相关代码如下: from __future__ import division, print_function, absolute_import#导入模块 import os import datetime from timeit import time import warnings import cv2 import numpy as np import argparse from PIL import Image from yolo import YOLO from deep_sort import preprocessing from deep_sort import nn_matching from deep_sort.detection import Detection from deep_sort.tracker import Tracker from tools import generate_detections as gdet from deep_sort.detection import Detection as ddet from collections import deque from keras import backend backend.clear_session() #解析命令行参数 ap = argparse.ArgumentParser() ap.add_argument("i", "--input",help="path to input video", default = "E:\项目实战目标追踪\multiobjecttracking\videos\soccer_01.mp4") ap.add_argument("c", "--class",help="name of class", default = "person") args = vars(ap.parse_args()) pts = [deque(maxlen=30) for _ in range(9999)] warnings.filterwarnings('ignore') #初始化颜色列表,表示每个可能的类标签 np.random.seed(100) COLORS = np.random.randint(0, 255, size=(200, 3),#生成200×3个0~255的随机数 dtype="uint8") def main(yolo): start = time.time() #参数设定 max_cosine_distance = 0.5#余弦距离的控制阈值 nn_budget = None nms_max_overlap = 0.3 #非极大抑制的阈值 counter = [] #deep_sort model_filename = 'model_data/market1501.pb' encoder = gdet.create_box_encoder(model_filename,batch_size=1) #最近邻距离度量,每个目标返回到已观察所有样本的最近距离,由距离度量方法构造Tracker metric = nn_matching.NearestNeighborDistanceMetric("cosine", max_cosine_distance, nn_budget) tracker = Tracker(metric) writeVideo_flag = True video_capture = cv2.VideoCapture(args["input"]) #打开路径中的视频 if writeVideo_flag: #定义编解码器并创建VideoWriter对象 w = int(video_capture.get(3)) #获取视频流帧的宽度 h = int(video_capture.get(4)) #获取视频流帧的高度 fourcc = cv2.VideoWriter_fourcc(*'MJPG') out = cv2.VideoWriter('./output/'+args["input"][43:57]+ "_" + args["class"] + '_output.avi', fourcc, 15, (w, h)) #视频输出 list_file = open('detection.txt', 'w') frame_index = 1 fps = 0.0 while True: ret, frame = video_capture.read() #ret(布尔值)表示是否读取到图 #片,frame表示截取到一帧图片 if ret != True: break t1 = time.time() image = Image.fromarray(frame[...,::1]) #OpenCV转换成PIL.Image格式 boxs,class_names = yolo.detect_image(image) features = encoder(frame,boxs) #这里的置信分数是1.0 detections = [Detection(bbox, 1.0, feature) for bbox, feature in zip(boxs, features)] #运行非最大值抑制 boxes = np.array([d.tlwh for d in detections]) scores = np.array([d.confidence for d in detections]) indices = preprocessing.non_max_suppression(boxes, nms_max_overlap, scores) #抑制重复的检测 detections = [detections[i] for i in indices] tracker.predict() #将跟踪状态分布向前传播一步 tracker.update(detections) #执行测量更新和跟踪管理 i = int(0) indexIDs = [] c = [] boxes = [] for det in detections: bbox = det.to_tlbr() #以边界框格式获取当前位置 cv2.rectangle(frame,(int(bbox[0]), int(bbox[1])), (int(bbox[2]), int(bbox[3])),(255,255,255), 2) #画矩形框 for track in tracker.tracks: if not track.is_confirmed() or track.time_since_update > 1: continue indexIDs.append(int(track.track_id)) counter.append(int(track.track_id)) bbox = track.to_tlbr() #以边界框格式获取当前位置 color = [int(c) for c in COLORS[indexIDs[i] % len(COLORS)]] cv2.rectangle(frame, (int(bbox[0]), int(bbox[1])), (int(bbox[2]), int(bbox[3])),(color), 3) cv2.putText(frame,str(track.track_id),(int(bbox[0]), int(bbox[1] 50)),0, 5e3 * 150, (color),2) #添加文字 if len(class_names) > 0: class_name = class_names[0] cv2.putText(frame, str(class_names[0]),(int(bbox[0]), int(bbox[1] 20)),0, 5e3 * 150, (color),2) i += 1 #bbox_center_point(x,y) center = (int(((bbox[0])+(bbox[2]))/2),int(((bbox[1])+(bbox[3]))/2)) pts[track.track_id].append(center) thickness = 5 #中心点 cv2.circle(frame, (center), 1, color, thickness) #绘制运动路径 for j in range(1, len(pts[track.track_id])): if pts[track.track_id][j  1] is None or pts[track.track_id][j] is None: continue thickness = int(np.sqrt(64 / float(j + 1)) * 2) #线条粗细 cv2.line(frame,(pts[track.track_id][j1]), (pts[track.track_id][j]),(color),thickness) #画线 count = len(set(counter)) cv2.putText(frame, "Total Object Counter: "+str(count),(int(20), int(120)),0, 5e3 * 200, (0,255,0),2) cv2.putText(frame, "Current Object Counter: "+str(i),(int(20), int(80)),0, 5e3 * 200, (0,255,0),2) cv2.putText(frame, "FPS: %f"%(fps),(int(20), int(40)),0, 5e3 * 200, (0,255,0),3) cv2.namedWindow("YOLO3_Deep_SORT", 0); cv2.resizeWindow('YOLO3_Deep_SORT', 1024, 768); cv2.imshow('YOLO3_Deep_SORT', frame) if writeVideo_flag: #存储一帧 out.write(frame) frame_index = frame_index + 1 list_file.write(str(frame_index)+' ') if len(boxs) != 0: for i in range(0,len(boxs)): list_file.write(str(boxs[i][0]) + ' '+str(boxs[i][1]) + ' '+str(boxs[i][2]) + ' '+str(boxs[i][3]) + ' ') list_file.write('\n') fps = ( fps + (1./(time.time()t1)) ) / 2 #print(set(counter)) #按下Q键停止 if cv2.waitKey(1) & 0xFF == ord('q'): break print(" ") print("[Finish]") end = time.time() if len(pts[track.track_id]) != None: print(args["input"][43:57]+": "+ str(count) + " " + str(class_name) +' Found') else: print("[No Found]") video_capture.release() if writeVideo_flag: out.release() list_file.close() cv2.destroyAllWindows() if __name__ == '__main__': main(YOLO()) 5.4系统测试 运行效果如图53~图55所示。 图53测试图1 图54测试图2 图55测试图3