-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathhw.py
More file actions
286 lines (236 loc) · 10.7 KB
/
hw.py
File metadata and controls
286 lines (236 loc) · 10.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
import logging
import threading
import typing
from collections.abc import Sequence
from datetime import datetime
from pathlib import Path
from time import sleep
import cv2
import numpy as np
from rcs._core.common import BaseCameraConfig
from rcs.camera.interface import BaseCameraSet, Frame, FrameSet
from rcs.utils import SimpleFrameRate
class HardwareCamera(typing.Protocol):
"""Implementation of a hardware camera potentially a set of cameras of the same kind."""
def open(self):
"""Should open the camera and prepare it for polling."""
def close(self):
"""Should close the camera and release all resources."""
def config(self, camera_name: str) -> BaseCameraConfig:
"""Should return the configuration object of the cameras."""
def poll_frame(self, camera_name: str) -> Frame:
"""Should return the latest frame from the camera with the given name.
This method should be thread safe.
"""
@property
def camera_names(self) -> list[str]:
"""Returns the names of the cameras in this set."""
def calibrate(self) -> bool:
"""Returns camera intrinsics"""
N = typing.TypeVar("N", bound=int)
H = typing.TypeVar("H", bound=int)
W = typing.TypeVar("W", bound=int)
class CalibrationStrategy(typing.Protocol):
"""Implementation the hardware dependend calibration strategy."""
def calibrate(
self,
samples: list[Frame],
intrinsics: np.ndarray[tuple[typing.Literal[3], typing.Literal[4]], np.dtype[np.float64]],
lock: threading.Lock,
) -> bool:
"""Implements algorithm to calibrate the camera.
Args:
samples: List of frames to use for calibration.
intrinsics: Intrinsic camera parameters, e.g. from a previous calibration.
lock: A lock to ensure thread safety during calibration as the samples might refresh in parallel.
Returns:
bool: True if calibration was successful, False otherwise.
"""
def get_extrinsics(self) -> np.ndarray[tuple[typing.Literal[4], typing.Literal[4]], np.dtype[np.float64]] | None:
"""
Returns the calibrated extrinsic, can also be cached. If not calibrated then it returns None.
It is urged to perform efficient caching for this method as it is called in each step.
"""
class DummyCalibrationStrategy(CalibrationStrategy):
"""Always returns identity extrinsics."""
def calibrate(
self,
samples: list[Frame],
intrinsics: np.ndarray[tuple[typing.Literal[3], typing.Literal[4]], np.dtype[np.float64]],
lock: threading.Lock,
) -> bool:
return True
def get_extrinsics(self) -> np.ndarray[tuple[typing.Literal[4], typing.Literal[4]], np.dtype[np.float64]] | None:
return np.eye(4) # type: ignore[return-value]
class HardwareCameraSet(BaseCameraSet):
"""This base class polls in a separate thread for all cameras and stores them in a buffer.
Cameras can consist of multiple cameras, e.g. RealSense cameras.
"""
def __init__(
self, cameras: Sequence[HardwareCamera], warm_up_disposal_frames: int = 30, max_buffer_frames: int = 1000
):
self.cameras = cameras
self.camera_dict, self._camera_names = self._cameras_util()
self.frame_rate = self._frames_rate()
self.rate_limiter = SimpleFrameRate(self.frame_rate)
self.warm_up_disposal_frames = warm_up_disposal_frames
self.max_buffer_frames = max_buffer_frames
self._buffer: list[FrameSet | None] = [None for _ in range(self.max_buffer_frames)]
self._buffer_lock = threading.Lock()
self.running = False
self._thread: threading.Thread | None = None
self._logger = logging.getLogger(__name__)
self._next_ring_index = 0
self._buffer_len = 0
self.writer: dict[str, cv2.VideoWriter] = {}
@property
def camera_names(self) -> list[str]:
"""Returns the names of the cameras in this set."""
return self._camera_names
@property
def name_to_identifier(self) -> dict[str, str]:
"""Returns a dictionary mapping the camera names to their identifiers."""
name_to_id: dict[str, str] = {}
for camera in self.cameras:
for name in camera.camera_names:
name_to_id[name] = camera.config(name).identifier
return name_to_id
def _frames_rate(self) -> int:
"""Checks if all cameras have the same frame rate."""
frame_rates = {camera.config(name).frame_rate for camera in self.cameras for name in camera.camera_names}
if len(frame_rates) > 1:
msg = "All cameras must have the same frame rate. Different frame rates are not supported."
raise ValueError(msg)
if len(frame_rates) == 0:
self._logger.warning("No camera found, empty polling with 1 fps.")
return 1
return next(iter(frame_rates))
def _cameras_util(self) -> tuple[dict[str, HardwareCamera], list[str]]:
"""Utility function to create a dictionary of cameras and a list of camera names."""
camera_dict: dict[str, HardwareCamera] = {}
camera_names: list[str] = []
for camera in self.cameras:
camera_names.extend(camera.camera_names)
for name in camera.camera_names:
assert name not in camera_dict, f"Camera name {name} not unique."
camera_dict[name] = camera
return camera_dict, camera_names
def buffer_size(self) -> int:
return len(self._buffer) - self._buffer.count(None)
def wait_for_frames(self, timeout: float = 10.0):
while self.buffer_size() == 0:
sleep(0.1)
timeout -= 0.1
if timeout < 0:
self._logger.error("Timeout waiting for frames")
raise
def get_latest_frames(self) -> FrameSet | None:
"""Should return the latest frame from the camera with the given name."""
with self._buffer_lock:
return self._buffer[self._next_ring_index - 1] if self._buffer_len > 0 else None
def get_timestamp_frames(self, ts: datetime) -> FrameSet | None:
"""Should return the frame from the camera with the given name and closest to the given timestamp."""
# iterate through the buffer and find the closest timestamp
with self._buffer_lock:
for i in range(self._buffer_len):
idx = (self._next_ring_index - i - 1) % self.max_buffer_frames # iterate backwards
assert self._buffer[idx] is not None
item: FrameSet = typing.cast(FrameSet, self._buffer[idx])
assert item.avg_timestamp is not None
if item.avg_timestamp <= ts.timestamp():
return self._buffer[idx]
return None
def stop(self):
"""Stops the polling of the cameras."""
self.running = False
assert self._thread is not None
self._thread.join()
self._thread = None
def close(self):
if self.running and self._thread is not None:
self.stop()
for camera in self.cameras:
camera.close()
self.stop_video()
def start(self, warm_up: bool = True):
"""Should start the polling of the cameras."""
if self.running:
self._logger.warning("Camera thread already running!")
return
self.running = True
self._thread = threading.Thread(target=self.polling_thread, args=(warm_up,))
self._thread.start()
def record_video(self, path: Path, str_id: str):
if self.recording_ongoing():
return
for camera in self.camera_names:
self.writer[camera] = cv2.VideoWriter(
str(path / f"episode_{str_id}_{camera}.mp4"),
# migh require to install ffmpeg
cv2.VideoWriter_fourcc(*"mp4v"), # type: ignore
self.frame_rate,
(self.config(camera).resolution_width, self.config(camera).resolution_height),
)
def recording_ongoing(self) -> bool:
with self._buffer_lock:
return len(self.writer) > 0
def stop_video(self):
if len(self.writer) > 0:
with self._buffer_lock:
for camera in self.camera_names:
self.writer[camera].release()
self.writer = {}
def warm_up(self):
for _ in range(self.warm_up_disposal_frames):
for camera_name in self.camera_names:
self.poll_frame(camera_name)
self.rate_limiter()
def calibrate(self) -> bool:
for camera in self.cameras:
c = camera.calibrate()
if c is None:
return False
return True
def polling_thread(self, warm_up: bool = True):
for camera in self.cameras:
camera.open()
if warm_up:
self.warm_up()
while self.running:
frame_set = self.poll_frame_set()
# buffering
with self._buffer_lock:
self._buffer[self._next_ring_index] = frame_set
self._next_ring_index = (self._next_ring_index + 1) % self.max_buffer_frames
self._buffer_len = max(self._buffer_len + 1, self.max_buffer_frames)
# video recording
for camera_key, writer in self.writer.items():
if frame_set is not None:
writer.write(frame_set.frames[camera_key].camera.color.data[:, :, ::-1])
self.rate_limiter()
def poll_frame_set(self) -> FrameSet:
"""Gather frames over all available cameras."""
frames: dict[str, Frame] = {}
for camera_name in self.camera_names:
# callback
frame = self.poll_frame(camera_name)
frames[camera_name] = frame
# filter none
timestamps: list[float] = [frame.avg_timestamp for frame in frames.values() if frame.avg_timestamp is not None]
return FrameSet(frames=frames, avg_timestamp=float(np.mean(timestamps)) if len(timestamps) > 0 else None)
def clear_buffer(self):
"""Deletes all frames from the buffer."""
with self._buffer_lock:
self._buffer = [None for _ in range(self.max_buffer_frames)]
self._next_ring_index = 0
self._buffer_len = 0
self.wait_for_frames()
def config(self, camera_name: str) -> BaseCameraConfig:
"""Returns the configuration object of the cameras."""
return self.camera_dict[camera_name].config(camera_name)
def poll_frame(self, camera_name: str) -> Frame:
return self.camera_dict[camera_name].poll_frame(camera_name)
def __enter__(self):
pass
def __exit__(self, *args, **kwargs):
self.close()