diff --git a/.gitignore b/.gitignore index 1816510c08..89dfc92143 100644 --- a/.gitignore +++ b/.gitignore @@ -79,5 +79,8 @@ htmlcov/ .coverage .coverage.* +# Created from simulation +MUJOCO_LOG.TXT + # Memory2 autorecord recording*.db diff --git a/dimos/control/examples/twist_base_keyboard_teleop.py b/dimos/control/examples/twist_base_keyboard_teleop.py index 43b2270410..8c243c8124 100644 --- a/dimos/control/examples/twist_base_keyboard_teleop.py +++ b/dimos/control/examples/twist_base_keyboard_teleop.py @@ -33,12 +33,14 @@ from __future__ import annotations +import asyncio + from dimos.control.blueprints.mobile import coordinator_mock_twist_base from dimos.core.coordination.module_coordinator import ModuleCoordinator from dimos.robot.unitree.keyboard_teleop import KeyboardTeleop -def main() -> None: +async def main() -> None: """Run mock twist base + keyboard teleop.""" coord = ModuleCoordinator.build(coordinator_mock_twist_base) teleop = ModuleCoordinator.build(KeyboardTeleop.blueprint()) @@ -52,9 +54,9 @@ def main() -> None: teleop.start() # Block until Ctrl+C — loop() handles KeyboardInterrupt and calls stop() - coord.loop() + await coord.loop() teleop.stop() if __name__ == "__main__": - main() + asyncio.run(main()) diff --git a/dimos/core/coordination/blueprints.py b/dimos/core/coordination/blueprints.py index 7d94555de4..902d4e8cff 100644 --- a/dimos/core/coordination/blueprints.py +++ b/dimos/core/coordination/blueprints.py @@ -154,6 +154,7 @@ class Blueprint: ) requirement_checks: tuple[Callable[[], str | None], ...] = field(default_factory=tuple) configurator_checks: "tuple[SystemConfigurator, ...]" = field(default_factory=tuple) + record_modules: tuple[type[ModuleBase], ...] = field(default_factory=tuple) @classmethod def create(cls, module: type[ModuleBase], **kwargs: Any) -> "Blueprint": @@ -163,6 +164,9 @@ def create(cls, module: type[ModuleBase], **kwargs: Any) -> "Blueprint": def disabled_modules(self, *modules: type[ModuleBase]) -> "Blueprint": return replace(self, disabled_modules_tuple=self.disabled_modules_tuple + modules) + def default_record_modules(self, *modules: type[ModuleBase]) -> "Blueprint": + return replace(self, record_modules=self.record_modules + modules) + def config(self) -> type: configs = { b.module.name: (get_type_hints(b.module)["config"] | None, None) @@ -227,6 +231,7 @@ def autoconnect(*blueprints: Blueprint) -> Blueprint: remapping_map=MappingProxyType(all_remappings), requirement_checks=all_requirement_checks, configurator_checks=all_configurator_checks, + record_modules=tuple(module for bp in blueprints for module in bp.record_modules), ) diff --git a/dimos/core/coordination/module_coordinator.py b/dimos/core/coordination/module_coordinator.py index 9ac46e3c20..b9285e583d 100644 --- a/dimos/core/coordination/module_coordinator.py +++ b/dimos/core/coordination/module_coordinator.py @@ -14,13 +14,17 @@ from __future__ import annotations +import asyncio from collections import defaultdict from collections.abc import Mapping, MutableMapping +import dataclasses import importlib import inspect +from pathlib import Path import shutil import sys import threading +from types import MappingProxyType from typing import TYPE_CHECKING, Any, cast from dimos.core.coordination.rpyc_server import RpycServer @@ -31,6 +35,7 @@ from dimos.core.module import ModuleBase, ModuleSpec from dimos.core.resource import Resource from dimos.core.transport import LCMTransport, PubSubTransport, pLCMTransport +from dimos.record.record_replay import RecordReplay from dimos.spec.utils import is_spec, spec_annotation_compliance, spec_structural_compliance from dimos.utils.generic import short_id from dimos.utils.logging_config import setup_logger @@ -257,6 +262,25 @@ def build( if "g" in blueprint_args: global_config.update(**blueprint_args.pop("g")) + # Auto-replay if --replay-file is set + replay_file = global_config.replay_file + if replay_file: + logger.info("Auto-replay from %s", replay_file) + # Strip replay_file from all override sources so the nested build() + # inside replay() does not re-enter this branch. + global_config.replay_file = None + clean_bp = dataclasses.replace( + blueprint, + global_config_overrides=MappingProxyType( + { + k: v + for k, v in blueprint.global_config_overrides.items() + if k != "replay_file" + } + ), + ) + return cls.replay(clean_bp, replay_file, blueprint_args=dict(blueprint_args)) + _run_configurators(blueprint) _check_requirements(blueprint) _verify_no_name_conflicts(blueprint) @@ -272,10 +296,68 @@ def build( coordinator.build_all_modules() coordinator.start_all_modules() + if global_config.record_path: + # Delete existing file, don't append to it. + Path(global_config.record_path).unlink(missing_ok=True) + record_modules = blueprint.record_modules + for bp in blueprint.active_blueprints: + if bp.module in record_modules: + instance = coordinator.get_instance(bp.module) + if instance is not None: + instance.start_recording_outputs(global_config.record_path) + _log_blueprint_graph(blueprint, coordinator) return coordinator + @classmethod + def replay( + cls, + blueprint: Blueprint, + recording_path: str, + *, + speed: float = 1.0, + blueprint_args: MutableMapping[str, Any] | None = None, + ) -> ModuleCoordinator: + """Build with a recording replacing some module outputs.""" + recording = RecordReplay(recording_path) + recorded_streams = set(recording.store.list_streams()) + if not recorded_streams: + raise ValueError("Recording is empty — no streams to replay") + + modules_to_disable: list[type[ModuleBase]] = [] + for bp in blueprint.blueprints: + out_names = {c.name for c in bp.streams if c.direction == "out"} + if not out_names: + continue + covered = out_names & recorded_streams + if covered: + modules_to_disable.append(bp.module) + uncovered = out_names - covered + if uncovered: + logger.warning( + "Replay: disabling %s (partial: replaying %s, missing %s)", + bp.module.__name__, + covered, + uncovered, + ) + else: + logger.info("Replay: disabling %s (all OUTs covered)", bp.module.__name__) + + if not modules_to_disable: + logger.warning( + "Replay: no modules disabled - recording streams %s" + "don't match any module OUT names", + recorded_streams, + ) + + patched = blueprint.disabled_modules(*modules_to_disable) + coordinator = cls.build(patched, blueprint_args) + + recording.play(speed=speed) + + return coordinator + def load_blueprint( self, blueprint: Blueprint, @@ -512,11 +594,11 @@ def _restart_module( return new_proxy - def loop(self) -> None: - stop = threading.Event() + async def loop(self) -> None: + stop = asyncio.Event() try: - stop.wait() - except KeyboardInterrupt: + await stop.wait() + except (KeyboardInterrupt, asyncio.CancelledError): return finally: self.stop() diff --git a/dimos/core/global_config.py b/dimos/core/global_config.py index b7c5c1bf12..317189d491 100644 --- a/dimos/core/global_config.py +++ b/dimos/core/global_config.py @@ -35,6 +35,8 @@ class GlobalConfig(BaseSettings): simulation: bool = False replay: bool = False replay_db: str = "go2_short" + replay_file: str | None = None + record_path: str | None = None new_memory: bool = False viewer: ViewerBackend = "rerun" n_workers: int = 2 diff --git a/dimos/core/module.py b/dimos/core/module.py index 259118098f..8957d0fadb 100644 --- a/dimos/core/module.py +++ b/dimos/core/module.py @@ -19,6 +19,7 @@ import json import sys import threading +import time from typing import ( TYPE_CHECKING, Any, @@ -40,6 +41,7 @@ from dimos.core.resource import CompositeResource from dimos.core.rpc_client import RpcCall from dimos.core.stream import In, Out, RemoteOut, Transport +from dimos.memory2.store.sqlite import SqliteStore from dimos.protocol.rpc.pubsubrpc import LCMRPC from dimos.protocol.rpc.spec import DEFAULT_RPC_TIMEOUT, DEFAULT_RPC_TIMEOUTS, RPCSpec from dimos.protocol.service.spec import BaseConfig, Configurable @@ -57,6 +59,7 @@ from dimos.core.coordination.blueprints import Blueprint from dimos.core.introspection.module.info import ModuleInfo from dimos.core.rpc_client import RPCClient + from dimos.memory2.stream import Stream if sys.version_info >= (3, 13): from typing import TypeVar @@ -120,12 +123,14 @@ class ModuleBase(Configurable, CompositeResource): _module_closed: bool = False _module_closed_lock: threading.Lock _loop_thread_timeout: float = 2.0 + _rec_store: SqliteStore | None = None _main_gen: AsyncGenerator[None, None] | None = None _tools: dict[str, Any] _tools_lock: threading.Lock def __init__(self, config_args: dict[str, Any]) -> None: super().__init__(**config_args) + self._rec_unsubs: list[Callable[[], None]] = [] self._module_closed_lock = threading.Lock() self._tools = {} self._tools_lock = threading.Lock() @@ -170,6 +175,7 @@ def start(self) -> None: def stop(self) -> None: self._stop_main() super().stop() + self.stop_recording_outputs() self._close_module() def _close_module(self) -> None: @@ -408,6 +414,33 @@ def blueprint(self) -> _BlueprintPartial: return partial(Blueprint.create, self) # type: ignore[arg-type] + @rpc + def start_recording_outputs(self, db_path: str) -> None: + from dimos.memory2.store.sqlite import SqliteStore + + if self._rec_store is not None: + raise RuntimeError("Recording already in progress") + self._rec_store = SqliteStore(path=db_path) + for name, out in self.outputs.items(): + stream = self._rec_store.stream(name, out.type) + + def cb(msg: Any, _stream: "Stream[object]" = stream) -> None: + # Storage ts is wall-clock at record time. Sensor / message + # timestamps live on the payload (e.g. ``msg.ts``) and are + # preserved there. Storage ts must be unique within a stream. + _stream.append(msg, ts=time.time()) + + self._rec_unsubs.append(out.subscribe(cb)) + + @rpc + def stop_recording_outputs(self) -> None: + for unsub in self._rec_unsubs: + unsub() + self._rec_unsubs = [] + if self._rec_store is not None: + self._rec_store.stop() + self._rec_store = None + @rpc def set_module_ref(self, name: str, module_ref: "RPCClient") -> None: setattr(self, name, module_ref) diff --git a/dimos/core/test_core.py b/dimos/core/test_core.py index bf62e526ee..e69cde2fef 100644 --- a/dimos/core/test_core.py +++ b/dimos/core/test_core.py @@ -77,7 +77,7 @@ def test_classmethods() -> None: # Check that we have the expected RPC methods assert "navigate_to" in class_rpcs, "navigate_to should be in rpcs" assert "start" in class_rpcs, "start should be in rpcs" - assert len(class_rpcs) == 7 + assert len(class_rpcs) == 9 # Check that the values are callable assert callable(class_rpcs["navigate_to"]), "navigate_to should be callable" diff --git a/dimos/experimental/security_demo/security_module.py b/dimos/experimental/security_demo/security_module.py index 9dffc5714c..d199623e5e 100644 --- a/dimos/experimental/security_demo/security_module.py +++ b/dimos/experimental/security_demo/security_module.py @@ -373,6 +373,8 @@ def _cancel_current_goal(self) -> None: with self._lock: pose = self._latest_pose if pose is not None: + # We want to update timestamps, otherwise recordings etc. would fail + pose.ts = time.time() self.goal_request.publish(pose) def _transition_to(self, new_state: State) -> None: diff --git a/dimos/memory2/backend.py b/dimos/memory2/backend.py index 3c3b89669e..532a53dc7c 100644 --- a/dimos/memory2/backend.py +++ b/dimos/memory2/backend.py @@ -16,6 +16,7 @@ from __future__ import annotations +from contextlib import suppress from dataclasses import replace from typing import TYPE_CHECKING, Any, Generic, TypeVar @@ -241,6 +242,17 @@ def _iterate_live( finally: sub.dispose() + def delete_range(self, t1: float, t2: float) -> int: + """Delete observations in [t1, t2] from all stores. Returns count deleted.""" + ids = self.metadata_store.delete_range(t1, t2) + for obs_id in ids: + if self.blob_store is not None: + with suppress(KeyError): + self.blob_store.delete(self.name, obs_id) + if self.vector_store is not None: + self.vector_store.delete(self.name, obs_id) + return len(ids) + def count(self, query: StreamQuery) -> int: if query.search_vec: return sum(1 for _ in self.iterate(query)) diff --git a/dimos/memory2/codecs/base.py b/dimos/memory2/codecs/base.py index 821b36b60f..01447de852 100644 --- a/dimos/memory2/codecs/base.py +++ b/dimos/memory2/codecs/base.py @@ -17,6 +17,8 @@ import importlib from typing import Any, Protocol, TypeVar, runtime_checkable +from dimos.msgs.sensor_msgs.Image import Image + T = TypeVar("T") @@ -33,8 +35,6 @@ def codec_for(payload_type: type[Any] | None = None) -> Codec[Any]: from dimos.memory2.codecs.pickle import PickleCodec if payload_type is not None: - from dimos.msgs.sensor_msgs.Image import Image - if issubclass(payload_type, Image): from dimos.memory2.codecs.jpeg import JpegCodec diff --git a/dimos/memory2/observationstore/base.py b/dimos/memory2/observationstore/base.py index c2643c4cc4..d33fb421c7 100644 --- a/dimos/memory2/observationstore/base.py +++ b/dimos/memory2/observationstore/base.py @@ -69,5 +69,10 @@ def fetch_by_ids(self, ids: list[int]) -> list[Observation[T]]: """Batch fetch by id (for vector search results).""" ... + @abstractmethod + def delete_range(self, t1: float, t2: float) -> list[int]: + """Delete observations with ts in [t1, t2]. Returns deleted IDs.""" + ... + def serialize(self) -> dict[str, Any]: return {"class": qual(type(self)), "config": self.config.model_dump()} diff --git a/dimos/memory2/observationstore/memory.py b/dimos/memory2/observationstore/memory.py index 199a83cfcb..820a20ab83 100644 --- a/dimos/memory2/observationstore/memory.py +++ b/dimos/memory2/observationstore/memory.py @@ -86,3 +86,13 @@ def fetch_by_ids(self, ids: list[int]) -> list[Observation[T]]: id_set = set(ids) with self._lock: return [obs for obs in self._observations if obs.id in id_set] + + def delete_range(self, t1: float, t2: float) -> list[int]: + """Delete observations with ts in [t1, t2]. Returns deleted IDs.""" + with self._lock: + to_delete = [obs for obs in self._observations if t1 <= obs.ts <= t2] + ids = [obs.id for obs in to_delete] + self._observations = deque( + obs for obs in self._observations if not (t1 <= obs.ts <= t2) + ) + return ids diff --git a/dimos/memory2/observationstore/sqlite.py b/dimos/memory2/observationstore/sqlite.py index 42f4193581..7189b61e06 100644 --- a/dimos/memory2/observationstore/sqlite.py +++ b/dimos/memory2/observationstore/sqlite.py @@ -445,5 +445,25 @@ def fetch_by_ids(self, ids: list[int]) -> list[Observation[T]]: rows = self._conn.execute(sql, ids).fetchall() return [self._row_to_obs(r, has_blob=join) for r in rows] + def delete_range(self, t1: float, t2: float) -> list[int]: + """Delete observations with ts in [t1, t2]. Returns deleted IDs.""" + with self._lock: + try: + rows = self._conn.execute( + f'DELETE FROM "{self._name}" WHERE ts >= ? AND ts <= ? RETURNING id', + (t1, t2), + ).fetchall() + ids = [r[0] for r in rows] + if ids: + placeholders = ",".join("?" * len(ids)) + self._conn.execute( + f'DELETE FROM "{self._name}_rtree" WHERE id IN ({placeholders})', ids + ) + self._conn.commit() + except Exception: + self._conn.rollback() + raise + return ids + def stop(self) -> None: super().stop() diff --git a/dimos/memory2/store/base.py b/dimos/memory2/store/base.py index f6096bdceb..6839365c8b 100644 --- a/dimos/memory2/store/base.py +++ b/dimos/memory2/store/base.py @@ -14,7 +14,7 @@ from __future__ import annotations -from typing import Any, TypeVar, cast +from typing import Any, TypeVar from dimos.core.resource import CompositeResource from dimos.memory2.backend import Backend @@ -96,7 +96,7 @@ def streams(self) -> StreamAccessor: @staticmethod def _resolve_codec( - payload_type: type[Any] | None, raw_codec: Codec[Any] | str | None + payload_type: type[object] | None, raw_codec: Codec[object] | str | None ) -> Codec[Any]: if isinstance(raw_codec, Codec): return raw_codec @@ -110,7 +110,7 @@ def _resolve_codec( return codec_for(payload_type) def _create_backend( - self, name: str, payload_type: type[Any] | None = None, **config: Any + self, name: str, payload_type: type[object] | None = None, **config: Any ) -> Backend[Any]: """Create a Backend for the named stream. Called once per stream name.""" codec = self._resolve_codec(payload_type, config.pop("codec", None)) @@ -153,7 +153,7 @@ def stream(self, name: str, payload_type: type[T] | None = None, **overrides: An backend = self._create_backend(name, payload_type, **resolved) backend.start() self._streams[name] = Stream(source=backend) - return cast("Stream[T]", self._streams[name]) + return self._streams[name] def list_streams(self) -> list[str]: """Return names of all streams in this store.""" diff --git a/dimos/memory2/stream.py b/dimos/memory2/stream.py index 3a92f7708f..5e40faf444 100644 --- a/dimos/memory2/stream.py +++ b/dimos/memory2/stream.py @@ -125,6 +125,16 @@ def is_live(self) -> bool: return self._source.is_live() return False + @property + def data_type(self) -> type: + """Payload type of the root backend, or ``object`` if unbound.""" + current: Any = self + while isinstance(current._source, Stream): + current = current._source + if current._source is None: + return object + return cast("type", current._source.data_type) + def __iter__(self) -> Iterator[O]: if self._source is None: raise TypeError( @@ -429,6 +439,33 @@ def subscribe( ) ) + def delete_range(self, t1: float, t2: float) -> int: + """Delete all observations with timestamps in [t1, t2]. Returns count deleted.""" + if isinstance(self._source, Stream): + raise TypeError("Cannot delete from a transform stream.") + if self._source is None: + raise TypeError("No source available.") + return self._source.delete_range(t1, t2) + + def publish(self, out: Any) -> DisposableBase: + """Publish each observation's data to a Module ``Out`` port. + + Iteration runs on the dimos thread pool (via :meth:`subscribe`). + Returns a ``DisposableBase`` suitable for ``register_disposable()``. + + Example:: + + lidar.live().transform(VoxelMapTransformer()).publish(self.global_map) + """ + + def _on_error(e: Exception) -> None: + logger.error("Stream.publish() pipeline error: %s", e, exc_info=True) + + return self.subscribe( + on_next=lambda obs: out.publish(obs.data), + on_error=_on_error, + ) + def chain(self, other: Stream[R, Any]) -> Stream[R]: """Append operations from an unbound stream to this stream. diff --git a/dimos/navigation/frontier_exploration/wavefront_frontier_goal_selector.py b/dimos/navigation/frontier_exploration/wavefront_frontier_goal_selector.py index b8dbe0dfc8..a1a62c76f0 100644 --- a/dimos/navigation/frontier_exploration/wavefront_frontier_goal_selector.py +++ b/dimos/navigation/frontier_exploration/wavefront_frontier_goal_selector.py @@ -741,7 +741,6 @@ def stop_exploration(self) -> bool: position=self.latest_odometry.position, orientation=self.latest_odometry.orientation, frame_id="world", - ts=self.latest_odometry.ts, ) self.goal_request.publish(goal) @@ -782,7 +781,6 @@ def _exploration_loop(self) -> None: goal_msg.position.z = 0.0 goal_msg.orientation.w = 1.0 # No rotation goal_msg.frame_id = "world" - goal_msg.ts = self.latest_costmap.ts self.goal_request.publish(goal_msg) logger.info(f"Published frontier goal: ({goal.x:.2f}, {goal.y:.2f})") diff --git a/dimos/navigation/patrolling/module.py b/dimos/navigation/patrolling/module.py index 533165683f..40db5a2825 100644 --- a/dimos/navigation/patrolling/module.py +++ b/dimos/navigation/patrolling/module.py @@ -15,6 +15,7 @@ import asyncio from collections.abc import AsyncGenerator +import time from dimos_lcm.std_msgs import Bool @@ -104,6 +105,8 @@ async def _stop_patrolling(self) -> None: self._planner_spec.set_replanning_enabled(True) self._planner_spec.reset_safe_goal_clearance() if self._latest_pose is not None: + # Update timestamp so recordings see distinct messages. + self._latest_pose.ts = time.time() self.goal_request.publish(self._latest_pose) async def _patrol_loop(self) -> None: diff --git a/dimos/protocol/pubsub/impl/lcmpubsub.py b/dimos/protocol/pubsub/impl/lcmpubsub.py index a8cfc8c7ab..fa90d47db3 100644 --- a/dimos/protocol/pubsub/impl/lcmpubsub.py +++ b/dimos/protocol/pubsub/impl/lcmpubsub.py @@ -18,7 +18,6 @@ from dataclasses import dataclass import re import threading -from typing import Any from dimos.msgs.protocol import DimosMsg from dimos.protocol.pubsub.encoders import ( @@ -72,7 +71,7 @@ def from_channel_str(channel: str, default_lcm_type: type[DimosMsg] | None = Non return Topic(topic=topic_str, lcm_type=lcm_type or default_lcm_type) -class LCMPubSubBase(LCMService, AllPubSub[Topic, Any]): +class LCMPubSubBase(LCMService, AllPubSub[Topic, bytes]): """LCM-based PubSub with native regex subscription support. LCM natively supports regex patterns in subscribe(), so we implement @@ -91,7 +90,7 @@ def publish(self, topic: Topic | str, message: bytes) -> None: topic_str = str(topic) if isinstance(topic, Topic) else topic self.l.publish(topic_str, message) - def subscribe_all(self, callback: Callable[[bytes, Topic], Any]) -> Callable[[], None]: + def subscribe_all(self, callback: Callable[[bytes, Topic], None]) -> Callable[[], None]: return self.subscribe(Topic(re.compile(".*")), callback) def subscribe( diff --git a/dimos/protocol/pubsub/spec.py b/dimos/protocol/pubsub/spec.py index fe979fce82..0e292cfcb7 100644 --- a/dimos/protocol/pubsub/spec.py +++ b/dimos/protocol/pubsub/spec.py @@ -186,6 +186,6 @@ class SubscribeAllCapable(Protocol[MsgT_co, TopicT_co]): Both AllPubSub (native) and DiscoveryPubSub (synthesized) satisfy this. """ - def subscribe_all(self, callback: Callable[[Any, Any], Any]) -> Callable[[], None]: + def subscribe_all(self, callback: Callable[[MsgT_co, TopicT_co], None]) -> Callable[[], None]: """Subscribe to all messages on all topics.""" ... diff --git a/dimos/record/record_replay.py b/dimos/record/record_replay.py new file mode 100644 index 0000000000..ae0e518cd9 --- /dev/null +++ b/dimos/record/record_replay.py @@ -0,0 +1,375 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""RecordReplay — record and replay pub/sub topics using memory2 stores. + +Usage:: + + from dimos.record import RecordReplay + from dimos.protocol.pubsub.impl.lcmpubsub import LCM + + # Record from live LCM traffic + rec = RecordReplay(Path("my_recording.db")) + rec.start_recording([LCM()]) + # ... robot runs ... + rec.stop_recording() + + # Replay into LCM (viewable via rerun-bridge) + rec.play(pubsub=LCM(), speed=1.0) + + # Timeline editing + rec.trim(start=2.0, end=30.0) + rec.delete_range(start=10.0, end=12.0) + + # Query/filter via memory2 streams + rec.stream("lidar").time_range(0, 5).count() +""" + +import asyncio +from collections.abc import Callable, Collection +from contextlib import suppress +import heapq +import math +import re +import sys +import time +from typing import Any, NotRequired, TypedDict + +import rerun as rr + +from dimos.memory2.store.sqlite import SqliteStore +from dimos.memory2.stream import Stream +from dimos.protocol.pubsub.impl.lcmpubsub import Topic +from dimos.protocol.pubsub.spec import PubSub +from dimos.protocol.service.spec import Service +from dimos.utils.logging_config import setup_logger +from dimos.visualization.rerun.bridge import RerunConvertible, is_rerun_multi + +if sys.version_info >= (3, 11): + from typing import Self +else: + from typing import Any as Self + +logger = setup_logger() + +_SANITIZE_RE = re.compile(r"[^A-Za-z0-9_]") + + +def topic_to_stream_name(channel: str) -> str: + """Convert a raw LCM channel/topic pattern to a safe stream name.""" + name = channel.split("#")[0].lstrip("/") + name = _SANITIZE_RE.sub("_", name) + if name and name[0].isdigit(): + name = f"_{name}" + return name or "_unknown" + + +class StreamInfo(TypedDict): + name: str + count: int + start: NotRequired[float] + end: NotRequired[float] + duration: NotRequired[float] + type: NotRequired[str] + + +class RecordReplay: + """Record and replay pub/sub topics using memory2 stores. + + A recording is a single SQLite file containing one stream per topic. + Supports recording from any ``SubscribeAllCapable`` pubsub, playback + with speed control, seeking, and timeline editing (trim/delete). + + The underlying :class:`SqliteStore` is fully accessible for advanced + queries via :attr:`store`, :attr:`streams`, and :meth:`stream`. + """ + + def __init__(self, path: str) -> None: + self._store = SqliteStore(path=path) + + self._recording = False + self._unsubscribes: list[Callable[[], None]] = [] + + self._resume = asyncio.Event() + self._play_task: asyncio.Task[None] | None = None + self._play_speed = 1.0 + self._position = 0.0 + + @property + def store(self) -> SqliteStore: + """The underlying store.""" + return self._store + + @property + def path(self) -> str: + """Path to the recording file.""" + return self._store.config.path + + @property + def is_recording(self) -> bool: + return self._recording + + @property + def is_playing(self) -> bool: + return self._play_task is not None and not self._play_task.done() + + @property + def is_paused(self) -> bool: + return self.is_playing and not self._resume.is_set() + + def start_recording( + self, + pubsubs: Collection[PubSub[Topic, object]], + topics: Collection[Topic], + ) -> None: + """Start recording messages from the given pubsubs.""" + if self._recording: + raise RuntimeError("Already recording") + self._recording = True + + for pubsub in pubsubs: + if isinstance(pubsub, Service): + pubsub.start() + for topic in topics: + unsub = pubsub.subscribe(topic, self._on_message) + self._unsubscribes.append(unsub) + + logger.info("Recording started on %d pubsub(s)", len(pubsubs)) + + def stop_recording(self) -> None: + if not self._recording: + return + self._recording = False + for unsub in self._unsubscribes: + unsub() + self._unsubscribes.clear() + logger.info("Recording stopped") + + def _on_message(self, msg: object, topic: Topic) -> None: + # Storage ts = reception time (default in stream.append). Sensor stamp + # is preserved on the payload (msg.ts) — same split rosbag uses, and + # the only one that survives sender-side stamp bugs / retransmits / + # looped replay sources without UNIQUE collisions. + stream_name = topic_to_stream_name(topic.pattern) + self._store.stream(stream_name, type(msg)).append(msg) + + @property + def duration(self) -> float: + """Total duration of the recording in seconds.""" + t_min, t_max = self.time_range + return t_max - t_min + + @property + def time_range(self) -> tuple[float, float]: + """Absolute (min_ts, max_ts) across all streams.""" + streams = self._store.list_streams() + if not streams: + return (0.0, 0.0) + t_min = math.inf + t_max = -math.inf + for name in streams: + s: Stream[object] = self._store.stream(name) + if s.exists(): + t0, t1 = s.get_time_range() + t_min = min(t_min, t0) + t_max = max(t_max, t1) + if t_min is math.inf: + return (0.0, 0.0) + return (t_min, t_max) + + @property + def position(self) -> float: + """Current playback position in seconds from recording start.""" + return self._position + + def stream_info(self) -> tuple[StreamInfo, ...]: + """Return per-stream metadata: name, count, time range, type.""" + result = [] + for name in self._store.list_streams(): + s: Stream[object] = self._store.stream(name) + info: StreamInfo = {"name": name, "count": s.count()} + if info["count"] > 0: + t0, t1 = s.get_time_range() + info["start"] = t0 + info["end"] = t1 + info["duration"] = t1 - t0 + t = s.data_type + info["type"] = f"{t.__module__}.{t.__qualname__}" if t is not object else "unknown" + result.append(info) + return tuple(result) + + def play(self, speed: float = 1.0) -> None: + """Start playback as a separate Rerun recording. + + Connects to the running Rerun viewer and logs decoded messages + under a recording called ``"playback"``, so it appears alongside + (but separate from) any live data. + """ + if self.is_playing: + raise RuntimeError("Already playing") + + self._play_speed = speed + # Set resume so playback starts, this is cleared to pause playback. + self._resume.set() + self._play_task = asyncio.create_task(self._playback_loop()) + + async def _playback_loop(self) -> None: + t_min, t_max = self.time_range + if t_min >= t_max: + return + + # Separate Rerun recording so playback appears as its own source + rec = rr.RecordingStream("playback", make_default=False) + rec.connect_grpc() + + # Reconstruct topic map from each stream's payload type. Channel is + # /#; under the convention that recorded topic + # names are valid identifiers, the stream name is the channel name. + topic_map: dict[str, Topic] = {} + for name in self._store.list_streams(): + s: Stream[object] = self._store.stream(name) + data_type = s.data_type + if data_type is not object: + topic_map[name] = Topic(topic=f"/{name}", lcm_type=data_type) + + # Merge-sort all streams by timestamp + start_ts = t_min + self._position + heap: list[tuple[float, int, str, Any]] = [] + counter = 0 # tiebreaker for heapq + + for name in self._store.list_streams(): + it: Any = iter(self._store.stream(name).after(start_ts - 0.001)) + try: + obs = next(it) + except StopIteration: + continue + heapq.heappush(heap, (obs.ts, counter, name, (obs, it))) + counter += 1 + + if not heap: + return + + wall_start = time.monotonic() + rec_start = heap[0][0] # earliest observation timestamp + + while heap: + if not self._resume.is_set(): + pause_start = time.monotonic() + await self._resume.wait() + wall_start += time.monotonic() - pause_start + + ts, _, stream_name, (obs, it) = heapq.heappop(heap) + + # Wait for correct playback time + elapsed_rec = ts - rec_start + target_wall = wall_start + (elapsed_rec / self._play_speed) + await asyncio.sleep(target_wall - time.monotonic()) + + self._position = ts - t_min + + topic = topic_map.get(stream_name) + if topic is not None and topic.lcm_type is not None: + msg = ( + obs.data + if not isinstance(obs.data, bytes) + else topic.lcm_type.lcm_decode(obs.data) + ) + if isinstance(msg, RerunConvertible): + entity_path = f"world/{stream_name}" + rerun_data = msg.to_rerun() + if is_rerun_multi(rerun_data): + for path, archetype in rerun_data: + rec.log(path, archetype) + else: + rec.log(entity_path, rerun_data) # type: ignore[arg-type] + + try: + next_obs = next(it) + except StopIteration: + continue + heapq.heappush(heap, (next_obs.ts, counter, stream_name, (next_obs, it))) + counter += 1 + + def pause(self) -> None: + """Pause playback. Resume with :meth:`resume`.""" + self._resume.clear() + + def resume(self) -> None: + """Resume paused playback.""" + self._resume.set() + + async def stop_playback(self) -> None: + """Stop playback.""" + self._resume.set() + if self._play_task is not None: + self._play_task.cancel() + with suppress(asyncio.CancelledError): + await self._play_task + self._play_task = None + + async def seek(self, seconds: float) -> None: + """Set playback position in seconds from recording start. + + Takes effect immediately if playing (restarts playback loop). + If not playing, sets the position for the next :meth:`play`. + """ + self._position = max(0.0, min(seconds, self.duration)) + if self.is_playing: + await self.stop_playback() + self.play(speed=self._play_speed) + + def delete_range(self, start: float, end: float) -> int: + """Delete observations in [start, end] seconds from recording start. + + Returns total count of deleted observations across all streams. + """ + t_min, _ = self.time_range + abs_start = t_min + start + abs_end = t_min + end + + total = 0 + for name in self._store.list_streams(): + s: Stream[object] = self._store.stream(name) + total += s.delete_range(abs_start, abs_end) + return total + + def trim(self, start: float, end: float) -> int: + """Keep only [start, end] seconds, delete everything else. + + Returns total count of deleted observations. + """ + t_min, t_max = self.time_range + total = 0 + if start > 0: + total += self.delete_range(0, start - 0.0001) + if end < (t_max - t_min): + total += self.delete_range(end + 0.0001, t_max - t_min + 1) + return total + + async def close(self) -> None: + """Stop recording/playback and close the store.""" + self.stop_recording() + await self.stop_playback() + self._store.stop() + + async def __aenter__(self) -> Self: + return self + + async def __aexit__(self, *exc: Any) -> None: + await self.close() + + def __repr__(self) -> str: + streams = self._store.list_streams() + dur = self.duration + return f"RecordReplay({self._store.config.path!r}, streams={len(streams)}, duration={dur:.1f}s)" diff --git a/dimos/record/test_record_replay.py b/dimos/record/test_record_replay.py new file mode 100644 index 0000000000..08124f4870 --- /dev/null +++ b/dimos/record/test_record_replay.py @@ -0,0 +1,275 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for RecordReplay.""" + +import asyncio +from collections.abc import Callable +from contextlib import suppress +from pathlib import Path +import threading +from typing import Any +from unittest.mock import patch + +import pytest + +from dimos.record.record_replay import RecordReplay + + +class FakeTopic: + """Minimal topic for testing.""" + + def __init__(self, name: str) -> None: + self.topic = name + self.lcm_type = None + + @property + def pattern(self) -> str: + return self.topic + + def __str__(self) -> str: + return self.topic + + +class FakePubSub: + """Minimal PubSub that supports subscribe_all for testing.""" + + def __init__(self) -> None: + self._subscribers: list[Callable[[Any, Any], None]] = [] + self._lock = threading.Lock() + self._started = False + + def start(self) -> None: + self._started = True + + def stop(self) -> None: + self._started = False + + def publish(self, topic: Any, message: Any) -> None: + # Not needed for recording tests + pass + + def subscribe(self, topic: object, callback: Callable[[Any, Any], None]) -> Callable[[], None]: + key = str(topic) + + def filtered(msg: Any, t: Any) -> None: + if str(t) == key: + callback(msg, t) + + with self._lock: + self._subscribers.append(filtered) + + def unsub() -> None: + with self._lock: + with suppress(ValueError): + self._subscribers.remove(filtered) + + return unsub + + def emit(self, topic_name: str, msg: Any) -> None: + """Test helper: simulate a message arriving.""" + topic = FakeTopic(topic_name) + with self._lock: + subs = list(self._subscribers) + for cb in subs: + cb(msg, topic) + + +class SimpleMsg: + """A simple test message (not LCM, uses pickle codec).""" + + def __init__(self, value: float) -> None: + self.value = value + + def __eq__(self, other: object) -> bool: + return isinstance(other, SimpleMsg) and self.value == other.value + + +ALL_TOPICS = [ + FakeTopic("/sensor/lidar"), + FakeTopic("/sensor/odom"), + FakeTopic("/sensor"), + FakeTopic("/data"), + FakeTopic("/a"), + FakeTopic("/from_ps1"), + FakeTopic("/from_ps2"), +] + + +@pytest.fixture +def tmp_db(tmp_path: Path) -> str: + return str(tmp_path / "test_recording.db") + + +class TestRecordReplay: + async def test_record_and_list_streams(self, tmp_db: str) -> None: + pubsub = FakePubSub() + async with RecordReplay(tmp_db) as rec: + rec.start_recording([pubsub], topics=ALL_TOPICS) + assert rec.is_recording + + pubsub.emit("/sensor/lidar", SimpleMsg(1.0)) + pubsub.emit("/sensor/odom", SimpleMsg(2.0)) + pubsub.emit("/sensor/lidar", SimpleMsg(3.0)) + await asyncio.sleep(0.05) # let timestamps diverge + + rec.stop_recording() + assert not rec.is_recording + + streams = rec.store.list_streams() + assert "sensor_lidar" in streams + assert "sensor_odom" in streams + + async def test_record_and_query(self, tmp_db: str) -> None: + pubsub = FakePubSub() + async with RecordReplay(tmp_db) as rec: + rec.start_recording([pubsub], topics=ALL_TOPICS) + + for i in range(10): + pubsub.emit("/data", SimpleMsg(float(i))) + await asyncio.sleep(0.01) + + rec.stop_recording() + + s = rec.store.stream("data") + assert s.count() == 10 + first = s.first() + assert isinstance(first.data, SimpleMsg) + assert first.data.value == 0.0 + + async def test_duration(self, tmp_db: str) -> None: + pubsub = FakePubSub() + async with RecordReplay(tmp_db) as rec: + rec.start_recording([pubsub], topics=ALL_TOPICS) + pubsub.emit("/a", SimpleMsg(0.0)) + await asyncio.sleep(0.1) + pubsub.emit("/a", SimpleMsg(1.0)) + rec.stop_recording() + + assert rec.duration >= 0.05 # at least some duration + + async def test_stream_info(self, tmp_db: str) -> None: + pubsub = FakePubSub() + async with RecordReplay(tmp_db) as rec: + rec.start_recording([pubsub], topics=ALL_TOPICS) + for i in range(5): + pubsub.emit("/sensor", SimpleMsg(float(i))) + await asyncio.sleep(0.01) + rec.stop_recording() + + infos = rec.stream_info() + assert len(infos) == 1 + assert infos[0]["name"] == "sensor" + assert infos[0]["count"] == 5 + + async def test_delete_range(self, tmp_db: str) -> None: + pubsub = FakePubSub() + async with RecordReplay(tmp_db) as rec: + rec.start_recording([pubsub], topics=ALL_TOPICS) + for i in range(20): + pubsub.emit("/data", SimpleMsg(float(i))) + await asyncio.sleep(0.01) + rec.stop_recording() + + before = rec.store.stream("data").count() + assert before == 20 + + dur = rec.duration + # Delete middle third + deleted = rec.delete_range(dur / 3, 2 * dur / 3) + assert deleted > 0 + + after = rec.store.stream("data").count() + assert after < before + + async def test_trim(self, tmp_db: str) -> None: + pubsub = FakePubSub() + async with RecordReplay(tmp_db) as rec: + rec.start_recording([pubsub], topics=ALL_TOPICS) + for i in range(30): + pubsub.emit("/data", SimpleMsg(float(i))) + await asyncio.sleep(0.01) + rec.stop_recording() + + before = rec.store.stream("data").count() + dur = rec.duration + # Trim to middle third + rec.trim(dur / 3, 2 * dur / 3) + + after = rec.store.stream("data").count() + assert after < before + + async def test_repr(self, tmp_db: str) -> None: + async with RecordReplay(tmp_db) as rec: + r = repr(rec) + assert "RecordReplay" in r + assert "streams=0" in r + + async def test_playback_runs(self, tmp_db: str) -> None: + """Test that playback task starts and finishes.""" + pubsub = FakePubSub() + async with RecordReplay(tmp_db) as rec: + rec.start_recording([pubsub], topics=ALL_TOPICS) + for i in range(5): + pubsub.emit("/data", SimpleMsg(float(i))) + await asyncio.sleep(0.01) + rec.stop_recording() + + with patch("dimos.record.record_replay.rr", autospec=True, spec_set=True): + rec.play(speed=100.0) # very fast + assert rec.is_playing + async with asyncio.timeout(5.0): + await rec._play_task + assert not rec.is_playing + + async def test_stop_playback(self, tmp_db: str) -> None: + pubsub = FakePubSub() + async with RecordReplay(tmp_db) as rec: + rec.start_recording([pubsub], topics=ALL_TOPICS) + for i in range(100): + pubsub.emit("/data", SimpleMsg(float(i))) + await asyncio.sleep(0.005) + rec.stop_recording() + + with patch("dimos.record.record_replay.rr", autospec=True, spec_set=True): + rec.play(speed=0.1) # slow + await asyncio.sleep(0.1) + assert rec.is_playing + await rec.stop_playback() + assert not rec.is_playing + + async def test_seek(self, tmp_db: str) -> None: + pubsub = FakePubSub() + async with RecordReplay(tmp_db) as rec: + rec.start_recording([pubsub], topics=ALL_TOPICS) + for i in range(10): + pubsub.emit("/data", SimpleMsg(float(i))) + await asyncio.sleep(0.01) + rec.stop_recording() + + await rec.seek(0.05) + assert rec.position == pytest.approx(0.05, abs=0.01) + + async def test_multiple_pubsubs(self, tmp_db: str) -> None: + ps1 = FakePubSub() + ps2 = FakePubSub() + async with RecordReplay(tmp_db) as rec: + rec.start_recording([ps1, ps2], topics=ALL_TOPICS) + ps1.emit("/from_ps1", SimpleMsg(1.0)) + ps2.emit("/from_ps2", SimpleMsg(2.0)) + rec.stop_recording() + + streams = rec.store.list_streams() + assert "from_ps1" in streams + assert "from_ps2" in streams diff --git a/dimos/robot/cli/dimos.py b/dimos/robot/cli/dimos.py index 522faf50be..a17fd1a2e0 100644 --- a/dimos/robot/cli/dimos.py +++ b/dimos/robot/cli/dimos.py @@ -14,6 +14,7 @@ from __future__ import annotations +import asyncio from collections.abc import Iterable from contextlib import suppress from datetime import datetime, timezone @@ -39,6 +40,7 @@ from dimos.core.global_config import GlobalConfig, global_config from dimos.core.run_registry import get_most_recent, is_pid_alive, stop_entry from dimos.robot.unitree.go2.cli.go2tool import app as go2tool_app +from dimos.utils.cli.recorder.run_recorder import main as recorder_main from dimos.utils.logging_config import setup_logger if TYPE_CHECKING: @@ -190,21 +192,16 @@ def load_config_args(config: type[BaseModel], args: Iterable[str], path: Path) - return kwargs # type: ignore[no-any-return] -@main.command() -def run( +async def _run( ctx: typer.Context, - robot_types: list[str] = typer.Argument(..., help="Blueprints or modules to run"), - daemon: bool = typer.Option(False, "--daemon", "-d", help="Run in background"), - disable: list[str] = typer.Option([], "--disable", help="Module names to disable"), - blueprint_args: list[str] = typer.Option((), "--option", "-o"), - config_path: Path = typer.Option( - CONFIG_DIR / "dimos", "--config", "-c", help="Path to config file" - ), - show_help: bool = typer.Option(False, "--help"), + robot_types: list[str], + daemon: bool, + disable: list[str], + blueprint_args: list[str], + config_path: Path, + show_help: bool, ) -> None: """Start a robot blueprint""" - logger.info("Starting DimOS") - from dimos.core.coordination.blueprints import autoconnect from dimos.core.coordination.module_coordinator import ModuleCoordinator from dimos.core.coordination.process_lifecycle import ( @@ -224,6 +221,11 @@ def run( cli_config_overrides: dict[str, Any] = ctx.obj + # Apply overrides early so blueprint module-level code (e.g. viewer + # selection) sees the correct global_config values at import time. + if cli_config_overrides: + global_config.update(**cli_config_overrides) + # Clean stale registry entries stale = cleanup_stale() if stale: @@ -306,7 +308,7 @@ def run( entry.save() spawn_watchdog(run_id, log_dir=log_dir) install_signal_handlers(entry, coordinator) - coordinator.loop() + await coordinator.loop() else: rpyc_port = coordinator.start_rpyc_service() entry = RunEntry( @@ -327,11 +329,28 @@ def run( # runs with a visible traceback. install_signal_handlers(entry, coordinator, sigint=False) try: - coordinator.loop() + await coordinator.loop() finally: entry.remove() +@main.command() +def run( + ctx: typer.Context, + robot_types: list[str] = typer.Argument(..., help="Blueprints or modules to run"), + daemon: bool = typer.Option(False, "--daemon", "-d", help="Run in background"), + disable: list[str] = typer.Option([], "--disable", help="Module names to disable"), + blueprint_args: list[str] = typer.Option((), "--option", "-o"), + config_path: Path = typer.Option( + CONFIG_DIR / "dimos", "--config", "-c", help="Path to config file" + ), + show_help: bool = typer.Option(False, "--help"), +) -> None: + """Start a robot blueprint""" + logger.info("Starting DimOS") + asyncio.run(_run(ctx, robot_types, daemon, disable, blueprint_args, config_path, show_help)) + + @main.command() def status() -> None: """Show the running DimOS instance.""" @@ -633,6 +652,13 @@ def top(ctx: typer.Context) -> None: dtop_main() +@main.command(context_settings={"allow_extra_args": True, "ignore_unknown_options": True}) +def recorder(ctx: typer.Context) -> None: + """Record and replay tool — terminal VLC for dimos recordings.""" + sys.argv = ["recorder", *ctx.args] + recorder_main() + + topic_app = typer.Typer(help="Topic commands for pub/sub") main.add_typer(topic_app, name="topic") diff --git a/dimos/robot/test_all_blueprints_generation.py b/dimos/robot/test_all_blueprints_generation.py index d8b0081d7f..8ba7d13f9f 100644 --- a/dimos/robot/test_all_blueprints_generation.py +++ b/dimos/robot/test_all_blueprints_generation.py @@ -32,7 +32,14 @@ "dimos/core/blueprints.py", "dimos/core/test_blueprints.py", } -BLUEPRINT_METHODS = {"transports", "global_config", "remappings", "requirements", "configurators"} +BLUEPRINT_METHODS = { + "transports", + "global_config", + "remappings", + "requirements", + "configurators", + "default_record_modules", +} _EXCLUDED_MODULE_NAMES = {"Module", "ModuleBase", "StreamModule"} diff --git a/dimos/robot/unitree/connection.py b/dimos/robot/unitree/connection.py index 919efc76f6..26dc23169f 100644 --- a/dimos/robot/unitree/connection.py +++ b/dimos/robot/unitree/connection.py @@ -17,7 +17,7 @@ import functools import threading import time -from typing import Any, TypeAlias +from typing import Any, TypeAlias, cast import numpy as np from numpy.typing import NDArray @@ -47,7 +47,7 @@ repair_stale_ts, ) from dimos.robot.unitree.type.lowstate import LowStateMsg -from dimos.robot.unitree.type.odometry import Odometry +from dimos.robot.unitree.type.odometry import Odometry, RawOdometryMessage from dimos.utils.decorators.decorators import simple_mcache from dimos.utils.reactive import backpressure, callback_to_observable @@ -244,11 +244,33 @@ def publish_request(self, topic: str, data: dict[Any, Any]) -> Any: @simple_mcache def raw_lidar_stream(self) -> Observable[RawLidarMsg]: - return backpressure(self.unitree_sub_stream(RTC_TOPIC["ULIDAR_ARRAY"])) + # Drop WebRTC retransmits at the source: under flaky-transport + # conditions Unitree's data channel occasionally redelivers the same + # logical frame, carrying an identical `data.stamp`. Both deliveries + # arrive back-to-back. Keeping both wastes downstream work (decode, + # codec, recording, viz) on a frame consumers already processed and + # leaves recordings with the same body_ts twice in a row. + return backpressure( + self.unitree_sub_stream(RTC_TOPIC["ULIDAR_ARRAY"]).pipe( + ops.distinct_until_changed( + key_mapper=lambda raw: cast("RawLidarMsg", raw)["data"]["stamp"] + ) + ) + ) @simple_mcache def raw_odom_stream(self) -> Observable[Pose]: - return backpressure(self.unitree_sub_stream(RTC_TOPIC["ROBOTODOM"])) + # Same WebRTC-retransmit dedup as raw_lidar_stream — odom just nests + # the stamp one level deeper inside `data.header`. + return backpressure( + self.unitree_sub_stream(RTC_TOPIC["ROBOTODOM"]).pipe( + ops.distinct_until_changed( + key_mapper=lambda raw: cast("RawOdometryMessage", raw)["data"]["header"][ + "stamp" + ] + ) + ) + ) @simple_mcache def lidar_stream(self) -> Observable[PointCloud2]: diff --git a/dimos/robot/unitree/go2/blueprints/basic/unitree_go2_basic.py b/dimos/robot/unitree/go2/blueprints/basic/unitree_go2_basic.py index 54a2c0f7c6..3c104e434f 100644 --- a/dimos/robot/unitree/go2/blueprints/basic/unitree_go2_basic.py +++ b/dimos/robot/unitree/go2/blueprints/basic/unitree_go2_basic.py @@ -150,6 +150,7 @@ def _go2_rerun_blueprint() -> Any: ) .global_config(n_workers=4, robot_model="unitree_go2") .configurators(ClockSyncConfigurator()) + .default_record_modules(GO2Connection) ) __all__ = [ diff --git a/dimos/robot/unitree/go2/connection.py b/dimos/robot/unitree/go2/connection.py index 98d1423dd6..dd402df207 100644 --- a/dimos/robot/unitree/go2/connection.py +++ b/dimos/robot/unitree/go2/connection.py @@ -311,6 +311,8 @@ def _publish_tf(self, msg: PoseStamped) -> None: def publish_camera_info(self) -> None: while True: + # We want to update timestamps, otherwise recordings etc. would fail + self.camera_info_static.ts = time.time() self.camera_info.publish(self.camera_info_static) time.sleep(1.0) diff --git a/dimos/utils/cli/recorder/run_recorder.py b/dimos/utils/cli/recorder/run_recorder.py new file mode 100644 index 0000000000..0dbc4db5a0 --- /dev/null +++ b/dimos/utils/cli/recorder/run_recorder.py @@ -0,0 +1,542 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""recorder — Terminal VLC for dimos recordings. + +Record from live LCM traffic, play back recordings, trim, seek. +Run ``rerun-bridge`` in another terminal to visualize playback. + +Usage:: + + recorder # interactive — record from LCM + recorder play my_recording.db # play an existing recording + recorder --help +""" + +from __future__ import annotations + +from collections import deque +import sys +import time +from typing import Any + +from rich.text import Text +from textual.app import App, ComposeResult +from textual.color import Color +from textual.containers import Horizontal +from textual.widgets import DataTable, Footer, Header, Static + +from dimos.record.record_replay import RecordReplay +from dimos.utils.cli import theme + +# Braille sparkline constants (same as dtop) +_BRAILLE_BASE = 0x2800 +_LDOTS = (0x00, 0x40, 0x44, 0x46, 0x47) +_RDOTS = (0x00, 0x80, 0xA0, 0xB0, 0xB8) +_SPARK_WIDTH = 16 + +from dimos.record.record_replay import topic_to_stream_name + + +def _heat(ratio: float) -> str: + """Map 0..1 to cyan -> yellow -> red.""" + cyan = Color.parse(theme.CYAN) + yellow = Color.parse(theme.YELLOW) + red = Color.parse(theme.RED) + if ratio <= 0.5: + return cyan.blend(yellow, ratio * 2).hex + return yellow.blend(red, (ratio - 0.5) * 2).hex + + +def _spark(history: deque[float], max_val: float, width: int = _SPARK_WIDTH) -> Text: + """Braille sparkline from a deque of values.""" + n = width * 2 + vals = list(history) + if len(vals) < n: + vals = [0.0] * (n - len(vals)) + vals + else: + vals = vals[-n:] + result = Text() + if max_val <= 0: + max_val = 1.0 + for i in range(0, n, 2): + lv = min(vals[i] / max_val, 1.0) + rv = min(vals[i + 1] / max_val, 1.0) + li = min(int(lv * 4 + 0.5), 4) + ri = min(int(rv * 4 + 0.5), 4) + ch = chr(_BRAILLE_BASE | _LDOTS[li] | _RDOTS[ri]) + result.append(ch, style=_heat(max(lv, rv))) + return result + + +def _fmt_time(seconds: float) -> str: + """Format seconds as MM:SS.s""" + if seconds < 0: + seconds = 0 + m = int(seconds) // 60 + s = seconds - m * 60 + return f"{m:02d}:{s:05.2f}" + + +def _progress_bar(position: float, duration: float, width: int = 40) -> Text: + """Render a progress bar with position indicator.""" + if duration <= 0: + return Text("░" * width, style=theme.DIM) + ratio = min(position / duration, 1.0) + filled = int(ratio * width) + result = Text() + result.append("█" * filled, style=theme.CYAN) + if filled < width: + result.append("▓", style=theme.BRIGHT_CYAN) + result.append("░" * (width - filled - 1), style=theme.DIM) + return result + + +def _short_type(channel: str) -> str: + """Extract the short type name from a channel string.""" + if "#" not in channel: + return "" + return channel.rsplit("#", 1)[-1].rsplit(".", 1)[-1] + + +class RecorderApp(App[None]): + """Terminal VLC for dimos recordings. + + Shows all live LCM topics (like lcmspy). Select topics then press + ``r`` to record, ``space`` to play back, arrow keys to seek, etc. + """ + + CSS_PATH = "../dimos.tcss" + + CSS = f""" + Screen {{ + layout: vertical; + background: {theme.BACKGROUND}; + }} + #streams {{ + height: 1fr; + border: solid {theme.BORDER}; + background: {theme.BG}; + scrollbar-size: 0 0; + }} + #streams > .datatable--header {{ + color: {theme.ACCENT}; + background: transparent; + }} + #streams > .datatable--cursor {{ + background: {theme.BRIGHT_BLACK}; + }} + #timeline {{ + height: 5; + padding: 1 2; + background: {theme.BG}; + border-top: solid {theme.DIM}; + }} + #controls {{ + height: 3; + padding: 0 2; + background: {theme.BG}; + border-top: solid {theme.DIM}; + }} + #status-left {{ + width: 1fr; + }} + #status-right {{ + width: auto; + }} + """ + + BINDINGS = [ + ("q", "quit", "Quit"), + ("space", "toggle_select", "Toggle"), + ("a", "select_all", "All"), + ("n", "select_none", "None"), + ("r", "toggle_record", "Rec"), + ("p", "toggle_play", "Play"), + ("s", "stop_all", "Stop"), + ("left", "seek_back", "-5s"), + ("right", "seek_fwd", "+5s"), + ("shift+left", "seek_back_big", "-30s"), + ("shift+right", "seek_fwd_big", "+30s"), + ("[", "mark_trim_start", "In"), + ("]", "mark_trim_end", "Out"), + ("t", "do_trim", "Trim"), + ("d", "do_delete", "Del"), + ] + + def __init__( + self, + db_path: str | None = None, + autoplay: bool = False, + ) -> None: + super().__init__() + from dimos.protocol.service.lcmservice import autoconf + + autoconf(check_only=True) + + if db_path is None: + ts = time.strftime("%Y%m%d_%H%M%S") + self._db_path = f"recording_{ts}.db" + else: + self._db_path = db_path + self._autoplay = autoplay + self._recorder: RecordReplay | None = None + self._lcm: Any = None + self._spy: Any = None + + # Per-stream sparkline history keyed by stream_name + self._freq_history: dict[str, deque[float]] = {} + # Set of selected stream names (for recording) + self._selected: set[str] = set() + + self._trim_in: float | None = None + self._trim_out: float | None = None + + self._table: DataTable[Any] | None = None + + def compose(self) -> ComposeResult: + yield Header(show_clock=True) + self._table = DataTable(zebra_stripes=False, cursor_type="row") + self._table.id = "streams" + self._table.add_column("REC", key="sel", width=5) + self._table.add_column("Topic", key="topic") + self._table.add_column("Type", key="type") + self._table.add_column("Freq", key="freq") + self._table.add_column("Bandwidth", key="bw") + self._table.add_column("Recorded", key="rec") + self._table.add_column("Activity", key="activity") + yield self._table + yield Static(id="timeline") + with Horizontal(id="controls"): + yield Static(id="status-left") + yield Static(id="status-right") + yield Footer() + + def on_mount(self) -> None: + from dimos.protocol.pubsub.impl.lcmpubsub import LCM + from dimos.utils.cli.lcmspy.lcmspy import GraphLCMSpy + + self._lcm = LCM() + + # Live topic discovery via LCM spy (same as lcmspy tool) + self._spy = GraphLCMSpy(graph_log_window=0.5) + self._spy.start() + + self._recorder = RecordReplay(self._db_path) + self.title = f"recorder — {self._recorder.path}" + self.set_interval(0.5, self._refresh) + + if self._autoplay and self._db_path: + self._start_playback() + + async def on_unmount(self) -> None: + if self._recorder: + await self._recorder.close() + if self._spy: + self._spy.stop() + if self._lcm and hasattr(self._lcm, "stop"): + self._lcm.stop() + + def action_toggle_select(self) -> None: + """Toggle selection on the row under the cursor.""" + if self._table is None or self._table.row_count == 0: + return + row_key, _ = self._table.coordinate_to_cell_key(self._table.cursor_coordinate) + name = str(row_key.value) + if name in self._selected: + self._selected.discard(name) + else: + self._selected.add(name) + + def action_select_all(self) -> None: + """Select all visible topics.""" + if self._spy is None: + return + with self._spy._topic_lock: + channels = list(self._spy.topic.keys()) + for ch in channels: + self._selected.add(topic_to_stream_name(ch)) + # Also include any already-recorded streams + if self._recorder: + self._selected.update(self._recorder.store.list_streams()) + + def action_select_none(self) -> None: + self._selected.clear() + + def action_toggle_play(self) -> None: + if self._recorder is None: + return + if self._recorder.is_recording: + return + if self._recorder.is_playing: + if self._recorder.is_paused: + self._recorder.resume() + else: + self._recorder.pause() + else: + self._start_playback() + + def action_toggle_record(self) -> None: + if self._recorder is None: + return + if self._recorder.is_playing: + return + if self._recorder.is_recording: + self._recorder.stop_recording() + else: + topics = self._selected_topics() if self._selected else self._all_topics() + self._recorder.start_recording([self._lcm], topics=topics) + + async def action_stop_all(self) -> None: + if self._recorder is None: + return + self._recorder.stop_recording() + await self._recorder.stop_playback() + + async def action_seek_back(self) -> None: + await self._seek_relative(-5.0) + + async def action_seek_fwd(self) -> None: + await self._seek_relative(5.0) + + async def action_seek_back_big(self) -> None: + await self._seek_relative(-30.0) + + async def action_seek_fwd_big(self) -> None: + await self._seek_relative(30.0) + + def action_mark_trim_start(self) -> None: + if self._recorder: + self._trim_in = self._recorder.position + + def action_mark_trim_end(self) -> None: + if self._recorder: + self._trim_out = self._recorder.position + + async def action_do_trim(self) -> None: + if self._recorder and self._trim_in is not None and self._trim_out is not None: + await self._recorder.stop_playback() + lo, hi = sorted((self._trim_in, self._trim_out)) + self._recorder.trim(lo, hi) + self._trim_in = self._trim_out = None + + async def action_do_delete(self) -> None: + if self._recorder and self._trim_in is not None and self._trim_out is not None: + await self._recorder.stop_playback() + lo, hi = sorted((self._trim_in, self._trim_out)) + self._recorder.delete_range(lo, hi) + self._trim_in = self._trim_out = None + + def _all_topics(self) -> list[Any]: + """All discovered topics from the spy.""" + from dimos.protocol.pubsub.impl.lcmpubsub import Topic as LCMTopic + + if not self._spy: + return [] + return [LCMTopic.from_channel_str(ch) for ch in list(self._spy.topic)] + + def _selected_topics(self) -> list[Any]: + """Map selected stream names back to LCM Topics via the spy.""" + from dimos.protocol.pubsub.impl.lcmpubsub import Topic as LCMTopic + from dimos.record.record_replay import topic_to_stream_name + + if not self._spy: + return [] + return [ + LCMTopic.from_channel_str(ch) + for ch in list(self._spy.topic) + if topic_to_stream_name(ch) in self._selected + ] + + def _start_playback(self) -> None: + if self._recorder is None or self._lcm is None: + return + if hasattr(self._lcm, "start"): + self._lcm.start() + self._recorder.play(speed=1.0) + + async def _seek_relative(self, delta: float) -> None: + if self._recorder: + await self._recorder.seek(self._recorder.position + delta) + + def _refresh(self) -> None: + if self._table is None: + return + assert self._recorder is not None + spy = self._spy + + # Build unified row list: live topics + recorded-only streams + # Each row: (stream_name, channel, spy_topic_or_None) + rows: dict[str, tuple[str, Any]] = {} # stream_name -> (channel, spy_topic) + + if spy is not None: + with spy._topic_lock: + live_topics: dict[str, Any] = dict(spy.topic) # channel -> GraphTopic + for channel, spy_topic in live_topics.items(): + sname = topic_to_stream_name(channel) + rows[sname] = (channel, spy_topic) + + # Add streams that exist in the recording but are not live + recorded_streams = set(self._recorder.store.list_streams()) + for sname in recorded_streams: + if sname not in rows: + rows[sname] = (sname, None) + + # Render table + # Remember cursor position so we can restore it + cursor_row = self._table.cursor_coordinate.row if self._table.row_count > 0 else 0 + self._table.clear(columns=False) + + sorted_names = sorted(rows.keys()) + for sname in sorted_names: + channel, spy_topic = rows[sname] + + # Selection marker + is_sel = sname in self._selected + if is_sel: + sel = Text(" [●] ", style=f"bold {theme.BRIGHT_GREEN}") + else: + sel = Text(" [ ] ", style=theme.DIM) + + # Topic name — green when actively recording, bright when selected + if self._recorder.is_recording and sname in (self._recorder.store.list_streams()): + topic_style = f"bold {theme.BRIGHT_GREEN}" + elif is_sel: + topic_style = theme.BRIGHT_WHITE + else: + topic_style = theme.FOREGROUND + + # Type + type_str = _short_type(channel) if "#" in channel else "" + + # Live freq / bandwidth from spy + if spy_topic is not None: + freq = spy_topic.freq(5.0) + freq_text = Text(f"{freq:.1f} Hz", style=_heat(min(freq / 30.0, 1.0))) + kbps = spy_topic.kbps(5.0) + bw_text = Text(spy_topic.kbps_hr(5.0), style=_heat(min(kbps / 3072, 1.0))) + else: + freq_text = Text("—", style=theme.DIM) + bw_text = Text("—", style=theme.DIM) + + # Recorded count + if sname in recorded_streams: + count = self._recorder.store.stream(sname).count() + rec_text = Text(str(count), style=theme.YELLOW) + else: + rec_text = Text("", style=theme.DIM) + + # Sparkline from spy freq history + if sname not in self._freq_history: + self._freq_history[sname] = deque(maxlen=_SPARK_WIDTH * 2) + if spy_topic is not None: + self._freq_history[sname].append(spy_topic.freq(0.5)) + else: + self._freq_history[sname].append(0.0) + max_f = max(self._freq_history[sname]) if self._freq_history[sname] else 1.0 + activity = _spark(self._freq_history[sname], max_f) + + self._table.add_row( + sel, + Text(sname, style=topic_style), + Text(type_str, style=theme.BLUE), + freq_text, + bw_text, + rec_text, + activity, + key=sname, + ) + + # Restore cursor + if self._table.row_count > 0: + row = min(cursor_row, self._table.row_count - 1) + self._table.move_cursor(row=row) + + # Timeline + duration = self._recorder.duration + position = self._recorder.position + + timeline = Text() + timeline.append(" ") + timeline.append(_fmt_time(position), style=theme.BRIGHT_WHITE) + timeline.append(" ", style=theme.DIM) + timeline.append_text(_progress_bar(position, duration, width=50)) + timeline.append(" ", style=theme.DIM) + timeline.append(_fmt_time(duration), style=theme.FOREGROUND) + + if self._trim_in is not None or self._trim_out is not None: + timeline.append("\n ") + timeline.append("[", style=theme.YELLOW) + timeline.append( + _fmt_time(self._trim_in) if self._trim_in is not None else "--:--", + style=theme.YELLOW if self._trim_in is not None else theme.DIM, + ) + timeline.append(" .. ", style=theme.DIM) + timeline.append( + _fmt_time(self._trim_out) if self._trim_out is not None else "--:--", + style=theme.YELLOW if self._trim_out is not None else theme.DIM, + ) + timeline.append("]", style=theme.YELLOW) + + self.query_one("#timeline", Static).update(timeline) + + # Status bar + status = Text() + if self._recorder.is_recording: + status.append(" ● REC ", style=f"bold on {theme.RED}") + elif self._recorder.is_paused: + status.append(" ❚❚ PAUSED ", style=theme.YELLOW) + elif self._recorder.is_playing: + status.append(" ▶ PLAYING ", style=theme.BRIGHT_GREEN) + else: + status.append(" ■ STOPPED ", style=theme.DIM) + + n_live = len([r for r in rows.values() if r[1] is not None]) + n_sel = len(self._selected) + status.append(f" {n_live} live", style=theme.FOREGROUND) + if n_sel: + status.append(f" {n_sel} selected", style=theme.BRIGHT_GREEN) + if recorded_streams: + status.append(f" {len(recorded_streams)} recorded", style=theme.YELLOW) + + # Contextual hint + if not (self._recorder.is_recording or self._recorder.is_playing): + if n_live > 0 and n_sel == 0: + status.append(" SPACE select, A all, R rec", style=theme.DIM) + elif n_sel > 0: + status.append(" R to record selected", style=theme.DIM) + + self.query_one("#status-left", Static).update(status) + + rhs = Text() + rhs.append(f"{self._recorder.path} ", style=theme.DIM) + self.query_one("#status-right", Static).update(rhs) + + +def main() -> None: + db_path: str | None = None + autoplay = False + + args = sys.argv[1:] + if args and args[0] == "play" and len(args) > 1: + db_path = args[1] + autoplay = True + elif args and not args[0].startswith("-"): + db_path = args[0] + + RecorderApp(db_path=db_path, autoplay=autoplay).run() + + +if __name__ == "__main__": + main() diff --git a/dimos/visualization/rerun/bridge.py b/dimos/visualization/rerun/bridge.py index f2e3e51d08..63dd28303a 100644 --- a/dimos/visualization/rerun/bridge.py +++ b/dimos/visualization/rerun/bridge.py @@ -20,13 +20,13 @@ from dataclasses import field from functools import lru_cache import subprocess +import sys import time from typing import ( Any, Literal, Protocol, TypeAlias, - TypeGuard, cast, runtime_checkable, ) @@ -47,7 +47,12 @@ from dimos.utils.logging_config import setup_logger from dimos.visualization.rerun.init import rerun_init -RERUN_GRPC_PORT = 9877 +if sys.version_info >= (3, 13): + from typing import TypeIs +else: + from typing import TypeGuard as TypeIs + +RERUN_GRPC_PORT = 9876 RERUN_WEB_PORT = 9090 # TODO OUT visual annotations @@ -96,20 +101,16 @@ BlueprintFactory: TypeAlias = Callable[[], "Blueprint"] # to_rerun() can return a single archetype or a list of (entity_path, archetype) tuples -RerunMulti: TypeAlias = "list[tuple[str, Archetype]]" +RerunMulti: TypeAlias = list[tuple[str, Archetype]] RerunData: TypeAlias = "Archetype | RerunMulti" -def is_rerun_multi(data: Any) -> TypeGuard[RerunMulti]: +def is_rerun_multi(data: object) -> TypeIs[RerunMulti]: """Check if data is a list of (entity_path, archetype) tuples.""" - return ( - isinstance(data, list) - and bool(data) - and isinstance(data[0], tuple) - and len(data[0]) == 2 - and isinstance(data[0][0], str) - and isinstance(data[0][1], Archetype) - ) + match data: + case [(str(), Archetype()), *_]: + return True + return False @runtime_checkable @@ -294,6 +295,8 @@ def _on_message(self, msg: Any, topic: Any) -> None: @rpc def start(self) -> None: + # Delay import to reduce import time (~2.4s) + super().start() logger.info("Rerun bridge starting", viewer_mode=self.config.viewer_mode) @@ -326,7 +329,7 @@ def start(self) -> None: ) rr.spawn(connect=True, memory_limit=self.config.memory_limit) elif self.config.viewer_mode == "web": - server_uri = rr.serve_grpc() + server_uri = rr.serve_grpc(grpc_port=RERUN_GRPC_PORT) rr.serve_web_viewer(connect_to=server_uri, open_browser=False) elif self.config.viewer_mode == "connect": diff --git a/docs/usage/blueprints.md b/docs/usage/blueprints.md index cc6030be66..5811b80a14 100644 --- a/docs/usage/blueprints.md +++ b/docs/usage/blueprints.md @@ -361,6 +361,32 @@ class SomeSkill(Module): return "result" ``` +## Recording + +Blueprints can declare which modules are sensor sources via `default_record_modules`. When `--record-path` is used, only these modules' Out streams are recorded: + +```python session=blueprint-ex4 +from dimos.robot.unitree.go2.connection import GO2Connection + +unitree_go2 = ( + autoconnect(with_vis, GO2Connection.blueprint(), WebsocketVisModule.blueprint()) + .global_config(n_workers=4, robot_model="unitree_go2") + .default_record_modules(GO2Connection) +) +``` + +Recording captures decoded message objects directly via `Out.subscribe()` — no transport dependency. Codecs are auto-selected (JPEG for images, LCM for standard messages). Sensor timestamps are preserved from `msg.ts`. + +```bash +# Record sensor outputs +dimos --record-path recording.db run unitree-go2 + +# Replay (disables modules whose Outs are in the recording) +dimos --replay-file recording.db --viewer rerun-web run unitree-go2 +``` + +During replay, modules whose Out stream names overlap with the recording are disabled. The recorded data is published to LCM at realtime speed, and `rerun-bridge` handles visualization. + ## Building All you have to do to build a blueprint is call: diff --git a/docs/usage/cli.md b/docs/usage/cli.md index 017b441c7e..09a61cc3cf 100644 --- a/docs/usage/cli.md +++ b/docs/usage/cli.md @@ -16,7 +16,9 @@ dimos [GLOBAL OPTIONS] COMMAND [ARGS] | `--robot-ips` | TEXT | `None` | Multiple robot IPs | | `--simulation` / `--no-simulation` | bool | `False` | Enable MuJoCo simulation | | `--replay` / `--no-replay` | bool | `False` | Use recorded replay data | -| `--replay-db` | TEXT | `go2_bigoffice` | Replay memory2 SQLite database name | +| `--replay-db` | TEXT | `go2_china_office` | Replay memory2 SQLite database name | +| `--replay-file` | TEXT | `None` | Path to a `.db` recording file for replay | +| `--record-path` | TEXT | `None` | Record module outputs to a `.db` file | | `--new-memory` / `--no-new-memory` | bool | `False` | Clear persistent memory on start | | `--viewer` | `rerun\|rerun-web\|rerun-connect\|foxglove\|none` | `rerun` | Visualization backend | | `--n-workers` | INT | `2` | Number of forkserver workers | @@ -78,9 +80,15 @@ dimos run unitree-go2 # Background (returns immediately) dimos run unitree-go2-agentic --daemon -# Replay with Rerun viewer +# Replay with Rerun viewer (legacy pickle datasets) dimos --replay --viewer rerun run unitree-go2 +# Record all sensor outputs to a database +dimos --record-path recording.db run unitree-go2 + +# Replay a recording (disables recorded modules, replays via LCM) +dimos --replay-file recording.db --viewer rerun-web run unitree-go2 + # Real robot dimos run unitree-go2-agentic --robot-ip 192.168.123.161 @@ -91,6 +99,24 @@ dimos run unitree-go2 keyboard-teleop dimos run unitree-go2-agentic --disable OsmSkill WebInput ``` +#### Record/replay end-to-end test + +```bash +# 1) Start a stack and record all configured output streams +dimos --record-path /tmp/recording.db run unitree-go2 + +# 2) Stop the run, then replay the same file +dimos --replay-file /tmp/recording.db --viewer rerun-web run unitree-go2 +``` + +For interactive topic selection, use `dimos recorder` instead of `--record-path`: + +```bash +dimos recorder /tmp/recording.db +``` + +Then select streams with `space` and press `r` to start/stop recording. + When `--daemon` is used, the process: 1. Builds and starts all modules (foreground — you see errors) 2. Runs a health check (polls worker PIDs) @@ -313,6 +339,24 @@ Or run standalone: dtop ``` +### `recorder` + +Record and replay tool — a terminal UI for recording LCM streams, playing back recordings, and trimming. +It allows interactive selection of topics to record. + +```bash +dimos recorder [filename] +``` + +Common controls: +- `space`: select/deselect current stream +- `r`: start/stop recording selected streams +- `p`: play/pause recording +- `s`: stop playback/recording +- `[` / `]`: mark trim in/out +- `t`: trim to marked range +- `d`: delete marked range + ### `rerun-bridge` Launch the Rerun visualization bridge as a standalone process (outside of a blueprint). diff --git a/pyproject.toml b/pyproject.toml index 55a7f0fbd7..227530a50a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -103,6 +103,7 @@ dimos = "dimos.robot.cli.dimos:main" rerun-bridge = "dimos.visualization.rerun.bridge:app" doclinks = "dimos.utils.docs.doclinks:main" dtop = "dimos.utils.cli.dtop:main" +recorder = "dimos.utils.cli.recorder.run_recorder:main" [project.urls] Homepage = "https://dimensionalos.com"