feat: unified /data/ read surface; remove legacy index/search/export#139
Conversation
Replace the fragmented consumer-read surface (/discovery, /records/{srn},
/search) with a single /data/ URL family owned by a new `data` domain:
- Node catalog (GET /data) and machine-readable schema manifests (GET /data/{schema}).
- Single record fetch by bare id (GET /data/records/{id}[@Version]), returning
both id and srn.
- Records and feature-table reads at /data/{schema}/records and
/data/{schema}/{feature} as paginated JSON or streamed CSV / gzipped CSV, with
bounded memory via a server-side cursor and pre-flight error surfacing before
any bytes.
- FilterExpr POST body (rate-limited per route), keyset pagination, and a
reserved-name policy (records/datasets) enforced at registration and read.
Delete the index (ChromaDB), search, export, and discovery domains, the
sdk/index package, the discovery infra adapter, and the legacy routes; drop the
/stats index-counts field in favour of a /data hyperlink. chromadb and
sentence-transformers are no longer dependencies.
|
US6 deleted the only file in tests/contract/ (an index-coupled worker test),
leaving the directory empty. CI's server-contract job runs `pytest tests/contract`,
which errors (exit 4) when the path doesn't exist on a fresh checkout.
Add a self-sufficient, DB-free contract test asserting the legacy read routes
(/discovery, /records/{srn}, /search) now 404 and the new /data/ operation IDs
are registered and unique — the SDK-codegen contract — without touching Postgres
(the contract job has no DB service).
|
@greptile |
Greptile SummaryThis PR replaces the fragmented
Confidence Score: 5/5Safe to merge. The new /data/ surface is well-structured, the 1170-test suite covers all the critical paths, and all the issues identified in the prior review round have been addressed in the implementation. Every concern raised in the previous review has been resolved: malformed cursors now map to 400 via No files require special attention.
|
| Filename | Overview |
|---|---|
| server/osa/application/api/v1/routes/data/_streaming.py | Response assembly for paginated JSON and streaming CSV/gzip. Pre-flight error surfacing works correctly for both paths. Paginated path abandons the generator after consuming limit+1 rows without explicit aclose(). |
| server/osa/domain/data/model/query_plan.py | Clean IR design: Keyset owns both encode and decode sides of cursor, preventing drift. encode_cursor uses default=str for datetime safety. TableKind-keyed tiebreak prevents hook column named srn from hijacking feature pagination. |
| server/osa/infrastructure/data/postgres_table_read_store.py | Feature streaming scoped to schema via records JOIN + records_scope predicate. Cursor errors wrapped to ValidationError (400). _coerce_cursor_value handles DateTime, Date, and Integer/BigInteger. Previously flagged issues all resolved. |
| server/osa/infrastructure/data/postgres_catalog_read_store.py | Clean catalog/manifest/record-by-id implementation. Feature count uses SchemaFeatureReader.count_rows which scopes to schema. get_record_by_id uses LIKE with LIKE-escaped ID and filters by domain wildcard (intentional cross-domain lookup). |
| server/osa/infrastructure/data/schema_feature_reader.py | Properly scopes feature table row counts and streaming to the requested schema via JOIN through records table + schema_id/schema_version predicates. Single source of truth for schema-scoped feature access. |
| server/osa/domain/data/service/data_query.py | Filter bounds validated against config.data.* keys (correctly renamed from discovery_*). Validation runs in generator body so it surfaces as 4xx before any bytes on both streaming and paginated paths. |
| server/osa/domain/data/model/filter.py | FilterExpr discriminated union with MetadataFieldRef / FeatureFieldRef. SortOrder dead class from previous review is gone. Operator compatibility tables for both FieldType and JSON primitive types. |
| server/osa/config.py | New DataConfig nested model for /data/ filter tree bounds. discovery_* keys removed; all config now under config.data.*. Clean separation from legacy discovery config. |
| server/osa/application/api/v1/routes/data/tables.py | Metaprogrammed table-route factory registers longest-suffix-first to prevent greedy path-param match on dotted format suffixes. Operation ID uniqueness check prevents SDK codegen collisions. |
| server/osa/domain/data/query/read_table.py | Handler validates schema existence (resolve_table) before constructing QueryPlan, ensuring NotFoundError propagates synchronously before the async generator is touched. |
Sequence Diagram
sequenceDiagram
participant C as Client
participant R as Route
participant H as ReadRecordsTableHandler
participant CS as DataCatalogService
participant QS as DataQueryService
participant PS as PostgresTableReadStore
participant PG as Postgres
C->>+R: "GET /data/{schema}/records[.csv]"
R->>+H: run(ReadRecordsTable)
H->>+CS: resolve_table(schema, RECORDS)
CS->>PG: get_latest_schema_id / get_schema_manifest
PG-->>CS: SchemaManifest + ResolvedTable
CS-->>-H: ResolvedTable(schema_id, columns)
H->>QS: stream_records(QueryPlan) lazy generator
H-->>-R: TableRead(plan, columns, rows)
R->>+R: build_table_response(rows, fmt, columns, plan)
alt Paginated JSON
R->>+PS: __anext__ x limit+1
PS->>+PG: DECLARE cursor / FETCH
PG-->>-PS: row mappings
PS-->>-R: page rows
R-->>C: StreamingResponse JSON + next_cursor
else Streaming CSV / CSV.gz
R->>+PS: pre-flight __anext__
PS->>PG: DECLARE cursor / FETCH first batch
PG-->>PS: rows
PS-->>-R: first row or 4xx on error
R-->>-C: StreamingResponse unbounded stream
end
Reviews (8): Last reviewed commit: "refactor: replace data-adapter module fu..." | Re-trigger Greptile
| cursor_after = None | ||
| if plan.pagination.cursor is not None: | ||
| from osa.domain.data.model.query_plan import decode_cursor | ||
|
|
||
| decoded = decode_cursor(str(plan.pagination.cursor)) | ||
| sort_value = self._coerce_cursor_value(decoded["s"], primary.column) | ||
| cursor_after = page.after((sort_value, decoded["id"])) | ||
| return page.order_by(), cursor_after |
There was a problem hiding this comment.
Malformed cursor produces HTTP 500 instead of 400
decode_cursor() raises ValueError on invalid base64 or missing s/id keys, and int(decoded["id"]) in _features_sort (line ~324) raises ValueError for non-integer IDs. Neither is caught here; they propagate through the async generator's first __anext__() pull as raw ValueError, which is not an OSAError subclass, so app.py's unhandled_exception_handler returns 500. A user who supplies a corrupted cursor (e.g. truncated base64) gets a generic 500 instead of a descriptive 400. Wrap both calls in a try/except ValueError and re-raise as ValidationError(…, field="cursor").
| select_cols = [ft.c.id, ft.c.record_srn, ft.c.created_at, *data_columns(ft)] | ||
| stmt = select(*select_cols).select_from(ft) | ||
| if conditions: | ||
| stmt = stmt.where(and_(*conditions)) | ||
| stmt = stmt.order_by(*order_keys) | ||
|
|
||
| result = await self.session.stream(stmt) | ||
| try: | ||
| async for row in result.mappings(): | ||
| yield dict(row) | ||
| finally: | ||
| await result.close() |
There was a problem hiding this comment.
Feature stream not scoped to the requested schema
The SELECT against the feature table (ft) has no predicate on record_srn → records.schema_id. Every row ever written to the hook's Postgres table is returned, regardless of which schema's records it belongs to. feature_tables_table has a global UNIQUE(hook_name) constraint, meaning one physical table exists per hook name; if any convention for any other schema also lists a hook with the same name, its rows are included in the response for the requested schema. The URL GET /data/{schema}/{feature} implies schema-scoped data — add a JOIN to records (or a subquery) filtering by schema_id = plan.schema_id.id.root to enforce that boundary at the SQL level.
| if depth > self.config.discovery_max_filter_depth: | ||
| raise ValidationError( | ||
| f"Filter tree depth {depth} exceeds maximum " | ||
| f"{self.config.discovery_max_filter_depth}.", | ||
| field="filter", | ||
| code="filter_depth_exceeded", | ||
| ) | ||
| predicates = list(_iter_predicates(expr)) | ||
| if len(predicates) > self.config.discovery_max_predicates: | ||
| raise ValidationError( | ||
| f"Filter tree has {len(predicates)} predicates, exceeds maximum " | ||
| f"{self.config.discovery_max_predicates}.", | ||
| field="filter", | ||
| code="filter_predicates_exceeded", | ||
| ) | ||
| distinct_hooks = {p.field.hook for p in predicates if isinstance(p.field, FeatureFieldRef)} | ||
| if len(distinct_hooks) > self.config.discovery_max_cross_domain_joins: | ||
| raise ValidationError( | ||
| f"Filter joins {len(distinct_hooks)} feature hooks, exceeds maximum " | ||
| f"{self.config.discovery_max_cross_domain_joins}.", | ||
| field="filter", |
There was a problem hiding this comment.
discovery_* config key references survive in the new data service
_validate_filter_bounds reads config.discovery_max_filter_depth, config.discovery_max_predicates, and config.discovery_max_cross_domain_joins. The PR removes the discovery domain, so these keys are semantically stale — they work today because the config model still exposes them, but they carry the old domain name and will silently pull the wrong limit if the config is ever renamed or the discovery section removed. Consider aliasing or renaming to data_max_* config keys alongside the domain rename.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Greptile review follow-ups on the unified /data/ surface (PR #139): - Malformed pagination cursors (corrupt base64, missing s/id keys, or a non-integer feature id) raised bare ValueError from decode_cursor / int(decoded["id"]) inside the stream's pre-flight pull. ValueError is not an OSAError, so the global handler returned 500. Wrap decode+coerce in both _records_sort and _features_sort and re-raise as ValidationError(field="cursor", code="invalid_cursor") -> 400. - Promote the three flat discovery_max_* config fields into a DataConfig sub-config (config.data.max_filter_depth / max_predicates / max_feature_joins), shedding the dead 'discovery' domain name now that the domain is removed. Overridable via OSA_DATA__MAX_* (nested env). - Break a latent circular import: persistence/__init__ no longer eagerly imports the DI provider, so importing a persistence leaf module (e.g. feature_table) doesn't drag in the whole DI graph. application/di.py imports PersistenceProvider from its module directly. Feature-table schema-scoping concern tracked separately in #140.
|
@greptile |
| async def _feature_count(self, ft: sa.Table) -> int: | ||
| return int((await self.session.execute(select(func.count()).select_from(ft))).scalar_one()) |
There was a problem hiding this comment.
_feature_count returns total rows across all schemas
_feature_count issues SELECT count(*) FROM ft with no predicate, so when two schemas register a hook with the same name (e.g. Schema A and Schema B both have a convention that includes hook "citations"), the physical features.citations table is shared and the returned row_count in GET /data/{schema} manifests the total row count across both schemas rather than just the requested schema's records. The same root cause that affects the streaming path (already tracked) shows up here as incorrect data in the manifest response.
A features.<hook> table is shared by every convention that registers the
hook name, across schemas (CreateFeatureTables swallows the ConflictError
on re-registration). _stream_features and _feature_count read the table
unscoped, so /data/{schema}/{feature} leaked rows belonging to other
schemas and the manifest row_count was inflated. Join records and filter
by schema_id + schema_version in both paths.
|
@greptile |
| def encode_cursor(sort_value: Any, id_value: Any) -> str: | ||
| """Encode a cursor as urlsafe base64 of ``{"s": sort_value, "id": id_value}``.""" | ||
| payload = {"s": sort_value, "id": id_value} | ||
| return base64.urlsafe_b64encode(json.dumps(payload).encode()).decode() |
There was a problem hiding this comment.
Cursor encoding fails with
TypeError for datetime sort values
Feature rows are yielded as raw dict(row) mappings (no flattening), so a datetime/date column value from Postgres (e.g. created_at, or any feature column with json_type string + format date-time) reaches encode_cursor as a Python datetime object. Bare json.dumps has no handler for datetime, so calling GET /data/{schema}/{feature}?sort=created_at on any schema with more than one page of results produces a 500. The JsonSerializer already uses default=str for the same reason — the fix is identical here. The _coerce_feature_cursor_value and _coerce_cursor_value decoders already expect ISO strings, so the round-trip is consistent once the encoder writes them correctly.
| def encode_cursor(sort_value: Any, id_value: Any) -> str: | |
| """Encode a cursor as urlsafe base64 of ``{"s": sort_value, "id": id_value}``.""" | |
| payload = {"s": sort_value, "id": id_value} | |
| return base64.urlsafe_b64encode(json.dumps(payload).encode()).decode() | |
| def encode_cursor(sort_value: Any, id_value: Any) -> str: | |
| """Encode a cursor as urlsafe base64 of ``{"s": sort_value, "id": id_value}``.""" | |
| payload = {"s": sort_value, "id": id_value} | |
| return base64.urlsafe_b64encode(json.dumps(payload, default=str).encode()).decode() |
Feature rows are raw DB mappings, so paginating a feature table sorted by created_at handed a Python datetime to encode_cursor, where bare json.dumps raised TypeError -> 500. Encode with default=str (as the JSON serializer already does), and coerce the decoded ISO string back to a datetime in _coerce_feature_cursor_value — created_at is an implicit column with no ColumnDef, so the existing format-driven coercion missed it and asyncpg would reject the str bind against DateTime(timezone=True).
|
@greptile |
_coerce_cursor_value only recognised created_at/published_at by name, so paginating records sorted by a FieldType.DATE (or date-time) metadata column bound the cursor's ISO string against the dynamic table's sa.Date/sa.DateTime column — asyncpg rejects the str bind on page 2+. Dispatch on the resolved sort expression's SQLAlchemy type instead, which also covers the old name-based cases.
…er path The records and feature sorts each had their own cursor handling: a type-driven coercer on the records side, a FeatureSchema-driven one on the feature side (which needed a name-based special case for the implicit created_at column), and two copies of the decode/coerce/after block with function-local decode_cursor imports. Collapse to one _cursor_after helper and one _coerce_cursor_value that dispatches on the bound column's SQLAlchemy type — covering DATE/TIMESTAMP/BIGINT for both table kinds — and drop the now-dead fschema parameter from _features_sort.
|
@greptile |
sort=id on records aliases to the srn column in the store, but _next_cursor encoded the flattened row's bare id as the cursor's sort value. The keyset condition compared srn > '<bare id>', which every SRN satisfies (they all start with 'urn:'), so next_cursor kept regenerating and pagination never advanced. Encode the tiebreaker value (records srn / feature id) as the sort value whenever the sort column is id — the store compares both cursor components against that same column.
|
@greptile |
| def _next_cursor(page: list[Mapping[str, Any]], plan: QueryPlan) -> str | None: | ||
| if not page: | ||
| return None | ||
| last = page[-1] | ||
| # The keyset tiebreaker is the records srn / feature id (see | ||
| # PostgresDataReadStore). ``sort=id`` aliases to that same column in the | ||
| # store, so the cursor's sort value must be the tiebreaker value too — | ||
| # encoding the bare record id would compare it against the srn column, | ||
| # match every row, and never advance. | ||
| tiebreak = last.get("srn", last.get("id")) | ||
| sort_column = plan.sort[0].column | ||
| sort_value = tiebreak if sort_column == "id" else last.get(sort_column) | ||
| return encode_cursor(sort_value, tiebreak) |
There was a problem hiding this comment.
Wrong tiebreaker for feature rows when a hook column is named
"srn"
last.get("srn", last.get("id")) is designed to return the SRN for records rows and fall back to the integer id for feature rows — but the fallback only fires when "srn" is absent from the dict. A hook that declares a data column named "srn" (valid per PgIdentifier and not blocked by AUTO_COLUMN_NAMES) puts that column's string value into the result mapping, so last.get("srn") returns the hook column value instead of the integer ft.c.id PK that _features_sort uses as its tiebreaker. On the next page fetch _coerce_cursor_value(hook_srn_string, BigInteger_column) calls int(hook_srn_string) and raises ValueError → _invalid_cursor → 400, silently breaking all subsequent pagination for that feature table.
Use plan.table_kind to select the correct key unconditionally — you will also need to import TableKind at the top of _streaming.py.
_next_cursor fell back from 'srn' to 'id' by dict lookup, but a hook may legally declare a data column named 'srn' (only id/record_srn/created_at are reserved feature auto columns). That column's string value would hijack the feature tiebreaker and 400 on the next page when coerced against the BIGINT id column. Dispatch on plan.table_kind instead: records tiebreak on srn, features on id, unconditionally.
|
@greptile |
…r owns statement timeout DataResponseFormat carried an HTTP media type and a Postgres timeout literal inside domain/data/model — presentation and infrastructure concerns. The registry and serializers now live with the routes that consume them; the media type is derived from the serializer (single owner); the timeout is a timedelta execution budget threaded through the DataReadStore port and applied by the Postgres adapter via SET LOCAL (integer ms). Routes no longer inject AsyncSession; _runtime.py is gone.
…jecting RecordRepository
…ncode and decode The tiebreak column choice and the sort=id aliasing rule were re-derived independently by _next_cursor (route) and the store's sort builders — the split that produced three cursor bugs. plan.keyset is now the single source of truth: the store maps its column names to SQL expressions, the response assembly calls keyset.cursor_from_row.
…surface The data domain's query/ folder was empty and routes called services directly — the only domain bypassing the documented entry-point pattern, and with it the __auth__ gate the deferred private-schema auth model will need. ReadRecordsTable/ReadFeatureTable handlers now own table resolution, plan construction, and config-driven limit clamping (routes no longer inject Config); GetNodeCatalog/GetSchemaManifest/GetDataRecord cover the catalog reads. The QueryHandler result TypeVar is unbounded so handlers can return domain read models and streaming reads without ceremonial wrappers.
postgres_data_read_store.py was a 665-line adapter carrying both the streaming read path and the catalog/manifest/record-by-id reads behind one DataReadStore port. Split into DataTableReadStore (PostgresTableReadStore: streaming, filter SQL, keyset) and DataCatalogReadStore (PostgresCatalogReadStore: catalog, manifest, record lookup), matching the DataQueryService / DataCatalogService split. Schema→feature-table resolution shared by both adapters lives in schema_features.py.
… is Annotated[str], not callable)
…eader and class methods No module-level functions in the /data/ adapters. The shared schema->feature resolution (schema_features.py's four loose functions, two doing session I/O) becomes a SchemaFeatureReader composed by both read stores. Per-file helpers (statement_timeout_sql, _escape_like, _invalid_cursor, _apply_scalar_op, _feature_column_specs) move onto their store classes as static methods.
|
@greptile |
Summary
Replaces the fragmented consumer-read surface (
/discovery,/records/{srn},/search) with a single/data/URL family owned by a newdatadomain, and deletes the now-deadindex(ChromaDB),search,export, anddiscoverydomains.New
/data/surfaceGET /data— node catalog (published schemas + their table resources).GET /data/{schema}— machine-readable schema manifest (fields, table resources, row counts).GET /data/records/{id}[@{version}]— single record by bare internal id; returns bothidandsrn.GET|POST /data/{schema}/records[.csv|.csv.gz]— records table as paginated JSON (GET) or streamed CSV / gzipped CSV; POST takes aFilterExprbody (rate-limited per route).GET|POST /data/{schema}/{feature}[.csv|.csv.gz]— feature-table dump for a schema's registered hook, same engine/shape as records.Streaming uses a Postgres server-side cursor (bounded memory regardless of result size) with pre-flight error surfacing — parse/validation/unknown-schema errors return 4xx/404 before any bytes. Keyset pagination, reserved-name policy (
records/datasets) enforced at both registration and read.Removals (footprint reduction)
index,search,export, anddiscoverydomains;infrastructure/index;sdk/index; the discovery infra adapter; and the legacy/discovery,/records/{srn},/searchroutes (+ their wiring and DI providers)./statsdrops the index-counts field in favour of adata_urlpointer.chromadbandsentence-transformersare no longer dependencies.Test plan
ruffclean;tyshows only the pre-existing project baseline (no new diagnostics, nounresolved-import)./data/integration/e2e tests against a real Postgres (records + feature streaming, JSON pagination, CSV/gzip, filter composition, rate-limit, version pinning, bounded-memory + cursor-release guarantees, legacy-route 404s).test_worker*.pysuite; AND/OR/NOT filter compilation re-covered by a new/data/e2e test.Follow-ups (not in this PR)
docker build+ image-size comparison vsmain(the dependency removal that shrinks it is included here).Indexesentry from Principle V.Implements the 137-feat-unified-data spec (
specs/137-feat-unified-data/).