Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
fd32f63
feat: add unified /data/ read surface; remove legacy index/search/export
rorybyrne Jun 9, 2026
f290b6a
test: add DB-free contract tests for the /data/ surface
rorybyrne Jun 9, 2026
d9f50a4
fix: map malformed /data/ cursor to 400 and add DataConfig sub-config
rorybyrne Jun 9, 2026
9c95694
fix: scope feature-table streams and counts to the requested schema
rorybyrne Jun 10, 2026
1d79804
refactor: drop dead SortOrder enum from data filter model
rorybyrne Jun 10, 2026
8c65980
build: allow narrowing test-integration-pg to a pytest target
rorybyrne Jun 10, 2026
a614c97
fix: make created_at feature cursors round-trip instead of 500ing
rorybyrne Jun 10, 2026
57c4f4a
fix: coerce records cursor values by sort-column type, not column name
rorybyrne Jun 11, 2026
0c08fe2
refactor: unify cursor coercion on column types; share the keyset-aft…
rorybyrne Jun 11, 2026
3f6b434
fix: encode the srn as the cursor sort value for records sorted by id
rorybyrne Jun 11, 2026
2f20fa0
fix: select the cursor tiebreak key by table kind, not key presence
rorybyrne Jun 11, 2026
2c95f7e
chore: remove stale legacy-router docstring, narrow cursor except, re…
rorybyrne Jun 11, 2026
1d8f784
refactor: make PaginationParams the single owner of the limit-clamp p…
rorybyrne Jun 11, 2026
ffcf49e
refactor: replace route-local id@version parsing with a RecordRef val…
rorybyrne Jun 11, 2026
711957a
refactor: source the page-size ceiling from DataConfig.max_page_limit
rorybyrne Jun 11, 2026
5c340f6
refactor: move manifest-structure knowledge into DataCatalogService.r…
rorybyrne Jun 11, 2026
522323d
refactor: move serializers/format registry to the route layer; adapte…
rorybyrne Jun 11, 2026
b70c4ae
refactor: route /stats through a GetStats query handler instead of in…
rorybyrne Jun 11, 2026
d29e8e8
refactor: Keyset on QueryPlan owns the pagination contract for both e…
rorybyrne Jun 11, 2026
ee7ae13
feat: restore Router → QueryHandler → Service layering on the /data/ …
rorybyrne Jun 11, 2026
5424114
refactor: split the data read-store god adapter along the service seam
rorybyrne Jun 11, 2026
a9e5e50
test: use plain strings for HookName in resolve_table tests (HookName…
rorybyrne Jun 11, 2026
33f8c77
refactor: replace data-adapter module functions with a SchemaFeatureR…
rorybyrne Jun 11, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions server/Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ test-integration:
@TEST=1 uv run pytest tests/integration -v --tb=short -x

# Run integration tests with PG: ensure DB running → wipe → create → migrate → test → wipe
test-integration-pg:
# Optionally narrow to a pytest target, e.g. `just test-integration-pg tests/integration/test_foo.py`
test-integration-pg target="tests/integration":
POSTGRES_PORT={{TEST_PG_PORT}} just --justfile ../Justfile db-up
@just test-db-drop
@just test-db-create
Expand All @@ -116,7 +117,7 @@ test-integration-pg:
OSA_DATABASE__URL="{{TEST_DB_URL}}" \
OSA_AUTH__JWT__SECRET="test-secret-for-integration-tests-minimum-32-chars" \
TEST=1 \
uv run pytest tests/integration -v --tb=short -x; \
uv run pytest "{{target}}" -v --tb=short -x; \
just test-db-drop

# === Dependencies ===
Expand Down
21 changes: 15 additions & 6 deletions server/osa/application/api/rest/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dishka import Provider as DishkaProvider
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from slowapi.errors import RateLimitExceeded
from sqlalchemy.ext.asyncio import AsyncEngine

from osa.application.api.v1.errors import map_osa_error
Expand All @@ -16,17 +17,16 @@
auth,
conventions,
depositions,
discovery,
events,
ingestions,
health,
ontologies,
records,
schemas,
search,
stats,
validation,
)
from osa.application.api.v1.routes import data as data_routes
from osa.application.api.v1.routes.data._limiter import limiter
from osa.application.di import create_container
from osa.config import DEV_JWT_SECRET, Config
from osa.domain.shared.authorization.startup import validate_all_handlers
Expand Down Expand Up @@ -194,16 +194,25 @@ def create_app(
app_instance.include_router(admin.router, prefix="/api/v1")
app_instance.include_router(auth.router, prefix="/api/v1")
app_instance.include_router(events.router, prefix="/api/v1")
app_instance.include_router(records.router, prefix="/api/v1")
app_instance.include_router(search.router, prefix="/api/v1")
app_instance.include_router(stats.router, prefix="/api/v1")
app_instance.include_router(ontologies.router, prefix="/api/v1")
app_instance.include_router(schemas.router, prefix="/api/v1")
app_instance.include_router(conventions.router, prefix="/api/v1")
app_instance.include_router(depositions.router, prefix="/api/v1")
app_instance.include_router(ingestions.router, prefix="/api/v1")
app_instance.include_router(validation.router, prefix="/api/v1")
app_instance.include_router(discovery.router, prefix="/api/v1")
app_instance.include_router(data_routes.router, prefix="/api/v1")

# POST /data/* rate limiting (slowapi). The shared limiter is attached to
# app state and its 429 handler registered; GET routes are not limited.
app_instance.state.limiter = limiter

@app_instance.exception_handler(RateLimitExceeded)
async def rate_limit_handler(request: Request, exc: RateLimitExceeded):
return JSONResponse(
status_code=429,
content={"code": "rate_limited", "message": "POST rate limit exceeded (10/min/IP)."},
)

# Global OSA error handler - maps domain and infrastructure errors to HTTP responses
@app_instance.exception_handler(OSAError)
Expand Down
2 changes: 2 additions & 0 deletions server/osa/application/api/v1/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
InvalidStateError,
NotFoundError,
OSAError,
ReservedNameError,
ValidationError,
)

