Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
aa05448
Memory2 record/replay functionality
Mar 29, 2026
4d7f98d
Merge branch 'dev' into record-replay
Mar 29, 2026
8f763c1
Fix async
Mar 30, 2026
4e646d6
Load in viewer as separate source
Mar 31, 2026
6c7ff5d
Fix playback
Mar 31, 2026
ff0adfc
Use AllPubSub
Apr 4, 2026
9083096
Recording fixes
Apr 6, 2026
8315178
Fix playback
Apr 7, 2026
2b9a476
Merge branch 'dev' into record-replay
Apr 9, 2026
c473fda
Merge branch 'dev' into record-replay
Apr 9, 2026
46b38b8
Fix duplicate recording timestamps
Dreamsorcerer Apr 10, 2026
99e6013
Fixes
Dreamsorcerer Apr 10, 2026
81157b4
Revert flake.nix
Dreamsorcerer Apr 10, 2026
9d69576
Cleanup
Dreamsorcerer Apr 10, 2026
6bf01da
Import time
Dreamsorcerer Apr 10, 2026
7b6d0c1
Docs
Dreamsorcerer Apr 10, 2026
a20eaf7
Merge branch 'dev' into record-replay
Dreamsorcerer Apr 13, 2026
4e6ea2c
Typing fixes
Dreamsorcerer Apr 13, 2026
ad6debf
Typing fixes
Dreamsorcerer Apr 13, 2026
b11e938
Revert
Dreamsorcerer Apr 13, 2026
bc563b7
Remove init files
Dreamsorcerer Apr 13, 2026
2d6f159
Fix tests
Dreamsorcerer Apr 13, 2026
5b4bdf4
Fix
Dreamsorcerer Apr 13, 2026
2dfc23e
Fix
Dreamsorcerer Apr 13, 2026
787bd66
Fix
Dreamsorcerer Apr 13, 2026
d9f6ff3
Adjust imports
Dreamsorcerer Apr 13, 2026
480f853
Fix args
Dreamsorcerer Apr 13, 2026
28de1d2
Fixes
Dreamsorcerer Apr 13, 2026
34a20ba
Fix test
Dreamsorcerer Apr 14, 2026
4cbf61e
Merge dev into record-replay
Copilot Apr 21, 2026
c3588c0
docs: add record/replay usage and test steps
Copilot Apr 21, 2026
d0f1b63
adressing Paul pr comments
leshy May 5, 2026
15e1844
Merge remote-tracking branch 'origin/dev' into ivan/record-replay-wrap
leshy May 5, 2026
2977242
removed store metadata intervention
leshy May 5, 2026
452130d
webrtc retransmit issue fix
leshy May 5, 2026
08337ce
webrtc lidar timestamps have been fixed
leshy May 5, 2026
edf2b76
record/replay should use wall clock for msg ts, not record time
leshy May 5, 2026
d66a7bb
final cleanup
leshy May 5, 2026
5434e17
sql transaction fix
leshy May 5, 2026
c292aa7
Merge remote-tracking branch 'origin/dev' into ivan/record-replay-wrap
leshy May 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,8 @@ htmlcov/
.coverage
.coverage.*

# Created from simulation
MUJOCO_LOG.TXT

# Memory2 autorecord
recording*.db
8 changes: 5 additions & 3 deletions dimos/control/examples/twist_base_keyboard_teleop.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@

from __future__ import annotations

import asyncio

from dimos.control.blueprints.mobile import coordinator_mock_twist_base
from dimos.core.coordination.module_coordinator import ModuleCoordinator
from dimos.robot.unitree.keyboard_teleop import KeyboardTeleop


def main() -> None:
async def main() -> None:
"""Run mock twist base + keyboard teleop."""
coord = ModuleCoordinator.build(coordinator_mock_twist_base)
teleop = ModuleCoordinator.build(KeyboardTeleop.blueprint())
Expand All @@ -52,9 +54,9 @@ def main() -> None:
teleop.start()

# Block until Ctrl+C — loop() handles KeyboardInterrupt and calls stop()
coord.loop()
await coord.loop()
teleop.stop()


