-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathopencv_stream.py
More file actions
93 lines (70 loc) · 2.65 KB
/
opencv_stream.py
File metadata and controls
93 lines (70 loc) · 2.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import cv2
import asyncio
import socketio
import base64
import numpy as np
async def main():
# create socket.io connection
sio = socketio.Client()
connected = True
@sio.event
def connect():
print("CONNECTED")
connected = True
cap = cv2.VideoCapture(0)
# Shapes video for fisheye fix adjustment
codec = cv2.VideoWriter_fourcc('M', 'J', 'P', 'G')
cap.set(6, codec)
cap.set(5, 30)
cap.set(3, 1920)
cap.set(4, 1080)
# Camera, fixing fisheye
cameraMatrix = np.genfromtxt("./camera_matrix.txt")
dist = np.genfromtxt("./distortion.txt")
ret, frame = cap.read()
h, w = frame.shape[:2]
newCameraMatrix, roi = cv2. getOptimalNewCameraMatrix(cameraMatrix, dist, (w, h), 1, (w, h))
while connected:
ret, frame = cap.read()
# undistorts the fisheye frame
frame = cv2.undistort(frame, cameraMatrix, dist, None, newCameraMatrix)
x, y, w, h = roi
frame = frame[y:y+h, x:x+w]
# Resize the frame
new_width = 640 # New width
new_height = 480 # New height
frame = cv2.resize(frame, (new_width, new_height))
cv2.imshow('frame', frame)
# Encode the frame with a specified JPEG compression quality (e.g., 50)
jpeg_quality = 50 # Adjust this value as needed
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), jpeg_quality]
retval, buffer = cv2.imencode('.jpg', frame, encode_param)
jpg_as_text = base64.b64encode(buffer)
sio.emit("imageSend", {"data": jpg_as_text.decode('utf-8')})
cv2.imshow('frame', frame)
# 30 ms delay
key = cv2.waitKey(30) & 0xFF
# press q to quit
if cv2.waitKey(1) & 0xFF == ord('q'):
break
@sio.event
def disconnect():
# destroy windows
connected = False
cv2.destroyAllWindows()
# actually connect now
try:
print("connecting to socket.io server")
socketServerURL = "https://ea1b-76-78-137-157.ngrok-free.app/"
sio.connect(socketServerURL, wait_timeout=10)
except Exception as e:
print(e)
while (True):
if not sio.connected:
print("Disconnected. Attempting to reconnect...")
try:
sio.connect(socketServerURL, wait_timeout=10)
except Exception as e:
print(e)
if __name__ == "__main__":
asyncio.run(main())