diff --git a/services/intake/src/nmp/intake/api/v2/experiments/endpoints.py b/services/intake/src/nmp/intake/api/v2/experiments/endpoints.py index 6bbe156a76..8d5d9a1077 100644 --- a/services/intake/src/nmp/intake/api/v2/experiments/endpoints.py +++ b/services/intake/src/nmp/intake/api/v2/experiments/endpoints.py @@ -40,9 +40,11 @@ from nmp.intake.spans.clickhouse_client import ClickHouseSpanClient from nmp.intake.spans.domain import SpanStatus from nmp.intake.spans.experiment_rollup_repository import ( + METRICS_VERSION, ExperimentRollup, ExperimentRollupRepository, ScoreRollup, + metrics_to_rollup, ) from nmp.intake.spans.experiment_session_repository import ExperimentSessionRepository from nmp.intake.spans.storage import make_pagination @@ -366,7 +368,17 @@ async def list_experiments( page_size=page_size, ) responses = [ExperimentResponse.from_entity(e) for e in result.data] - await _hydrate_rollups(workspace=workspace, responses=responses, rollup_repository=rollup_repository) + # Prefer the denormalized metrics written by the refresh worker; fall back to a live + # ClickHouse hydrate for experiments not yet refreshed, written by a different metrics + # version, or whose blob can't be decoded. + needs_live_hydrate = [] + for entity, response in zip(result.data, responses): + rollup = _cached_rollup(response.name, entity.metrics) + if rollup is not None: + _apply_rollup(response, rollup) + else: + needs_live_hydrate.append(response) + await _hydrate_rollups(workspace=workspace, responses=needs_live_hydrate, rollup_repository=rollup_repository) return Page( data=responses, pagination=PaginationData(**result.pagination.model_dump()), @@ -836,6 +848,21 @@ def _apply_is_pinned_filter(parsed: ParsedFilter) -> None: parsed.and_with(null_clause) +def _cached_rollup(name: str, metrics: dict | None) -> ExperimentRollup | None: + """Decode a denormalized metrics blob into a rollup, or ``None`` to fall back to live hydrate. + + Falls back when the blob is absent, was written by a different ``METRICS_VERSION``, or can't be + decoded — so a single stale/corrupt row never fails the whole list response. + """ + if not metrics or metrics.get("version") != METRICS_VERSION: + return None + try: + return metrics_to_rollup(name, metrics) + except Exception: + logger.warning("Discarding unreadable cached metrics for experiment %s; using live hydrate", name) + return None + + async def _hydrate_rollups( *, workspace: str, diff --git a/services/intake/src/nmp/intake/config.py b/services/intake/src/nmp/intake/config.py index d8b8565153..fa0d764863 100644 --- a/services/intake/src/nmp/intake/config.py +++ b/services/intake/src/nmp/intake/config.py @@ -52,3 +52,12 @@ class IntakeConfig(_BaseIntakeConfig): ge=1024, description="Maximum accepted body size for OTLP ingest requests, in bytes.", ) + rollup_refresh_interval_seconds: float = Field( + default=10.0, + gt=0, + description=( + "How often the background worker drains the dirty set and denormalizes ClickHouse rollups " + "onto Experiment entities. Bounds how stale a leaderboard's metrics can be after ingest; " + "ingest never blocks on it." + ), + ) diff --git a/services/intake/src/nmp/intake/entities/experiments.py b/services/intake/src/nmp/intake/entities/experiments.py index 46e95fe8d0..9d00b35530 100644 --- a/services/intake/src/nmp/intake/entities/experiments.py +++ b/services/intake/src/nmp/intake/entities/experiments.py @@ -79,3 +79,13 @@ class Experiment(EntityBase): "Pin state is workspace-shared: every user with workspace access sees the same pinned set." ), ) + + metrics: dict[str, Any] | None = Field( + default=None, + description=( + "System-managed denormalized rollup summary (run_count, cost_usd, latency_ms, per-evaluator " + "scores, refreshed_at) computed from ClickHouse and refreshed by a background worker after " + "ingest. Not accepted on the create/update body; written only by the refresh path so it can " + "drive entity-store sort/filter without a per-read ClickHouse query." + ), + ) diff --git a/services/intake/src/nmp/intake/service.py b/services/intake/src/nmp/intake/service.py index b529177eb4..7c354bd1ce 100644 --- a/services/intake/src/nmp/intake/service.py +++ b/services/intake/src/nmp/intake/service.py @@ -11,6 +11,8 @@ from nmp.intake.config import IntakeConfig from nmp.intake.spans.api import annotations, evaluator_results, spans, traces from nmp.intake.spans.clickhouse_client import ClickHouseSettings, ClickHouseSpanClient +from nmp.intake.spans.experiment_rollup_refresher import ExperimentRollupRefresher +from nmp.intake.spans.experiment_rollup_repository import ExperimentRollupRepository from nmp.intake.spans.ingest import atif, chat_completions, otlp logger = logging.getLogger(__name__) @@ -26,6 +28,8 @@ def __init__(self): super().__init__(name="intake", module_name="nmp.intake") # The client is owned by the service lifecycle; it is absent before startup and after shutdown. self.clickhouse_client: ClickHouseSpanClient | None = None + # Background worker that denormalizes ClickHouse rollups onto Experiment entities. + self.rollup_refresher: ExperimentRollupRefresher | None = None self._ready = False @property @@ -79,12 +83,26 @@ async def on_startup(self) -> None: "clickhouse_database": cfg.clickhouse_config.database, }, ) + entity_client = self.dependency_provider.get_entity_client(as_service=self.name) + if entity_client is not None: + self.rollup_refresher = ExperimentRollupRefresher( + rollup_repository=ExperimentRollupRepository(self.clickhouse_client), + entity_client=entity_client, + interval_seconds=cfg.rollup_refresh_interval_seconds, + ) + self.rollup_refresher.start() + else: + logger.warning("Entity client unavailable; experiment rollup refresher not started") self._ready = True async def on_shutdown(self) -> None: """Close the service-owned ClickHouse client.""" self._ready = False + # Stop the refresher first (its final flush still needs ClickHouse). + if self.rollup_refresher is not None: + await self.rollup_refresher.stop() + self.rollup_refresher = None if self.clickhouse_client is not None: await self.clickhouse_client.close() self.clickhouse_client = None diff --git a/services/intake/src/nmp/intake/spans/api/dependencies.py b/services/intake/src/nmp/intake/spans/api/dependencies.py index 2afe8152fe..faae28d50a 100644 --- a/services/intake/src/nmp/intake/spans/api/dependencies.py +++ b/services/intake/src/nmp/intake/spans/api/dependencies.py @@ -11,6 +11,7 @@ from nmp.intake.spans.annotations_repository import AnnotationsRepository from nmp.intake.spans.clickhouse_client import ClickHouseSpanClient, get_clickhouse_client from nmp.intake.spans.evaluator_results_repository import EvaluatorResultsRepository +from nmp.intake.spans.experiment_rollup_refresher import ExperimentRollupRefresher from nmp.intake.spans.service import IntakeSpansService from nmp.intake.spans.span_repository import SpanRepository from nmp.intake.spans.trace_repository import TraceRepository @@ -84,3 +85,12 @@ def get_spans_service( SpansServiceDep = Annotated[IntakeSpansService, Depends(get_spans_service)] + + +def get_rollup_refresher(request: Request) -> ExperimentRollupRefresher | None: + """Reach the service-owned rollup refresher from the request (absent if startup didn't create one).""" + service = getattr(request.app.state, "intake_service", None) or getattr(request.app.state, "service", None) + return getattr(service, "rollup_refresher", None) if service is not None else None + + +RollupRefresherDep = Annotated[ExperimentRollupRefresher | None, Depends(get_rollup_refresher)] diff --git a/services/intake/src/nmp/intake/spans/experiment_rollup_refresher.py b/services/intake/src/nmp/intake/spans/experiment_rollup_refresher.py new file mode 100644 index 0000000000..fc11cc6b99 --- /dev/null +++ b/services/intake/src/nmp/intake/spans/experiment_rollup_refresher.py @@ -0,0 +1,117 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Background worker that denormalizes ClickHouse rollups onto Experiment entities. + +Ingest marks ``(workspace, experiment_id)`` dirty — a cheap, non-blocking set add. A +background loop drains the dirty set on a fixed interval, recomputes each touched +experiment's rollup via ``get_rollups`` (which uses ``FINAL``, so it's correct under +re-ingest), and writes the summary onto the experiment's system-managed ``metrics`` +field. Bursts coalesce: many ingests for one experiment within an interval collapse to a +single recompute, and ingest latency is never gated on the rollup query. +""" + +from __future__ import annotations + +import asyncio +import logging +from datetime import datetime, timezone + +from nmp.common.entities.client import EntityClient, EntityConflictError, EntityNotFoundError +from nmp.intake.entities.experiments import Experiment +from nmp.intake.spans.experiment_rollup_repository import ExperimentRollupRepository, rollup_to_metrics + +logger = logging.getLogger(__name__) + + +class ExperimentRollupRefresher: + """Coalesces dirty experiment ids and refreshes their denormalized ``metrics`` on a fixed cadence.""" + + def __init__( + self, + *, + rollup_repository: ExperimentRollupRepository, + entity_client: EntityClient, + interval_seconds: float = 10.0, + ) -> None: + self._rollup_repository = rollup_repository + self._entity_client = entity_client + self._interval_seconds = interval_seconds + self._dirty: set[tuple[str, str]] = set() + self._task: asyncio.Task[None] | None = None + self._stopping = asyncio.Event() + + def mark_dirty(self, *, workspace: str, experiment_id: str) -> None: + """Queue an experiment for refresh. Cheap and non-blocking; safe to call from the ingest path.""" + self._dirty.add((workspace, experiment_id)) + + def pending(self) -> set[tuple[str, str]]: + """Return a copy of the currently-queued ``(workspace, experiment_id)`` pairs (for observability/tests).""" + return set(self._dirty) + + def start(self) -> None: + if self._task is None: + self._stopping.clear() + self._task = asyncio.create_task(self._run()) + + async def stop(self) -> None: + # Signal the loop to exit and let it finish any in-flight flush — we never cancel mid-flush, so a + # detached batch can't be dropped before it's written. Then a final drain covers the cases the loop + # can't (it saw the stop flag before its first flush, or items were enqueued during the last flush). + self._stopping.set() + if self._task is not None: + await self._task + self._task = None + await self.flush() + + async def _run(self) -> None: + while not self._stopping.is_set(): + try: + # Interruptible sleep: wakes early when stop() sets the event so shutdown is prompt. + await asyncio.wait_for(self._stopping.wait(), timeout=self._interval_seconds) + except asyncio.TimeoutError: + pass # interval elapsed; time for a periodic flush + try: + await self.flush() + except Exception: + logger.exception("Experiment rollup refresh cycle failed") + + async def flush(self) -> None: + """Drain the dirty set and write current rollups. Directly callable for deterministic tests.""" + if not self._dirty: + return + batch = self._dirty + self._dirty = set() + by_workspace: dict[str, list[str]] = {} + for workspace, experiment_id in batch: + by_workspace.setdefault(workspace, []).append(experiment_id) + for workspace, experiment_ids in by_workspace.items(): + try: + await self._refresh_workspace(workspace, experiment_ids) + except Exception: + # Re-queue the whole workspace batch for the next cycle (e.g. ClickHouse unavailable). + logger.exception("Failed to refresh experiment rollups for workspace %s; re-queuing", workspace) + for experiment_id in experiment_ids: + self._dirty.add((workspace, experiment_id)) + + async def _refresh_workspace(self, workspace: str, experiment_ids: list[str]) -> None: + rollups = await self._rollup_repository.get_rollups(workspace=workspace, experiment_ids=experiment_ids) + refreshed_at = datetime.now(timezone.utc).isoformat() + for experiment_id in experiment_ids: + rollup = rollups.get(experiment_id) + if rollup is None: + continue + await self._write_metrics(workspace, experiment_id, rollup_to_metrics(rollup, refreshed_at=refreshed_at)) + + async def _write_metrics(self, workspace: str, experiment_id: str, metrics: dict) -> None: + try: + experiment = await self._entity_client.get(Experiment, name=experiment_id, workspace=workspace) + except EntityNotFoundError: + # Deleted between ingest and refresh; nothing to update. + return + experiment.metrics = metrics + try: + await self._entity_client.update(experiment) + except EntityConflictError: + # A concurrent user edit won the optimistic lock; re-queue for the next cycle. + self._dirty.add((workspace, experiment_id)) diff --git a/services/intake/src/nmp/intake/spans/experiment_rollup_repository.py b/services/intake/src/nmp/intake/spans/experiment_rollup_repository.py index bd2f5ca993..53ee9421bf 100644 --- a/services/intake/src/nmp/intake/spans/experiment_rollup_repository.py +++ b/services/intake/src/nmp/intake/spans/experiment_rollup_repository.py @@ -41,6 +41,71 @@ def evaluator_names(self) -> list[str]: return sorted(self.evaluator_scores) +def _score_to_dict(score: ScoreRollup | None) -> dict[str, Any] | None: + if score is None: + return None + return { + "sum": score.sum, + "mean": score.mean, + "median": score.median, + "p90": score.p90, + "p95": score.p95, + "p99": score.p99, + "count": score.count, + } + + +def _score_from_dict(data: dict[str, Any] | None) -> ScoreRollup | None: + if data is None: + return None + return ScoreRollup( + sum=data.get("sum"), + mean=data.get("mean"), + median=data.get("median"), + p90=data.get("p90"), + p95=data.get("p95"), + p99=data.get("p99"), + count=int(data.get("count", 0)), + ) + + +# Bump when the stored metrics shape or how an aggregate is computed changes, so readers can tell +# blobs written by an older rollup version apart (and recompute/fall back rather than trust stale shapes). +METRICS_VERSION = 1 + + +def rollup_to_metrics(rollup: ExperimentRollup, *, refreshed_at: str) -> dict[str, Any]: + """Serialize a rollup into the JSON-safe dict stored on ``Experiment.metrics``.""" + return { + "version": METRICS_VERSION, + "run_count": rollup.run_count, + "model_names": rollup.model_names, + "agent_names": rollup.agent_names, + "agent_versions": rollup.agent_versions, + "evaluators": {name: _score_to_dict(score) for name, score in rollup.evaluator_scores.items()}, + "cost_usd": _score_to_dict(rollup.cost_usd), + "latency_ms": _score_to_dict(rollup.latency_ms), + "refreshed_at": refreshed_at, + } + + +def metrics_to_rollup(experiment_id: str, metrics: dict[str, Any]) -> ExperimentRollup: + """Reconstruct a rollup from a stored ``Experiment.metrics`` dict (inverse of ``rollup_to_metrics``).""" + evaluators = metrics.get("evaluators") or {} + return ExperimentRollup( + experiment_id=experiment_id, + run_count=int(metrics.get("run_count", 0)), + model_names=list(metrics.get("model_names") or []), + agent_names=list(metrics.get("agent_names") or []), + agent_versions=list(metrics.get("agent_versions") or []), + evaluator_scores={ + name: score for name, data in evaluators.items() if (score := _score_from_dict(data)) is not None + }, + cost_usd=_score_from_dict(metrics.get("cost_usd")), + latency_ms=_score_from_dict(metrics.get("latency_ms")), + ) + + class ExperimentRollupRepository: def __init__(self, client: ClickHouseSpanClient) -> None: self._client = client diff --git a/services/intake/src/nmp/intake/spans/ingest/atif.py b/services/intake/src/nmp/intake/spans/ingest/atif.py index 81106d4b55..dbb49c3c7b 100644 --- a/services/intake/src/nmp/intake/spans/ingest/atif.py +++ b/services/intake/src/nmp/intake/spans/ingest/atif.py @@ -10,7 +10,7 @@ from fastapi import APIRouter, Depends, Response, status from nmp.common.entities.client import EntityClient from nmp.common.service.dependencies import get_entity_client -from nmp.intake.spans.api.dependencies import SpansServiceDep, require_workspace_access +from nmp.intake.spans.api.dependencies import RollupRefresherDep, SpansServiceDep, require_workspace_access from nmp.intake.spans.domain import TraceBatch from nmp.intake.spans.ingest.atif_domain import ( AtifAgent, @@ -82,10 +82,12 @@ async def ingest_atif( body: AtifIngestRequest, service: SpansServiceDep, entity_client: EntityClientDep, + refresher: RollupRefresherDep, ) -> Response: + context = body.resolved_evaluation_context() await validate_experiment_context( workspace=workspace, - context=body.resolved_evaluation_context(), + context=context, entity_client=entity_client, ) ingested_at = utc_now() @@ -102,4 +104,6 @@ async def ingest_atif( ingested_at=ingested_at, ) await service.ingest_batch(TraceBatch(spans=spans, evaluator_results=evaluator_results)) + if refresher is not None and context is not None and context.evaluation_id: + refresher.mark_dirty(workspace=workspace, experiment_id=context.evaluation_id) return Response(status_code=status.HTTP_201_CREATED) diff --git a/services/intake/src/nmp/intake/spans/ingest/chat_completions.py b/services/intake/src/nmp/intake/spans/ingest/chat_completions.py index 67d6aa9bbf..43f0d80b87 100644 --- a/services/intake/src/nmp/intake/spans/ingest/chat_completions.py +++ b/services/intake/src/nmp/intake/spans/ingest/chat_completions.py @@ -19,7 +19,7 @@ from fastapi import APIRouter, Depends, status from nmp.common.entities.client import EntityClient from nmp.common.service.dependencies import get_entity_client -from nmp.intake.spans.api.dependencies import SpansServiceDep, require_workspace_access +from nmp.intake.spans.api.dependencies import RollupRefresherDep, SpansServiceDep, require_workspace_access from nmp.intake.spans.domain import IntakeSpan, SpanKind, SpanStatus, TraceBatch from nmp.intake.spans.ingest.evaluation_context import ( ExperimentContextIngestModel, @@ -152,15 +152,19 @@ async def ingest_chat_completion( body: ChatCompletionsIngestRequest, service: SpansServiceDep, entity_client: EntityClientDep, + refresher: RollupRefresherDep, ) -> ChatCompletionsIngestResponse: + context = body.resolved_evaluation_context() await validate_experiment_context( workspace=workspace, - context=body.resolved_evaluation_context(), + context=context, entity_client=entity_client, ) ingested_at = utc_now() span = _chat_completion_to_span(workspace=workspace, body=body, ingested_at=ingested_at) await service.ingest_batch(TraceBatch(spans=[span])) + if refresher is not None and context is not None and context.evaluation_id: + refresher.mark_dirty(workspace=workspace, experiment_id=context.evaluation_id) return ChatCompletionsIngestResponse( session_id=span.session_id, span_id=span.external_span_id, diff --git a/services/intake/tests/integration/spans/test_experiment_metrics_refresh.py b/services/intake/tests/integration/spans/test_experiment_metrics_refresh.py new file mode 100644 index 0000000000..8d4bd7f0c7 --- /dev/null +++ b/services/intake/tests/integration/spans/test_experiment_metrics_refresh.py @@ -0,0 +1,79 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""ATIF ingest marks the touched experiment dirty on the rollup refresher. + +The refresh worker's flush/write logic is covered by the unit tests in +``test_experiment_rollup_refresher.py``; here we verify the ingest-path wiring that +queues experiments for it. +""" + +from __future__ import annotations + +from typing import Any, cast + +from fastapi.testclient import TestClient + +ATIF_INGEST = "/apis/intake/v2/workspaces/default/ingest/atif" +EXPERIMENTS = "/apis/intake/v2/workspaces/default/experiments" +GROUPS = "/apis/intake/v2/workspaces/default/experiment-groups" + + +def _ensure_group(client: TestClient, name: str = "metrics-refresh-group") -> str: + response = client.post(GROUPS, json={"name": name}) + if response.status_code == 409: + response = client.get(f"{GROUPS}/{name}") + response.raise_for_status() + return response.json()["id"] + + +def _service(client: TestClient) -> Any: + # client.app is a FastAPI app at runtime but typed as a bare ASGI callable. + state = cast(Any, client.app).state + return getattr(state, "intake_service", None) or getattr(state, "service", None) + + +def _atif_body(*, experiment_id: str, score: float) -> dict[str, Any]: + return { + "schema_version": "ATIF-v1.7", + "session_id": f"{experiment_id}-session", + "experiment_context": {"experiment_id": experiment_id, "test_case_id": "case-1"}, + "extra": {"task_name": "case-1", "verifier_result": {"rewards": {"reward": score}}}, + "agent": {"name": "sample-agent", "version": "1.0.0", "model_name": "provider/sample-model"}, + "steps": [], + } + + +def test_atif_ingest_marks_experiment_dirty(client: TestClient) -> None: + group_id = _ensure_group(client) + experiment_id = "metrics-dirty-exp" + created = client.post( + EXPERIMENTS, + json={"name": experiment_id, "experiment_group_id": group_id, "dataset_name": "ds"}, + ) + assert created.status_code == 201, created.text + + response = client.post(ATIF_INGEST, json=_atif_body(experiment_id=experiment_id, score=1.0)) + assert response.status_code == 201, response.text + + refresher = _service(client).rollup_refresher + assert refresher is not None + assert ("default", experiment_id) in refresher.pending() + + +def test_atif_ingest_without_experiment_context_marks_nothing(client: TestClient) -> None: + response = client.post( + ATIF_INGEST, + json={ + "schema_version": "ATIF-v1.6", + "session_id": "no-context-session", + "extra": {"verifier_result": {"rewards": {"reward": 1.0}}}, + "agent": {"name": "sample-agent", "version": "1.0.0"}, + "steps": [], + }, + ) + assert response.status_code == 201, response.text + + refresher = _service(client).rollup_refresher + assert refresher is not None + assert refresher.pending() == set() diff --git a/services/intake/tests/test_experiment_metrics_cache_gate.py b/services/intake/tests/test_experiment_metrics_cache_gate.py new file mode 100644 index 0000000000..d9a7009ae9 --- /dev/null +++ b/services/intake/tests/test_experiment_metrics_cache_gate.py @@ -0,0 +1,36 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""The list read path only trusts denormalized metrics on a version+decode match, else falls back.""" + +from __future__ import annotations + +from nmp.intake.api.v2.experiments.endpoints import _cached_rollup +from nmp.intake.spans.experiment_rollup_repository import METRICS_VERSION, ExperimentRollup, rollup_to_metrics + + +def _metrics() -> dict: + rollup = ExperimentRollup(experiment_id="exp", run_count=3) + return rollup_to_metrics(rollup, refreshed_at="2026-06-23T00:00:00+00:00") + + +def test_decodes_current_version() -> None: + rollup = _cached_rollup("exp", _metrics()) + assert rollup is not None + assert rollup.run_count == 3 + + +def test_none_when_absent() -> None: + assert _cached_rollup("exp", None) is None + assert _cached_rollup("exp", {}) is None + + +def test_none_on_version_mismatch() -> None: + stale = _metrics() + stale["version"] = METRICS_VERSION + 1 + assert _cached_rollup("exp", stale) is None + + +def test_none_on_malformed_blob() -> None: + # Right version, but a shape that blows up decode -> treated as cache-miss, not an exception. + assert _cached_rollup("exp", {"version": METRICS_VERSION, "evaluators": "not-a-dict"}) is None diff --git a/services/intake/tests/test_experiment_rollup_refresher.py b/services/intake/tests/test_experiment_rollup_refresher.py new file mode 100644 index 0000000000..ef316bb783 --- /dev/null +++ b/services/intake/tests/test_experiment_rollup_refresher.py @@ -0,0 +1,166 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Unit tests for the debounced experiment rollup refresher.""" + +from __future__ import annotations + +from typing import cast + +import pytest +from nmp.common.entities.client import EntityClient, EntityConflictError, EntityNotFoundError +from nmp.intake.entities.experiments import Experiment +from nmp.intake.spans.experiment_rollup_refresher import ExperimentRollupRefresher +from nmp.intake.spans.experiment_rollup_repository import ( + ExperimentRollup, + ExperimentRollupRepository, + ScoreRollup, + metrics_to_rollup, + rollup_to_metrics, +) + + +def _sample_rollup(experiment_id: str) -> ExperimentRollup: + return ExperimentRollup( + experiment_id=experiment_id, + run_count=3, + model_names=["provider/model"], + agent_names=["agent"], + agent_versions=["1.0"], + evaluator_scores={"reward": ScoreRollup(sum=2.0, mean=0.667, median=1.0, p90=1.0, p95=1.0, p99=1.0, count=3)}, + cost_usd=ScoreRollup(sum=0.6, mean=0.2, median=0.2, p90=0.3, p95=0.3, p99=0.3, count=3), + latency_ms=ScoreRollup(sum=3000.0, mean=1000.0, median=1000.0, p90=1500.0, p95=1500.0, p99=1500.0, count=3), + ) + + +class _FakeRollupRepo: + def __init__(self, *, error: Exception | None = None) -> None: + self.calls: list[tuple[str, list[str]]] = [] + self._error = error + + async def get_rollups(self, *, workspace: str, experiment_ids: list[str]) -> dict[str, ExperimentRollup]: + self.calls.append((workspace, list(experiment_ids))) + if self._error is not None: + raise self._error + return {experiment_id: _sample_rollup(experiment_id) for experiment_id in experiment_ids} + + +class _FakeEntityClient: + def __init__(self, *, get_error: Exception | None = None, update_error: Exception | None = None) -> None: + self.updated: list[Experiment] = [] + self.get_calls: list[tuple[str, str]] = [] + self._get_error = get_error + self._update_error = update_error + + async def get(self, entity_type: type[Experiment], *, name: str, workspace: str) -> Experiment: + self.get_calls.append((name, workspace)) + if self._get_error is not None: + raise self._get_error + return Experiment(name=name, workspace=workspace, experiment_group_id="grp", dataset_name="ds") + + async def update(self, entity: Experiment) -> Experiment: + if self._update_error is not None: + raise self._update_error + self.updated.append(entity) + return entity + + +def _refresher(repo: _FakeRollupRepo, entity_client: _FakeEntityClient) -> ExperimentRollupRefresher: + return ExperimentRollupRefresher( + rollup_repository=cast(ExperimentRollupRepository, repo), + entity_client=cast(EntityClient, entity_client), + interval_seconds=999, + ) + + +def test_rollup_metrics_round_trip() -> None: + rollup = _sample_rollup("exp-1") + restored = metrics_to_rollup("exp-1", rollup_to_metrics(rollup, refreshed_at="2026-06-23T00:00:00+00:00")) + assert restored.run_count == rollup.run_count + assert restored.model_names == rollup.model_names + assert restored.evaluator_scores["reward"] == rollup.evaluator_scores["reward"] + assert restored.cost_usd == rollup.cost_usd + assert restored.latency_ms == rollup.latency_ms + + +@pytest.mark.asyncio +async def test_flush_writes_denormalized_metrics() -> None: + repo = _FakeRollupRepo() + entity_client = _FakeEntityClient() + refresher = _refresher(repo, entity_client) + + refresher.mark_dirty(workspace="default", experiment_id="exp-a") + refresher.mark_dirty(workspace="default", experiment_id="exp-b") + await refresher.flush() + + # One batched rollup query for the workspace, covering both dirty experiments. + assert len(repo.calls) == 1 + workspace, ids = repo.calls[0] + assert workspace == "default" + assert set(ids) == {"exp-a", "exp-b"} + + # Each experiment got its metrics written, and the dirty set is drained. + assert {entity.name for entity in entity_client.updated} == {"exp-a", "exp-b"} + written = entity_client.updated[0].metrics + assert written is not None + assert written["version"] == 1 + assert written["run_count"] == 3 + assert written["evaluators"]["reward"]["mean"] == pytest.approx(0.667) + assert written["cost_usd"]["mean"] == pytest.approx(0.2) + assert "refreshed_at" in written + assert refresher.pending() == set() + + +def test_mark_dirty_dedupes() -> None: + refresher = _refresher(_FakeRollupRepo(), _FakeEntityClient()) + refresher.mark_dirty(workspace="default", experiment_id="exp-a") + refresher.mark_dirty(workspace="default", experiment_id="exp-a") + assert refresher.pending() == {("default", "exp-a")} + + +@pytest.mark.asyncio +async def test_flush_noop_when_clean() -> None: + repo = _FakeRollupRepo() + await _refresher(repo, _FakeEntityClient()).flush() + assert repo.calls == [] + + +@pytest.mark.asyncio +async def test_missing_experiment_is_skipped() -> None: + entity_client = _FakeEntityClient(get_error=EntityNotFoundError("gone")) + refresher = _refresher(_FakeRollupRepo(), entity_client) + refresher.mark_dirty(workspace="default", experiment_id="exp-a") + await refresher.flush() + assert entity_client.updated == [] + assert refresher.pending() == set() # not re-queued; the experiment no longer exists + + +@pytest.mark.asyncio +async def test_update_conflict_requeues() -> None: + entity_client = _FakeEntityClient(update_error=EntityConflictError("version mismatch")) + refresher = _refresher(_FakeRollupRepo(), entity_client) + refresher.mark_dirty(workspace="default", experiment_id="exp-a") + await refresher.flush() + # A concurrent edit won the optimistic lock; the experiment is re-queued for the next cycle. + assert refresher.pending() == {("default", "exp-a")} + + +@pytest.mark.asyncio +async def test_stop_flushes_pending_without_loss() -> None: + entity_client = _FakeEntityClient() + refresher = _refresher(_FakeRollupRepo(), entity_client) + refresher.mark_dirty(workspace="default", experiment_id="exp-a") + refresher.start() + # stop() signals the loop to exit and lets it run a final drain — no mid-flush cancellation. + await refresher.stop() + assert {entity.name for entity in entity_client.updated} == {"exp-a"} + assert refresher.pending() == set() + + +@pytest.mark.asyncio +async def test_rollup_query_failure_requeues() -> None: + repo = _FakeRollupRepo(error=RuntimeError("clickhouse down")) + refresher = _refresher(repo, _FakeEntityClient()) + refresher.mark_dirty(workspace="default", experiment_id="exp-a") + await refresher.flush() + assert refresher.pending() == {("default", "exp-a")} diff --git a/web/packages/sdk/generated/agents/schema/DeploymentLogsResponse.ts b/web/packages/sdk/generated/agents/schema/DeploymentLogsResponse.ts index b2dd176038..41701a5090 100644 --- a/web/packages/sdk/generated/agents/schema/DeploymentLogsResponse.ts +++ b/web/packages/sdk/generated/agents/schema/DeploymentLogsResponse.ts @@ -6,7 +6,7 @@ * Do not edit manually. * agents (plugin) */ -import type { LogLine } from './LogLine.ts'; +import type { LogLine } from './LogLine'; /** * Response body for ``GET /deployments/{name}/logs``.