if __name__ == "__main__":
main()
asyncio.run(main())
5 changes: 5 additions & 0 deletions dimos/core/coordination/blueprints.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ class Blueprint:
)
requirement_checks: tuple[Callable[[], str | None], ...] = field(default_factory=tuple)
configurator_checks: "tuple[SystemConfigurator, ...]" = field(default_factory=tuple)
record_modules: tuple[type[ModuleBase], ...] = field(default_factory=tuple)

@classmethod
def create(cls, module: type[ModuleBase], **kwargs: Any) -> "Blueprint":
Expand All @@ -163,6 +164,9 @@ def create(cls, module: type[ModuleBase], **kwargs: Any) -> "Blueprint":
def disabled_modules(self, *modules: type[ModuleBase]) -> "Blueprint":
return replace(self, disabled_modules_tuple=self.disabled_modules_tuple + modules)

def default_record_modules(self, *modules: type[ModuleBase]) -> "Blueprint":
return replace(self, record_modules=self.record_modules + modules)

def config(self) -> type:
configs = {
b.module.name: (get_type_hints(b.module)["config"] | None, None)
Expand Down Expand Up @@ -227,6 +231,7 @@ def autoconnect(*blueprints: Blueprint) -> Blueprint:
remapping_map=MappingProxyType(all_remappings),
requirement_checks=all_requirement_checks,
configurator_checks=all_configurator_checks,
record_modules=tuple(module for bp in blueprints for module in bp.record_modules),
)


Expand Down
90 changes: 86 additions & 4 deletions dimos/core/coordination/module_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@

from __future__ import annotations

import asyncio
from collections import defaultdict
from collections.abc import Mapping, MutableMapping
import dataclasses
import importlib
import inspect
from pathlib import Path
import shutil
import sys
import threading
from types import MappingProxyType
from typing import TYPE_CHECKING, Any, cast

