Skip to content

Commit b2cf818

Browse files
committed
Fix thread scope propagation in AtomicMemory provider
## Summary Thread scope (`Scope.thread`) was silently dropped before reaching the Core API. Ingest, search, and list operations never forwarded `session_id` on the wire, so thread-scoped reads returned unfiltered results and thread-scoped writes stored memories without a session association. This fix propagates `thread` as `session_id` on every route that Core honors it, strips it from routes that do not (get, delete, expand), and validates that responses echo back a matching `session_id` for thread-scoped requests. ## Changes - Add `thread: str | None` field to `UserScope` and `WorkspaceScope`. - Add `include_thread` flag to `scope_to_fields` and `scope_to_query_pairs`; emit `session_id` when opted in. - Add `strip_read_filters` to `scope_mapper` — drops both `agent_scope` and `thread` for routes (get, delete, expand) that do not accept those filters. - Fix `strip_agent_scope` to preserve `thread` when clearing workspace filter. - Propagate `session_id` on ingest and search request bodies in `provider.py` / `async_provider.py`. - Extract `_build_list_path` helper that appends `session_id` to the list query string when thread is set; use it in both sync and async providers. - Use `include_thread=True` in `scope_to_fields` / `scope_to_query_pairs` calls for ingest, search, and list in `handle_impl.py` and `async_handle_impl.py`. - Add `_build_memory_scope` in `handle_impl.py` and `_build_scope` in `mappers.py`: project `session_id` from the backend response back into the returned scope, and raise `ValueError` when a thread-scoped request receives a response without a matching `session_id`. - Add comprehensive test coverage across `test_provider.py`, `test_async_provider.py`, `test_handle_base.py`, `test_mappers.py`, and `test_scope_mapper.py` for all affected operations. ## Why The Core API uses `session_id` to partition memories by conversation thread. Without forwarding `thread` as `session_id`, thread-scoped ingests write memories outside any session, thread-scoped searches return the full unfiltered memory set, and there is no way to verify the backend actually applied the requested scope. The validation step on responses closes a silent correctness gap where a mismatch between requested and returned session could go undetected. ## Validation All new tests are exercised via `respx` mocks against the full HTTP path. Tests cover: - `session_id` present in request body/query for ingest, search, list, package, and `search_as_of`. - `session_id` absent from get and delete query strings (routes that do not filter by thread). - Expand strips `thread` from returned memory scope. - Response mapper raises on a missing or mismatched `session_id` when thread scope was requested. - `strip_read_filters` and `strip_agent_scope` preserve or clear `thread` as expected.
1 parent e448096 commit b2cf818

12 files changed

Lines changed: 451 additions & 24 deletions

File tree

atomicmemory/providers/atomicmemory/async_handle_impl.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
scope_to_fields,
4848
scope_to_query_pairs,
4949
strip_agent_scope,
50+
strip_read_filters,
5051
)
5152

5253
Route = Callable[[str], str]
@@ -93,7 +94,7 @@ async def expand(self, refs: list[str], scope: MemoryScope) -> list[AtomicMemory
9394
method="POST",
9495
json=body,
9596
)
96-
echoed = strip_agent_scope(scope)
97+
echoed = strip_read_filters(scope)
9798
return [_to_atomic_memory(m, echoed) for m in raw.get("memories", [])]
9899

