-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathswag_preprocess.py
More file actions
157 lines (132 loc) · 7.35 KB
/
swag_preprocess.py
File metadata and controls
157 lines (132 loc) · 7.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import cv2
import random
import numpy as np
from tqdm import tqdm
from swag_logger import Logger
from swag_model import SwagModel
from swag_speed_detector import SpeedCalculator
from swag_tracker import SwagTracker
from swag_utils import calculate_centroids
from swag_visualizer import SwagVisualizer
from swag_yolo import YOLO
from sklearn.cluster import DBSCAN
class SWAGPreprocessor:
def __init__(self, input_path, configPath, weightPath, metaPath):
# initializing video
self.cap = cv2.VideoCapture(input_path)
self.totalFrames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
self.fps = int(self.cap.get(cv2.CAP_PROP_FPS))
self.total_sec = self.totalFrames * self.fps
self.width = int(self.cap.get(3))
self.height = int(self.cap.get(4))
# initialize mock logger
self.logger = Logger("preprocess", None)
# initializing yolo
self.yolo = YOLO(self.logger, configPath, weightPath, metaPath, self.width, self.height)
# initializing tracker
self.tracker = SwagTracker(self.logger)
self.tracker.min_visibility = 1
self.tracker.max_disappeared = 1
# initializing visualizer
self.visualizer = SwagVisualizer(self.logger)
# initialize model
SwagModel.centroid_history = 99999 # just carefull, but need to remember to occurence place
# preprocessor config
self.prep_length_in_sec = min(30, self.total_sec) # TODO set min up to 30 for long videos -> more accurate prep
self.prep_speed_factor = 2 # dont!
self.prep_scene_count = 3 # check 3 random scenario
self.prep_scene_lenght = self.prep_length_in_sec / self.prep_scene_count
self.objects = [[] for i in range(self.prep_scene_count)]
self.start_frames = [i * int(self.totalFrames / self.prep_scene_count) for i in range(self.prep_scene_count)]
self.frame_per_scene = int(self.fps / self.prep_speed_factor) * self.prep_length_in_sec
_, self.test_img = self.cap.read()
self.selected_cars = {}
def select_car_callback(self, event,x,y,flags,param):
if event == cv2.EVENT_LBUTTONDBLCLK:
cv2.waitKey(0)
for o in list(self.tracker.current_objects.values()):
if o.box[0] < x < o.box[2] and o.box[1] < y < o.box[3]:
selected_car = o
print("Type the real word distances in meter for the selected car and press enter to continue")
width = input("Width:")
height = input("Height:")
if selected_car not in self.selected_cars.keys():
self.selected_cars[selected_car] = []
self.selected_cars[selected_car].append({
"frame": 0,
"centroid": selected_car.centroids,
"box": selected_car.box,
"width": width,
"height": height
})
print("Car selected: {}".format(o.objectId))
return selected_car
def bounding_box(self, points):
x_coordinates, y_coordinates = zip(*points)
return [(min(x_coordinates), min(y_coordinates)), (max(x_coordinates), max(y_coordinates))]
def process(self):
# crate a pixel-meter scale map
disappear_centers = []
appear_centers = []
move_vectors = []
for scene in self.objects:
prev_objects = []
for frame in scene:
[move_vectors.append((0, 0) if x.move_vector[0]+x.move_vector[1] < 2 else (np.sign(x.move_vector[0]),np.sign(x.move_vector[1]))) for x in list(set(frame).intersection(prev_objects)) if x.move_vector is not None]
[appear_centers.append(x.centroid_history[-1] if len(x.centroid_history) > 1 else x.centroids) for x in np.setdiff1d(frame,prev_objects)]
[disappear_centers.append(x.centroids) for x in np.setdiff1d(prev_objects,frame)]
prev_objects = frame
disappear_clusters = DBSCAN(eps=min(self.width, self.height)*0.1, min_samples=int(len(disappear_centers) / 9)+1).fit_predict(np.array(disappear_centers))
appear_clusters = DBSCAN(eps=min(self.width, self.height) * 0.1,
min_samples=int(len(appear_centers) / 9)+1).fit_predict(np.array(appear_centers))
vector_clusters = DBSCAN(eps=0.01,min_samples=int(len(move_vectors) / 9)+1).fit_predict(np.array(disappear_centers))
orginized_vectors = [[] for i in range(len(np.unique(vector_clusters)))]
for vec, clust in zip(move_vectors, vector_clusters):
orginized_vectors[clust] = vec
# visualize
disappear_colors = [(random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)) for x in range(len(np.unique(disappear_clusters)))]
appear_colors = [(random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)) for x in range(len(np.unique(appear_clusters)))]
disappear_boxes = [(self.bounding_box([x for x, c in zip(disappear_centers, disappear_clusters) if c == i]), i) for i in np.unique(disappear_clusters)]
for (d, c) in zip(disappear_centers, disappear_clusters):
self.test_img = cv2.circle(self.test_img, (d[0], d[1]), 4, disappear_colors[c], -1)
for b in disappear_boxes:
b = b[0]
self.test_img = cv2.rectangle(self.test_img, (b[0][0], b[0][1]), (b[1][0], b[1][1]), (255, 0, 0), 7)
cv2.imshow("disappear", self.test_img)
cv2.waitKey(0)
for (d, c) in zip(appear_centers, appear_clusters):
self.test_img = cv2.circle(self.test_img, (d[0], d[1]), 4, appear_colors[c], -1)
cv2.imshow("appear", self.test_img)
cv2.waitKey(0)
def run(self):
for k in self.start_frames:
self.cap.set(cv2.CAP_PROP_POS_FRAMES, k)
for i in tqdm(range(k, k+self.frame_per_scene)):
ret, frame = self.cap.read()
if i % self.prep_speed_factor != 0:
continue
curr_time = i / self.fps
result = self.yolo.detect(frame)
objects = self.tracker.update(result,curr_time, i)
self.objects[self.start_frames.index(k)].append(list(objects.values()))
normal_objects = [o for o in list(objects.values()) if o not in self.selected_cars]
selected_objects = [o for o in list(objects.values()) if o in self.selected_cars]
frame = self.visualizer.draw_swag_objects(normal_objects, frame, self.tracker.counter, self.tracker.current_objects_counter)
for o in selected_objects:
frame = self.visualizer.draw_swag_object(o, frame, default_color=(0, 255, 255))
cv2.imshow("Frame", frame)
cv2.setMouseCallback('Frame', self.select_car_callback, i + k)
key = cv2.waitKey(1) & 0xFF
if key == ord("q"):
break
self.cap.release()
cv2.destroyAllWindows()
if __name__ == '__main__':
input_path = "test_data/Road traffic video for object recognition.mp4"
configPath = "cfg/yolov4.cfg"
weightPath = "bin/yolov4.weights"
metaPath = "data/coco.data"
output = "result3.avi"
swag = SWAGPreprocessor(input_path, configPath, weightPath, metaPath)
swag.run()
swag.process()