11import logging
22import threading
33import typing
4- from abc import ABC , abstractmethod
54from datetime import datetime
65from pathlib import Path
76from time import sleep
87
98import cv2
109import numpy as np
11- from pydantic import Field
1210from rcs .camera .interface import (
1311 BaseCameraConfig ,
14- BaseCameraSetConfig ,
12+ BaseCameraSet ,
1513 Frame ,
1614 FrameSet ,
1715 SimpleFrameRate ,
1816)
1917
2018
21- class HWCameraSetConfig (BaseCameraSetConfig ):
22- cameras : dict [str , BaseCameraConfig ] = Field (default = {})
23- warm_up_disposal_frames : int = 30 # frames
24- record_path : str = "camera_frames"
25- max_buffer_frames : int = 1000
19+ class HardwareCamera (typing .Protocol ):
20+ """Implementation of a hardware camera potentially a set of cameras of the same kind."""
2621
22+ def open (self ):
23+ """Should open the camera and prepare it for polling."""
2724
28- # TODO(juelg): refactor camera thread into their own class, to avoid a base hardware camera set class
29- # TODO(juelg): add video recording
30- class BaseHardwareCameraSet (ABC ):
31- """This base class should have the ability to poll in a separate thread for all cameras and store them in a buffer.
32- Implements BaseCameraSet
25+ def close (self ):
26+ """Should close the camera and release all resources."""
27+
28+ def config (self , camera_name : str ) -> BaseCameraConfig :
29+ """Should return the configuration object of the cameras."""
30+
31+ def poll_frame (self , camera_name : str ) -> Frame :
32+ """Should return the latest frame from the camera with the given name.
33+
34+ This method should be thread safe.
35+ """
36+
37+ @property
38+ def camera_names (self ) -> list [str ]:
39+ """Should return a list of the activated human readable names of the cameras."""
40+
41+
42+ class HardwareCameraSet (BaseCameraSet ):
43+ """This base class polls in a separate thread for all cameras and stores them in a buffer.
44+
45+ Cameras can consist of multiple cameras, e.g. RealSense cameras.
3346 """
3447
35- def __init__ (self ):
36- self ._buffer : list [FrameSet | None ] = [None for _ in range (self .config .max_buffer_frames )]
48+ def __init__ (self , cameras : list [HardwareCamera ], warm_up_disposal_frames : int = 30 , max_buffer_frames : int = 1000 ):
49+ self .cameras = cameras
50+ self .camera_dict , self .camera_names = self ._cameras_util ()
51+ self .name_to_identifier = self ._name_to_identifier ()
52+ self .frame_rate = self ._frames_rate ()
53+ self .rate_limiter = SimpleFrameRate (self .frame_rate )
54+
55+ self .warm_up_disposal_frames = warm_up_disposal_frames
56+ self .max_buffer_frames = max_buffer_frames
57+ self ._buffer : list [FrameSet | None ] = [None for _ in range (self .max_buffer_frames )]
3758 self ._buffer_lock = threading .Lock ()
3859 self .running = False
3960 self ._thread : threading .Thread | None = None
4061 self ._logger = logging .getLogger (__name__ )
4162 self ._next_ring_index = 0
4263 self ._buffer_len = 0
4364 self .writer : dict [str , cv2 .VideoWriter ] = {}
44- self .rate = SimpleFrameRate ()
65+
66+
67+ def _name_to_identifier (self ) -> dict [str , str ]:
68+ """Returns a dictionary mapping the camera names to their identifiers."""
69+ name_to_id : dict [str , str ] = {}
70+ for camera in self .cameras :
71+ for name in camera .camera_names :
72+ name_to_id [name ] = camera .config (name ).identifier
73+ return name_to_id
74+
75+ def _frames_rate (self ) -> int :
76+ """Checks if all cameras have the same frame rate."""
77+ frame_rates = set (camera .config (name ).frame_rate for camera in self .cameras for name in camera .camera_names )
78+ if len (frame_rates ) > 1 :
79+ raise ValueError ("All cameras must have the same frame rate. Different frames rates are not supported." )
80+ if len (frame_rates ) == 0 :
81+ self ._logger .warning ("No camera found, empty polling with 1 fps." )
82+ return 1
83+ return frame_rates [0 ]
84+
85+ def _cameras_util (self ) -> tuple [dict [str , HardwareCamera ], list [str ]]:
86+ """Utility function to create a dictionary of cameras and a list of camera names."""
87+ camera_dict : dict [str , HardwareCamera ] = {}
88+ camera_names : list [str ] = []
89+ for camera in self .cameras :
90+ camera_names .extend (camera .camera_names )
91+ for name in camera .camera_names :
92+ assert name not in camera_dict , f"Camera name { name } not unique."
93+ camera_dict [name ] = camera
94+ return camera_dict , camera_names
4595
4696 def buffer_size (self ) -> int :
4797 return len (self ._buffer ) - self ._buffer .count (None )
@@ -64,7 +114,7 @@ def get_timestamp_frames(self, ts: datetime) -> FrameSet | None:
64114 # iterate through the buffer and find the closest timestamp
65115 with self ._buffer_lock :
66116 for i in range (self ._buffer_len ):
67- idx = (self ._next_ring_index - i - 1 ) % self .config . max_buffer_frames # iterate backwards
117+ idx = (self ._next_ring_index - i - 1 ) % self .max_buffer_frames # iterate backwards
68118 assert self ._buffer [idx ] is not None
69119 item : FrameSet = typing .cast (FrameSet , self ._buffer [idx ])
70120 assert item .avg_timestamp is not None
@@ -82,6 +132,8 @@ def stop(self):
82132 def close (self ):
83133 if self .running and self ._thread is not None :
84134 self .stop ()
135+ for camera in self .cameras :
136+ camera .close ()
85137 self .stop_video ()
86138
87139 def start (self , warm_up : bool = True ):
@@ -101,8 +153,8 @@ def record_video(self, path: Path, str_id: str):
101153 str (path / f"episode_{ str_id } _{ camera } .mp4" ),
102154 # migh require to install ffmpeg
103155 cv2 .VideoWriter_fourcc (* "mp4v" ), # type: ignore
104- self .config . frame_rate ,
105- (self .config .resolution_width , self .config .resolution_height ),
156+ self .frame_rate ,
157+ (self .config ( camera ) .resolution_width , self .config ( camera ) .resolution_height ),
106158 )
107159
108160 def recording_ongoing (self ) -> bool :
@@ -117,31 +169,34 @@ def stop_video(self):
117169 self .writer = {}
118170
119171 def warm_up (self ):
120- for _ in range (self .config . warm_up_disposal_frames ):
172+ for _ in range (self .warm_up_disposal_frames ):
121173 for camera_name in self .camera_names :
122174 self ._poll_frame (camera_name )
123- self .rate ( self . config . frame_rate )
175+ self .rate_limiter ( )
124176
125177 def polling_thread (self , warm_up : bool = True ):
178+ for camera in self .cameras :
179+ camera .open ()
126180 if warm_up :
127181 self .warm_up ()
128182 while self .running :
129183 frame_set = self .poll_frame_set ()
130184 # buffering
131185 with self ._buffer_lock :
132186 self ._buffer [self ._next_ring_index ] = frame_set
133- self ._next_ring_index = (self ._next_ring_index + 1 ) % self .config . max_buffer_frames
134- self ._buffer_len = max (self ._buffer_len + 1 , self .config . max_buffer_frames )
187+ self ._next_ring_index = (self ._next_ring_index + 1 ) % self .max_buffer_frames
188+ self ._buffer_len = max (self ._buffer_len + 1 , self .max_buffer_frames )
135189 # video recording
136190 for camera_key , writer in self .writer .items ():
137191 if frame_set is not None :
138192 writer .write (frame_set .frames [camera_key ].camera .color .data [:, :, ::- 1 ])
139- self .rate ( self . config . frame_rate )
193+ self .rate_limiter ( )
140194
141195 def poll_frame_set (self ) -> FrameSet :
142196 """Gather frames over all available cameras."""
143197 frames : dict [str , Frame ] = {}
144198 for camera_name in self .camera_names :
199+ # callback
145200 frame = self ._poll_frame (camera_name )
146201 frames [camera_name ] = frame
147202 # filter none
@@ -151,29 +206,16 @@ def poll_frame_set(self) -> FrameSet:
151206 def clear_buffer (self ):
152207 """Deletes all frames from the buffer."""
153208 with self ._buffer_lock :
154- self ._buffer = [None for _ in range (self .config . max_buffer_frames )]
209+ self ._buffer = [None for _ in range (self .max_buffer_frames )]
155210 self ._next_ring_index = 0
156211 self ._buffer_len = 0
157212 self .wait_for_frames ()
158213
159- @property
160- @abstractmethod
161- def config (self ) -> HWCameraSetConfig :
162- """Should return the configuration object of the cameras."""
163-
164- @abstractmethod
165- def _poll_frame (self , camera_name : str ) -> Frame :
166- """Should return the latest frame from the camera with the given name.
214+ def config (self , camera_name : str ) -> BaseCameraConfig :
215+ """Returns the configuration object of the cameras."""
216+ return self .camera_dict [camera_name ].config (camera_name )
167217
168- This method should be thread safe.
169- """
218+ def poll_frame ( self , camera_name : str ) -> Frame :
219+ return self . camera_dict [ camera_name ]. poll_frame ( camera_name )
170220
171- @property
172- def camera_names (self ) -> list [str ]:
173- """Should return a list of the activated human readable names of the cameras."""
174- return list (self .config .cameras )
175221
176- @property
177- def name_to_identifier (self ) -> dict [str , str ]:
178- # return {key: camera.identifier for key, camera in self._cfg.cameras.items()}
179- return self .config .name_to_identifier
0 commit comments