99100
async def list(
@@ -103,7 +104,7 @@ async def list(
103104
) -> AtomicMemoryListResultPage:
104105
opts = _coerce_list_options(options)
105106
_assert_list_options_scope_compat(scope, opts)
106-
pairs: list[tuple[str, str]] = scope_to_query_pairs(scope)
107+
pairs: list[tuple[str, str]] = scope_to_query_pairs(scope, include_thread=True)
107108
if opts.limit is not None:
108109
pairs.append(("limit", str(opts.limit)))
109110
if opts.offset is not None:
@@ -127,14 +128,16 @@ async def list(
127128
)
128129

129130
async def get(self, id: str, scope: MemoryScope) -> AtomicMemoryMemory | None:
130-
path = self._route(f"/memories/{quote(id, safe='')}?{urlencode(scope_to_query_pairs(scope))}")
131+
unfiltered_scope = strip_read_filters(scope)
132+
path = self._route(f"/memories/{quote(id, safe='')}?{urlencode(scope_to_query_pairs(unfiltered_scope))}")
131133
raw = await afetch_json_or_none(self._client, self._http, path)
132134
if raw is None:
133135
return None
134-
return _to_atomic_memory(raw, strip_agent_scope(scope))
136+
return _to_atomic_memory(raw, unfiltered_scope)
135137

136138
async def delete(self, id: str, scope: MemoryScope) -> None:
137-
path = self._route(f"/memories/{quote(id, safe='')}?{urlencode(scope_to_query_pairs(scope))}")
139+
unfiltered_scope = strip_read_filters(scope)
140+
path = self._route(f"/memories/{quote(id, safe='')}?{urlencode(scope_to_query_pairs(unfiltered_scope))}")
138141
try:
139142
await afetch_void(self._client, self._http, path, method="DELETE")
140143
except ProviderError as exc:
@@ -152,7 +155,7 @@ async def _post_ingest(
152155
) -> AtomicMemoryIngestResult:
153156
assert_scope_allows_visibility(scope, input.visibility)
154157
body: dict[str, Any] = {
155-
**scope_to_fields(scope),
158+
**scope_to_fields(scope, include_thread=True),
156159
"conversation": input.conversation,
157160
"source_site": input.source_site,
158161
"source_url": input.source_url or "",
@@ -173,7 +176,7 @@ async def _post_search(
173176
scope: MemoryScope,
174177
) -> AtomicMemorySearchResultPage:
175178
body: dict[str, Any] = {
176-
**scope_to_fields(scope, include_agent_scope=True),
179+
**scope_to_fields(scope, include_agent_scope=True, include_thread=True),
177180
"query": request.query,
178181
}
179182
if request.limit is not None:

atomicmemory/providers/atomicmemory/async_provider.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
from atomicmemory.providers.atomicmemory.path import normalize_api_version
5656
from atomicmemory.providers.atomicmemory.provider import (
5757
_build_ingest_body,
58+
_build_list_path,
5859
_build_package_body,
5960
_build_search_body,
6061
_qs,
@@ -137,7 +138,7 @@ async def do_delete(self, ref: MemoryRef) -> None:
137138
async def do_list(self, request: ListRequest) -> ListResultPage:
138139
offset = int(request.cursor) if request.cursor else 0
139140
limit = request.limit if request.limit is not None else 20
140-
path = self._route(f"/memories/list?user_id={_qs(request.scope.user)}&limit={limit}&offset={offset}")
141+
path = self._route(_build_list_path(request.scope, limit, offset))
141142
raw = await afetch_json(self._require_client(), self._http_options, path)
142143
memories = [to_memory(m, request.scope) for m in raw.get("memories", [])]
143144
next_offset = offset + len(memories)

atomicmemory/providers/atomicmemory/handle.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ class UserScope(BaseModel):
3535
model_config = ConfigDict(extra="forbid", populate_by_name=True)
3636
kind: Literal["user"] = "user"
3737
user_id: str = Field(alias="userId")
38+
thread: str | None = None
3839

3940

4041
class WorkspaceScope(BaseModel):
@@ -45,6 +46,7 @@ class WorkspaceScope(BaseModel):
4546
user_id: str = Field(alias="userId")
4647
workspace_id: str = Field(alias="workspaceId")
4748
agent_id: str = Field(alias="agentId")
49+
thread: str | None = None
4850
agent_scope: AgentScope | None = Field(default=None, alias="agentScope")
4951

5052

atomicmemory/providers/atomicmemory/handle_impl.py

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
scope_to_fields,
4242
scope_to_query_pairs,
4343
strip_agent_scope,
44+
strip_read_filters,
4445
)
4546

4647
Route = Callable[[str], str]
@@ -97,7 +98,7 @@ def expand(self, refs: list[str], scope: MemoryScope) -> list[AtomicMemoryMemory
9798
method="POST",
9899
json=body,
99100
)
100-
echoed = strip_agent_scope(scope)
101+
echoed = strip_read_filters(scope)
101102
return [_to_atomic_memory(m, echoed) for m in raw.get("memories", [])]
102103

103104
def list(
@@ -107,7 +108,7 @@ def list(
107108
) -> AtomicMemoryListResultPage:
108109
opts = _coerce_list_options(options)
109110
_assert_list_options_scope_compat(scope, opts)
110-
pairs: list[tuple[str, str]] = scope_to_query_pairs(scope)
111+
pairs: list[tuple[str, str]] = scope_to_query_pairs(scope, include_thread=True)
111112
if opts.limit is not None:
112113
pairs.append(("limit", str(opts.limit)))
113114
if opts.offset is not None:
@@ -131,14 +132,16 @@ def list(
131132
)
132133

133134
def get(self, id: str, scope: MemoryScope) -> AtomicMemoryMemory | None:
134-
path = self._route(f"/memories/{quote(id, safe='')}?{urlencode(scope_to_query_pairs(scope))}")
135+
unfiltered_scope = strip_read_filters(scope)
136+
path = self._route(f"/memories/{quote(id, safe='')}?{urlencode(scope_to_query_pairs(unfiltered_scope))}")
135137
raw = fetch_json_or_none(self._client, self._http, path)
136138
if raw is None:
137139
return None
138-
return _to_atomic_memory(raw, strip_agent_scope(scope))
140+
return _to_atomic_memory(raw, unfiltered_scope)
139141

140142
def delete(self, id: str, scope: MemoryScope) -> None:
141-
path = self._route(f"/memories/{quote(id, safe='')}?{urlencode(scope_to_query_pairs(scope))}")
143+
unfiltered_scope = strip_read_filters(scope)
144+
path = self._route(f"/memories/{quote(id, safe='')}?{urlencode(scope_to_query_pairs(unfiltered_scope))}")
142145
try:
143146
fetch_void(self._client, self._http, path, method="DELETE")
144147
except ProviderError as exc:
@@ -160,7 +163,7 @@ def _post_ingest(
160163
) -> AtomicMemoryIngestResult:
161164
assert_scope_allows_visibility(scope, input.visibility)
162165
body: dict[str, Any] = {
163-
**scope_to_fields(scope),
166+
**scope_to_fields(scope, include_thread=True),
164167
"conversation": input.conversation,
165168
"source_site": input.source_site,
166169
"source_url": input.source_url or "",
@@ -181,7 +184,7 @@ def _post_search(
181184
scope: MemoryScope,
182185
) -> AtomicMemorySearchResultPage:
183186
body: dict[str, Any] = {
184-
**scope_to_fields(scope, include_agent_scope=True),
187+
**scope_to_fields(scope, include_agent_scope=True, include_thread=True),
185188
"query": request.query,
186189
}
187190
if request.limit is not None:
@@ -263,7 +266,7 @@ def _to_atomic_memory(raw: dict[str, Any], scope: MemoryScope) -> AtomicMemoryMe
263266
payload: dict[str, Any] = {
264267
"id": raw["id"],
265268
"content": raw.get("content") or "",
266-
"scope": scope,
269+
"scope": _build_memory_scope(raw, scope),
267270
"created_at": _parse_iso(raw.get("created_at")) or _now_utc(),
268271
}
269272
if raw.get("updated_at"):
@@ -274,6 +277,23 @@ def _to_atomic_memory(raw: dict[str, Any], scope: MemoryScope) -> AtomicMemoryMe
274277
return AtomicMemoryMemory.model_validate(payload)
275278

276279

280+
def _build_memory_scope(raw: dict[str, Any], requested_scope: MemoryScope) -> MemoryScope:
281+
"""Validate and project Core ``session_id`` back into namespace scope."""
282+
session_id = raw.get("session_id")
283+
if requested_scope.thread is not None:
284+
if not session_id:
285+
raise ValueError(
286+
"atomicmemory provider: backend response missing required `session_id` for thread-scoped request"
287+
)
288+
if session_id != requested_scope.thread:
289+
raise ValueError(
290+
"atomicmemory provider: backend response `session_id` did not match requested thread scope"
291+
)
292+
if not session_id:
293+
return requested_scope
294+
return requested_scope.model_copy(update={"thread": session_id})
295+
296+
277297
def _to_atomic_search_result(raw: dict[str, Any], scope: MemoryScope) -> AtomicMemorySearchResult:
278298
similarity = _coalesce(raw.get("semantic_similarity"), raw.get("similarity"))
279299
ranking_score = _coalesce(raw.get("ranking_score"), raw.get("score"))

atomicmemory/providers/atomicmemory/mappers.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,37 @@ def to_memory(raw: dict[str, Any], scope: Scope) -> Memory:
5151
return Memory(
5252
id=raw["id"],
5353
content=raw["content"],
54-
scope=scope,
54+
scope=_build_scope(raw, scope),
5555
created_at=created_at,
5656
provenance=_build_provenance(raw),
5757
metadata=_build_metadata(raw),
5858
)
5959

6060

61+
def _build_scope(raw: dict[str, Any], scope: Scope) -> Scope:
62+
"""Merge backend-projected scope fields and validate scoped reads."""
63+
namespace = raw.get("namespace")
64+
session_id = raw.get("session_id")
65+
if scope.namespace is not None and namespace is not None and namespace != scope.namespace:
66+
raise ValueError("atomicmemory provider: backend response `namespace` did not match requested namespace scope")
67+
if scope.thread is not None:
68+
if not session_id:
69+
raise ValueError(
70+
"atomicmemory provider: backend response missing required `session_id` for thread-scoped request"
71+
)
72+
if session_id != scope.thread:
73+
raise ValueError(
74+
"atomicmemory provider: backend response `session_id` did not match requested thread scope"
75+
)
76+
77+
updates: dict[str, Any] = {}
78+
if namespace:
79+
updates["namespace"] = namespace
80+
if session_id:
81+
updates["thread"] = session_id
82+
return scope.model_copy(update=updates)
83+
84+
6185
def _build_provenance(raw: dict[str, Any]) -> Provenance | None:
6286
fields: dict[str, Any] = {}
6387
if "source_site" in raw and raw["source_site"] is not None:

atomicmemory/providers/atomicmemory/provider.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
from datetime import datetime
1111
from typing import Any
12-
from urllib.parse import quote
12+
from urllib.parse import quote, urlencode
1313

1414
import httpx
1515

@@ -31,6 +31,7 @@
3131
MemoryVersion,
3232
PackageFormat,
3333
PackageRequest,
34+
Scope,
3435
SearchRequest,
3536
SearchResult,
3637
SearchResultPage,
@@ -129,7 +130,7 @@ def do_delete(self, ref: MemoryRef) -> None:
129130
def do_list(self, request: ListRequest) -> ListResultPage:
130131
offset = int(request.cursor) if request.cursor else 0
131132
limit = request.limit if request.limit is not None else 20
132-
path = self._route(f"/memories/list?user_id={_qs(request.scope.user)}&limit={limit}&offset={offset}")
133+
path = self._route(_build_list_path(request.scope, limit, offset))
133134
raw = fetch_json(self._require_client(), self._http_options, path)
134135
memories = [to_memory(m, request.scope) for m in raw.get("memories", [])]
135136
next_offset = offset + len(memories)
@@ -264,6 +265,8 @@ def _build_ingest_body(input: IngestInput) -> dict[str, Any]:
264265
"source_site": input.provenance.source if input.provenance and input.provenance.source else "sdk",
265266
"source_url": input.provenance.source_url if input.provenance and input.provenance.source_url else "",
266267
}
268+
if input.scope.thread is not None:
269+
body["session_id"] = input.scope.thread
267270
if input.mode == "verbatim":
268271
body["skip_extraction"] = True
269272
if input.metadata:
@@ -282,6 +285,8 @@ def _build_search_body(request: SearchRequest) -> dict[str, Any]:
282285
body["threshold"] = request.threshold
283286
if request.scope.namespace is not None:
284287
body["namespace_scope"] = request.scope.namespace
288+
if request.scope.thread is not None:
289+
body["session_id"] = request.scope.thread
285290
return body
286291

287292

@@ -298,3 +303,15 @@ def _build_package_body(request: PackageRequest) -> dict[str, Any]:
298303
def _qs(value: str | None) -> str:
299304
"""URL-encode a query-string value; empty string when falsy."""
300305
return quote(value, safe="") if value else ""
306+
307+
308+
def _build_list_path(scope: Scope, limit: int, offset: int) -> str:
309+
"""Build the Core list path, including optional thread scope."""
310+
pairs = [
311+
("user_id", scope.user or ""),
312+
("limit", str(limit)),
313+
("offset", str(offset)),
314+
]
315+
if scope.thread is not None:
316+
pairs.append(("session_id", scope.thread))
317+
return f"/memories/list?{urlencode(pairs)}"

atomicmemory/providers/atomicmemory/scope_mapper.py

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,14 @@
1111
from typing import Any
1212

1313
from atomicmemory.core.errors import ValidationError
14-
from atomicmemory.providers.atomicmemory.handle import MemoryScope, WorkspaceScope
14+
from atomicmemory.providers.atomicmemory.handle import MemoryScope, UserScope, WorkspaceScope
1515

1616

1717
def scope_to_fields(
1818
scope: MemoryScope,
1919
*,
2020
include_agent_scope: bool = False,
21+
include_thread: bool = False,
2122
) -> dict[str, Any]:
2223
"""Translate a `MemoryScope` to wire-format request fields.
2324
@@ -26,28 +27,36 @@ def scope_to_fields(
2627
include_agent_scope: Emit ``agent_scope`` on the wire. Defaults
2728
to ``False``; only the search routes opt in (core ignores
2829
``agent_scope`` on expand/list/get/delete).
30+
include_thread: Emit ``session_id`` on routes Core honors:
31+
ingest, search, and list.
2932
3033
Returns:
3134
A dict with ``user_id`` always set, plus ``workspace_id`` /
3235
``agent_id`` (and optionally ``agent_scope``) for workspace
3336
scopes.
3437
"""
3538
if not isinstance(scope, WorkspaceScope):
36-
return {"user_id": scope.user_id}
37-
fields: dict[str, Any] = {
39+
user_fields: dict[str, Any] = {"user_id": scope.user_id}
40+
if include_thread and scope.thread is not None:
41+
user_fields["session_id"] = scope.thread
42+
return user_fields
43+
workspace_fields: dict[str, Any] = {
3844
"user_id": scope.user_id,
3945
"workspace_id": scope.workspace_id,
4046
"agent_id": scope.agent_id,
4147
}
4248
if include_agent_scope and scope.agent_scope is not None:
43-
fields["agent_scope"] = scope.agent_scope
44-
return fields
49+
workspace_fields["agent_scope"] = scope.agent_scope
50+
if include_thread and scope.thread is not None:
51+
workspace_fields["session_id"] = scope.thread
52+
return workspace_fields
4553

4654

4755
def scope_to_query_pairs(
4856
scope: MemoryScope,
4957
*,
5058
include_agent_scope: bool = False,
59+
include_thread: bool = False,
5160
) -> list[tuple[str, str]]:
5261
"""Translate a scope to ``[(key, value)]`` pairs for query strings.
5362
@@ -66,6 +75,8 @@ def scope_to_query_pairs(
6675
pairs.extend(("agent_scope", v) for v in value)
6776
else:
6877
pairs.append(("agent_scope", value))
78+
if include_thread and scope.thread is not None:
79+
pairs.append(("session_id", scope.thread))
6980
return pairs
7081

7182

@@ -91,6 +102,18 @@ def strip_agent_scope(scope: MemoryScope) -> MemoryScope:
91102
"""
92103
if not isinstance(scope, WorkspaceScope):
93104
return scope
105+
return WorkspaceScope(
106+
user_id=scope.user_id,
107+
workspace_id=scope.workspace_id,
108+
agent_id=scope.agent_id,
109+
thread=scope.thread,
110+
)
111+
112+
113+
def strip_read_filters(scope: MemoryScope) -> MemoryScope:
114+
"""Drop filters the target route did not apply before echoing scope."""
115+
if not isinstance(scope, WorkspaceScope):
116+
return UserScope(user_id=scope.user_id)
94117
return WorkspaceScope(
95118
user_id=scope.user_id,
96119
workspace_id=scope.workspace_id,

0 commit comments

Comments
 (0)