Expand All @@ -24,6 +25,7 @@
InvalidStateError: 409,
ConflictError: 409,
AuthorizationError: 403,
ReservedNameError: 400,
}


Expand Down
47 changes: 47 additions & 0 deletions server/osa/application/api/v1/routes/data/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""Unified ``/data/`` read surface router.

Subroutes are registered by the user-story phases:
- catalog + manifest (``GET /data``, ``GET /data/{schema}``) — US3
- single record by ID (``GET /data/records/{id}``) — US4
- records table matrix (``/data/{schema}/records*``) — US1/US2 via the factory
- feature table matrix (``/data/{schema}/{feature}*``) — US5 via the factory
"""

from __future__ import annotations

from dishka.integrations.fastapi import DishkaRoute
from fastapi import APIRouter

from osa.application.api.v1.routes.data import (
catalog,
features_table,
records,
records_table,
)
from osa.domain.data.model.catalog import NodeCatalog

router = APIRouter(prefix="/data", tags=["data"], route_class=DishkaRoute)

# ``GET /data`` (empty sub-path) is registered directly on the prefixed router.
router.add_api_route(
"",
catalog.get_node_catalog,
methods=["GET"],
operation_id="data_get_node_catalog",
response_model=NodeCatalog,
)

# Table-shaped routes (records + feature tables) are registered via the factory
# onto a DishkaRoute-enabled subrouter. Records is registered before the feature
# table's ``/{schema}/{feature}`` catch-all so ``/{schema}/records`` matches the
# records routes rather than being captured as a feature named "records".
tables_router = APIRouter(route_class=DishkaRoute)
records_table.register(tables_router)
features_table.register(tables_router)

# Order matters: literal ``/records/{id}`` and the ``/{schema}/records*`` table
# routes must precede the manifest's ``/{schema}`` catch-all so a record fetch
# or table read isn't captured as a schema manifest lookup.
router.include_router(records.router)
router.include_router(tables_router)
router.include_router(catalog.manifest_router)
20 changes: 20 additions & 0 deletions server/osa/application/api/v1/routes/data/_limiter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""Shared slowapi limiter for ``/data/`` POST routes (research §5).

POST routes accept structured ``FilterExpr`` bodies that can express expensive
queries, so they are rate-limited per source IP. GET routes are intentionally
unlimited — stable GET URLs are exactly what we want CDNs to cache and
consumers to curl on a cron.

Default: 10 requests/minute/IP on POST routes. The limiter instance is shared
(module singleton) and registered on the FastAPI app at startup; route handlers
apply it via ``@limiter.limit(POST_RATE_LIMIT)``.
"""

from __future__ import annotations

from slowapi import Limiter
from slowapi.util import get_remote_address

POST_RATE_LIMIT = "10/minute"

limiter = Limiter(key_func=get_remote_address)
44 changes: 44 additions & 0 deletions server/osa/application/api/v1/routes/data/_params.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""Shared request parsing for table routes — sort spec + filter body."""

from __future__ import annotations

from pydantic import BaseModel

from osa.domain.data.model.filter import FilterExpr
from osa.domain.data.model.query_plan import SortDirection, SortSpec
from osa.domain.shared.error import ValidationError


class FilterRequestBody(BaseModel):
"""POST body shared by every table format (records + feature)."""

filter: FilterExpr | None = None
cursor: str | None = None
# Unbounded at the edge: the table-read handler clamps to
# [1, DataConfig.max_page_limit] — over-large requests get the max page, not a 422.
limit: int = 50
sort: str | None = None


def parse_sort(raw: str | None) -> list[SortSpec]:
"""Parse ``col[:asc|:desc],col2[:asc|:desc]`` → SortSpec list (empty if None)."""
if not raw:
return []
specs: list[SortSpec] = []
for part in raw.split(","):
token = part.strip()
if not token:
continue
if ":" in token:
column, direction = token.split(":", 1)
try:
dir_enum = SortDirection(direction.strip().lower())
except ValueError as exc:
raise ValidationError(
f"Invalid sort direction in {token!r}; expected 'asc' or 'desc'.",
field="sort",
) from exc
else:
column, dir_enum = token, SortDirection.ASC
specs.append(SortSpec(column=column.strip(), direction=dir_enum))
return specs
97 changes: 97 additions & 0 deletions server/osa/application/api/v1/routes/data/_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""Response assembly for table reads — streaming and paginated paths.

Two shapes share one entry point :func:`build_table_response`:

* **Streaming** (``.csv`` / ``.csv.gz``): pre-flight pulls the first row inside
a try (research §4). A parse/validation/planner error raised before the first
row propagates to the route → mapped to a 4xx/404 *before any bytes*. On
success the first row is chained back in and the whole iterator is streamed.

* **Paginated** (JSON): consume up to ``limit + 1`` rows; if a ``limit+1``-th
row exists there's a next page, so derive ``next_cursor`` from the last
returned row's ``(sort_value, srn)`` pair. The bounded page (``limit`` ≤ 1000)
is then rendered by the JSON serializer.
"""

