前言

前段时间刷短视频看到过别人用摄像头自动化监控员工上班状态,比如标注员工是不是离开了工位,在位置上是不是摸鱼。虽然是段子,但是这个是可以用识别技术实现一下,于是我在网上找,知道发现了 SlowFast,那么下面就用 SlowFast 简单测试一下视频的行为识别。
图片

工具简介

YOLO

YOLO 是一个基于深度学习神经网络的对象识别和定位算法,前面我也用 v5s 训练了标注的扑克牌,实现了图片或视频中的点数识别,这里就跳过了。

DeepSORT

DeepSORT 是一个实现目标跟踪的算法,其使用卡尔曼滤波器预测所检测对象的运动轨迹。也就是当视频中有多个目标,算法能知道上一帧与下一帧各目标对象的匹配,从而完成平滑锁定,而不是在视频播放或记录时,检测框一闪一闪的。

SlowFast

SlowFast 是一个行为分类模型 (pytorchvideo 内置),可以通过输入视频序列和检测框信息,输出每个检测框的行为类别。所以需要借助类似 YOLO 的多目标检测模型,当然 SlowFast 也可以自行标注数据集训练,来完成自定义的行为识别。
图片

流程

  • 读取视频或者摄像头中的图片

  • 通过 yolo 检测出画面的目标

  • 通过 deep_sort 对目标进行跟踪

  • 通过 slowfast 识别出目标的动作

  • 根据识别的动作进行业务处理等
    图片

    编码

    整个流程下来,除了安装 slowfast 依赖 (pytorchvideo) 外,deep_sort 可以下载 https://github.com/wufan-tb/yolo_slowfast/tree/master/deep_sort 然后 import 到项目中。如果要实时处理摄像头的视频,可以通过采用多线程,单独开一个线程读摄像头并一秒保存一张图,再开一个线程用于处理保存的图片,最后将处理后的结果保存为视频,或者只是做一些业务操作,以下只是一个例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
import torch
import numpy as np
import os,cv2,time,torch,random,pytorchvideo,warnings,argparse,math
warnings.filterwarnings("ignore",category=UserWarning)

from pytorchvideo.transforms.functional import (
uniform_temporal_subsample,
short_side_scale_with_boxes,
clip_boxes_to_image,)
from torchvision.transforms._functional_video import normalize
from pytorchvideo.data.ava import AvaLabeledVideoFramePaths
from pytorchvideo.models.hub import slowfast_r50_detection
from deep_sort.deep_sort import DeepSort

class MyVideoCapture:

def __init__(self, source):
self.cap = cv2.VideoCapture(source)
self.idx = -1
self.end = False
self.stack = []

def read(self):
self.idx += 1
ret, img = self.cap.read()
if ret:
self.stack.append(img)
else:
self.end = True
return ret, img

def to_tensor(self, img):
img = torch.from_numpy(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
return img.unsqueeze(0)

def get_video_clip(self):
assert len(self.stack) > 0, "clip length must large than 0 !"
self.stack = [self.to_tensor(img) for img in self.stack]
clip = torch.cat(self.stack).permute(-1, 0, 1, 2)
del self.stack
self.stack = []
return clip

def release(self):
self.cap.release()

def tensor_to_numpy(tensor):
img = tensor.cpu().numpy().transpose((1, 2, 0))
return img

def ava_inference_transform(
clip,
boxes,
num_frames = 32, #if using slowfast_r50_detection, change this to 32, 4 for slow
crop_size = 640,
data_mean = [0.45, 0.45, 0.45],
data_std = [0.225, 0.225, 0.225],
slow_fast_alpha = 4, #if using slowfast_r50_detection, change this to 4, None for slow
):
boxes = np.array(boxes)
roi_boxes = boxes.copy()
clip = uniform_temporal_subsample(clip, num_frames)
clip = clip.float()
clip = clip / 255.0
height, width = clip.shape[2], clip.shape[3]
boxes = clip_boxes_to_image(boxes, height, width)
clip, boxes = short_side_scale_with_boxes(clip,size=crop_size,boxes=boxes,)
clip = normalize(clip,
np.array(data_mean, dtype=np.float32),
np.array(data_std, dtype=np.float32),)
boxes = clip_boxes_to_image(boxes, clip.shape[2], clip.shape[3])
if slow_fast_alpha is not None:
fast_pathway = clip
slow_pathway = torch.index_select(clip,1,
torch.linspace(0, clip.shape[1] - 1, clip.shape[1] // slow_fast_alpha).long())
clip = [slow_pathway, fast_pathway]

return clip, torch.from_numpy(boxes), roi_boxes

def plot_one_box(x, img, color=[100,100,100], text_info="None",
velocity=None, thickness=1, fontsize=0.5, fontthickness=1):
c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3]))
cv2.rectangle(img, c1, c2, color, thickness, lineType=cv2.LINE_AA)
t_size = cv2.getTextSize(text_info, cv2.FONT_HERSHEY_TRIPLEX, fontsize , fontthickness+2)[0]
cv2.rectangle(img, c1, (c1[0] + int(t_size[0]), c1[1] + int(t_size[1]*1.45)), color, -1)
cv2.putText(img, text_info, (c1[0], c1[1]+t_size[1]+2),
cv2.FONT_HERSHEY_TRIPLEX, fontsize, [255,255,255], fontthickness)
return img

def deepsort_update(Tracker, pred, xywh, np_img):
outputs = Tracker.update(xywh, pred[:,4:5],pred[:,5].tolist(),cv2.cvtColor(np_img,cv2.COLOR_BGR2RGB))
return outputs