from dimos.core.coordination.rpyc_server import RpycServer
Expand All @@ -31,6 +35,7 @@
from dimos.core.module import ModuleBase, ModuleSpec
from dimos.core.resource import Resource
from dimos.core.transport import LCMTransport, PubSubTransport, pLCMTransport
from dimos.record.record_replay import RecordReplay
from dimos.spec.utils import is_spec, spec_annotation_compliance, spec_structural_compliance
from dimos.utils.generic import short_id
from dimos.utils.logging_config import setup_logger
Expand Down Expand Up @@ -257,6 +262,25 @@ def build(
if "g" in blueprint_args:
global_config.update(**blueprint_args.pop("g"))

# Auto-replay if --replay-file is set
replay_file = global_config.replay_file
if replay_file:
logger.info("Auto-replay from %s", replay_file)
# Strip replay_file from all override sources so the nested build()
# inside replay() does not re-enter this branch.
global_config.replay_file = None
clean_bp = dataclasses.replace(
blueprint,
global_config_overrides=MappingProxyType(
{
k: v
for k, v in blueprint.global_config_overrides.items()
if k != "replay_file"
}
),
)
return cls.replay(clean_bp, replay_file, blueprint_args=dict(blueprint_args))

_run_configurators(blueprint)
_check_requirements(blueprint)
_verify_no_name_conflicts(blueprint)
Expand All @@ -272,10 +296,68 @@ def build(
coordinator.build_all_modules()
coordinator.start_all_modules()

if global_config.record_path:
# Delete existing file, don't append to it.
Path(global_config.record_path).unlink(missing_ok=True)
record_modules = blueprint.record_modules
for bp in blueprint.active_blueprints:
if bp.module in record_modules:
instance = coordinator.get_instance(bp.module)
if instance is not None:
instance.start_recording_outputs(global_config.record_path)

_log_blueprint_graph(blueprint, coordinator)

return coordinator

@classmethod
def replay(
cls,
blueprint: Blueprint,
recording_path: str,
*,
speed: float = 1.0,
blueprint_args: MutableMapping[str, Any] | None = None,
) -> ModuleCoordinator:
"""Build with a recording replacing some module outputs."""
recording = RecordReplay(recording_path)
recorded_streams = set(recording.store.list_streams())
if not recorded_streams:
raise ValueError("Recording is empty — no streams to replay")

modules_to_disable: list[type[ModuleBase]] = []
for bp in blueprint.blueprints:
out_names = {c.name for c in bp.streams if c.direction == "out"}
if not out_names:
continue
covered = out_names & recorded_streams
if covered:
modules_to_disable.append(bp.module)
uncovered = out_names - covered
if uncovered:
logger.warning(
"Replay: disabling %s (partial: replaying %s, missing %s)",
bp.module.__name__,
covered,
uncovered,
)
else:
logger.info("Replay: disabling %s (all OUTs covered)", bp.module.__name__)

if not modules_to_disable:
logger.warning(
"Replay: no modules disabled - recording streams %s"
"don't match any module OUT names",
recorded_streams,
)

patched = blueprint.disabled_modules(*modules_to_disable)
coordinator = cls.build(patched, blueprint_args)

recording.play(speed=speed)

return coordinator
Comment thread
leshy marked this conversation as resolved.

Comment thread
leshy marked this conversation as resolved.
def load_blueprint(
self,
blueprint: Blueprint,
Expand Down Expand Up @@ -512,11 +594,11 @@ def _restart_module(

return new_proxy

def loop(self) -> None:
stop = threading.Event()
async def loop(self) -> None:
stop = asyncio.Event()
try:
stop.wait()
except KeyboardInterrupt:
await stop.wait()
except (KeyboardInterrupt, asyncio.CancelledError):
return
finally:
self.stop()
Expand Down
2 changes: 2 additions & 0 deletions dimos/core/global_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class GlobalConfig(BaseSettings):
simulation: bool = False
replay: bool = False
replay_db: str = "go2_short"
replay_file: str | None = None
record_path: str | None = None
new_memory: bool = False
viewer: ViewerBackend = "rerun"
n_workers: int = 2
Expand Down
33 changes: 33 additions & 0 deletions dimos/core/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import json
import sys
import threading
import time
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -40,6 +41,7 @@
from dimos.core.resource import CompositeResource
from dimos.core.rpc_client import RpcCall
from dimos.core.stream import In, Out, RemoteOut, Transport
from dimos.memory2.store.sqlite import SqliteStore
from dimos.protocol.rpc.pubsubrpc import LCMRPC
from dimos.protocol.rpc.spec import DEFAULT_RPC_TIMEOUT, DEFAULT_RPC_TIMEOUTS, RPCSpec
from dimos.protocol.service.spec import BaseConfig, Configurable
Expand All @@ -57,6 +59,7 @@
from dimos.core.coordination.blueprints import Blueprint
from dimos.core.introspection.module.info import ModuleInfo
from dimos.core.rpc_client import RPCClient
from dimos.memory2.stream import Stream

if sys.version_info >= (3, 13):
from typing import TypeVar
Expand Down Expand Up @@ -120,12 +123,14 @@ class ModuleBase(Configurable, CompositeResource):
_module_closed: bool = False
_module_closed_lock: threading.Lock
_loop_thread_timeout: float = 2.0
_rec_store: SqliteStore | None = None
_main_gen: AsyncGenerator[None, None] | None = None
_tools: dict[str, Any]
_tools_lock: threading.Lock

def __init__(self, config_args: dict[str, Any]) -> None:
super().__init__(**config_args)
self._rec_unsubs: list[Callable[[], None]] = []
self._module_closed_lock = threading.Lock()
self._tools = {}
self._tools_lock = threading.Lock()
Expand Down Expand Up @@ -170,6 +175,7 @@ def start(self) -> None:
def stop(self) -> None:
self._stop_main()
super().stop()
self.stop_recording_outputs()
self._close_module()

def _close_module(self) -> None:
Expand Down Expand Up @@ -408,6 +414,33 @@ def blueprint(self) -> _BlueprintPartial:

return partial(Blueprint.create, self) # type: ignore[arg-type]

@rpc
def start_recording_outputs(self, db_path: str) -> None:
from dimos.memory2.store.sqlite import SqliteStore

if self._rec_store is not None:
raise RuntimeError("Recording already in progress")
self._rec_store = SqliteStore(path=db_path)
for name, out in self.outputs.items():
stream = self._rec_store.stream(name, out.type)

def cb(msg: Any, _stream: "Stream[object]" = stream) -> None:
# Storage ts is wall-clock at record time. Sensor / message
# timestamps live on the payload (e.g. ``msg.ts``) and are
# preserved there. Storage ts must be unique within a stream.
_stream.append(msg, ts=time.time())

self._rec_unsubs.append(out.subscribe(cb))

@rpc
def stop_recording_outputs(self) -> None:
Comment thread
Dreamsorcerer marked this conversation as resolved.
for unsub in self._rec_unsubs:
unsub()
self._rec_unsubs = []
if self._rec_store is not None:
self._rec_store.stop()
self._rec_store = None

@rpc
def set_module_ref(self, name: str, module_ref: "RPCClient") -> None:
setattr(self, name, module_ref)
Expand Down
2 changes: 1 addition & 1 deletion dimos/core/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def test_classmethods() -> None:
# Check that we have the expected RPC methods
assert "navigate_to" in class_rpcs, "navigate_to should be in rpcs"
assert "start" in class_rpcs, "start should be in rpcs"
assert len(class_rpcs) == 7
assert len(class_rpcs) == 9

# Check that the values are callable
assert callable(class_rpcs["navigate_to"]), "navigate_to should be callable"
Expand Down
2 changes: 2 additions & 0 deletions dimos/experimental/security_demo/security_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,8 @@ def _cancel_current_goal(self) -> None:
with self._lock:
pose = self._latest_pose
if pose is not None:
# We want to update timestamps, otherwise recordings etc. would fail
pose.ts = time.time()
self.goal_request.publish(pose)

def _transition_to(self, new_state: State) -> None:
Expand Down
12 changes: 12 additions & 0 deletions dimos/memory2/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from __future__ import annotations

from contextlib import suppress
from dataclasses import replace
from typing import TYPE_CHECKING, Any, Generic, TypeVar

Expand Down Expand Up @@ -241,6 +242,17 @@ def _iterate_live(
finally:
sub.dispose()

def delete_range(self, t1: float, t2: float) -> int:
"""Delete observations in [t1, t2] from all stores. Returns count deleted."""
ids = self.metadata_store.delete_range(t1, t2)
for obs_id in ids:
if self.blob_store is not None:
with suppress(KeyError):
self.blob_store.delete(self.name, obs_id)
if self.vector_store is not None:
self.vector_store.delete(self.name, obs_id)
return len(ids)

def count(self, query: StreamQuery) -> int:
if query.search_vec:
return sum(1 for _ in self.iterate(query))
Expand Down
4 changes: 2 additions & 2 deletions dimos/memory2/codecs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import importlib
from typing import Any, Protocol, TypeVar, runtime_checkable

from dimos.msgs.sensor_msgs.Image import Image

T = TypeVar("T")


Expand All @@ -33,8 +35,6 @@ def codec_for(payload_type: type[Any] | None = None) -> Codec[Any]:
from dimos.memory2.codecs.pickle import PickleCodec

if payload_type is not None:
from dimos.msgs.sensor_msgs.Image import Image

if issubclass(payload_type, Image):
from dimos.memory2.codecs.jpeg import JpegCodec

Expand Down
5 changes: 5 additions & 0 deletions dimos/memory2/observationstore/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,10 @@ def fetch_by_ids(self, ids: list[int]) -> list[Observation[T]]:
"""Batch fetch by id (for vector search results)."""
...

@abstractmethod
def delete_range(self, t1: float, t2: float) -> list[int]:
"""Delete observations with ts in [t1, t2]. Returns deleted IDs."""
...

def serialize(self) -> dict[str, Any]:
return {"class": qual(type(self)), "config": self.config.model_dump()}
10 changes: 10 additions & 0 deletions dimos/memory2/observationstore/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,13 @@ def fetch_by_ids(self, ids: list[int]) -> list[Observation[T]]:
id_set = set(ids)
with self._lock:
return [obs for obs in self._observations if obs.id in id_set]

def delete_range(self, t1: float, t2: float) -> list[int]:
"""Delete observations with ts in [t1, t2]. Returns deleted IDs."""
with self._lock:
to_delete = [obs for obs in self._observations if t1 <= obs.ts <= t2]
ids = [obs.id for obs in to_delete]
self._observations = deque(
obs for obs in self._observations if not (t1 <= obs.ts <= t2)
)
return ids
Loading
Loading