-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpreprocess.py
More file actions
196 lines (148 loc) · 6.39 KB
/
preprocess.py
File metadata and controls
196 lines (148 loc) · 6.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
import json
import os
import re
import shutil
import sys
import cv2
import h5py
import numpy as np
from PIL import ExifTags
from PIL import Image as PILImage
from config_default import DefaultConfig
from utility_functions.face_utilities import *
config = DefaultConfig()
source_to_fps = {
'screen': 30,
'basler': 60,
'webcam_l': 30,
'webcam_c': 30,
'webcam_r': 30,
}
source_to_interval_ms = dict([
(source, 1e3 / fps) for source, fps in source_to_fps.items()
])
max_sequence_len = 30
assumed_frame_rate = 10
def get_frames_camera(input_path, output_path):
vidcap = cv2.VideoCapture(input_path+'.mp4')
fps = vidcap.get(cv2.CAP_PROP_FPS) # OpenCV2 version 2 used "CV_CAP_PROP_FPS"
frame_count = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = frame_count/fps
# print('fps = ' + str(fps))
# print('number of frames = ' + str(frame_count))
# print('duration (S) = ' + str(duration))
success,image = vidcap.read()
count = 0
if not os.path.exists(output_path):
os.makedirs(output_path)
while success:
if image.shape == (1080, 1920, 3) or image.shape == (1920, 1080, 3):
# converting image to 720p as most web cams are 720p
image = image_resize(image, 1280, 720)
cv2.imwrite(output_path+ '/' + str(count)+".jpg", image) # save frame as JPEG file
success,image = vidcap.read()
# print('Read a new frame: ', success)
count += 1
def get_frames_screen(input_path, output_path):
vidcap = cv2.VideoCapture(input_path+'.mp4')
fps = vidcap.get(cv2.CAP_PROP_FPS) # OpenCV2 version 2 used "CV_CAP_PROP_FPS"
frame_count = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = frame_count/fps
# print('fps = ' + str(fps))
# print('number of frames = ' + str(frame_count))
# print('duration (S) = ' + str(duration))
success,image = vidcap.read()
count = 0
if not os.path.exists(output_path):
os.makedirs(output_path)
while success:
cv2.imwrite(output_path+ '/' + str(count)+".jpg", image) # save frame as JPEG file
success,image = vidcap.read()
# print('Read a new frame: ', success)
count += 1
def save_pog(input_path, output_path):
def traverse_datasets(hdf_file):
def h5py_dataset_iterator(g, prefix=''):
for key in g.keys():
item = g[key]
path = f'{prefix}/{key}'
if isinstance(item, h5py.Dataset): # test for dataset
yield (path, item)
elif isinstance(item, h5py.Group): # test for group (go down)
yield from h5py_dataset_iterator(item, path)
for path, _ in h5py_dataset_iterator(hdf_file):
yield path
with h5py.File(input_path, 'r') as f:
# Get point of gaze coordinates
pog_dict = dict()
pog_dict['pog'] = []
pog_dict['valid'] = []
data = f.get('face_PoG_tobii').get('data')
pog = np.array(data)
pog_dict['number_of_frames'] = pog.shape[0]
for index, point in enumerate(pog):
pog_dict['pog'].append((str(point[0]), str(point[1])))
data = f.get('face_PoG_tobii').get('validity')
validity = np.array(data)
for index, isValid in enumerate(validity):
pog_dict['valid'].append((str(isValid)))
if not os.path.exists(output_path):
os.makedirs(output_path)
out_file = open(output_path+'/'+'POG.json', "w")
json.dump(pog_dict, out_file)
out_file.close()
def ROIExtractionTask(input_path, output_path):
pog_file = open(input_path+'POG.json')
camera_type = 'webcam_c'
data = json.load(pog_file)
number_of_frames = int(data['number_of_frames'])
dlibDir = input_path
# prepape output paths
facePath = preparePath(output_path + '/' + 'face')
leftEyePath = preparePath(output_path + '/' + 'leftEye')
rightEyePath = preparePath(output_path + '/' + 'rightEye')
for i in range(number_of_frames):
image_path = input_path+'/'+ camera_type + '/' + str(i) + '.jpg'
if not os.path.isfile(image_path):
logError('Warning: Could not read image file %s!' % image_path)
continue
image = PILImage.open(image_path)
if image is None:
logError('Warning: Could not read image file %s!' % image_path)
continue
img = np.array(image.convert('RGB'))
shape_np, isValid = find_face_dlib(img)
face_rect, left_eye_rect, right_eye_rect, isValid = rc_landmarksToRects(shape_np, isValid)
if not isValid:
continue
# Crop images
imFace = cropImage(img, face_rect)
imEyeL = cropImage(img, left_eye_rect)
imEyeR = cropImage(img, right_eye_rect)
# Rotation Correction FaceGrid
faceGridPath = preparePath(output_path + '/' + 'faceGrid')
imFaceGrid = generate_grid2(face_rect, img)
imFace = cv2.resize(imFace, (256, 256), cv2.INTER_AREA)
imEyeL = cv2.resize(imEyeL, (256, 256), cv2.INTER_AREA)
imEyeR = cv2.resize(imEyeR, (256, 256), cv2.INTER_AREA)
imFaceGrid = cv2.resize(imFaceGrid, (256, 256), cv2.INTER_AREA)
# Save images
PILImage.fromarray(imFace).save(facePath + '/' + '%d.jpg' % i, quality=95)
PILImage.fromarray(imEyeL).save(leftEyePath + '/' + '%d.jpg' % i, quality=95)
PILImage.fromarray(imEyeR).save(rightEyePath + '/' + '%d.jpg' % i, quality=95)
PILImage.fromarray(imFaceGrid).save(faceGridPath + '/' + '%d.jpg' % i, quality=95)
def preprocess(person_id, camera_type):
directory = './dataset/'+person_id+'/'
stimuli = [ name for name in os.listdir(directory) if os.path.isdir(os.path.join(directory, name)) ]
# print(stimuli)
for stimulus in stimuli:
input_path = './dataset/'+person_id+'/'+stimulus
output_path = './processedFrames/'+person_id+'/'+stimulus
# save point of gaze in json file
save_pog(input_path + '/' + camera_type+'.h5', output_path)
# extract frames from video
get_frames_camera(input_path + '/' + camera_type, output_path + '/' + camera_type)
get_frames_screen(input_path + '/' + 'screen.128x72', output_path + '/' + 'screen.128x72')
# region of interest extraction
ROIExtractionTask(output_path + '/', output_path)
return