def save_yolopreds_tovideo(yolo_preds, id_to_ava_labels, color_map, output_video, vis=False):
for i, (im, pred) in enumerate(zip(yolo_preds.ims, yolo_preds.pred)):
if pred.shape[0]:
for j, (*box, cls, trackid, vx, vy) in enumerate(pred):
if int(cls) != 0:
ava_label = ''
elif trackid in id_to_ava_labels.keys():
ava_label = id_to_ava_labels[trackid].split(' ')[0]
else:
ava_label = 'Unknow'
text = '{} {} {}'.format(int(trackid),yolo_preds.names[int(cls)],ava_label)
color = color_map[int(cls)]
im = plot_one_box(box,im,color,text)
im = im.astype(np.uint8)
output_video.write(im)
if vis:
cv2.imshow("demo", im)

def main(config):
device = config.device
imsize = config.imsize

# model = torch.hub.load('D:/3code/6pytorch/opencv_demo/05_yolo_v5.6', 'yolov5s', source='local', pretrained=True).to(device)
model = torch.hub.load('ultralytics/yolov5', 'yolov5l6').to(device)
model.conf = config.conf
model.iou = config.iou
model.max_det = 100
if config.classes:
model.classes = config.classes

video_model = slowfast_r50_detection(True).eval().to(device)

deepsort_tracker = DeepSort("deep_sort/deep_sort/deep/checkpoint/ckpt.t7")
ava_labelnames,_ = AvaLabeledVideoFramePaths.read_label_map("selfutils/temp.pbtxt")
coco_color_map = [[random.randint(0, 255) for _ in range(3)] for _ in range(80)]

vide_save_path = config.output
video=cv2.VideoCapture(config.input)
width,height = int(video.get(3)),int(video.get(4))
video.release()
outputvideo = cv2.VideoWriter(vide_save_path,cv2.VideoWriter_fourcc(*'mp4v'), 25, (width,height))
print("processing...")

cap = MyVideoCapture(config.input)
id_to_ava_labels = {}
a=time.time()
while not cap.end:
ret, img = cap.read()
if not ret:
continue
yolo_preds=model([img], size=imsize)
yolo_preds.files=["img.jpg"]

deepsort_outputs=[]
for j in range(len(yolo_preds.pred)):
temp=deepsort_update(deepsort_tracker,yolo_preds.pred[j].cpu(),yolo_preds.xywh[j][:,0:4].cpu(),yolo_preds.ims[j])
if len(temp)==0:
temp=np.ones((0,8))
deepsort_outputs.append(temp.astype(np.float32))

yolo_preds.pred=deepsort_outputs

if len(cap.stack) == 25:
print(f"processing {cap.idx // 25}th second clips")
clip = cap.get_video_clip()
if yolo_preds.pred[0].shape[0]:
inputs, inp_boxes, _=ava_inference_transform(clip, yolo_preds.pred[0][:,0:4], crop_size=imsize)
inp_boxes = torch.cat([torch.zeros(inp_boxes.shape[0],1), inp_boxes], dim=1)
if isinstance(inputs, list):
inputs = [inp.unsqueeze(0).to(device) for inp in inputs]
else:
inputs = inputs.unsqueeze(0).to(device)

with torch.no_grad():
slowfaster_preds = video_model(inputs, inp_boxes.to(device))
slowfaster_preds = slowfaster_preds.cpu()
for tid,avalabel in zip(yolo_preds.pred[0][:,5].tolist(), np.argmax(slowfaster_preds, axis=1).tolist()):
id_to_ava_labels[tid] = ava_labelnames[avalabel+1]

save_yolopreds_tovideo(yolo_preds, id_to_ava_labels, coco_color_map, outputvideo, config.show)
print("total cost: {:.3f} s, video length: {} s".format(time.time()-a, cap.idx / 25))

cap.release()
outputvideo.release()
print('saved video to:', vide_save_path)


if __name__=="__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--input', type=str, default="/home/wufan/images/video/vad.mp4", help='test imgs folder or video or camera')
parser.add_argument('--output', type=str, default="output.mp4", help='folder to save result imgs, can not use input folder')
parser.add_argument('--imsize', type=int, default=640, help='inference size (pixels)')
parser.add_argument('--conf', type=float, default=0.4, help='object confidence threshold')
parser.add_argument('--iou', type=float, default=0.4, help='IOU threshold for NMS')
parser.add_argument('--device', default='cuda', help='cuda device, i.e. 0 or 0,1,2,3 or cpu')
parser.add_argument('--classes', nargs='+', type=int, help='filter by class: --class 0, or --class 0 2 3')
parser.add_argument('--show', action='store_true', help='show img')
config = parser.parse_args()

if config.input.isdigit():
print("using local camera.")
config.input = int(config.input)

print(config)
main(config)

其他

demo 中用的是网络 yolo,默认下载位置 C:\Users\Administrator/.cache\torch\hub\ultralytics_yolov5_master,而 slowfast 权重文件位置是 C:\Users\Administrator.cache\torch\hub\checkpoints\SLOWFAST_8x8_R50_DETECTION.pyth。

报错

运行执行命令,出现 AttributeError: ‘Upsample’ object has no attribute ‘recompute_scale_factor’错误

根据提示,找到 torch 下的 upsampling.py,将 return F.interpolate (input, self.size, self.scale_factor, self.mode, self.align_corners 修改为return F.interpolate(input, self.size, self.scale_factor, self.mode, self.align_corners)。