From 51efecc7a999e3f33fd3289b1e6aaec6a7687651 Mon Sep 17 00:00:00 2001 From: Jheng-Yi Lin Date: Tue, 12 May 2026 18:26:37 -0700 Subject: [PATCH 1/5] spec(manipulation/memory): object memory tracker on memory2, propose architecture design for integrating memory2 with current manipulation stack --- dimos/manipulation/memory/spec.py | 222 ++++++++++++++++++++++++++++++ 1 file changed, 222 insertions(+) create mode 100644 dimos/manipulation/memory/spec.py diff --git a/dimos/manipulation/memory/spec.py b/dimos/manipulation/memory/spec.py new file mode 100644 index 0000000000..cae92c6d6c --- /dev/null +++ b/dimos/manipulation/memory/spec.py @@ -0,0 +1,222 @@ +"""Object Memory Tracker — specification. + +A memory2-backed object tracker that takes over the identity-management +role currently done in-RAM by ``ObjectDB``. Solves two practical +problems with the current per-frame tracker: + +1. **Stable labels across scans.** YOLO classifies the same physical + object as different classes between frames ("trash can" / + "basket" / "container"). Today, ``Object.update_object`` + overwrites the name on every match (``object.py:88``), so the + public label is most-recent-wins. The tracker votes across all + detections that contributed to an identity. + +2. **Memory between actions.** Today, ``ObjectDB`` has no lifecycle management on + objects, no notion of confidence, and no persistence + across process restarts. Move an object A → B and the system sees + two objects. Lose one scan to camera occlusion and risk a duplicate + identity. Restart the process and the workspace memory (due to storing in RAM) is gone. + + This tracker models object existence as a **continuous belief** that + decays without detections and is reinforced by them. Memory2 is + the source of truth; a synchronous replay on ``start()`` rebuilds + the in-RAM cache from memory2 before any new detections arrive, so + the robot has memory between sessions. + + +Architecture +------------ + +:: + + camera + | + v + ObjectSceneRegistrationModule (detection only — no ObjectDB) + | + | raw_detections: list[DetObject] + v + ObjectMemoryTracker --tracked_objects--> PickAndPlaceModule + | ^ + | | sync replay on start() rebuilds cache from memory2 + v | + +-- memory2 (source of truth) ------+ + | object_observations (dense) | + | object_events (sparse) | + +------------------------------------+ + | + v + @skill recall(name) + + +Belief model +------------ + +For each tracked object:: + + confidence(now) = exp(-(now - last_seen_ts) / time_constant_s) + +Pure function of time since the last detection. Three thresholds split +the range:: + + confident c >= active_threshold (0.5) -> in published snapshot + tentative match_threshold <= c < active -> still match-eligible + (this band gives + occlusion robustness) + lost c < lost_threshold (0.1) -> emit LOST event, + move to _lost bucket + +With ``time_constant_s = 15``: a single missed scan at 1-5 s cadence +barely moves the needle; sustained ~30 s absence drops into tentative; +~45 s absence flips to lost. + + +Match — four tiers, Python over the cache +----------------------------------------- + +Per detection, the matcher tries (in order, falling through on miss): + +1. **track-id** — YOLOE's tracker assigns ids that survive short + occlusions. O(1) lookup against ``_state``. +2. **tight spatial** — within ``distance_threshold`` of any object + whose confidence is at least ``match_threshold`` (active OR + tentative). Name-agnostic to absorb YOLO label flicker. +3. **drift** — wider ``reacquire_radius``, same voted name, against + ``_state``. Handles "object moved slowly while watched." +4. **re-acquisition** — wider radius, same voted name, against + ``_lost``. Handles "object briefly disappeared and came back + somewhere else (moved)." + +No match → APPEARED event, new identity. Matching is in-process +Python with ram cache; memory2 is only written to, not queried, on this path. + + +Data flow / streams +------------------- + +Every matched detection appends to ``object_observations``. Lifecycle +transitions (APPEARED / PROMOTED / LABEL_CHANGED / MOVED / LOST) +append to ``object_events``. The in-RAM cache (``_state`` for active +identities, ``_lost`` for recently-lost) is updated **inline** on every +write — the emit helpers ``append`` to memory2 and then synchronously +apply the same record to the cache. memory2 holds the durable record; +the cache is a write-through index over it. + +On process ``start()``, the cache is rebuilt by **synchronous replay** +of both streams (``stream.to_list()``), completing before the tracker +accepts any new detections. This gives cross-session memory without +any async-backfill race. + +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any, Literal, Protocol, runtime_checkable + +from pydantic import Field + +from dimos.memory2.module import MemoryModuleConfig + +if TYPE_CHECKING: + from pathlib import Path + + from dimos.agents.annotation import skill + from dimos.core.stream import In, Out + from dimos.perception.detection.type.detection3d.object import Object as DetObject + + +EventKind = Literal["APPEARED", "PROMOTED", "LABEL_CHANGED", "MOVED", "LOST"] + + +# --------------------------------------------------------------------- streams + +STREAM_OBSERVATIONS = "object_observations" +STREAM_EVENTS = "object_events" + + +# ----------------------------------------------------------------- data shapes + + +@dataclass +class ObjectObservation: + """Payload of one observation in the ``object_observations`` stream.""" + + object_id: str # tracker's identity (NOT detection.object_id) + detection: DetObject # perception Object — stored as-is + + +@dataclass +class ObjectEvent: + """Payload of one event (lifecycle transition) in the ``object_events`` stream.""" + + kind: EventKind + object_id: str # tracker's identity (NOT detection.object_id) + name: str # voted name at the time of the event + detection: DetObject # snapshot of the relevant Object — stored as-is + details: dict[str, Any] = field(default_factory=dict) + + +# ----------------------------------------------------------------------- config + + +class ObjectMemoryTrackerConfig(MemoryModuleConfig): + """Config for tracker.""" + + # Spatial matching + distance_threshold: float = 0.2 # Tier 2 match radius(m), the distance close enough to believe it's the same object + reacquire_radius: float = 1.0 # Tier 3 & 4 match radius(m). Used for drift (object moved while watched) and re-acquisition (object came back after being lost or moved). + + # Identity promotion + min_detections_for_permanent: int = 6 # minimum number of detections to promote an object + + # Continuous-belief existence model + time_constant_s: float = 15.0 # decay time-constant in confidence(now) = exp(-Δt/τ) + active_threshold: float = 0.5 # ≥ this: published in tracked_objects + match_threshold: float = 0.2 # ≥ this: still match-eligible (tentative band) + lost_threshold: float = 0.1 # < this: emit LOST, move to _lost + recent_lost_window_s: float = 60.0 # how long _lost is searchable for reacquire + + # Persistence + db_path: str | Path = Field(default="manipulation_object_memory.db") + + +# ---------------------------------------------------------------------- protocol + + +@runtime_checkable +class ObjectMemoryTrackerSpec(Protocol): + """Protocol for the memory2-backed object lifecycle tracker. + + On start, opens two memory2 streams (observations + events), rebuilds + its in-RAM cache via synchronous replay from past sessions, and begins + consuming raw detections. Publishes confident objects to manipulation + and exposes cross-session recall as an agent skill. + """ + + config: ObjectMemoryTrackerConfig + + # Inputs + raw_detections: In[list[DetObject]] + """Per-scan batch of detections.""" + + # Outputs + tracked_objects: Out[list[DetObject]] + """Confident objects (confidence ≥ ``active_threshold``) published per scan""" + + object_events: Out[ObjectEvent] + """Live lifecycle events.""" + + def recall(self, name: str) -> str: + """Where did I last see something I labelled ? + + Decorated ``@skill`` in the implementation. Queries + ``object_events.tags(name=name).last()`` — works after process + restart because memory2 is the source of truth, not the in-RAM + cache. + + Returns a human-readable summary, or + ``"I have no memory of any ."`` when the events stream + has no matching entry. + """ + ... From b34dab89fe0070949521774cb1172f9f8878fbe2 Mon Sep 17 00:00:00 2001 From: Jheng-Yi Lin Date: Wed, 13 May 2026 16:34:39 -0700 Subject: [PATCH 2/5] updated memory2-native and object lifecycle management integration with manipulation stack --- dimos/manipulation/memory/spec.py | 424 +++++++++++++++++++++++------- 1 file changed, 322 insertions(+), 102 deletions(-) diff --git a/dimos/manipulation/memory/spec.py b/dimos/manipulation/memory/spec.py index cae92c6d6c..4cfc98d96a 100644 --- a/dimos/manipulation/memory/spec.py +++ b/dimos/manipulation/memory/spec.py @@ -1,27 +1,4 @@ -"""Object Memory Tracker — specification. - -A memory2-backed object tracker that takes over the identity-management -role currently done in-RAM by ``ObjectDB``. Solves two practical -problems with the current per-frame tracker: - -1. **Stable labels across scans.** YOLO classifies the same physical - object as different classes between frames ("trash can" / - "basket" / "container"). Today, ``Object.update_object`` - overwrites the name on every match (``object.py:88``), so the - public label is most-recent-wins. The tracker votes across all - detections that contributed to an identity. - -2. **Memory between actions.** Today, ``ObjectDB`` has no lifecycle management on - objects, no notion of confidence, and no persistence - across process restarts. Move an object A → B and the system sees - two objects. Lose one scan to camera occlusion and risk a duplicate - identity. Restart the process and the workspace memory (due to storing in RAM) is gone. - - This tracker models object existence as a **continuous belief** that - decays without detections and is reinforced by them. Memory2 is - the source of truth; a synchronous replay on ``start()`` rebuilds - the in-RAM cache from memory2 before any new detections arrive, so - the robot has memory between sessions. +""" Memory2-native perception for manipulation. Architecture @@ -32,80 +9,206 @@ camera | v - ObjectSceneRegistrationModule (detection only — no ObjectDB) + RGBDCameraRecorder ==> color_image ──auto-subscribe──┐ + ==> depth_image | + ==> camera_info v + SemanticSearch + (continuous CLIP, + brightness/sharpness + filtered) + | + v + color_image_embedded + | + ┌──────────── pulled on trigger ──────────────┘ + v + LazyPerceptionModule ◀── watched_names: set[str] ──┐ + | @skill find_objects(prompts) | + | + startup scan (one-time on boot) | + | + 10s heartbeat: | + | scan(default_prompts + watched_names) | + | composes: CLIP search → peaks → VLM → 3D | + | | + | raw_detections: list[DetObject] | + v | + ObjectMemoryTracker ─────────────────────────────────┘ + | object_observations (private) + | object_events (public API) | - | raw_detections: list[DetObject] v - ObjectMemoryTracker --tracked_objects--> PickAndPlaceModule - | ^ - | | sync replay on start() rebuilds cache from memory2 - v | - +-- memory2 (source of truth) ------+ - | object_observations (dense) | - | object_events (sparse) | - +------------------------------------+ - | - v - @skill recall(name) - - -Belief model ------------- + tracked_objects: list[DetObject] + | + v + PickAndPlaceModule (manipulation — no API change) + + events ─→ @skill recall(name) + + All streams live in one shared SQLite store (recording.db). + + +The lazy pipeline (LazyPerceptionModule) +---------------------------------------- + +Inside ``find_objects(prompts)``, the heartbeat callback, and the +one-time startup scan: + +1. Split comma-separated prompts → run one class at a time. Moondream's + ``query_detections`` sets every detection's ``.name`` to the query + string, so per-class is required for correct labeling. + +2. CLIP-embed the prompt → query vector. + +3. ``color_image_embedded.search(query_vec).order_by("ts").materialize()`` + — pull a similarity-ranked, time-sorted result set. + +4. ``.transform(peaks(...))`` — extract temporal *peaks* (local maxima + of similarity over time), so the VLM gets diverse candidate frames + rather than near-duplicates of the same scene. + +5. Filter peaks by ``lookback_window_s`` and ``min_peak_score``, cap + at ``max_frames_per_call``. + +6. For each surviving peak frame: pull aligned depth and the latest + camera_info, run VLM detection, project via + ``Object.from_2d_to_list`` (camera→world transform from the + recorder's pose). + +7. Publish the resulting ``list[Object]`` on ``detections_out``. + -For each tracked object:: +Startup workspace scan + dynamic watch list +------------------------------------------- + +Two mechanisms keep the tracker's lifecycle model coherent for any +object in the scene, not just the ones the agent happens to query for. + +**1. Startup scan (one-time on boot).** Shortly after ``start()`` +(default ~5s, to let SemanticSearch populate the embedding index), +``LazyPerceptionModule`` runs a one-time scan with +``startup_prompts``. Each prompt is processed through the same lazy +pipeline. Detected objects flow into the tracker → seed identities +appear in ``tracked_objects`` immediately, without waiting for the +agent to ask. + +``startup_prompts`` is blueprint-configured (curated for the +workspace, e.g. ``["cup", "bowl", "plate", "fork", "knife"]`` for a +kitchen setup). With an empty list, the startup scan is skipped. + +**2. Dynamic watch list (every heartbeat).** The tracker publishes +``watched_names: Out[set[str]]`` — the union of voted names in +``_state`` (currently tracked) and ``_lost`` (within +``recent_lost_window_s``). ``LazyPerceptionModule`` subscribes via +its ``watched_names_in`` port and caches the latest set. + +Each heartbeat scans the union ``default_prompts ∪ watched_names``. +So once any object enters the tracker — via agent call, startup +scan, or earlier heartbeat — its name is automatically added to the +heartbeat's watch list. LOST events fire when objects actually leave +the scene (heartbeat scanned and didn't find them), not just because +they fell off the static prompt list. + +**Why this matters.** Without these two mechanisms, the lifecycle +model is technically there but operationally broken for any object +outside ``default_prompts``. The agent calls ``find_objects("fork")``, +fork appears, ~45s later LOST fires as a false negative because +nothing was looking for the fork anymore. The dynamic watch list +closes that gap; the startup scan ensures the tracker isn't empty on +boot. + + +The tracker (ObjectMemoryTracker) +----------------------------------------------------- + +The tracker is **detector-agnostic** — anything publishing +``list[DetObject]`` on ``raw_detections`` works. v3 just plugs +``LazyPerceptionModule`` in front of it; the contract is the same. + +**Belief model.** For each tracked object:: confidence(now) = exp(-(now - last_seen_ts) / time_constant_s) -Pure function of time since the last detection. Three thresholds split -the range:: +Pure function of time since the last detection. Three bands:: - confident c >= active_threshold (0.5) -> in published snapshot - tentative match_threshold <= c < active -> still match-eligible - (this band gives - occlusion robustness) - lost c < lost_threshold (0.1) -> emit LOST event, - move to _lost bucket + confident c >= active_threshold (0.5) -> in published snapshot + tentative match_threshold <= c < active -> still match-eligible + (occlusion robustness) + lost c < lost_threshold (0.1) -> emit LOST event, + move to _lost bucket -With ``time_constant_s = 15``: a single missed scan at 1-5 s cadence -barely moves the needle; sustained ~30 s absence drops into tentative; -~45 s absence flips to lost. +With ``time_constant_s = 15`` and ``background_scan_period_s = 10``: +LOST fires roughly ``tau * ln(10) + heartbeat_period ≈ 45s`` after an +object actually disappears, gated by ``lookback_window_s`` (the lazy +pipeline's recency filter on peak frames). +**Four-tier match.** Per detection, the matcher tries in order: -Match — four tiers, Python over the cache ------------------------------------------ +1. ``track-id`` — O(1) lookup. YOLOE assigned ids; under v3 VLM + detections have ``track_id = -1`` so this tier is mostly inert. +2. ``tight spatial`` — within ``distance_threshold``, name-agnostic. + Absorbs label flicker. +3. ``drift`` — wider ``reacquire_radius``, same voted name, against + ``_state``. Silent (no event). Handles "object moved while + watched." +4. ``re-acquisition`` — wider radius, same voted name, against + ``_lost``. Emits ``MOVED``. Handles "object briefly disappeared + and came back somewhere else." -Per detection, the matcher tries (in order, falling through on miss): +No match → ``APPEARED`` event, new identity. -1. **track-id** — YOLOE's tracker assigns ids that survive short - occlusions. O(1) lookup against ``_state``. -2. **tight spatial** — within ``distance_threshold`` of any object - whose confidence is at least ``match_threshold`` (active OR - tentative). Name-agnostic to absorb YOLO label flicker. -3. **drift** — wider ``reacquire_radius``, same voted name, against - ``_state``. Handles "object moved slowly while watched." -4. **re-acquisition** — wider radius, same voted name, against - ``_lost``. Handles "object briefly disappeared and came back - somewhere else (moved)." -No match → APPEARED event, new identity. Matching is in-process -Python with ram cache; memory2 is only written to, not queried, on this path. +Streams — six total in one shared SQLite store +---------------------------------------------- +Sensor streams (recorder writes, lazy reads, everyone replays): -Data flow / streams -------------------- + color_image JPEG-encoded RGB frames + world pose + depth_image depth frames (one row per camera tick) + camera_info intrinsics (rarely change, .last() suffices) -Every matched detection appends to ``object_observations``. Lifecycle -transitions (APPEARED / PROMOTED / LABEL_CHANGED / MOVED / LOST) -append to ``object_events``. The in-RAM cache (``_state`` for active -identities, ``_lost`` for recently-lost) is updated **inline** on every -write — the emit helpers ``append`` to memory2 and then synchronously -apply the same record to the cache. memory2 holds the durable record; -the cache is a write-through index over it. +Embedding stream (SemanticSearch writes, lazy reads): + + color_image_embedded CLIP vector + the image bytes, vec0-indexed + for similarity search + +Tracker streams: + + object_observations private — one row per matched detection, + drives the voted-name histogram and the + cache rebuild on restart + object_events public API — APPEARED / PROMOTED / + LABEL_CHANGED / MOVED / LOST. Consumed by + recall(name), audit, monitoring. + + +Open vocab + cross-session memory +--------------------------------- + +The system is fully open-vocab. The agent passes any natural-language +description to ``find_objects``; the VLM handles it. The system is +*reactively* open-vocab on the agent side, and *proactively* open-vocab +on the heartbeat side once an object has entered the tracker (via the +dynamic watch list described above). + +Three sources of prompts feed the lazy pipeline: + +- **Agent-supplied** (per-call) — argument to ``find_objects``. +- **Static seed** — ``default_prompts`` from the blueprint config; + scanned by every heartbeat regardless of tracker state. Use this + for objects you want monitored even before they've been seen + (alerting use cases). +- **Dynamic** — names currently in the tracker (``watched_names``); + automatically added once an object is observed. Use this for + "keep tracking whatever the agent discovered." + +Cross-session memory: ``tracker.start()`` synchronously replays both +tracker streams (``stream.to_list()``) before accepting new detections. +The tracker recovers all identities, lost-bucket state, and label +histograms from the durable record. ``recall(name)`` queries +``object_events.tags(name=name).last()`` directly — works after +process restart because memory2 is the source of truth. After replay, +the tracker re-publishes ``watched_names`` so the heartbeat picks up +where the previous session left off. -On process ``start()``, the cache is rebuilt by **synchronous replay** -of both streams (``stream.to_list()``), completing before the tracker -accepts any new detections. This gives cross-session memory without -any async-backfill race. """ @@ -116,13 +219,19 @@ from pydantic import Field -from dimos.memory2.module import MemoryModuleConfig +from dimos.memory2.module import MemoryModuleConfig, RecorderConfig +from dimos.models.embedding.base import EmbeddingModel +from dimos.models.embedding.clip import CLIPModel +from dimos.models.vl.base import VlModel +from dimos.models.vl.moondream import MoondreamVlModel if TYPE_CHECKING: from pathlib import Path - from dimos.agents.annotation import skill + from dimos.agents.annotation import skill # noqa: F401 (referenced in docstrings) from dimos.core.stream import In, Out + from dimos.msgs.sensor_msgs.CameraInfo import CameraInfo + from dimos.msgs.sensor_msgs.Image import Image from dimos.perception.detection.type.detection3d.object import Object as DetObject @@ -140,7 +249,9 @@ @dataclass class ObjectObservation: - """Payload of one observation in the ``object_observations`` stream.""" + """Payload of one observation in the ``object_observations`` stream. + **Private, internal for ObjectMemoryTracker** + """ object_id: str # tracker's identity (NOT detection.object_id) detection: DetObject # perception Object — stored as-is @@ -148,27 +259,125 @@ class ObjectObservation: @dataclass class ObjectEvent: - """Payload of one event (lifecycle transition) in the ``object_events`` stream.""" + """Payload of one lifecycle event in the ``object_events`` stream. + + **Public API** + """ kind: EventKind - object_id: str # tracker's identity (NOT detection.object_id) + object_id: str # tracker's identity name: str # voted name at the time of the event detection: DetObject # snapshot of the relevant Object — stored as-is details: dict[str, Any] = field(default_factory=dict) -# ----------------------------------------------------------------------- config +# ============================================================================= +# RGBDCameraRecorder — records color / depth / intrinsics to memory2 +# ============================================================================= + + +class RGBDCameraRecorderConfig(RecorderConfig): + """Config for the generic RGBD-camera recorder.""" + + db_path: str | Path = "recording.db" + + +@runtime_checkable +class RGBDCameraRecorderSpec(Protocol): + """Protocol for the generic RGBD-camera recorder.""" + + config: RGBDCameraRecorderConfig + + color_image: In[Image] + depth_image: In[Image] + camera_info: In[CameraInfo] + + +# ============================================================================= +# LazyPerceptionModule — agent-callable lazy detector + heartbeat +# ============================================================================= + + +class LazyPerceptionModuleConfig(MemoryModuleConfig): + """Config for the lazy perception module.""" + + db_path: str | Path = "recording.db" # shared with Recorder + SemanticSearch + + # Models (blueprint can override) + vlm_provider: type[VlModel] = MoondreamVlModel + embedding_model: type[EmbeddingModel] = CLIPModel + + # Heartbeat — gives lifecycle events real-world timing. + # Set to 0 to disable; lifecycle then fires only on agent calls. + background_scan_period_s: float = 10.0 + + # Static seed prompts the heartbeat ALWAYS scans for, regardless of + # tracker state. Use for "alert me if X appears" patterns. The + # heartbeat also scans tracker.watched_names dynamically, so curated + # objects appearing in the scene get tracked even if not in this list. + default_prompts: list[str] = Field(default_factory=list) + + # Startup workspace scan — runs once after start() to seed the tracker + # with whatever is already in view. Disabled if startup_prompts is empty. + startup_workspace_scan: bool = True + startup_prompts: list[str] = Field(default_factory=list) + startup_delay_s: float = 5.0 # wait this long after start() for SemanticSearch + + # Lazy pipeline tuning + lookback_window_s: float = 60.0 # peaks must be within this recent window + peak_distance: float = 1.0 # peaks() distance kwarg (seconds) + peak_prominence: float = 0.02 # peaks() prominence kwarg + min_peak_score: float = 0.20 # absolute floor on peak similarity + max_frames_per_call: int = 2 # cap VLM calls per (scan × prompt class) + + +@runtime_checkable +class LazyPerceptionModuleSpec(Protocol): + """Protocol for the lazy perception module.""" + + config: LazyPerceptionModuleConfig + + # Inputs + watched_names_in: In[set[str]] + """Subscribed to ``ObjectMemoryTracker.watched_names``. Cached + locally; on each heartbeat the latest cached set is unioned with + ``default_prompts`` to determine what to scan for. Initial state + is empty (heartbeat only scans ``default_prompts``) until the + tracker publishes its first update.""" + + # Outputs + detections_out: Out[list[DetObject]] + """Wired by the blueprint to ``ObjectMemoryTracker.raw_detections``.""" + + def find_objects(self, prompts: str) -> str: + """Run the lazy detection pipeline. + + Decorated ``@skill`` in the implementation. ``prompts`` is a + comma-separated list of natural-language object classes; the + pipeline splits, embeds, searches, peaks, VLM-detects, and + publishes ``list[Object]`` for the tracker. + + Returns a human-readable summary string. Bounded to recent + frames by ``lookback_window_s`` — for older lookups use the + tracker's ``recall(name)`` skill. + """ + ... + + +# ============================================================================= +# ObjectMemoryTracker — v1 lifecycle + new watched_names port for v3 +# ============================================================================= class ObjectMemoryTrackerConfig(MemoryModuleConfig): - """Config for tracker.""" + """Config for the object lifecycle tracker.""" # Spatial matching - distance_threshold: float = 0.2 # Tier 2 match radius(m), the distance close enough to believe it's the same object - reacquire_radius: float = 1.0 # Tier 3 & 4 match radius(m). Used for drift (object moved while watched) and re-acquisition (object came back after being lost or moved). + distance_threshold: float = 0.2 # Tier 2 match radius (m) + reacquire_radius: float = 1.0 # Tier 3 & 4 match radius (m) # Identity promotion - min_detections_for_permanent: int = 6 # minimum number of detections to promote an object + min_detections_for_permanent: int = 6 # observations before PROMOTED fires # Continuous-belief existence model time_constant_s: float = 15.0 # decay time-constant in confidence(now) = exp(-Δt/τ) @@ -178,20 +387,22 @@ class ObjectMemoryTrackerConfig(MemoryModuleConfig): recent_lost_window_s: float = 60.0 # how long _lost is searchable for reacquire # Persistence - db_path: str | Path = Field(default="manipulation_object_memory.db") - - -# ---------------------------------------------------------------------- protocol + db_path: str | Path = Field(default="recording.db") @runtime_checkable class ObjectMemoryTrackerSpec(Protocol): """Protocol for the memory2-backed object lifecycle tracker. - On start, opens two memory2 streams (observations + events), rebuilds - its in-RAM cache via synchronous replay from past sessions, and begins - consuming raw detections. Publishes confident objects to manipulation - and exposes cross-session recall as an agent skill. + On start, opens two memory2 streams (observations + events), + rebuilds its in-RAM cache via synchronous replay, and begins + consuming raw detections. Publishes confident objects to + manipulation, exposes cross-session recall as an agent skill, and + publishes the union of currently-tracked and recently-lost voted + names so the upstream detector can scan for them automatically. + + Detector-agnostic: anything publishing ``list[DetObject]`` on + ``raw_detections`` works. """ config: ObjectMemoryTrackerConfig @@ -202,18 +413,27 @@ class ObjectMemoryTrackerSpec(Protocol): # Outputs tracked_objects: Out[list[DetObject]] - """Confident objects (confidence ≥ ``active_threshold``) published per scan""" + """Confident objects (confidence ≥ ``active_threshold``) published + per scan.""" object_events: Out[ObjectEvent] """Live lifecycle events.""" + watched_names: Out[set[str]] + """Union of voted names in ``_state`` (currently tracked) and + ``_lost`` (within ``recent_lost_window_s``). Re-published on every + cache update so subscribers (e.g., ``LazyPerceptionModule``'s + heartbeat) automatically scan for newly-discovered objects. + Republished once after the start-time replay so the heartbeat + picks up cross-session state.""" + def recall(self, name: str) -> str: """Where did I last see something I labelled ? Decorated ``@skill`` in the implementation. Queries ``object_events.tags(name=name).last()`` — works after process - restart because memory2 is the source of truth, not the in-RAM - cache. + restart because memory2 is the source of truth, not the + in-RAM cache. Unbounded recency (full event history). Returns a human-readable summary, or ``"I have no memory of any ."`` when the events stream From 4204b7ae598a9425a48b6b0c6e3d757eb92e97c9 Mon Sep 17 00:00:00 2001 From: Jheng-Yi Lin Date: Wed, 13 May 2026 16:57:11 -0700 Subject: [PATCH 3/5] docstring clean up for cleaner reading --- dimos/manipulation/memory/spec.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/dimos/manipulation/memory/spec.py b/dimos/manipulation/memory/spec.py index 4cfc98d96a..9ea2ff1fe5 100644 --- a/dimos/manipulation/memory/spec.py +++ b/dimos/manipulation/memory/spec.py @@ -120,8 +120,9 @@ ----------------------------------------------------- The tracker is **detector-agnostic** — anything publishing -``list[DetObject]`` on ``raw_detections`` works. v3 just plugs -``LazyPerceptionModule`` in front of it; the contract is the same. +``list[DetObject]`` on ``raw_detections`` works. ``LazyPerceptionModule`` +is what plugs in front of it here; the contract doesn't depend on +which detector is used. **Belief model.** For each tracked object:: @@ -142,8 +143,10 @@ **Four-tier match.** Per detection, the matcher tries in order: -1. ``track-id`` — O(1) lookup. YOLOE assigned ids; under v3 VLM - detections have ``track_id = -1`` so this tier is mostly inert. +1. ``track-id`` — O(1) lookup against the detector's tracker id, if + it assigns one. VLM-based detection does not (``track_id = -1``), + so this tier is mostly inert under the current pipeline; legacy + detectors that supply ids would still use it. 2. ``tight spatial`` — within ``distance_threshold``, name-agnostic. Absorbs label flicker. 3. ``drift`` — wider ``reacquire_radius``, same voted name, against @@ -365,7 +368,7 @@ def find_objects(self, prompts: str) -> str: # ============================================================================= -# ObjectMemoryTracker — v1 lifecycle + new watched_names port for v3 +# ObjectMemoryTracker — identity, lifecycle events, watched_names port # ============================================================================= From 4f7199dd97f4d94d78b7700289e9442e3e96bdfa Mon Sep 17 00:00:00 2001 From: Jheng-Yi Lin Date: Wed, 13 May 2026 20:13:48 -0700 Subject: [PATCH 4/5] memory2-native design, drop tracker/lifecyle manage, expose @skills for agent to query on memory2 --- dimos/manipulation/memory/spec.py | 467 ++++++++++-------------------- 1 file changed, 148 insertions(+), 319 deletions(-) diff --git a/dimos/manipulation/memory/spec.py b/dimos/manipulation/memory/spec.py index 9ea2ff1fe5..6f2b25a457 100644 --- a/dimos/manipulation/memory/spec.py +++ b/dimos/manipulation/memory/spec.py @@ -22,205 +22,120 @@ | ┌──────────── pulled on trigger ──────────────┘ v - LazyPerceptionModule ◀── watched_names: set[str] ──┐ - | @skill find_objects(prompts) | - | + startup scan (one-time on boot) | - | + 10s heartbeat: | - | scan(default_prompts + watched_names) | - | composes: CLIP search → peaks → VLM → 3D | - | | - | raw_detections: list[DetObject] | - v | - ObjectMemoryTracker ─────────────────────────────────┘ - | object_observations (private) - | object_events (public API) + LazyPerceptionModule + | @skill find_objects(prompt) + | .search(vec).filter(sim>=thr).order_by("ts", desc).first() + | → VLM detect → 3D project + | @skill find_objects_near(prompt, x, y, z, radius) + | .near((x,y,z), r).search(vec).filter(sim>=thr).order_by("ts", desc).first() + | → VLM detect → 3D project + | @skill recall(name) + | .search(vec).order_by("ts", desc).first() + | → return camera pose + timestamp (no VLM, cheaper) | + | latest_detections: Out[list[DetObject]] v - tracked_objects: list[DetObject] - | - v - PickAndPlaceModule (manipulation — no API change) - - events ─→ @skill recall(name) + PickAndPlaceModule (manipulation — reads latest_detections) All streams live in one shared SQLite store (recording.db). -The lazy pipeline (LazyPerceptionModule) ----------------------------------------- - -Inside ``find_objects(prompts)``, the heartbeat callback, and the -one-time startup scan: - -1. Split comma-separated prompts → run one class at a time. Moondream's - ``query_detections`` sets every detection's ``.name`` to the query - string, so per-class is required for correct labeling. - -2. CLIP-embed the prompt → query vector. - -3. ``color_image_embedded.search(query_vec).order_by("ts").materialize()`` - — pull a similarity-ranked, time-sorted result set. - -4. ``.transform(peaks(...))`` — extract temporal *peaks* (local maxima - of similarity over time), so the VLM gets diverse candidate frames - rather than near-duplicates of the same scene. - -5. Filter peaks by ``lookback_window_s`` and ``min_peak_score``, cap - at ``max_frames_per_call``. - -6. For each surviving peak frame: pull aligned depth and the latest - camera_info, run VLM detection, project via - ``Object.from_2d_to_list`` (camera→world transform from the - recorder's pose). - -7. Publish the resulting ``list[Object]`` on ``detections_out``. - - -Startup workspace scan + dynamic watch list -------------------------------------------- - -Two mechanisms keep the tracker's lifecycle model coherent for any -object in the scene, not just the ones the agent happens to query for. +Three skills, three query shapes +-------------------------------- -**1. Startup scan (one-time on boot).** Shortly after ``start()`` -(default ~5s, to let SemanticSearch populate the embedding index), -``LazyPerceptionModule`` runs a one-time scan with -``startup_prompts``. Each prompt is processed through the same lazy -pipeline. Detected objects flow into the tracker → seed identities -appear in ``tracked_objects`` immediately, without waiting for the -agent to ask. +Each skill is a one-line composition of memory2 primitives. Every +skill returns the **most recent confident match** along with its +timestamp — the agent sees how fresh the data is and decides if it's +fresh enough to act on. No time-window parameter on the API; "current" +is whatever memory2 has most recently. -``startup_prompts`` is blueprint-configured (curated for the -workspace, e.g. ``["cup", "bowl", "plate", "fork", "knife"]`` for a -kitchen setup). With an empty list, the startup scan is skipped. +- ``find_objects(prompt)`` — **semantic + most recent**. Search the + embedding stream, filter to confident matches, take the most recent. + Run VLM + 3D projection on that frame. Returns ``list[Object]`` (via + ``latest_detections``) plus formatted summary with timestamp. -**2. Dynamic watch list (every heartbeat).** The tracker publishes -``watched_names: Out[set[str]]`` — the union of voted names in -``_state`` (currently tracked) and ``_lost`` (within -``recent_lost_window_s``). ``LazyPerceptionModule`` subscribes via -its ``watched_names_in`` port and caches the latest set. +- ``find_objects_near(prompt, x, y, z, radius)`` — **spatial + semantic + + most recent**. Same as ``find_objects`` but pre-filters via + memory2's R*Tree index to frames recorded when the camera pose was + within ``radius`` of ``(x, y, z)``. -Each heartbeat scans the union ``default_prompts ∪ watched_names``. -So once any object enters the tracker — via agent call, startup -scan, or earlier heartbeat — its name is automatically added to the -heartbeat's watch list. LOST events fire when objects actually leave -the scene (heartbeat scanned and didn't find them), not just because -they fell off the static prompt list. +- ``recall(name)`` — **cheaper cousin of find_objects**. Same query + shape but skips VLM detection and 3D projection; returns the camera + pose at the matching frame plus timestamp. Use for "where was I when + I saw X" questions where exact 3D object pose isn't needed. -**Why this matters.** Without these two mechanisms, the lifecycle -model is technically there but operationally broken for any object -outside ``default_prompts``. The agent calls ``find_objects("fork")``, -fork appears, ~45s later LOST fires as a false negative because -nothing was looking for the fork anymore. The dynamic watch list -closes that gap; the startup scan ensures the tracker isn't empty on -boot. +**Freshness contract is in the response.** Every result includes +``(seen N seconds ago)`` so the agent can reason: "2 seconds is fresh +— act"; "5 minutes is stale — re-query or skip." -The tracker (ObjectMemoryTracker) ------------------------------------------------------ +The lazy pipeline +------------------ -The tracker is **detector-agnostic** — anything publishing -``list[DetObject]`` on ``raw_detections`` works. ``LazyPerceptionModule`` -is what plugs in front of it here; the contract doesn't depend on -which detector is used. +All three skills share the same shape: -**Belief model.** For each tracked object:: +1. CLIP-embed the prompt → query vector. - confidence(now) = exp(-(now - last_seen_ts) / time_constant_s) +2. Build a memory2 query on ``color_image_embedded`` by composing the + relevant filters (``.near`` / ``.search`` / ``.filter`` / + ``.order_by``). All filters push down to SQL / R*Tree / vec0 + indexes via memory2's store backend. -Pure function of time since the last detection. Three bands:: +3. Take ``.first()`` — the most recent confident match. Single + observation. - confident c >= active_threshold (0.5) -> in published snapshot - tentative match_threshold <= c < active -> still match-eligible - (occlusion robustness) - lost c < lost_threshold (0.1) -> emit LOST event, - move to _lost bucket - -With ``time_constant_s = 15`` and ``background_scan_period_s = 10``: -LOST fires roughly ``tau * ln(10) + heartbeat_period ≈ 45s`` after an -object actually disappears, gated by ``lookback_window_s`` (the lazy -pipeline's recency filter on peak frames). - -**Four-tier match.** Per detection, the matcher tries in order: +4. (find_objects / find_objects_near only:) Pull aligned depth via + ``.at(obs.ts, tolerance)`` and latest intrinsics via ``.last()``. + Run VLM detection on the color frame, project via + ``Object.from_2d_to_list`` (camera→world transform from the + recorder's pose). -1. ``track-id`` — O(1) lookup against the detector's tracker id, if - it assigns one. VLM-based detection does not (``track_id = -1``), - so this tier is mostly inert under the current pipeline; legacy - detectors that supply ids would still use it. -2. ``tight spatial`` — within ``distance_threshold``, name-agnostic. - Absorbs label flicker. -3. ``drift`` — wider ``reacquire_radius``, same voted name, against - ``_state``. Silent (no event). Handles "object moved while - watched." -4. ``re-acquisition`` — wider radius, same voted name, against - ``_lost``. Emits ``MOVED``. Handles "object briefly disappeared - and came back somewhere else." +5. Publish ``list[Object]`` on ``latest_detections``. Return a + formatted summary that **includes the observation timestamp** so + the agent can read freshness. -No match → ``APPEARED`` event, new identity. +``PickAndPlaceModule`` reads ``latest_detections`` to act on named +targets. Multiple instances in the result are handled either by +prompt qualifiers ("cup on the right" → VLM returns one detection in +the frame) or by the agent reading the returned positions and +choosing. -Streams — six total in one shared SQLite store ----------------------------------------------- +Streams (recorded continuously) +------------------------------- -Sensor streams (recorder writes, lazy reads, everyone replays): +:: color_image JPEG-encoded RGB frames + world pose depth_image depth frames (one row per camera tick) camera_info intrinsics (rarely change, .last() suffices) - -Embedding stream (SemanticSearch writes, lazy reads): - - color_image_embedded CLIP vector + the image bytes, vec0-indexed - for similarity search - -Tracker streams: - - object_observations private — one row per matched detection, - drives the voted-name histogram and the - cache rebuild on restart - object_events public API — APPEARED / PROMOTED / - LABEL_CHANGED / MOVED / LOST. Consumed by - recall(name), audit, monitoring. + color_image_embedded CLIP vector + image bytes, vec0-indexed + for similarity search (populated by SemanticSearch) Open vocab + cross-session memory --------------------------------- -The system is fully open-vocab. The agent passes any natural-language -description to ``find_objects``; the VLM handles it. The system is -*reactively* open-vocab on the agent side, and *proactively* open-vocab -on the heartbeat side once an object has entered the tracker (via the -dynamic watch list described above). - -Three sources of prompts feed the lazy pipeline: - -- **Agent-supplied** (per-call) — argument to ``find_objects``. -- **Static seed** — ``default_prompts`` from the blueprint config; - scanned by every heartbeat regardless of tracker state. Use this - for objects you want monitored even before they've been seen - (alerting use cases). -- **Dynamic** — names currently in the tracker (``watched_names``); - automatically added once an object is observed. Use this for - "keep tracking whatever the agent discovered." - -Cross-session memory: ``tracker.start()`` synchronously replays both -tracker streams (``stream.to_list()``) before accepting new detections. -The tracker recovers all identities, lost-bucket state, and label -histograms from the durable record. ``recall(name)`` queries -``object_events.tags(name=name).last()`` directly — works after -process restart because memory2 is the source of truth. After replay, -the tracker re-publishes ``watched_names`` so the heartbeat picks up -where the previous session left off. +The agent passes any natural-language description to ``find_objects`` +or ``find_objects_near`` (``"red mug with handle"``, +``"the screwdriver near the laptop"``). The VLM handles visual +disambiguation; the agent can encode spatial qualifiers in the prompt, +or call ``find_objects_near`` to use memory2's R*Tree index for +camera-pose-bounded queries. +Cross-session memory is implicit: memory2 persists every observation +in ``recording.db``. Skills query the full embedding history regardless +of when the current process started — no replay logic, no bespoke +loaders. ``recall(name)`` returning "(seen 3 hours ago)" is normal and +correct after a process restart; the agent reads the timestamp and +decides. """ from __future__ import annotations -from dataclasses import dataclass, field -from typing import TYPE_CHECKING, Any, Literal, Protocol, runtime_checkable - -from pydantic import Field +from pathlib import Path +from typing import TYPE_CHECKING, Protocol, runtime_checkable from dimos.memory2.module import MemoryModuleConfig, RecorderConfig from dimos.models.embedding.base import EmbeddingModel @@ -229,8 +144,6 @@ from dimos.models.vl.moondream import MoondreamVlModel if TYPE_CHECKING: - from pathlib import Path - from dimos.agents.annotation import skill # noqa: F401 (referenced in docstrings) from dimos.core.stream import In, Out from dimos.msgs.sensor_msgs.CameraInfo import CameraInfo @@ -238,42 +151,6 @@ from dimos.perception.detection.type.detection3d.object import Object as DetObject -EventKind = Literal["APPEARED", "PROMOTED", "LABEL_CHANGED", "MOVED", "LOST"] - - -# --------------------------------------------------------------------- streams - -STREAM_OBSERVATIONS = "object_observations" -STREAM_EVENTS = "object_events" - - -# ----------------------------------------------------------------- data shapes - - -@dataclass -class ObjectObservation: - """Payload of one observation in the ``object_observations`` stream. - **Private, internal for ObjectMemoryTracker** - """ - - object_id: str # tracker's identity (NOT detection.object_id) - detection: DetObject # perception Object — stored as-is - - -@dataclass -class ObjectEvent: - """Payload of one lifecycle event in the ``object_events`` stream. - - **Public API** - """ - - kind: EventKind - object_id: str # tracker's identity - name: str # voted name at the time of the event - detection: DetObject # snapshot of the relevant Object — stored as-is - details: dict[str, Any] = field(default_factory=dict) - - # ============================================================================= # RGBDCameraRecorder — records color / depth / intrinsics to memory2 # ============================================================================= @@ -297,41 +174,22 @@ class RGBDCameraRecorderSpec(Protocol): # ============================================================================= -# LazyPerceptionModule — agent-callable lazy detector + heartbeat +# LazyPerceptionModule — agent-callable open-vocab perception # ============================================================================= class LazyPerceptionModuleConfig(MemoryModuleConfig): """Config for the lazy perception module.""" - db_path: str | Path = "recording.db" # shared with Recorder + SemanticSearch + db_path: str | Path = "recording.db" # Models (blueprint can override) vlm_provider: type[VlModel] = MoondreamVlModel embedding_model: type[EmbeddingModel] = CLIPModel - # Heartbeat — gives lifecycle events real-world timing. - # Set to 0 to disable; lifecycle then fires only on agent calls. - background_scan_period_s: float = 10.0 - - # Static seed prompts the heartbeat ALWAYS scans for, regardless of - # tracker state. Use for "alert me if X appears" patterns. The - # heartbeat also scans tracker.watched_names dynamically, so curated - # objects appearing in the scene get tracked even if not in this list. - default_prompts: list[str] = Field(default_factory=list) - - # Startup workspace scan — runs once after start() to seed the tracker - # with whatever is already in view. Disabled if startup_prompts is empty. - startup_workspace_scan: bool = True - startup_prompts: list[str] = Field(default_factory=list) - startup_delay_s: float = 5.0 # wait this long after start() for SemanticSearch - - # Lazy pipeline tuning - lookback_window_s: float = 60.0 # peaks must be within this recent window - peak_distance: float = 1.0 # peaks() distance kwarg (seconds) - peak_prominence: float = 0.02 # peaks() prominence kwarg - min_peak_score: float = 0.20 # absolute floor on peak similarity - max_frames_per_call: int = 2 # cap VLM calls per (scan × prompt class) + # Similarity threshold — observations below this don't count as a + # confident match for find_objects / find_objects_near. + min_similarity: float = 0.20 @runtime_checkable @@ -340,106 +198,77 @@ class LazyPerceptionModuleSpec(Protocol): config: LazyPerceptionModuleConfig - # Inputs - watched_names_in: In[set[str]] - """Subscribed to ``ObjectMemoryTracker.watched_names``. Cached - locally; on each heartbeat the latest cached set is unioned with - ``default_prompts`` to determine what to scan for. Initial state - is empty (heartbeat only scans ``default_prompts``) until the - tracker publishes its first update.""" - - # Outputs - detections_out: Out[list[DetObject]] - """Wired by the blueprint to ``ObjectMemoryTracker.raw_detections``.""" - - def find_objects(self, prompts: str) -> str: - """Run the lazy detection pipeline. - - Decorated ``@skill`` in the implementation. ``prompts`` is a - comma-separated list of natural-language object classes; the - pipeline splits, embeds, searches, peaks, VLM-detects, and - publishes ``list[Object]`` for the tracker. - - Returns a human-readable summary string. Bounded to recent - frames by ``lookback_window_s`` — for older lookups use the - tracker's ``recall(name)`` skill. + latest_detections: Out[list[DetObject]] + """Wired by the blueprint to the manipulation stack + (e.g., ``PickAndPlaceModule.objects``). Cache of the most recent + ``find_objects`` / ``find_objects_near`` result. Manipulation + reads from here to act on the named target.""" + + def find_objects(self, prompt: str) -> str: + """Find objects matching ``prompt``. Returns the most recent + confident match with timestamp. + + Decorated ``@skill`` in the implementation. Composes + ``.search()`` + ``.filter(similarity >= min_similarity)`` + + ``.order_by("ts", desc=True)`` + ``.first()`` over + ``color_image_embedded``, then runs VLM detection on the + matching frame and projects to 3D. Publishes ``list[Object]`` + on ``latest_detections``. + + Open-vocab: ``prompt`` can be any natural language + (``"red mug"``, ``"the cup on the right"``). Comma-separated + prompts are split and processed per class because VLM + ``query_detections`` labels every result with the literal + query string. + + Returns a human-readable summary including the observation + timestamp (``"(seen 3s ago)"``) so the agent can judge + freshness. Returns a no-match message when no observation + meets ``min_similarity``. """ ... - -# ============================================================================= -# ObjectMemoryTracker — identity, lifecycle events, watched_names port -# ============================================================================= - - -class ObjectMemoryTrackerConfig(MemoryModuleConfig): - """Config for the object lifecycle tracker.""" - - # Spatial matching - distance_threshold: float = 0.2 # Tier 2 match radius (m) - reacquire_radius: float = 1.0 # Tier 3 & 4 match radius (m) - - # Identity promotion - min_detections_for_permanent: int = 6 # observations before PROMOTED fires - - # Continuous-belief existence model - time_constant_s: float = 15.0 # decay time-constant in confidence(now) = exp(-Δt/τ) - active_threshold: float = 0.5 # ≥ this: published in tracked_objects - match_threshold: float = 0.2 # ≥ this: still match-eligible (tentative band) - lost_threshold: float = 0.1 # < this: emit LOST, move to _lost - recent_lost_window_s: float = 60.0 # how long _lost is searchable for reacquire - - # Persistence - db_path: str | Path = Field(default="recording.db") - - -@runtime_checkable -class ObjectMemoryTrackerSpec(Protocol): - """Protocol for the memory2-backed object lifecycle tracker. - - On start, opens two memory2 streams (observations + events), - rebuilds its in-RAM cache via synchronous replay, and begins - consuming raw detections. Publishes confident objects to - manipulation, exposes cross-session recall as an agent skill, and - publishes the union of currently-tracked and recently-lost voted - names so the upstream detector can scan for them automatically. - - Detector-agnostic: anything publishing ``list[DetObject]`` on - ``raw_detections`` works. - """ - - config: ObjectMemoryTrackerConfig - - # Inputs - raw_detections: In[list[DetObject]] - """Per-scan batch of detections.""" - - # Outputs - tracked_objects: Out[list[DetObject]] - """Confident objects (confidence ≥ ``active_threshold``) published - per scan.""" - - object_events: Out[ObjectEvent] - """Live lifecycle events.""" - - watched_names: Out[set[str]] - """Union of voted names in ``_state`` (currently tracked) and - ``_lost`` (within ``recent_lost_window_s``). Re-published on every - cache update so subscribers (e.g., ``LazyPerceptionModule``'s - heartbeat) automatically scan for newly-discovered objects. - Republished once after the start-time replay so the heartbeat - picks up cross-session state.""" + def find_objects_near( + self, + prompt: str, + x: float, + y: float, + z: float, + radius: float = 1.0, + ) -> str: + """Find objects matching ``prompt``, in frames recorded when + the camera was within ``radius`` meters of ``(x, y, z)``. + + Decorated ``@skill`` in the implementation. Same query shape + as ``find_objects`` but with an extra ``.near((x, y, z), radius)`` + filter that uses memory2's R*Tree spatial index. Otherwise + identical: take most recent confident match → VLM → 3D project + → publish on ``latest_detections``. + + ``.near()`` filters by the camera's pose at record time, NOT + by the detected object's position. Use for "frames captured + while looking at this workspace area"; object-position + filtering is downstream of detection. + """ + ... def recall(self, name: str) -> str: - """Where did I last see something I labelled ? - - Decorated ``@skill`` in the implementation. Queries - ``object_events.tags(name=name).last()`` — works after process - restart because memory2 is the source of truth, not the - in-RAM cache. Unbounded recency (full event history). - - Returns a human-readable summary, or - ``"I have no memory of any ."`` when the events stream - has no matching entry. + """Where did I last see something matching ``name``? + + Decorated ``@skill`` in the implementation. Cheaper cousin of + ``find_objects``: composes ``.search()`` + + ``.order_by("ts", desc=True)`` + ``.first()`` over + ``color_image_embedded`` — returns the most recent confident + semantic match across the full embedding history, but **does + not run VLM**. Returns the camera pose at the matching frame + plus timestamp. + + Works after process restart because memory2's SQLite store is + the persistence layer; the query touches the same db that + previous sessions wrote. + + Returns a human-readable summary with timestamp + (``"Last saw 'cup' near (1.0, 0.5, 0.9) (seen 3min ago)"``), or + ``"No memory of ."`` when nothing matches. """ ... From e03d9890a0940dfeb49a37cfeeff0144b162bda0 Mon Sep 17 00:00:00 2001 From: Jheng-Yi Lin Date: Thu, 14 May 2026 13:46:28 -0700 Subject: [PATCH 5/5] feat(manipulation/memory2): RGBDCameraRecorder + LazyPerceptionModule + xarm6 blueprint - New dimos/manipulation/memory2/ package: RGBDCameraRecorder (records RGBD streams + continuous CLIP embed; resume-if-exists semantics) and LazyPerceptionModule with three @skill methods (find_objects, find_objects_near, recall) composing memory2 query primitives. - xarm6_perception and xarm6_perception_agent blueprints + agent prompt. - PickAndPlaceModule._on_objects also writes _detection_snapshot so pick() works on publish-driven detections (xArm7 path strictly additive). - db_path silo'd to recording_xarm6.db. --- dimos/manipulation/blueprints.py | 171 +++++++++++ dimos/manipulation/memory2/__init__.py | 24 ++ dimos/manipulation/memory2/lazy_perception.py | 277 ++++++++++++++++++ dimos/manipulation/memory2/recorder.py | 86 ++++++ .../manipulation/{memory => memory2}/spec.py | 138 +++++---- dimos/manipulation/pick_and_place_module.py | 18 +- dimos/robot/all_blueprints.py | 2 + 7 files changed, 664 insertions(+), 52 deletions(-) create mode 100644 dimos/manipulation/memory2/__init__.py create mode 100644 dimos/manipulation/memory2/lazy_perception.py create mode 100644 dimos/manipulation/memory2/recorder.py rename dimos/manipulation/{memory => memory2}/spec.py (61%) diff --git a/dimos/manipulation/blueprints.py b/dimos/manipulation/blueprints.py index 1c006c1d04..d0e339418c 100644 --- a/dimos/manipulation/blueprints.py +++ b/dimos/manipulation/blueprints.py @@ -37,6 +37,7 @@ from dimos.core.transport import LCMTransport from dimos.hardware.sensors.camera.realsense.camera import RealSenseCamera from dimos.manipulation.manipulation_module import ManipulationModule +from dimos.manipulation.memory2 import LazyPerceptionModule, RGBDCameraRecorder from dimos.manipulation.pick_and_place_module import PickAndPlaceModule from dimos.msgs.geometry_msgs.Quaternion import Quaternion from dimos.msgs.geometry_msgs.Transform import Transform @@ -338,8 +339,178 @@ ) +# ============================================================================ +# XArm6 perception — memory2-native pipeline (open-vocab VLM, no tracker) +# ============================================================================ +# Mirrors the xArm7 xarm_perception blueprint but uses the new memory2-native +# perception modules (RGBDCameraRecorder + LazyPerceptionModule) in place of +# ObjectSceneRegistrationModule. RGBDCameraRecorder also runs the continuous +# CLIP embed pipeline (no separate SemanticSearch module — see spec.py for the +# "why one module" reasoning). The PickAndPlaceModule API is unchanged — its +# `objects: In[list[Object]]` port is now fed by LazyPerceptionModule.objects. +# +# TF chain: world → link6 (ManipulationModule) → camera_link (RealSense) +# Usage: launch coordinator-xarm6 in one terminal, then xarm6-perception-agent in another: +# Terminal 1: XARM6_IP= dimos run coordinator-xarm6 +# Terminal 2: XARM6_IP= dimos run xarm6-perception-agent +# Without the coordinator the arm won't actually move (joint_state LCM transport +# expects an external coordinator publisher). +# +# Camera transform: PLACEHOLDER reusing xArm7 values. Replace with measured +# hand-eye calibration for the xArm6 mount on first hardware run if positions +# returned by find_objects are off by a consistent offset. +_XARM6_PERCEPTION_CAMERA_TRANSFORM = Transform( + translation=Vector3(x=0.06693724, y=-0.0309563, z=0.00691482), + rotation=Quaternion(0.70513398, 0.00535696, 0.70897578, -0.01052180), # xyzw +) + +_xarm6_perception_cfg = _catalog_xarm6( + name="arm", + adapter_type="xarm" if global_config.xarm6_ip else "mock", + address=global_config.xarm6_ip, + pitch=math.radians(45), + add_gripper=True, + tf_extra_links=["link6"], +) + +xarm6_perception = ( + autoconnect( + PickAndPlaceModule.blueprint( + robots=[_xarm6_perception_cfg.to_robot_model_config()], + planning_timeout=10.0, + enable_viz=True, + floor_z=-0.02, + ), + RealSenseCamera.blueprint( + base_frame_id="link6", + base_transform=_XARM6_PERCEPTION_CAMERA_TRANSFORM, + ), + # Memory2-native perception pipeline — replaces ObjectSceneRegistrationModule. + # RGBDCameraRecorder records color/depth/intrinsics AND continuously + # CLIP-embeds color frames into color_image_embedded. + # db_path is xArm6-specific to silo from other memory2 users and to avoid + # picking up stale embeddings from a different camera mount. + # Resume-if-exists: delete recording_xarm6.db manually for a clean slate. + RGBDCameraRecorder.blueprint(db_path="recording_xarm6.db"), + LazyPerceptionModule.blueprint(db_path="recording_xarm6.db"), + vis_module("foxglove"), + ) + .transports( + { + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), + } + ) + .global_config(n_workers=4) +) + + +# XArm6 perception + LLM agent. Three perception skills (find_objects, +# find_objects_near, recall) replace the previous look / scan_objects from +# the tracker-based pipeline. The agent reads the (seen Ns ago) timestamp +# in each response and decides whether to re-query before acting. +# Usage: launch coordinator-xarm6 in one terminal, then xarm6-perception-agent in another: +# Terminal 1: XARM6_IP= dimos run coordinator-xarm6 +# Terminal 2: XARM6_IP= dimos run xarm6-perception-agent +# Without the coordinator the arm won't actually move (joint_state LCM transport +# expects an external coordinator publisher). +_MANIPULATION_AGENT_SYSTEM_PROMPT_XARM6 = """\ +You are a robotic manipulation assistant controlling an xArm6 robot arm with an \ +eye-in-hand RealSense camera and a gripper. + +# Skills + +## Perception +- **find_objects **: Find objects matching the prompt in the current scene. \ +Returns 3D positions and a "(seen Ns ago)" timestamp so you can judge freshness. \ +Prompt is any natural language — use spatial qualifiers ("the cup on the right") \ +or attributes ("the red mug") to disambiguate when multiple instances exist. \ +Example: "what cups are there?" → find_objects("cup"); "find the red mug" → \ +find_objects("red mug"). + +- **find_objects_near [radius]**: Same as find_objects but \ +narrows to frames captured when the camera was within `radius` meters of (x, y, z). \ +Use when you know which workspace area to scan — cheaper than full-scene query. \ +Example: "find a screwdriver near the workbench at 1.0, 0.5, 0.8" → \ +find_objects_near("screwdriver", 1.0, 0.5, 0.8, 0.5). + +- **recall **: Where did I last see something matching ? Cheaper than \ +find_objects (no VLM, returns camera pose at the matching frame plus timestamp), \ +across full session history including previous process runs. Use for "where was the \ +cup earlier?" / "have I ever seen a wrench?" type questions. + +## Pick & Place +- **pick **: Pick up an object that find_objects most recently \ +detected. Use the EXACT name from find_objects output. When multiple instances \ +match, add a spatial or attribute qualifier in your find_objects prompt ("cup on \ +the right") so VLM returns just one — then pick operates unambiguously. +- **place **: Place a held object at explicit world-frame coordinates. \ +Example: "place it at 0.4, 0.3, 0.1" +- **drop_on **: Drop a held object onto another detected object. \ +Automatically compensates for camera occlusion. Example: "drop it in the bowl". +- **place_back**: Return a held object to its original pick position. +- **pick_and_place **: Pick then place in one command. + +## Motion +- **move_to_pose [roll pitch yaw]**: Move end-effector to an absolute \ +world-frame pose (meters / radians). +- **move_to_joints **: Move to a 6-DOF joint configuration (radians). +- **go_home**: Move to the home/observe position. +- **go_init**: Return to startup position. Use after pick/place as a safe rest pose. + +## Gripper +- **open_gripper / close_gripper / set_gripper**: Direct gripper control. + +## Status & Recovery +- **get_robot_state**: Current joint positions, end-effector pose, gripper state. +- **get_scene_info**: Latest detection snapshot (from the most recent find_objects call). +- **reset**: Clear a FAULT state and return to IDLE. +- **clear_perception_obstacles**: Remove detected obstacles from the planning world. \ +Use when planning fails with COLLISION_AT_START. + +# Freshness and re-querying +- Every find_objects / find_objects_near response includes "(seen Ns ago)". +- A few seconds old → fresh, safe to act on. +- Minutes old → scene may have changed; re-query before acting. +- If you just picked or placed an object, the scene changed — re-query before the \ +next action. + +# Disambiguation (multiple instances) +- If find_objects returns multiple positions, either: + 1. Re-query with a spatial/attribute qualifier ("cup on the right", "red cup"), or + 2. Read the positions yourself and choose by reasoning (e.g., "right" = larger Y; \ +"closest" = smallest distance from gripper). +- Don't rely on identity across calls — find_objects always reflects the most recent \ +detection. The same name in two calls may be the same physical object or different ones. + +# Rules +- Use the EXACT object name from find_objects output. Do NOT substitute similar names. +- "drop it in/on [object]" → use **drop_on**. "place it at [coords]" → use **place**. +- "bring it back" → pick, then **go_init**. +- "bring it to me" / "hand it over" → pick, then move toward user (≈ X=0, Y=0.5). +- NEVER open the gripper while holding an object unless executing place / drop_on / \ +when the user explicitly asks. +- After pick or place, return to init with **go_init** unless another action follows. + +# Coordinate System +World frame (meters): X = forward, Y = left, Z = up. Z = 0 is robot base. +Typical working area: X 0.3–0.7, Y −0.5 to 0.5, Z 0.05–0.5. + +# Error Recovery +If planning fails with COLLISION_AT_START: call **clear_perception_obstacles**, then \ +**reset**, then retry. +""" + +xarm6_perception_agent = autoconnect( + xarm6_perception, + McpServer.blueprint(), + McpClient.blueprint(system_prompt=_MANIPULATION_AGENT_SYSTEM_PROMPT_XARM6), +) + + __all__ = [ "dual_xarm6_planner", + "xarm6_perception", + "xarm6_perception_agent", "xarm6_planner_only", "xarm7_planner_coordinator", "xarm7_planner_coordinator_agent", diff --git a/dimos/manipulation/memory2/__init__.py b/dimos/manipulation/memory2/__init__.py new file mode 100644 index 0000000000..59906b9c29 --- /dev/null +++ b/dimos/manipulation/memory2/__init__.py @@ -0,0 +1,24 @@ +"""Manipulation × memory2 integration. + +Three agent-callable perception skills built on memory2 primitives: +``find_objects``, ``find_objects_near``, ``recall``. See ``spec.py`` +for the architecture docstring, configs, and Protocol definitions. +""" + +from dimos.manipulation.memory2.lazy_perception import LazyPerceptionModule +from dimos.manipulation.memory2.recorder import RGBDCameraRecorder +from dimos.manipulation.memory2.spec import ( + LazyPerceptionModuleConfig, + LazyPerceptionModuleSpec, + RGBDCameraRecorderConfig, + RGBDCameraRecorderSpec, +) + +__all__ = [ + "LazyPerceptionModule", + "LazyPerceptionModuleConfig", + "LazyPerceptionModuleSpec", + "RGBDCameraRecorder", + "RGBDCameraRecorderConfig", + "RGBDCameraRecorderSpec", +] diff --git a/dimos/manipulation/memory2/lazy_perception.py b/dimos/manipulation/memory2/lazy_perception.py new file mode 100644 index 0000000000..718e8af159 --- /dev/null +++ b/dimos/manipulation/memory2/lazy_perception.py @@ -0,0 +1,277 @@ +"""LazyPerceptionModule — agent-callable open-vocab perception. + +Three @skill methods, each a one-line composition of memory2 query +primitives. See ``spec.py`` for the architecture docstring. +""" + +from __future__ import annotations + +from collections.abc import Callable +import time +from typing import Any + +from dimos.agents.annotation import skill +from dimos.core.core import rpc +from dimos.core.stream import Out +from dimos.manipulation.memory2.spec import LazyPerceptionModuleConfig +from dimos.memory2.module import MemoryModule +from dimos.models.embedding.base import EmbeddingModel +from dimos.models.vl.base import VlModel +from dimos.msgs.geometry_msgs.Quaternion import Quaternion +from dimos.msgs.geometry_msgs.Transform import Transform +from dimos.msgs.geometry_msgs.Vector3 import Vector3 +from dimos.perception.detection.type.detection3d.object import Object as DetObject +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + + +def _relative_time(ts: float) -> str: + """Format a past timestamp as a human-readable age.""" + delta = max(0.0, time.time() - ts) + if delta < 60: + return f"{int(delta)}s ago" + if delta < 3600: + return f"{int(delta // 60)}min ago" + return f"{int(delta // 3600)}h ago" + + +class LazyPerceptionModule(MemoryModule): + """Lazy memory2-native open-vocab object detector. + + Three skills, each a one-line memory2 composition. Stateless: + every call is an independent query → VLM (find_objects only) + → 3D → publish. + """ + + config: LazyPerceptionModuleConfig + + objects: Out[list[DetObject]] + + _vlm: VlModel | None = None + _clip: EmbeddingModel | None = None + + @rpc + def start(self) -> None: + super().start() + self._vlm = self.register_disposable(self.config.vlm_provider()) + self._vlm.start() + self._clip = self.register_disposable(self.config.embedding_model()) + self._clip.start() + + # ------------------------------------------------------------------ skills + + @skill + def find_objects(self, prompt: str) -> str: + """Find objects matching ``prompt``. Returns most recent confident + match with timestamp. + + Open-vocab: ``prompt`` can be any natural language. Comma-separated + prompts are split and processed per class because Moondream's + ``query_detections`` labels every result with the literal query + string. + """ + prompt_list = [p.strip() for p in prompt.split(",") if p.strip()] + if not prompt_list: + self.objects.publish([]) + return "No prompts provided." + + all_objects: list[DetObject] = [] + most_recent_ts: float | None = None + for single in prompt_list: + objs, ts = self._find_and_project( + single, + build_query=lambda stream, vec: stream.search(vec), + ) + all_objects.extend(objs) + if ts is not None and (most_recent_ts is None or ts > most_recent_ts): + most_recent_ts = ts + + self.objects.publish(all_objects) + if not all_objects: + return f"No confident '{prompt}' match in memory." + + age = _relative_time(most_recent_ts) if most_recent_ts is not None else "unknown age" + lines = [self._fmt_object_line(o) for o in all_objects] + return f"Found {len(all_objects)} object(s) matching '{prompt}' (seen {age}):\n" + "\n".join(lines) + + @skill + def find_objects_near( + self, + prompt: str, + x: float, + y: float, + z: float, + radius: float = 1.0, + ) -> str: + """Find objects matching ``prompt`` in frames recorded when the + camera was within ``radius`` meters of ``(x, y, z)``. + + ``.near()`` filters by the camera's pose at record time, NOT by + the detected object's position. Note: applied as a Python + post-filter after vector search; R*Tree pre-gating of the + vector index is a memory2 follow-up. + """ + prompt_list = [p.strip() for p in prompt.split(",") if p.strip()] + if not prompt_list: + self.objects.publish([]) + return "No prompts provided." + + pose = (x, y, z) + all_objects: list[DetObject] = [] + most_recent_ts: float | None = None + for single in prompt_list: + objs, ts = self._find_and_project( + single, + build_query=lambda stream, vec: stream.near(pose, radius).search(vec), + ) + all_objects.extend(objs) + if ts is not None and (most_recent_ts is None or ts > most_recent_ts): + most_recent_ts = ts + + self.objects.publish(all_objects) + if not all_objects: + return f"No confident '{prompt}' match near ({x:.2f}, {y:.2f}, {z:.2f}) within {radius}m." + + age = _relative_time(most_recent_ts) if most_recent_ts is not None else "unknown age" + lines = [self._fmt_object_line(o) for o in all_objects] + return ( + f"Found {len(all_objects)} object(s) matching '{prompt}' " + f"near ({x:.2f}, {y:.2f}, {z:.2f}) within {radius}m (seen {age}):\n" + + "\n".join(lines) + ) + + @skill + def recall(self, name: str) -> str: + """Where did I last see something matching ``name``? + + Cheaper than ``find_objects``: no VLM, no 3D projection. Returns + the camera pose at the most recent confident semantic match plus + timestamp. Works across process restarts because memory2's SQLite + store is the persistence layer. + """ + if self._clip is None: + return f"No memory of '{name}'." + try: + vec = self._clip.embed_text(name) + obs = ( + self.store.streams.color_image_embedded + .search(vec) + .filter(lambda o: (o.similarity or 0) >= self.config.min_similarity) + .order_by("ts", desc=True) + .first() + ) + except (AttributeError, LookupError): + return f"No memory of '{name}'." + except Exception as e: + logger.warning("recall(%r) failed: %s", name, e) + return f"No memory of '{name}'." + + age = _relative_time(obs.ts) + if obs.pose: + x, y, z = obs.pose[0], obs.pose[1], obs.pose[2] + return f"Last saw '{name}' with camera near ({x:.2f}, {y:.2f}, {z:.2f}) ({age})." + return f"Last saw '{name}' (camera pose unknown) ({age})." + + # ----------------------------------------------------------- internal + + def _find_and_project( + self, + prompt: str, + build_query: Callable[[Any, Any], Any], + ) -> tuple[list[DetObject], float | None]: + """Composed memory2 pipeline for ONE prompt class. + + Returns (objects, observation_ts). ``build_query`` is a callable + ``(stream, query_vec) -> filtered Stream`` so find_objects / + find_objects_near can share the rest of the pipeline. + """ + if self._clip is None: + return [], None + try: + vec = self._clip.embed_text(prompt) + obs = ( + build_query(self.store.streams.color_image_embedded, vec) + .filter(lambda o: (o.similarity or 0) >= self.config.min_similarity) + .order_by("ts", desc=True) + .first() + ) + except (AttributeError, LookupError): + return [], None + except Exception as e: + logger.warning("_find_and_project(%r): %s", prompt, e) + return [], None + + return self._detect_and_project_one(obs, prompt), obs.ts + + def _detect_and_project_one(self, color_obs: Any, prompt: str) -> list[DetObject]: + """VLM detection + 3D projection for ONE peak frame, ONE prompt class.""" + if self._vlm is None: + return [] + + # Aligned depth + latest intrinsics. .first()/.last() raise LookupError on empty. + try: + depth_obs = self.store.streams.depth_image.at( + color_obs.ts, tolerance=0.1 + ).first() + info_obs = self.store.streams.camera_info.last() + except LookupError: + logger.warning("missing depth/info near ts=%.3f", color_obs.ts) + return [] + except AttributeError: + logger.warning("depth_image or camera_info stream not yet available") + return [] + + # VLM can hang (network) or OOM (CUDA). Treat failure as no-detection. + try: + dets_2d = self._vlm.query_detections(color_obs.data, prompt) + except Exception as e: + logger.warning("VLM failed (prompt=%r ts=%.3f): %s", prompt, color_obs.ts, e) + return [] + if not dets_2d.detections: + return [] + + camera_transform = self._camera_transform_from_pose(color_obs.pose) + try: + # from_2d_to_list's annotation says ImageDetections2D[Detection2DSeg] + # but the implementation at object.py:205-215 handles bbox-only + # detections too (synthesizes a rectangular mask). Moondream returns + # bbox-only — works at runtime. + return DetObject.from_2d_to_list( + detections_2d=dets_2d, # type: ignore[arg-type] + color_image=color_obs.data, + depth_image=depth_obs.data, + camera_info=info_obs.data, + camera_transform=camera_transform, + max_distance=self.config.max_distance, + use_aabb=self.config.use_aabb, + max_obstacle_width=self.config.max_obstacle_width, + ) + except Exception as e: + logger.warning("from_2d_to_list failed at ts=%.3f: %s", color_obs.ts, e) + return [] + + def _camera_transform_from_pose(self, pose: Any) -> Transform | None: + """Build a Transform from the recorder's pose-stamped observation. + + Observation.pose is always a 7-tuple (x, y, z, qx, qy, qz, qw) or None + (memory2/observationstore/sqlite.py:_decompose_pose). + Object.from_2d_to_list(camera_transform=T) applies T to a camera-frame + pointcloud to produce world-frame output — T is camera→world; the + recorder's pose-in-world IS that. NO .inverse(). + """ + if pose is None: + return None + try: + x, y, z, qx, qy, qz, qw = pose + except (TypeError, ValueError): + logger.warning("unexpected pose shape: %r", type(pose).__name__) + return None + return Transform( + translation=Vector3(x, y, z), + rotation=Quaternion(qx, qy, qz, qw), + ) + + @staticmethod + def _fmt_object_line(o: DetObject) -> str: + return f" - {o.name} at ({o.center.x:.2f}, {o.center.y:.2f}, {o.center.z:.2f})" diff --git a/dimos/manipulation/memory2/recorder.py b/dimos/manipulation/memory2/recorder.py new file mode 100644 index 0000000000..287c5a298c --- /dev/null +++ b/dimos/manipulation/memory2/recorder.py @@ -0,0 +1,86 @@ +"""RGBDCameraRecorder — Recorder + continuous CLIP embed for RGBD streams. + +Two things this module does on top of the base ``Recorder`` pattern at +``dimos/robot/unitree/go2/blueprints/smart/unitree_go2.py:49-53``: + +1. **Continuous CLIP embed pipeline.** Same logic as ``memory2.SemanticSearch``, + but co-located here so it shares the recorder's store. See ``spec.py`` for + the "why in one module" reasoning (memory2's SubjectNotifier is per-store). + +2. **Resume-if-exists semantics.** ``start()`` overrides ``Recorder.start()`` + to skip the overwrite-or-fail gate. If ``recording.db`` exists, we open it + and append (cross-session memory). If it doesn't, SQLite creates it fresh. + No flag flipping, no manual file deletion between runs. Users wanting a + clean slate delete the file themselves. +""" + +from __future__ import annotations + +from typing import Any + +from dimos.core.core import rpc +from dimos.core.stream import In +from dimos.manipulation.memory2.spec import RGBDCameraRecorderConfig +from dimos.memory2.embed import EmbedImages +from dimos.memory2.module import Recorder +from dimos.memory2.stream import Stream as MemoryStream +from dimos.memory2.transform import QualityWindow +from dimos.models.embedding.base import EmbeddingModel +from dimos.msgs.sensor_msgs.CameraInfo import CameraInfo +from dimos.msgs.sensor_msgs.Image import Image +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + + +class RGBDCameraRecorder(Recorder): + """Records RGBD streams (color + depth + intrinsics) and continuously + CLIP-embeds qualifying color frames into ``color_image_embedded``.""" + + color_image: In[Image] + depth_image: In[Image] + camera_info: In[CameraInfo] + + config: RGBDCameraRecorderConfig + + _embedding_model: EmbeddingModel | None = None + _embeddings: MemoryStream[Any] | None = None + + @rpc + def start(self) -> None: + + super(Recorder, self).start() + + if self.config.g.replay: + logger.info( + "Replay mode active — recording disabled, leaving %s untouched", + self.config.db_path, + ) + return + + if not self.inputs: + logger.warning("RGBDCameraRecorder has no In ports — nothing to record") + return + + # Replicate Recorder.start()'s port-to-stream registration. Touching + # self.store lazily opens the SqliteStore at db_path (existing file or new). + for name, port in self.inputs.items(): + stream: MemoryStream[Any] = self.store.stream(name, port.type) + self._port_to_stream(name, port, stream) + logger.info("Recording %s (%s)", name, port.type.__name__) + + # Continuous CLIP embed — same store, same notifier, .live() works. + self._embedding_model = self.register_disposable(self.config.embedding_model()) + self._embedding_model.start() + self._embeddings = self.store.stream("color_image_embedded", Image) + + # fmt: off + self.store.streams.color_image \ + .live() \ + .filter(lambda obs: obs.data.brightness > 0.1) \ + .transform(QualityWindow(lambda img: img.sharpness, window=0.5)) \ + .transform(EmbedImages(self._embedding_model, batch_size=2)) \ + .save(self._embeddings) \ + .drain_thread() + # fmt: on + logger.info("RGBDCameraRecorder: continuous CLIP embed pipeline active") diff --git a/dimos/manipulation/memory/spec.py b/dimos/manipulation/memory2/spec.py similarity index 61% rename from dimos/manipulation/memory/spec.py rename to dimos/manipulation/memory2/spec.py index 6f2b25a457..f81dc14916 100644 --- a/dimos/manipulation/memory/spec.py +++ b/dimos/manipulation/memory2/spec.py @@ -9,35 +9,39 @@ camera | v - RGBDCameraRecorder ==> color_image ──auto-subscribe──┐ - ==> depth_image | - ==> camera_info v - SemanticSearch - (continuous CLIP, - brightness/sharpness - filtered) - | - v - color_image_embedded - | - ┌──────────── pulled on trigger ──────────────┘ - v - LazyPerceptionModule - | @skill find_objects(prompt) + RGBDCameraRecorder + ├── records: color_image, depth_image, camera_info + └── continuous CLIP embed (brightness/sharpness filtered) + → color_image_embedded + | + ┌──────────── pulled on trigger (read-only) ──┐ + v | + LazyPerceptionModule | + | @skill find_objects(prompt) | + | .search(vec).filter(sim>=thr).order_by("ts", desc).first() + | → VLM detect → 3D project | + | @skill find_objects_near(prompt, x, y, z, radius) | + | .near((x,y,z), r).search(vec).filter(...).order_by(...).first() + | → VLM detect → 3D project | + | @skill recall(name) | | .search(vec).filter(sim>=thr).order_by("ts", desc).first() - | → VLM detect → 3D project - | @skill find_objects_near(prompt, x, y, z, radius) - | .near((x,y,z), r).search(vec).filter(sim>=thr).order_by("ts", desc).first() - | → VLM detect → 3D project - | @skill recall(name) - | .search(vec).order_by("ts", desc).first() | → return camera pose + timestamp (no VLM, cheaper) - | - | latest_detections: Out[list[DetObject]] - v - PickAndPlaceModule (manipulation — reads latest_detections) + | | + | objects: Out[list[DetObject]] | + v | + PickAndPlaceModule (manipulation — In[objects]) | + | + Shared SQLite store (recording.db) ─────────────────────┘ + Recorder writes; LazyPerceptionModule opens its own SqliteStore + on the same file (read-only via WAL). - All streams live in one shared SQLite store (recording.db). + Why recording + embedding in ONE module: memory2's SubjectNotifier + is in-memory per-store, so a separate-module embedder couldn't + receive .live() notifications from the recorder's writes even on + the same db_path. Keeping them in one module = one store = one + notifier = live pipeline works. The LazyPerceptionModule reads + only at skill-call time (no .live()), so cross-instance SQLite + reads (WAL) are sufficient there. Three skills, three query shapes @@ -52,12 +56,16 @@ - ``find_objects(prompt)`` — **semantic + most recent**. Search the embedding stream, filter to confident matches, take the most recent. Run VLM + 3D projection on that frame. Returns ``list[Object]`` (via - ``latest_detections``) plus formatted summary with timestamp. + the ``objects`` port) plus formatted summary with timestamp. - ``find_objects_near(prompt, x, y, z, radius)`` — **spatial + semantic - + most recent**. Same as ``find_objects`` but pre-filters via - memory2's R*Tree index to frames recorded when the camera pose was - within ``radius`` of ``(x, y, z)``. + + most recent**. Same as ``find_objects`` plus a camera-pose proximity + filter. NOTE on indexing: memory2 currently runs vector ``.search()`` + as a top-K vec0 query and applies the spatial ``.near()`` predicate in + Python afterward — the R*Tree index does NOT pre-filter the embedding + search. Correct results, but for very long recordings the top-K cap + can elide spatially-near low-similarity frames. Pushing R*Tree + pre-filtering through ``Backend._vector_search`` is a memory2 follow-up. - ``recall(name)`` — **cheaper cousin of find_objects**. Same query shape but skips VLM detection and 3D projection; returns the camera @@ -78,8 +86,10 @@ 2. Build a memory2 query on ``color_image_embedded`` by composing the relevant filters (``.near`` / ``.search`` / ``.filter`` / - ``.order_by``). All filters push down to SQL / R*Tree / vec0 - indexes via memory2's store backend. + ``.order_by``). ``.search`` pushes through the vec0 vector index; + tag/time/SQL-expressible filters push to SQL. ``.near`` runs as a + Python post-filter after vector search (R*Tree pre-gating of vector + search is a memory2 follow-up). 3. Take ``.first()`` — the most recent confident match. Single observation. @@ -90,11 +100,21 @@ ``Object.from_2d_to_list`` (camera→world transform from the recorder's pose). -5. Publish ``list[Object]`` on ``latest_detections``. Return a - formatted summary that **includes the observation timestamp** so - the agent can read freshness. +5. Publish ``list[Object]`` on ``objects``. Return a formatted summary + that **includes the observation timestamp** so the agent can read + freshness. -``PickAndPlaceModule`` reads ``latest_detections`` to act on named +**Snapshot contract.** Every publish on ``objects`` replaces +``PickAndPlaceModule._detection_snapshot`` — the cache ``pick`` / +``place`` read. So: + +- A successful ``find_objects("cup")`` → snapshot = [cup objects]; ``pick("cup")`` works. +- An interleaved ``find_objects("apple")`` returning 0 matches → snapshot = []; later + ``pick("cup")`` fails ("not found"). Re-query before acting. +- This is intentional. Each ``find_objects`` call is a fresh world model, not an + accumulating one. The agent prompt makes this explicit. + +``PickAndPlaceModule`` reads ``objects`` to act on named targets. Multiple instances in the result are handled either by prompt qualifiers ("cup on the right" → VLM returns one detection in the frame) or by the agent reading the returned positions and @@ -110,7 +130,8 @@ depth_image depth frames (one row per camera tick) camera_info intrinsics (rarely change, .last() suffices) color_image_embedded CLIP vector + image bytes, vec0-indexed - for similarity search (populated by SemanticSearch) + for similarity search (populated by the + recorder's continuous embed pipeline) Open vocab + cross-session memory @@ -120,8 +141,9 @@ or ``find_objects_near`` (``"red mug with handle"``, ``"the screwdriver near the laptop"``). The VLM handles visual disambiguation; the agent can encode spatial qualifiers in the prompt, -or call ``find_objects_near`` to use memory2's R*Tree index for -camera-pose-bounded queries. +or call ``find_objects_near`` for camera-pose-bounded queries (Python +spatial post-filter; see the ``find_objects_near`` note above for the +indexing limitation). Cross-session memory is implicit: memory2 persists every observation in ``recording.db``. Skills query the full embedding history regardless @@ -130,6 +152,12 @@ correct after a process restart; the agent reads the timestamp and decides. +``RGBDCameraRecorder`` uses **resume-if-exists**: it opens ``recording.db`` +if the file is there (cross-session memory works out of the box) and +creates it fresh if it isn't. ``RecorderConfig.overwrite`` is inert in +this subclass — see ``recorder.py``. Users wanting a clean slate delete +the file manually before launching. + """ from __future__ import annotations @@ -157,14 +185,15 @@ class RGBDCameraRecorderConfig(RecorderConfig): - """Config for the generic RGBD-camera recorder.""" + """Config for the RGBD-camera recorder + continuous embedder.""" db_path: str | Path = "recording.db" + embedding_model: type[EmbeddingModel] = CLIPModel @runtime_checkable class RGBDCameraRecorderSpec(Protocol): - """Protocol for the generic RGBD-camera recorder.""" + """Protocol for the RGBD-camera recorder + continuous embedder.""" config: RGBDCameraRecorderConfig @@ -191,6 +220,11 @@ class LazyPerceptionModuleConfig(MemoryModuleConfig): # confident match for find_objects / find_objects_near. min_similarity: float = 0.20 + # 3D projection knobs threaded into ``Object.from_2d_to_list``. + max_distance: float = 1.0 + use_aabb: bool = True + max_obstacle_width: float = 0.06 + @runtime_checkable class LazyPerceptionModuleSpec(Protocol): @@ -198,11 +232,12 @@ class LazyPerceptionModuleSpec(Protocol): config: LazyPerceptionModuleConfig - latest_detections: Out[list[DetObject]] - """Wired by the blueprint to the manipulation stack - (e.g., ``PickAndPlaceModule.objects``). Cache of the most recent - ``find_objects`` / ``find_objects_near`` result. Manipulation - reads from here to act on the named target.""" + objects: Out[list[DetObject]] + """Wired by the blueprint to ``PickAndPlaceModule.objects``. Cache + of the most recent ``find_objects`` / ``find_objects_near`` result. + Manipulation reads from here to act on the named target. Named + ``objects`` to match the existing manipulation In port for + autoconnect-by-name.""" def find_objects(self, prompt: str) -> str: """Find objects matching ``prompt``. Returns the most recent @@ -213,7 +248,7 @@ def find_objects(self, prompt: str) -> str: ``.order_by("ts", desc=True)`` + ``.first()`` over ``color_image_embedded``, then runs VLM detection on the matching frame and projects to 3D. Publishes ``list[Object]`` - on ``latest_detections``. + on the ``objects`` port. Open-vocab: ``prompt`` can be any natural language (``"red mug"``, ``"the cup on the right"``). Comma-separated @@ -240,10 +275,11 @@ def find_objects_near( the camera was within ``radius`` meters of ``(x, y, z)``. Decorated ``@skill`` in the implementation. Same query shape - as ``find_objects`` but with an extra ``.near((x, y, z), radius)`` - filter that uses memory2's R*Tree spatial index. Otherwise - identical: take most recent confident match → VLM → 3D project - → publish on ``latest_detections``. + as ``find_objects`` plus an extra ``.near((x, y, z), radius)`` + predicate. ``.near()`` runs as a Python post-filter today — + memory2's R*Tree index does not pre-gate the vector search. + Take most recent confident match → VLM → 3D project → publish + on the ``objects`` port. ``.near()`` filters by the camera's pose at record time, NOT by the detected object's position. Use for "frames captured diff --git a/dimos/manipulation/pick_and_place_module.py b/dimos/manipulation/pick_and_place_module.py index 9ca223832c..613c73d10e 100644 --- a/dimos/manipulation/pick_and_place_module.py +++ b/dimos/manipulation/pick_and_place_module.py @@ -130,8 +130,24 @@ def start(self) -> None: logger.info("PickAndPlaceModule started") def _on_objects(self, objects: list[DetObject]) -> None: - """Callback when objects received from perception (runs on RxPY thread pool).""" + """Callback when objects received from perception (runs on RxPY thread pool). + + Also writes ``_detection_snapshot`` so ``pick``/``place`` can act on + publish-driven detections (e.g., LazyPerceptionModule.find_objects) + without an explicit refresh_obstacles()/look() cycle. + + Snapshot has TWO writers now: + - this callback (publish-driven, xArm6-style continuous flow) + - ``refresh_obstacles()`` (scan-driven, xArm7-style episodic flow) + For the xArm7 path with ObjectSceneRegistrationModule (publishes + only non-empty ``all_permanent``, monotonically), the callback + keeps the snapshot self-healed to the latest cached set — strictly + additive vs the previous scan→freeze semantics. The previous + "freeze at refresh time" was a workaround for cache volatility, + not a hard contract. + """ try: + self._detection_snapshot = list(objects) if self._world_monitor is not None: self._world_monitor.on_objects(objects) except Exception as e: diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index b88159dd8e..3e164b4695 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -108,6 +108,8 @@ "xarm-perception-agent": "dimos.manipulation.blueprints:xarm_perception_agent", "xarm-perception-sim": "dimos.manipulation.blueprints:xarm_perception_sim", "xarm-perception-sim-agent": "dimos.manipulation.blueprints:xarm_perception_sim_agent", + "xarm6-perception": "dimos.manipulation.blueprints:xarm6_perception", + "xarm6-perception-agent": "dimos.manipulation.blueprints:xarm6_perception_agent", "xarm6-planner-only": "dimos.manipulation.blueprints:xarm6_planner_only", "xarm7-planner-coordinator": "dimos.manipulation.blueprints:xarm7_planner_coordinator", "xarm7-planner-coordinator-agent": "dimos.manipulation.blueprints:xarm7_planner_coordinator_agent",