-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcamera_driver.py
More file actions
129 lines (82 loc) · 4.39 KB
/
camera_driver.py
File metadata and controls
129 lines (82 loc) · 4.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import time
import cv2
from cv2 import VideoCapture, imwrite
import os
import datetime
import io
import json
import sys
from azure.cognitiveservices.vision.computervision import ComputerVisionClient
from msrest.authentication import CognitiveServicesCredentials
subscription_key = os.getenv("VISION_KEY")
endpoint = os.getenv("VISION_ENDPOINT")
computervision_client = ComputerVisionClient(endpoint, CognitiveServicesCredentials(subscription_key))
#SUPPORT_CONFIDENCE = 0.65
SUPPORT_CONFIDENCE = 0.75
if __name__ == "__main__":
while True:
try:
sleep_tm = 0.5
cam = VideoCapture(0)
prev_gray = None
motion_dir = None
last_event_ts = None
motion_timeout = 7
SUPPORTED_OBJECTS = ["mammal", "cat", "animal", "person"]
#SUPPORTED_OBJECTS = ["cat", "person"]
while True:
event_ts = datetime.datetime.now()
result, image = cam.read()
hist = cv2.calcHist([image], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256])
hist = cv2.normalize(hist, hist).flatten()
ts_now = event_ts.strftime('%Y-%m-%d_%H-%M-%S')
d = cv2.compareHist(hist, prev_gray, cv2.HISTCMP_CORREL) if prev_gray is not None else 0
if d < 0.997:
is_success, buffer = cv2.imencode(".jpg", image)
io_buf = io.BytesIO(buffer)
vision_response = computervision_client.detect_objects_in_stream(io_buf)
print(f"\nObjects analysis [{ts_now}]:", vision_response)
supported_found = False
found_objects = []
for object in vision_response.objects:
print(object.object_property, object.confidence)
obj = {
"object_property" : object.object_property,
"confidence" : object.confidence,
}
found_objects.append(obj)
if object.object_property in SUPPORTED_OBJECTS and not supported_found and object.confidence > SUPPORT_CONFIDENCE:
supported_found = True
print(f"Found supported object [{object.object_property}]: ", obj)
info_obj = {
"event_ts" : ts_now,
"distance" : d,
"objects" : found_objects
}
with open("motions/latest_info.json", "w") as latest_info_json:
latest_info_json.write(json.dumps(info_obj))
if supported_found:
event_duration = (event_ts - last_event_ts).total_seconds() if last_event_ts else 0
if motion_dir is None or event_duration > motion_timeout:
motion_dir = f"motions/motion_{ts_now}"
info_dir = f"infos/info_{ts_now}"
print(f"Starting new event[{d}]: {motion_dir}. Time since last motion: {event_duration}")
os.makedirs(motion_dir)
os.makedirs(info_dir)
last_event_ts = event_ts
motion_path = f"{motion_dir}/event_{ts_now}.png"
imwrite(motion_path, image)
info_path = f"{info_dir}/info_{ts_now}.json"
with open(info_path, "w") as info_json:
info_json.write(json.dumps(info_obj))
imwrite(f"motions/event_current.png", image)
print(f"Motion recorded to: {motion_path}")
else:
print(f"Ignoring [{ts_now}] -- no supported objects found")
imwrite(f"motions/current_frame.png", image)
prev_gray = hist
# print(f"Sleeping for {sleep_tm} seconds. Last distance: {d}")
time.sleep(sleep_tm)
cam.release()
except:
print("ERROR: Camera read failed", sys.exc_info()[0])