from __future__ import annotations

from collections.abc import AsyncIterator, Mapping, Sequence
from typing import Any

from fastapi.responses import StreamingResponse

from osa.application.api.v1.routes.data.formats import DataResponseFormat
from osa.domain.data.model.manifest import ColumnSpec
from osa.domain.data.model.query_plan import QueryPlan


async def build_table_response(
rows: AsyncIterator[Mapping[str, Any]],
fmt: DataResponseFormat,
columns: Sequence[ColumnSpec],
plan: QueryPlan,
) -> StreamingResponse:
if fmt.paginated:
return await _paginated_response(rows, fmt, columns, plan)
return await _streaming_response(rows, fmt, columns)


async def _streaming_response(
rows: AsyncIterator[Mapping[str, Any]],
fmt: DataResponseFormat,
columns: Sequence[ColumnSpec],
) -> StreamingResponse:
iterator = rows.__aiter__()
# Pre-flight: surface setup/validation errors before the first byte.
try:
first = await iterator.__anext__()
empty = False
except StopAsyncIteration:
empty = True

async def chained() -> AsyncIterator[Mapping[str, Any]]:
if not empty:
yield first
async for row in iterator:
yield row

serializer = fmt.make_serializer()
return StreamingResponse(
serializer.stream(chained(), columns),
media_type=fmt.media_type,
)


async def _paginated_response(
rows: AsyncIterator[Mapping[str, Any]],
fmt: DataResponseFormat,
columns: Sequence[ColumnSpec],
plan: QueryPlan,
) -> StreamingResponse:
limit = plan.pagination.limit
page: list[Mapping[str, Any]] = []
has_more = False
async for row in rows:
if len(page) == limit:
has_more = True
break
page.append(row)

next_cursor = _next_cursor(page, plan) if has_more else None

async def page_iter() -> AsyncIterator[Mapping[str, Any]]:
for row in page:
yield row

serializer = fmt.make_serializer()
return StreamingResponse(
serializer.stream(page_iter(), columns, next_cursor=next_cursor),
media_type=fmt.media_type,
)


def _next_cursor(page: list[Mapping[str, Any]], plan: QueryPlan) -> str | None:
# Tiebreak selection and sort=id aliasing live on plan.keyset — the same
# object the store builds its ORDER BY / after-condition from, so the
# encode side cannot drift from the decode side.
return plan.keyset.cursor_from_row(page[-1]) if page else None
41 changes: 41 additions & 0 deletions server/osa/application/api/v1/routes/data/catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Catalog & manifest routes — ``GET /data`` and ``GET /data/{schema}``.

JSON-only (no format suffix). Reserved schema names (``records``, ``datasets``)
and unknown schemas surface as 404 via the handler's ``NotFoundError``.

The node-catalog handler is registered directly on the prefixed ``/data``
router (an empty sub-path can't be ``include_router``-ed); the manifest handler
lives on ``manifest_router`` so its ``/{schema}`` catch-all can be ordered
after the literal ``/records/{id}`` route.
"""

from __future__ import annotations

from dishka.integrations.fastapi import DishkaRoute, FromDishka
from fastapi import APIRouter

from osa.domain.data.model.catalog import NodeCatalog
from osa.domain.data.model.manifest import SchemaManifest
from osa.domain.data.query.catalog import (
GetNodeCatalog,
GetNodeCatalogHandler,
GetSchemaManifest,
GetSchemaManifestHandler,
)

manifest_router = APIRouter(route_class=DishkaRoute)


async def get_node_catalog(handler: FromDishka[GetNodeCatalogHandler]) -> NodeCatalog:
"""List schemas hosted at this node."""
return await handler.run(GetNodeCatalog())


@manifest_router.get(
"/{schema}", operation_id="data_get_schema_manifest", response_model=SchemaManifest
)
async def get_schema_manifest(
schema: str, handler: FromDishka[GetSchemaManifestHandler]
) -> SchemaManifest:
"""Machine-readable manifest for a schema (`<id>` or `<id>@<semver>`)."""
return await handler.run(GetSchemaManifest(schema=schema))
Loading
Loading