-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutil.py
More file actions
316 lines (263 loc) · 10.2 KB
/
util.py
File metadata and controls
316 lines (263 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
"""
Simple util implementation for video conference
Including data capture, image compression and image overlap
Note that you can use your own implementation as well :)
"""
from io import BytesIO
import pyaudio
import cv2
import pyautogui
import numpy as np
import mss
import time
import math
from PIL import Image, ImageGrab
from config import *
from datetime import datetime
import struct
# audio setting
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 44100
CHUNK = 1024
BYTES_PER_SAMPLE = 2
def generate_wav_header(sample_rate, bits_per_sample, channels):
"""生成 WAV 格式头部"""
byte_rate = sample_rate * channels * bits_per_sample // 8
block_align = channels * bits_per_sample // 8
wav_header = struct.pack(
'<4sI4s4sIHHIIHH4sI',
b'RIFF', # ChunkID
36 + CHUNK, # ChunkSize
b'WAVE', # Format
b'fmt ', # Subchunk1ID
16, # Subchunk1Size
1, # AudioFormat (PCM)
channels, # NumChannels
sample_rate, # SampleRate
byte_rate, # ByteRate
block_align, # BlockAlign
bits_per_sample, # BitsPerSample
b'data', # Subchunk2ID
0 # Subchunk2Size
)
return wav_header
# my_screen_size = pyautogui.size()
# # 初始化摄像头
# cap = cv2.VideoCapture(0)
# cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) # 设置摄像头宽度
# cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) # 设置摄像头高度
# # 获取屏幕尺寸
# screen_width, screen_height = pyautogui.size()
# while True:
# # --- 捕获屏幕录像 ---
# screen = ImageGrab.grab() # 获取屏幕截图 (PIL Image)
# screen_np = np.array(screen) # 转换为 NumPy 数组
# screen_bgr = cv2.cvtColor(screen_np, cv2.COLOR_RGB2BGR) # 转换为 BGR 格式供 OpenCV 显示
# # --- 捕获摄像头画面 ---
# ret, camera_frame = cap.read()
# if not ret:
# print("摄像头捕获失败")
# break
# # 调整屏幕录像大小以便拼接
# screen_resized = cv2.resize(screen_bgr, (640, 480))
# # --- 拼接屏幕录像和摄像头画面 ---
# combined_frame = np.hstack((screen_resized, camera_frame)) # 水平拼接
# # 如果需要垂直拼接,使用 np.vstack()
# # --- 显示拼接画面 ---
# cv2.imshow("Screen and Camera", combined_frame)
# # 按下 'q' 键退出
# if cv2.waitKey(1) & 0xFF == ord("q"):
# break
# # 释放资源
# cap.release()
# cv2.destroyAllWindows()
def getCurrentTime():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def resize_image_to_fit_screen(image, my_screen_size):
screen_width, screen_height = my_screen_size
original_width, original_height = image.size
aspect_ratio = original_width / original_height
if screen_width / screen_height > aspect_ratio:
# resize according to height
new_height = screen_height
new_width = int(new_height * aspect_ratio)
else:
# resize according to width
new_width = screen_width
new_height = int(new_width / aspect_ratio)
# resize the image
resized_image = image.resize((new_width, new_height), Image.LANCZOS)
return resized_image
def overlay_camera_images(screen_image, camera_images):
"""
screen_image: PIL.Image
camera_images: list[PIL.Image]
"""
if screen_image is None and camera_images is None:
print("[Warn]: cannot display when screen and camera are both None")
return None
if screen_image is not None:
screen_image = resize_image_to_fit_screen(screen_image, my_screen_size)
if camera_images is not None:
# make sure same camera images
if not all(img.size == camera_images[0].size for img in camera_images):
raise ValueError("All camera images must have the same size")
screen_width, screen_height = (
my_screen_size if screen_image is None else screen_image.size
)
camera_width, camera_height = camera_images[0].size
# calculate num_cameras_per_row
num_cameras_per_row = screen_width // camera_width
# adjust camera_imgs
if len(camera_images) > num_cameras_per_row:
adjusted_camera_width = screen_width // len(camera_images)
adjusted_camera_height = (
adjusted_camera_width * camera_height
) // camera_width
camera_images = [
img.resize(
(adjusted_camera_width, adjusted_camera_height), Image.LANCZOS
)
for img in camera_images
]
camera_width, camera_height = adjusted_camera_width, adjusted_camera_height
num_cameras_per_row = len(camera_images)
# if no screen_img, create a container
if screen_image is None:
display_image = Image.fromarray(
np.zeros((camera_width, my_screen_size[1], 3), dtype=np.uint8)
)
else:
display_image = screen_image
# cover screen_img using camera_images
for i, camera_image in enumerate(camera_images):
row = i // num_cameras_per_row
col = i % num_cameras_per_row
x = col * camera_width
y = row * camera_height
display_image.paste(camera_image, (x, y))
return display_image
else:
return screen_image
def capture_screen(quality=30, width=1280, height=720, period = 2):
"""
Args:
quality (int, optional): _description_. Defaults to 30.
width (int, optional): _description_. Defaults to 1280.
height (int, optional): _description_. Defaults to 720.
Returns:
img_bytes: bytes of the captured image
"""
constant = 0.03
st = time.time()
tot = 0
screen_shots = []
while True:
with mss.mss() as sct:
monitor = sct.monitors[1]
img = sct.grab(monitor)
img_np = np.array(img)
img_np = cv2.resize(img_np, (width, height))
_, img_encode = cv2.imencode(".jpg", img_np, [int(cv2.IMWRITE_JPEG_QUALITY), quality])
img_bytes = img_encode.tobytes()
screen_shots.append(img_bytes)
tot += 1
if time.time() - st > period - constant:
size = 0
for frame in screen_shots:
size += len(frame)
print(f"Total size of {tot} frames: {size} bytes")
return screen_shots, tot, period
def bytes_to_video(screen_shots ,tot, period):
"""
Args:
screen_shots (list): list of bytes of captured images
tot (int): total number of captured images
period (float): period of capturing images
Returns:
video_bytes: bytes of the captured video
time_cost: time cost of converting bytes to video
"""
st = time.time()
fps = tot / period
fps = math.floor(fps)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
now = datetime.now()
video_name = "temp_bytes_to_video.mp4"
video = cv2.VideoWriter(video_name, fourcc, fps, (1280, 720))
for img in screen_shots:
img = cv2.imdecode(np.frombuffer(img, np.uint8), cv2.IMREAD_COLOR)
video.write(img)
video.release()
# video to bytes
with open(video_name, "rb") as f:
video_bytes = f.read()
en = time.time()
print(f"Convert time: {en - st}")
return video_bytes
def initialize_camera(camera_index=0, resolution=(1280, 720), fps=60):
cap = cv2.VideoCapture(camera_index)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, resolution[0])
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, resolution[1])
cap.set(cv2.CAP_PROP_FPS, fps)
if not cap.isOpened():
raise Exception(f"无法打开摄像头(索引 {camera_index})。请检查设备。")
return cap
def capture_camera(cap, period = 2, fps=30, quality=30):
"""
Args:
camera_index (int, optional): index of camera. Defaults to 0.
period (float, optional): period of capturing images. Defaults to 2.
resolution (tuple, optional): resolution of camera. Defaults to (1280, 720).
fps (int, optional): frames per second. Defaults to 30.
quality (int, optional): quality of captured images. Defaults to 30.
Returns:
frames (list): list of bytes of captured images
tot (int): total number of captured images
period (float): period of capturing images
"""
constant = 0.03
print(f"开始捕获摄像头帧,目标时间: {period}")
frames = []
tot = 0
time_list = []
while True:
ret, frame = cap.read()
time_list.append(time.time())
if not ret:
print(f"帧捕获失败。")
break
_, frame_encode = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), quality])
frames.append(frame_encode.tobytes())
tot += 1
if time.time() - time_list[0] > period - constant:
size = 0
for frame in frames:
size += len(frame)
print(f"Total size of {tot} frames: {size} bytes")
cap.release()
return frames, tot, time_list[0], time_list[-1]
def capture_voice():
return streamin.read(CHUNK)
def compress_image(image, format="JPEG", quality=85):
"""
compress image and output Bytes
:param image: PIL.Image, input image
:param format: str, output format ('JPEG', 'PNG', 'WEBP', ...)
:param quality: int, compress quality (0-100), 85 default
:return: bytes, compressed image data
"""
img_byte_arr = BytesIO()
image.save(img_byte_arr, format=format, quality=quality)
img_byte_arr = img_byte_arr.getvalue()
return img_byte_arr
def decompress_image(image_bytes):
"""
decompress bytes to PIL.Image
:param image_bytes: bytes, compressed data
:return: PIL.Image
"""
img_byte_arr = BytesIO(image_bytes)
image = Image.open(img_byte_arr)
return image