我正在参加「启航方案」
前言
这儿先声明一下
本项目是基于github.com/wufan-tb/yo… 做的一个二次开发,也便是进行一个项目改造,一起针对原项目进行优化。
在原项目中,采用单线程流线操作,导致无法进行真正的实时的多方针在线检测。只能经过已有的视频文件进行检测。一起在运算进程中,核算资源耗费较大,在进行真正的在线推理时将导致卡顿。为此,为了可以更好地是完结任务。本文博主,在花费一天的时刻细心阅读其源码后,进行了新一轮的定制修改。
支撑了在线视频检测,也便是支撑:
cam = cv.VideoCapture(0)
一起,这儿我将前天做好的人脸检测模块一起放置在了这儿: GitHub: github.com/Huterox/Rea…
Gitee: gitee.com/Huterox/Rea… 里边包含了完整的权重文件,无需进行下载新的权重文件。
装备项
为了方便办理和统一,我们这边独自将装备文件给提取出来了。首要是项目傍边的这两个文件: 别离用于办理人脸辨认以及多方针的行为检测。 这儿需求声明的是,假如需求进行二次开发的话,那么这儿请将你的完结模块,放置在与config相同的目录下面,进行开发,原因的话,很简略,python目录的问题。
人脸辨认装备
import dlib
import os
"""
人脸辨认装备
"""
class FACE_FILE(object):
shape_predictor_path='alg/faceRec/data/data_dlib/shape_predictor_68_face_landmarks.dat'
recognition_model_path='alg/faceRec/data/data_dlib/dlib_face_recognition_resnet_model_v1.dat'
csv_base_path='alg/faceRec/data/csv/features.csv'
faceData_path='alg/faceRec/data/faceData/'
points_faceData_path='alg/faceRec/data/faceData_points/'
faceName_path='alg/faceRec/data/faceName.txt'
imgs_folder_path=os.listdir("alg/faceRec/data/faceData/")
font_path = "alg/fonts/MSYH.ttc"
FACE_CONFIG={
"max_collection_image": 50,
"get_points_faceData_flag": True,
"import_all_features_flag":True,
"face_needTo_update":[x for x in range(1, 2)], #选择更新脸部的编号,从0开端
"num_of_person_in_lib":len(FACE_FILE.imgs_folder_path),
"recognition_threshold":0.43,
"predictor": dlib.shape_predictor(FACE_FILE.shape_predictor_path),
"recognition_model": dlib.face_recognition_model_v1(FACE_FILE.recognition_model_path),
"detector":dlib.get_frontal_face_detector(),
}
人脸辨认的模块十分简略,首要的话,首要便是我们的装备途径,例如,我们存入人脸信息的文件地址,还有一些字体文件啥的。
多方针行为检测装备
之后是我们多方针的一个行为检测模块。
"""
方针检测装备
"""
import os
class DECTION_CONFIG():
#test imgs folder or video or camera
input_date = r"C:\Users\31395\Desktop\peoplePose\temp\yolo_slowfast\video\test_person.mp4"
#folder to save result imgs, can not use input folder,视频保存途径
output = "output/video/"
#inference size (pixels)
yolo_imsize = 640
#object confidence threshold
yolo_conf = 0.4
#IOU threshold for NMS
yolo_iou = 0.4
#cuda device, i.e. 0 or 0,1,2,3 or cpu
yolo_device = "cuda"
#默许现已设置好了是cooc数据集
yolo_classes = None
yolo_weight = ""
#10 ~ 30 should be fine, the bigger, the faster
solowfast_process_batch_size = 25
# set 0.8 or 1 or 1.2
solowfast_video_clip_length = 1.2
#usually set 25 or 30
solowfast_frames_per_second = 25
data_mean = [0.45, 0.45, 0.45]
data_std = [0.225, 0.225, 0.225]
deepsort_ckpt = "alg/poseRec/deep_sort/deep_sort/deep/checkpoint/ckpt.t7"
deepsort_pb = "alg/poseRec/selfutils/temp.pbtxt"
streamTempBaseChannel = "/alg/poseRec/data/tempChannel"
# 设置实时处理的FPS
realTimeFps = 20
# 最大行列长度
max_queue_size = 512
# 每2秒左右存储一次视频,用于实时视频检测
tempStream = 2
首要的话,由于我是在刚刚说到的那个项目的基础上进行自己整合开发的,所以的话,这儿保留了他们原先的项目装备,新增了自己的一些装备,首要是关于yolo的一些装备,然后是slowfast的一些装备,那么deepsort的装备的话,在deep sort 那个文件夹下面,这个的话我们不需求进行改动。
这儿首要说一下我们新增的装备。(有中文注释部分)。
由于我们这边是做了一个实时在线检测的,可是呢,由于这个原项目,一开端便是做视频读取进行辨认的,一起,slowfast ,deepsort都是一个需求时刻序列的算法,也便是说需求将一组图画输入到网络然后进行猜测的算法,而且处理的速度十分慢,因而为了放置摄像头画面被吞了,我们这边就只能去开多线程去保护一个实时的状态。
由于假如不开多线程的话,会导致什么问题呢,便是时刻从1~10,假设在第5秒的时候要进行处理,那么处理的时刻是4秒,等你处理完了现已到了第10秒了(第九秒末)中心4秒产生的工作就没了。所以假如做实时,那么处理的功率就必须上去,这个堆设备去,可是在设备才干不行的状况下,我们可以做的便是削减画面的缺失,也便是做一个异步操作。那么这儿我选择了python的多线程来完结,为什么不是多进程呢,多进程确实可以加快进度,由于GIL的存在。可是考虑到,我们期望是尽可能削减资源耗费的状况下,去不遗漏画面,所以的话,这儿我们就仅仅用线程去做。一核有难总比八核有难要好一点。
人脸辨认模块
,那么我们接下来先看一下我们的人脸辨认模块,这个的话其实从前是介绍了的。不过后边我们又做了点改动。那便是支撑中文了,由于opencv是不支撑中文的。然后的话,我们的模块如下:
收集模块
首要的话仍是我们的人脸收集模块,在collection傍边我们提供了一个办法:
cam = cv.VideoCapture(0)
Collection().collection_cramer(cam)
cam.release()
cv.destroyAllWindows()
print("收集结束,程序退出!!")
这个办法支撑直接从摄像头或许视频傍边进行收集。当然我们相同也有一个直接从图片傍边进行收集的办法
之后收集结束之后的话,在这个目录下面(可以在装备中设置) 看到收集到的“大头”照
人脸存储模块
之后的话,我们要做的便是把这些人脸进行一个特征辨认,然后呢,将这些特征进行保存。
"""
负责读取收集到的人脸图画,然后去构建人脸对应的信息
"""
import cv2 as cv
import os
import numpy as np
import csv
from tqdm import tqdm
import shutil
from client.server.configFace import FACE_FILE,FACE_CONFIG
class BuildFace():
def write2csv(self,data, mode):
"""
更新csv文件傍边的数据(这儿面存储的是我们人脸的特征)
:param data:
:param mode:
:return:
"""
with open(FACE_FILE.csv_base_path, mode, newline='') as wf:
csv_writer = csv.writer(wf)
csv_writer.writerow(data)
def get_features_from_csv(self):
features_in_csv = []
with open(FACE_FILE.csv_base_path, 'r') as rf:
csv_reader = csv.reader(rf)
for row in csv_reader:
for i in range(0, 128):
row[i] = float(row[i])
features_in_csv.append(row)
return features_in_csv
def save_select_in_csv(self,data):
"""
选择性更新人脸数据
:param data:
:return:
"""
features_in_csv = self.get_features_from_csv()
with open(FACE_FILE.csv_base_path, 'w', newline='') as wf:
csv_writer = csv.writer(wf)
for index, i in enumerate(FACE_CONFIG.get("face_needTo_update")):
features_in_csv[i] = data[index]
csv_writer.writerow(features_in_csv[0])
with open(FACE_FILE.csv_base_path, 'a+', newline='') as af:
csv_writer = csv.writer(af)
for j in range(1, len(features_in_csv)):
csv_writer.writerow(features_in_csv[j])
print("csv文件更新完结!!")
def get_128_features(self,person_index):
"""
:param person_index: person_index代表第几个人脸数据文件夹
:return:
"""
num = 0
features = []
imgs_folder = FACE_FILE.imgs_folder_path[person_index]
points_faceImage_path = FACE_FILE.points_faceData_path + imgs_folder
imgs_path = FACE_FILE.faceData_path + imgs_folder + '/'
list_imgs = os.listdir(imgs_path)
imgs_num = len(list_imgs)
if os.path.exists(FACE_FILE.points_faceData_path + imgs_folder):
shutil.rmtree(points_faceImage_path)
os.makedirs(points_faceImage_path)
print("人脸点图文件夹建立成功!!")
with tqdm(total=imgs_num) as pbar:
pbar.set_description(str(imgs_folder))
for j in range(imgs_num):
image = cv.imread(os.path.join(imgs_path, list_imgs[j]))
faces = FACE_CONFIG.get("detector")(image, 1)
if len(faces) != 0:
for z, face in enumerate(faces):
shape = FACE_CONFIG.get("predictor")(image, face)
w, h = (face.right() - face.left()), (face.bottom() - face.top())
left, right, top, bottom = face.left() - w // 4, face.right() + w // 4, face.top() - h // 2, face.bottom() + h // 4
im = image
cv.rectangle(im, (left, top), (right, bottom), (0, 0, 255))
cv.imwrite(points_faceImage_path + '/{}.png'.format(j), im)
if (FACE_CONFIG.get("get_points_faceData_flag") == True):
for p in range(0, 68):
cv.circle(image, (shape.part(p).x, shape.part(p).y), 2, (0,0,255))
cv.imwrite(points_faceImage_path + '/{}.png'.format(j), image)
the_features = list(FACE_CONFIG.get("recognition_model").compute_face_descriptor(image, shape)) # 获取128维特征向量
features.append(the_features)
num += 1
pbar.update(1)
np_f = np.array(features)
res = np.median(np_f, axis=0)
return res
def building_form_config(self):
if (FACE_CONFIG.get("import_all_features_flag") == True):
self.building_all()
else:
peoples = FACE_CONFIG.get("face_needTo_update")
self.building_select(peoples)
def building_all(self):
res = self.get_128_features(person_index=0)
self.write2csv(res, 'w')
for i in range(1, FACE_CONFIG.get("num_of_person_in_lib")):
res = self.get_128_features(person_index=i)
self.write2csv(res, 'a+')
def building_select(self,peoples):
"""
更新某几个人脸,传入对应的下标编号,例如:[0,2,4]
:param peoples:
:return:
"""
select_res = []
for i in peoples:
res = self.get_128_features(person_index=i)
select_res.append(res)
self.save_select_in_csv(select_res)
这儿相同也是提供了多个办法,局部更新和全局更新都有。
那么相同的,我们将人脸的信息保存在这儿: 一起你还需求把人脸对应的名字写在这儿: 记住这儿的特征和人名是一一对应的。当然你也可以考虑优化一下用别的东西存储,或许直接写入到装备傍边去。又或许存入数据库傍边。
辨认模块
之后便是我们做人脸辨认的做法。
人脸辨认的进程其实和我们的人脸收集进程十分相似,不同的是,当新的人脸过来之后,我们核算他的特征向量与我们已有的特征向量进行一个比对。然后得到相似度最高的那个。
在这儿我首要说明两个比较重要的办法, 第一个便是这个:
def detect_from_cam(self,camera):
"""
这儿的话,和我们收集是相同的,便是传入这个camera目标就好了
:return:
"""
while camera.isOpened() and not self.quit_flag:
val, self.image = camera.read()
if val == False: continue
key = cv.waitKey(1)
res = self.face_detecting() # 0.038s
if res is not None:
face, self.all_face_location = res
for i in range(self.face_num):
[left, right, top, bottom] = self.all_face_location[i]
self.middle_point = [(left + right) / 2, (top + bottom) / 2]
self.face_img = self.image[top:bottom, left:right]
cv.rectangle(self.image, (left, top), (right, bottom), (0, 0, 255))
shape = FACE_CONFIG.get("predictor")(self.image, face[i]) # 0.002s
if self.face_num_change_flag == True or self.check_times <= 5:
if self.face_num_change_flag == True: # 人脸数量有改变,从头进行五次检测
self.check_times = 0
self.last_now_middlePoint_eDistance = [66666 for _ in range(self.available_max_face_num)]
for z in range(self.available_max_face_num):
self.check_features_from_cam[z] = []
if self.check_times < 5:
the_features_from_cam = list(
FACE_CONFIG.get("recognition_model").compute_face_descriptor(self.image, shape))
if self.check_times == 0: # 初始帧
self.check_features_from_cam[i].append(the_features_from_cam)
self.last_frame_middle_point[i] = self.middle_point
else:
this_face_index = self.track_link() # 后续帧需求与初始帧的人脸序号对应
self.check_features_from_cam[this_face_index].append(the_features_from_cam)
elif self.check_times == 5:
features_after_filter = self.middle_filter(self.check_features_from_cam[i])
self.check_features_from_cam[i] = []
for person in range(FACE_CONFIG.get("num_of_person_in_lib")):
e_distance = self.calculate_EuclideanDistance(self.all_features[person],
features_after_filter)
self.all_e_distance[i].append(e_distance)
if min(self.all_e_distance[i]) < FACE_CONFIG.get("recognition_threshold"):
self.person_name[i] = self.all_name[
self.all_e_distance[i].index(min(self.all_e_distance[i]))]
# cv.putText(self.image, self.person_name[i],
# (int((left + right) / 2) - 50, bottom + 20),
# cv.FONT_HERSHEY_SIMPLEX, 0.75, (0, 0, 255), 2)
self.image = self.cv2_add_chinese_text(self.image, self.person_name[i],
(int((left + right) / 2) - 50, bottom + 10),
(0, 0, 255), 25)
else:
self.person_name[i] = "Unknown"
else:
this_face_index = self.track_link()
self.image = self.cv2_add_chinese_text(self.image, self.person_name[this_face_index],
(int((left + right) / 2) - 50, bottom + 10),
(0, 0, 255), 25)
self.check_times += 1
for j in range(self.available_max_face_num):
self.all_e_distance[j] = []
"""
在这儿的话,n,s是不会触发的,这儿仅仅用一下这个q而已,也便是退出
"""
self.key_scan(key)
self.get_fps()
cv.namedWindow('camera', 0)
cv.imshow('camera', self.image)
camera.release()
cv.destroyAllWindows()
这个办法是从视频傍边进行辨认,这儿设置时5张图片进行进行核算头像特征,然后进行比照。
那么相同对应的还有这个,直接从图片傍边进行辨认,可是这儿时单张图片。
def detect_from_image(self,image):
"""
直接辨认一张图片傍边的人脸,这个开销是最小的,当然这个精确度嘛,没有直接读取视频好一点
由于那个的话确认了好几帧的状况,这个的话仅仅单张图画的。回来的是一个图画的人名列表
可是实际上的话,我们其实送入的图画其实只会有一个人头像,多方针检测,我们也是把一张图画
对多个方针进行截取,然后进行辨认,由于需求确认每个人物的序。
:param image:
:param show:
:return:
"""
self.image = image
# self.image = cv.imread('.test_1.jpg')
res = self.face_detecting()
names = []
if res is not None:
face, self.all_face_location = res
max_it = self.face_num if self.face_num < len(res) else len(res)
for i in range(max_it):
[left, right, top, bottom] = self.all_face_location[i]
self.middle_point = [(left + right) / 2, (top + bottom) / 2]
self.face_img = self.image[top:bottom, left:right]
cv.rectangle(self.image, (left, top), (right, bottom), (0, 0, 255))
shape = FACE_CONFIG.get("predictor")(self.image, face[i])
the_features_from_image = list(
FACE_CONFIG.get("recognition_model").compute_face_descriptor(self.image, shape))
e_distance = []
for features in self.all_features:
e_distance.append(self.calculate_EuclideanDistance(the_features_from_image,
features))
if(min(e_distance)<FACE_CONFIG.get("recognition_threshold")):
max_index = int(np.argmin(e_distance))
names.append(self.all_name[max_index])
return names
方针行为检测模块
之后是我们的方针检测模块。那么这儿的话,我们首要看到这个模块: 这个模块便是一个完结模块。
相同的这儿有两个办法。一个是原先的非实时在线检测,还有一个便是我们自己完结的在线检测。
非在线实时检测
这个的话便是一开端给到的办法,这个办法适合对现已录下来了的视频进行操作,对这个视频进行处理,最终得到一个成果。
这儿我进行过测验,我的设备是:
戴尔游匣G5 GTX 1650 4GB
对于一个14秒的视频,处理结束之后大约大约3,40秒左右。 所以这个几乎不能做在线的检测,也便是打开摄像头,你需求实时监控摄像头的画面的那种状况。
def detect_form_video(self,CameraName,show=False):
"""
这儿的代码履行流程是这姿态的:
1. 读取到视频傍边一切的图画
2. 先经过yolo算法,获取到多个方针
3. 经过deep_sort对这些方针进行跟踪
4. 经过slowfast辨认出对应的方针动作
5. 调用visualize_yolopreds处理出辨认出来的成果,一起绘制图画
6. 将处理结束之后的视频,再次进行合并为一个视频,并输出到指定文件中
这儿有个毛病便是,需求在处理完一个batch之后,你才干看到这个视频,而且每次都有卡顿
这儿完结的办法有延迟.
:param CameraName:
:return:
"""
# self.model = torch.hub.load('path/to/yolov5', 'custom', path='path/to/best.pt', source='local')
if DECTION_CONFIG.yolo_classes:
self.model.classes = DECTION_CONFIG.yolo_classes
# 读取视频
video_path = DECTION_CONFIG.input_date
# pytorch提供的动作辨认器
video = pytorchvideo.data.encoded_video.EncodedVideo.from_path(video_path)
img_path= DECTION_CONFIG.streamTempBaseChannel+"/"+CameraName+"/01"
self.clean_folder(img_path)
os.makedirs(img_path,exist_ok=True)
print("extracting video...")
# 对视频进行切分为图片
self.extract_video(video_path,img_path)
imgnames=natsort.natsorted(os.listdir(img_path))
save_path=DECTION_CONFIG.streamTempBaseChannel+"/"+CameraName+"/02"
self.clean_folder(save_path)
os.makedirs(save_path,exist_ok=True)
process_batch_size = DECTION_CONFIG.solowfast_process_batch_size # 10 ~ 30 should be fine, the bigger, the faster
video_clip_length = DECTION_CONFIG.solowfast_video_clip_length # set 0.8 or 1 or 1.2
frames_per_second = DECTION_CONFIG.solowfast_frames_per_second # usually set 25 or 30
print("processing...")
a=time.time()
for i in range(0,len(imgnames),process_batch_size):
imgs=[os.path.join(img_path,name) for name in imgnames[i:i+process_batch_size]]
yolo_preds=self.model(imgs, size=self.imsize)
mid=(i+process_batch_size/2)/frames_per_second
video_clips=video.get_clip(mid - video_clip_length/2, mid + video_clip_length/2 - 0.04)
video_clips=video_clips['video']
if video_clips is None:
continue
print(i/frames_per_second,video_clips.shape,len(imgs))
deepsort_outputs=[]
for i in range(len(yolo_preds.pred)):
temp=self.deepsort_update(self.deepsort_tracker,yolo_preds.pred[i].cpu(),yolo_preds.xywh[i][:,0:4].cpu(),yolo_preds.ims[i])
if len(temp)==0:
temp=np.ones((0,8))
deepsort_outputs.append(temp.astype(np.float32))
yolo_preds.pred=deepsort_outputs
id_to_ava_labels={}
if yolo_preds.pred[len(imgs)//2].shape[0]:
inputs,inp_boxes,_= self.ava_inference_transform(video_clips,yolo_preds.pred[len(imgs)//2][:,0:4],crop_size=self.imsize)
inp_boxes = torch.cat([torch.zeros(inp_boxes.shape[0],1), inp_boxes], dim=1)
if isinstance(inputs, list):
inputs = [inp.unsqueeze(0).to(self.device) for inp in inputs]
else:
inputs = inputs.unsqueeze(0).to(self.device)
with torch.no_grad():
slowfaster_preds = self.video_model(inputs, inp_boxes.to(self.device))
slowfaster_preds = slowfaster_preds.cpu()
for tid,avalabel in zip(yolo_preds.pred[len(imgs)//2][:,5].tolist(),np.argmax(slowfaster_preds,axis=1).tolist()):
id_to_ava_labels[tid]=self.ava_labelnames[avalabel+1]
self.visualize_yolopreds(yolo_preds,id_to_ava_labels,self.coco_color_map,save_path,show)
print("total cost: {:.3f}s, video clips length: {}s".format(time.time()-a,len(imgnames)/frames_per_second))
vide_save_path = DECTION_CONFIG.output+CameraName
if(not os.path.exists(vide_save_path)):
os.makedirs(vide_save_path)
vide_save_path = vide_save_path+"/"+CameraName+".mp4"
img_list=natsort.natsorted(os.listdir(save_path))
im=cv2.imread(os.path.join(save_path,img_list[0]))
height, width = im.shape[0], im.shape[1]
video = cv2.VideoWriter(vide_save_path,cv2.VideoWriter_fourcc(*'mp4v'), 25, (width,height))
for im_name in img_list:
img = cv2.imread(os.path.join(save_path,im_name))
video.write(img)
video.release()
self.clean_folder(img_path)
self.clean_folder(save_path)
print('saved video to:', vide_save_path)
在线实时检测
之后便是这个在线的检测。这个在线的意思不是说,处理速度十分快,这个毕竟硬件就摆在那里不行能有加速的。所以的话,我们仅有可以做的便是不遗漏,那么要想完结这个就只能做异步处理了。那么我在这儿做了一个简略的消费者生产者模型,来完结这个功用。
这儿也便是两个行列。 这样的话就可以完结这个在线实时的功用。
def readTimeCamera(self,video_path,CameraName):
"""
读取摄像头,而且按照帧数读取,而且把这读取的图画
先合成视频,由于这边会用到这个pytorchvideo优化
:param CameraName:
:return:
"""
def readCamera(video_path,CameraName):
camera = cv2.VideoCapture(video_path)
wait_time = int((1 / DECTION_CONFIG.realTimeFps) * 1000)
count = 0
imgs = []
split_count = 0
try:
while camera.isOpened() and self.going:
success,img = camera.read()
imgs.append(img)
count+=1
if(count==DECTION_CONFIG.realTimeFps*DECTION_CONFIG.tempStream):
# 此时读取了长度为tempStream秒的视频
vide_save_path = DECTION_CONFIG.output + CameraName
if (not os.path.exists(vide_save_path)):
os.makedirs(vide_save_path)
split_count+=1
# 这儿达到行列长度之后进行重复覆盖
if(split_count==DECTION_CONFIG.max_queue_size):
split_count = 1
vide_save_path = vide_save_path + "/" + CameraName +"-"+str(split_count)+ ".mp4"
height = len(img)
width = len(img[0])
video = cv2.VideoWriter(vide_save_path, cv2.VideoWriter_fourcc(*'mp4v'),
DECTION_CONFIG.realTimeFps, (width, height))
for img in imgs:
video.write(img)
video.release()
imgs = []
count=0
self.exitTempStreamVideo = True
# 假如没有耗费掉,这儿会进行一个等候,堵塞
self.read_camera_videos.put(vide_save_path)
key = cv2.waitKey(wait_time)
if key == ord('q'):
self.going = False
return
except Exception as e:
self.going = False
print(e)
return
finally:
camera.release()
print("摄像头停止并开释")
return
"""
敞开多线程进行调用
"""
t = threading.Thread(target=readCamera,args=(video_path,CameraName,))
t.start()
# readCamera(video_path,CameraName)
def readTimeProcessing(self,CameraName):
"""
在这儿完结算法处理
:return:
"""
def processing(CameraName):
while(1):
if(not self.going and self.read_camera_videos.empty()):
print("---核算结束---")
self.finish_process = True
return
# 加载辨认类型
if DECTION_CONFIG.yolo_classes:
self.model.classes = DECTION_CONFIG.yolo_classes
# 加载途径视频
if(self.read_camera_videos.empty() and not self.exitTempStreamVideo):
continue
vide_path = self.read_camera_videos.get()
# pytorch提供的动作辨认器,便是由于这个所以我们还需求存储一下视频文件
# 虽然会耗费IO的时刻,可是这玩意有对视频的优化
video = pytorchvideo.data.encoded_video.EncodedVideo.from_path(vide_path)
# 这儿仍是和从前相同
img_path = DECTION_CONFIG.streamTempBaseChannel + "/" + CameraName + "/01"
self.clean_folder(img_path)
os.makedirs(img_path, exist_ok=True)
# 对视频进行切分为图片
self.extract_video(vide_path, img_path)
imgnames = natsort.natsorted(os.listdir(img_path))
process_batch_size = DECTION_CONFIG.realTimeFps # 10 ~ 30 should be fine, the bigger, the faster
video_clip_length = DECTION_CONFIG.solowfast_video_clip_length # set 0.8 or 1 or 1.2
frames_per_second = DECTION_CONFIG.solowfast_frames_per_second # usually set 25 or 30
for i in range(0, len(imgnames), process_batch_size):
imgs = [os.path.join(img_path, name) for name in imgnames[i:i + process_batch_size]]
yolo_preds = self.model(imgs, size=self.imsize)
mid = (i + process_batch_size / 2) / frames_per_second
video_clips = video.get_clip(mid - video_clip_length / 2, mid + video_clip_length / 2 - 0.04)
video_clips = video_clips['video']
if video_clips is None:
continue
# print("*"*100)
# print(i / frames_per_second, video_clips.shape, len(imgs))
deepsort_outputs = []
for i in range(len(yolo_preds.pred)):
temp = self.deepsort_update(self.deepsort_tracker, yolo_preds.pred[i].cpu(),
yolo_preds.xywh[i][:, 0:4].cpu(), yolo_preds.ims[i])
if len(temp) == 0:
temp = np.ones((0, 8))
deepsort_outputs.append(temp.astype(np.float32))
yolo_preds.pred = deepsort_outputs
id_to_ava_labels = {}
if yolo_preds.pred[len(imgs) // 2].shape[0]:
inputs, inp_boxes, _ = self.ava_inference_transform(video_clips,
yolo_preds.pred[len(imgs) // 2][:, 0:4],
crop_size=self.imsize)
inp_boxes = torch.cat([torch.zeros(inp_boxes.shape[0], 1), inp_boxes], dim=1)
if isinstance(inputs, list):
inputs = [inp.unsqueeze(0).to(self.device) for inp in inputs]
else:
inputs = inputs.unsqueeze(0).to(self.device)
with torch.no_grad():
slowfaster_preds = self.video_model(inputs, inp_boxes.to(self.device))
slowfaster_preds = slowfaster_preds.cpu()
for tid, avalabel in zip(yolo_preds.pred[len(imgs) // 2][:, 5].tolist(),
np.argmax(slowfaster_preds, axis=1).tolist()):
id_to_ava_labels[tid] = self.ava_labelnames[avalabel + 1]
"""
在这儿,我们把这个成果放在行列傍边
"""
print("-" * 100)
print("已完结当时视频核算,剩下:",self.read_camera_videos.qsize())
print("-" * 100)
self.read_process_data.put((yolo_preds, id_to_ava_labels))
self.exitTempStreamVideo = False
"""
敞开多线程进行调用
"""
t = threading.Thread(target=processing, args=(CameraName,))
t.start()
def readTime_visualize_yolopreds(self,CameraName,show=False,process=None):
"""
显示yolo猜测的成果,绘制图画,我们在这儿进行逻辑处理
在这儿假如需求进行逻辑处理的话,在这儿传入一个process办法
这个办法需求接收三个参数,第一个参数是,当时的动作,另一个是当时目标的bounding box 现已image目标
:param yolo_preds:
:param id_to_ava_labels:
:param color_map:
:param save_folder:
:return:
"""
def visulize_readTime(CameraName,show=False,process=None):
while(1):
if(self.finish_process and self.read_process_data.empty()):
print("---核算处理结束---")
break
data = self.read_process_data.get()
print("*"*100)
print("正在处理实时成果,剩下:",self.read_process_data.qsize())
print("*" * 100)
yolo_preds = data[0]
id_to_ava_labels = data[1]
for i, (im, pred) in enumerate(zip(yolo_preds.ims, yolo_preds.pred)):
im=cv2.cvtColor(im,cv2.COLOR_BGR2RGB)
if pred.shape[0]:
for j, (*box, cls, trackid, vx, vy) in enumerate(pred):
if int(cls) != 0:
ava_label = ''
elif trackid in id_to_ava_labels.keys():
ava_label = id_to_ava_labels[trackid].split(' ')[0]
"""
在这儿得到动作信息和对应的bounding box
而且处理一些逻辑
"""
if(process!=None):
process(im,ava_label,box)
else:
ava_label = 'Unknow'
text = '{} {} {}'.format(int(trackid),yolo_preds.names[int(cls)],ava_label)
color = self.coco_color_map[int(cls)]
im = self.plot_one_box(box,im,color,text)
if(show):
cv2.namedWindow('camera', 0)
cv2.imshow('camera',im)
cv2.waitKey(1)
folder_name_temp = DECTION_CONFIG.output + CameraName
if (os.path.exists(folder_name_temp)):
shutil.rmtree(folder_name_temp)
print("!"*100)
print("视频处理结束!")
print("!" * 100)
t = threading.Thread(target=visulize_readTime, args=(CameraName,show,process,))
t.start()
def detect_from_video_realTime(self,camera,CameraName,show=False,process=None):
"""
这儿需求传入一个camera目标,然后完结实时读取
:param camera:
:param CameraName:
:param show:
:param save_path:
:return:
"""
# 实时读取摄像头
self.readTimeCamera(camera,CameraName)
# 进行处理运算
self.readTimeProcessing(CameraName)
# 对猜测成果进行处理
self.readTime_visualize_yolopreds(CameraName,show,process)
结合人脸辨认
之后的话,我们就可以结合人脸辨认了。 其实细心看一下这个上面的这个代码:
def readTime_visualize_yolopreds(self,CameraName,show=False,process=None):
"""
显示yolo猜测的成果,绘制图画,我们在这儿进行逻辑处理
在这儿假如需求进行逻辑处理的话,在这儿传入一个process办法
这个办法需求接收三个参数,第一个参数是,当时的动作,另一个是当时目标的bounding box 现已image目标
:param yolo_preds:
:param id_to_ava_labels:
:param color_map:
:param save_folder:
:return:
"""
def visulize_readTime(CameraName,show=False,process=None):
while(1):
if(self.finish_process and self.read_process_data.empty()):
print("---核算处理结束---")
break
data = self.read_process_data.get()
print("*"*100)
print("正在处理实时成果,剩下:",self.read_process_data.qsize())
print("*" * 100)
yolo_preds = data[0]
id_to_ava_labels = data[1]
for i, (im, pred) in enumerate(zip(yolo_preds.ims, yolo_preds.pred)):
im=cv2.cvtColor(im,cv2.COLOR_BGR2RGB)
if pred.shape[0]:
for j, (*box, cls, trackid, vx, vy) in enumerate(pred):
if int(cls) != 0:
ava_label = ''
elif trackid in id_to_ava_labels.keys():
ava_label = id_to_ava_labels[trackid].split(' ')[0]
"""
在这儿得到动作信息和对应的bounding box
而且处理一些逻辑
"""
if(process!=None):
process(im,ava_label,box)
else:
ava_label = 'Unknow'
text = '{} {} {}'.format(int(trackid),yolo_preds.names[int(cls)],ava_label)
color = self.coco_color_map[int(cls)]
im = self.plot_one_box(box,im,color,text)
if(show):
cv2.namedWindow('camera', 0)
cv2.imshow('camera',im)
cv2.waitKey(1)
folder_name_temp = DECTION_CONFIG.output + CameraName
if (os.path.exists(folder_name_temp)):
shutil.rmtree(folder_name_temp)
print("!"*100)
print("视频处理结束!")
print("!" * 100)
这儿有一个process,其实便是你写你的处理函数的当地。
def detect_pose():
pose = PoseRec()
source_path = 0
# pose.detect_form_video("Test01",True)
pose.detect_from_video_realTime(source_path,"Test02",True,process=None)
你直接调用我们人脸辨认模块的辨认代码,然后放在process里边就好了。
总结
以上便是我们悉数的内容,其实给到这儿就可以做很多工作了。