-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathinterface.py
More file actions
117 lines (85 loc) · 3.06 KB
/
interface.py
File metadata and controls
117 lines (85 loc) · 3.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import logging
from dataclasses import dataclass
from datetime import datetime
from time import sleep, time
from typing import Any, Protocol
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class SimpleFrameRate:
def __init__(self):
self.t = None
self._last_print = None
def reset(self):
self.t = None
def __call__(self, frame_rate: int | float):
if self.t is None:
self.t = time()
self._last_print = self.t
sleep(1 / frame_rate if isinstance(frame_rate, int) else frame_rate)
return
sleep_time = (
1 / frame_rate - (time() - self.t) if isinstance(frame_rate, int) else frame_rate - (time() - self.t)
)
if sleep_time > 0:
sleep(sleep_time)
if self._last_print is None or time() - self._last_print > 10:
self._last_print = time()
logger.info(f"FPS: {1 / (time() - self.t)}")
self.t = time()
class BaseCameraConfig(BaseModel):
identifier: str
class BaseCameraSetConfig(BaseModel):
cameras: dict = Field(default={})
resolution_width: int = 1280 # pixels
resolution_height: int = 720 # pixels
frame_rate: int = 15 # Hz
@property
def name_to_identifier(self):
return {key: camera.identifier for key, camera in self.cameras.items()}
@dataclass(kw_only=True)
class DataFrame:
data: Any
# timestamp in posix time
timestamp: float | None = None
@dataclass(kw_only=True)
class CameraFrame:
color: DataFrame
ir: DataFrame | None = None
depth: DataFrame | None = None
temperature: float | None = None
@dataclass(kw_only=True)
class IMUFrame:
accel: DataFrame | None = None
gyro: DataFrame | None = None
temperature: float | None = None
@dataclass(kw_only=True)
class Frame:
camera: CameraFrame
imu: IMUFrame | None = None
avg_timestamp: float | None = None
@dataclass(kw_only=True)
class FrameSet:
frames: dict[str, Frame]
avg_timestamp: float | None
class BaseCameraSet(Protocol):
"""Interface for a set of cameras for sim and hardware"""
def buffer_size(self) -> int:
"""Returns size of the internal buffer."""
def get_latest_frames(self) -> FrameSet | None:
"""Returns the latest frame from the camera with the given name."""
def get_timestamp_frames(self, ts: datetime) -> FrameSet | None:
"""Returns the frame from the camera with the given name and closest to the given timestamp."""
def clear_buffer(self):
"""Deletes all frames from the buffer."""
def close(self):
"""Stops any running threads e.g. for exitting."""
@property
def config(self) -> BaseCameraSetConfig:
"""Return the configuration object of the cameras."""
@property
def camera_names(self) -> list[str]:
"""Returns a list of the activated human readable names of the cameras."""
@property
def name_to_identifier(self) -> dict[str, str]:
"""Dict mapping from human readable name to identifier."""