Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions dimos/hardware/sensors/camera/webcam.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from dataclasses import dataclass, field
from functools import cache
import threading
import time
from typing import Literal

import cv2
from pydantic import Field
from reactivex import create
from reactivex.observable import Observable

Expand All @@ -28,13 +28,12 @@
from dimos.utils.reactive import backpressure


@dataclass
class WebcamConfig(CameraConfig):
camera_index: int = 0 # /dev/videoN
width: int = 640
height: int = 480
fps: float = 15.0
camera_info: CameraInfo = field(default_factory=CameraInfo)
camera_info: CameraInfo = Field(default_factory=CameraInfo)
frame_id_prefix: str | None = None
stereo_slice: Literal["left", "right"] | None = None # For stereo cameras

Expand Down
142 changes: 0 additions & 142 deletions dimos/perception/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
"Vector",
"Vector3",
"bbox2d_to_corners",
"colorize_depth",
"combine_object_data",
"cp",
"cv2",
Expand Down Expand Up @@ -351,147 +350,6 @@ def project_2d_points_to_3d(
)


def colorize_depth(
depth_img: Union[np.ndarray, "cp.ndarray"], # type: ignore[type-arg]
max_depth: float = 5.0,
overlay_stats: bool = True,
) -> Union[np.ndarray, "cp.ndarray"] | None: # type: ignore[type-arg]
"""
Normalize and colorize depth image using COLORMAP_JET with optional statistics overlay.

Args:
depth_img: Depth image (H, W) in meters
max_depth: Maximum depth value for normalization
overlay_stats: Whether to overlay depth statistics on the image

Returns:
Colorized depth image (H, W, 3) in RGB format, or None if input is None
"""
if depth_img is None:
return None

was_cu = _is_cu_array(depth_img)
xp = cp if was_cu else np
depth = depth_img if was_cu else np.asarray(depth_img)

valid_mask = xp.isfinite(depth) & (depth > 0)
depth_norm = xp.zeros_like(depth, dtype=xp.float32)
if bool(valid_mask.any() if not was_cu else xp.any(valid_mask)):
depth_norm = xp.where(valid_mask, xp.clip(depth / max_depth, 0, 1), depth_norm)

# Use CPU for colormap/text; convert back to GPU if needed
depth_norm_np = _to_numpy(depth_norm) # type: ignore[no-untyped-call]
depth_colored = cv2.applyColorMap((depth_norm_np * 255).astype(np.uint8), cv2.COLORMAP_JET)
depth_rgb_np = cv2.cvtColor(depth_colored, cv2.COLOR_BGR2RGB)
depth_rgb_np = (depth_rgb_np * 0.6).astype(np.uint8)

if overlay_stats and (np.any(_to_numpy(valid_mask))): # type: ignore[no-untyped-call]
valid_depths = _to_numpy(depth)[_to_numpy(valid_mask)] # type: ignore[no-untyped-call]
min_depth = float(np.min(valid_depths))
max_depth_actual = float(np.max(valid_depths))
h, w = depth_rgb_np.shape[:2]
center_y, center_x = h // 2, w // 2
center_region = _to_numpy( # type: ignore[no-untyped-call]
depth
)[max(0, center_y - 2) : min(h, center_y + 3), max(0, center_x - 2) : min(w, center_x + 3)]
center_mask = np.isfinite(center_region) & (center_region > 0)
if center_mask.any():
center_depth = float(np.median(center_region[center_mask]))
else:
depth_np = _to_numpy(depth) # type: ignore[no-untyped-call]
vm_np = _to_numpy(valid_mask) # type: ignore[no-untyped-call]
center_depth = float(depth_np[center_y, center_x]) if vm_np[center_y, center_x] else 0.0

font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.6
thickness = 1
line_type = cv2.LINE_AA
text_color = (255, 255, 255)
bg_color = (0, 0, 0)
padding = 5

min_text = f"Min: {min_depth:.2f}m"
(text_w, text_h), _ = cv2.getTextSize(min_text, font, font_scale, thickness)
cv2.rectangle(
depth_rgb_np,
(padding, padding),
(padding + text_w + 4, padding + text_h + 6),
bg_color,
-1,
)
cv2.putText(
depth_rgb_np,
min_text,
(padding + 2, padding + text_h + 2),
font,
font_scale,
text_color,
thickness,
line_type,
)

max_text = f"Max: {max_depth_actual:.2f}m"
(text_w, text_h), _ = cv2.getTextSize(max_text, font, font_scale, thickness)
cv2.rectangle(
depth_rgb_np,
(w - padding - text_w - 4, padding),
(w - padding, padding + text_h + 6),
bg_color,
-1,
)
cv2.putText(
depth_rgb_np,
max_text,
(w - padding - text_w - 2, padding + text_h + 2),
font,
font_scale,
text_color,
thickness,
line_type,
)

if center_depth > 0:
center_text = f"{center_depth:.2f}m"
(text_w, text_h), _ = cv2.getTextSize(center_text, font, font_scale, thickness)
center_text_x = center_x - text_w // 2
center_text_y = center_y + text_h // 2
cross_size = 10
cross_color = (255, 255, 255)
cv2.line(
depth_rgb_np,
(center_x - cross_size, center_y),
(center_x + cross_size, center_y),
cross_color,
1,
)
cv2.line(
depth_rgb_np,
(center_x, center_y - cross_size),
(center_x, center_y + cross_size),
cross_color,
1,
)
cv2.rectangle(
depth_rgb_np,
(center_text_x - 2, center_text_y - text_h - 2),
(center_text_x + text_w + 2, center_text_y + 2),
bg_color,
-1,
)
cv2.putText(
depth_rgb_np,
center_text,
(center_text_x, center_text_y),
font,
font_scale,
text_color,
thickness,
line_type,
)

return _to_cupy(depth_rgb_np) if was_cu else depth_rgb_np # type: ignore[no-untyped-call]


def draw_bounding_box(
image: Union[np.ndarray, "cp.ndarray"], # type: ignore[type-arg]
bbox: list[float],
Expand Down
32 changes: 32 additions & 0 deletions dimos/perception/depth/depth_anything3/blueprints/webcam.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright 2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from dimos.core.blueprints import autoconnect
from dimos.hardware.sensors.camera.module import CameraModule
from dimos.perception.depth.depth_anything3.module import DA3Mode, DA3Model, DepthAnything3Module
from dimos.visualization.rerun.bridge import rerun_bridge

depth_anything3_webcam = autoconnect(
CameraModule.blueprint(),
DepthAnything3Module.blueprint(model=DA3Model.SMALL),
rerun_bridge(),
)

depth_anything3_webcam_temporal = autoconnect(
CameraModule.blueprint(),
DepthAnything3Module.blueprint(model=DA3Model.SMALL, mode=DA3Mode.TEMPORAL, window_frames=5),
rerun_bridge(),
)

__all__ = ["depth_anything3_webcam", "depth_anything3_webcam_temporal"]
Loading