Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion services/intake/src/nmp/intake/api/v2/experiments/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@
from nmp.intake.spans.clickhouse_client import ClickHouseSpanClient
from nmp.intake.spans.domain import SpanStatus
from nmp.intake.spans.experiment_rollup_repository import (
METRICS_VERSION,
ExperimentRollup,
ExperimentRollupRepository,
ScoreRollup,
metrics_to_rollup,
)
from nmp.intake.spans.experiment_session_repository import ExperimentSessionRepository
from nmp.intake.spans.storage import make_pagination
Expand Down Expand Up @@ -366,7 +368,17 @@ async def list_experiments(
page_size=page_size,
)
responses = [ExperimentResponse.from_entity(e) for e in result.data]
await _hydrate_rollups(workspace=workspace, responses=responses, rollup_repository=rollup_repository)
# Prefer the denormalized metrics written by the refresh worker; fall back to a live
# ClickHouse hydrate for experiments not yet refreshed, written by a different metrics
# version, or whose blob can't be decoded.
needs_live_hydrate = []
for entity, response in zip(result.data, responses):
rollup = _cached_rollup(response.name, entity.metrics)
if rollup is not None:
_apply_rollup(response, rollup)
else:
needs_live_hydrate.append(response)
await _hydrate_rollups(workspace=workspace, responses=needs_live_hydrate, rollup_repository=rollup_repository)
return Page(
data=responses,
pagination=PaginationData(**result.pagination.model_dump()),
Expand Down Expand Up @@ -836,6 +848,21 @@ def _apply_is_pinned_filter(parsed: ParsedFilter) -> None:
parsed.and_with(null_clause)


def _cached_rollup(name: str, metrics: dict | None) -> ExperimentRollup | None:
"""Decode a denormalized metrics blob into a rollup, or ``None`` to fall back to live hydrate.

Falls back when the blob is absent, was written by a different ``METRICS_VERSION``, or can't be
decoded — so a single stale/corrupt row never fails the whole list response.
"""
if not metrics or metrics.get("version") != METRICS_VERSION:
return None
try:
return metrics_to_rollup(name, metrics)
except Exception:
logger.warning("Discarding unreadable cached metrics for experiment %s; using live hydrate", name)
return None


async def _hydrate_rollups(
*,
workspace: str,
Expand Down
9 changes: 9 additions & 0 deletions services/intake/src/nmp/intake/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,12 @@ class IntakeConfig(_BaseIntakeConfig):
ge=1024,
description="Maximum accepted body size for OTLP ingest requests, in bytes.",
)
rollup_refresh_interval_seconds: float = Field(
default=10.0,
gt=0,
description=(
"How often the background worker drains the dirty set and denormalizes ClickHouse rollups "
"onto Experiment entities. Bounds how stale a leaderboard's metrics can be after ingest; "
"ingest never blocks on it."
),
)
10 changes: 10 additions & 0 deletions services/intake/src/nmp/intake/entities/experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,13 @@ class Experiment(EntityBase):
"Pin state is workspace-shared: every user with workspace access sees the same pinned set."
),
)

metrics: dict[str, Any] | None = Field(
default=None,
description=(
"System-managed denormalized rollup summary (run_count, cost_usd, latency_ms, per-evaluator "
"scores, refreshed_at) computed from ClickHouse and refreshed by a background worker after "
"ingest. Not accepted on the create/update body; written only by the refresh path so it can "
"drive entity-store sort/filter without a per-read ClickHouse query."
),
)
18 changes: 18 additions & 0 deletions services/intake/src/nmp/intake/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from nmp.intake.config import IntakeConfig
from nmp.intake.spans.api import annotations, evaluator_results, spans, traces
from nmp.intake.spans.clickhouse_client import ClickHouseSettings, ClickHouseSpanClient
from nmp.intake.spans.experiment_rollup_refresher import ExperimentRollupRefresher
from nmp.intake.spans.experiment_rollup_repository import ExperimentRollupRepository
from nmp.intake.spans.ingest import atif, chat_completions, otlp

logger = logging.getLogger(__name__)
Expand All @@ -26,6 +28,8 @@ def __init__(self):
super().__init__(name="intake", module_name="nmp.intake")
# The client is owned by the service lifecycle; it is absent before startup and after shutdown.
self.clickhouse_client: ClickHouseSpanClient | None = None
# Background worker that denormalizes ClickHouse rollups onto Experiment entities.
self.rollup_refresher: ExperimentRollupRefresher | None = None
self._ready = False

@property
Expand Down Expand Up @@ -79,12 +83,26 @@ async def on_startup(self) -> None:
"clickhouse_database": cfg.clickhouse_config.database,
},
)
entity_client = self.dependency_provider.get_entity_client(as_service=self.name)
if entity_client is not None:
self.rollup_refresher = ExperimentRollupRefresher(
rollup_repository=ExperimentRollupRepository(self.clickhouse_client),
entity_client=entity_client,
interval_seconds=cfg.rollup_refresh_interval_seconds,
)
self.rollup_refresher.start()
else:
logger.warning("Entity client unavailable; experiment rollup refresher not started")
self._ready = True

async def on_shutdown(self) -> None:
"""Close the service-owned ClickHouse client."""

self._ready = False
# Stop the refresher first (its final flush still needs ClickHouse).
if self.rollup_refresher is not None:
await self.rollup_refresher.stop()
self.rollup_refresher = None
if self.clickhouse_client is not None:
await self.clickhouse_client.close()
self.clickhouse_client = None
Expand Down
10 changes: 10 additions & 0 deletions services/intake/src/nmp/intake/spans/api/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from nmp.intake.spans.annotations_repository import AnnotationsRepository
from nmp.intake.spans.clickhouse_client import ClickHouseSpanClient, get_clickhouse_client
from nmp.intake.spans.evaluator_results_repository import EvaluatorResultsRepository
from nmp.intake.spans.experiment_rollup_refresher import ExperimentRollupRefresher
from nmp.intake.spans.service import IntakeSpansService
from nmp.intake.spans.span_repository import SpanRepository
from nmp.intake.spans.trace_repository import TraceRepository
Expand Down Expand Up @@ -84,3 +85,12 @@ def get_spans_service(


SpansServiceDep = Annotated[IntakeSpansService, Depends(get_spans_service)]


def get_rollup_refresher(request: Request) -> ExperimentRollupRefresher | None:
"""Reach the service-owned rollup refresher from the request (absent if startup didn't create one)."""
service = getattr(request.app.state, "intake_service", None) or getattr(request.app.state, "service", None)
return getattr(service, "rollup_refresher", None) if service is not None else None


RollupRefresherDep = Annotated[ExperimentRollupRefresher | None, Depends(get_rollup_refresher)]
117 changes: 117 additions & 0 deletions services/intake/src/nmp/intake/spans/experiment_rollup_refresher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""Background worker that denormalizes ClickHouse rollups onto Experiment entities.

Ingest marks ``(workspace, experiment_id)`` dirty — a cheap, non-blocking set add. A
background loop drains the dirty set on a fixed interval, recomputes each touched
experiment's rollup via ``get_rollups`` (which uses ``FINAL``, so it's correct under
re-ingest), and writes the summary onto the experiment's system-managed ``metrics``
field. Bursts coalesce: many ingests for one experiment within an interval collapse to a
single recompute, and ingest latency is never gated on the rollup query.
"""

from __future__ import annotations

import asyncio
import logging
from datetime import datetime, timezone

from nmp.common.entities.client import EntityClient, EntityConflictError, EntityNotFoundError
from nmp.intake.entities.experiments import Experiment
from nmp.intake.spans.experiment_rollup_repository import ExperimentRollupRepository, rollup_to_metrics

logger = logging.getLogger(__name__)


class ExperimentRollupRefresher:
"""Coalesces dirty experiment ids and refreshes their denormalized ``metrics`` on a fixed cadence."""

def __init__(
self,
*,
rollup_repository: ExperimentRollupRepository,
entity_client: EntityClient,
interval_seconds: float = 10.0,
) -> None:
self._rollup_repository = rollup_repository
self._entity_client = entity_client
self._interval_seconds = interval_seconds
self._dirty: set[tuple[str, str]] = set()
self._task: asyncio.Task[None] | None = None
self._stopping = asyncio.Event()

def mark_dirty(self, *, workspace: str, experiment_id: str) -> None:
"""Queue an experiment for refresh. Cheap and non-blocking; safe to call from the ingest path."""
self._dirty.add((workspace, experiment_id))

def pending(self) -> set[tuple[str, str]]:
"""Return a copy of the currently-queued ``(workspace, experiment_id)`` pairs (for observability/tests)."""
return set(self._dirty)

def start(self) -> None:
if self._task is None:
self._stopping.clear()
self._task = asyncio.create_task(self._run())

async def stop(self) -> None:
# Signal the loop to exit and let it finish any in-flight flush — we never cancel mid-flush, so a
# detached batch can't be dropped before it's written. Then a final drain covers the cases the loop
# can't (it saw the stop flag before its first flush, or items were enqueued during the last flush).
self._stopping.set()
if self._task is not None:
await self._task
self._task = None
await self.flush()
Comment thread
coderabbitai[bot] marked this conversation as resolved.

async def _run(self) -> None:
while not self._stopping.is_set():
try:
# Interruptible sleep: wakes early when stop() sets the event so shutdown is prompt.
await asyncio.wait_for(self._stopping.wait(), timeout=self._interval_seconds)
except asyncio.TimeoutError:
pass # interval elapsed; time for a periodic flush
try:
await self.flush()
except Exception:
logger.exception("Experiment rollup refresh cycle failed")

async def flush(self) -> None:
"""Drain the dirty set and write current rollups. Directly callable for deterministic tests."""
if not self._dirty:
return
batch = self._dirty
self._dirty = set()
by_workspace: dict[str, list[str]] = {}
for workspace, experiment_id in batch:
by_workspace.setdefault(workspace, []).append(experiment_id)
for workspace, experiment_ids in by_workspace.items():
try:
await self._refresh_workspace(workspace, experiment_ids)
except Exception:
# Re-queue the whole workspace batch for the next cycle (e.g. ClickHouse unavailable).
logger.exception("Failed to refresh experiment rollups for workspace %s; re-queuing", workspace)
for experiment_id in experiment_ids:
self._dirty.add((workspace, experiment_id))

async def _refresh_workspace(self, workspace: str, experiment_ids: list[str]) -> None:
rollups = await self._rollup_repository.get_rollups(workspace=workspace, experiment_ids=experiment_ids)
refreshed_at = datetime.now(timezone.utc).isoformat()
for experiment_id in experiment_ids:
rollup = rollups.get(experiment_id)
if rollup is None:
continue
await self._write_metrics(workspace, experiment_id, rollup_to_metrics(rollup, refreshed_at=refreshed_at))

async def _write_metrics(self, workspace: str, experiment_id: str, metrics: dict) -> None:
try:
experiment = await self._entity_client.get(Experiment, name=experiment_id, workspace=workspace)
except EntityNotFoundError:
# Deleted between ingest and refresh; nothing to update.
return
experiment.metrics = metrics
try:
await self._entity_client.update(experiment)
except EntityConflictError:
# A concurrent user edit won the optimistic lock; re-queue for the next cycle.
self._dirty.add((workspace, experiment_id))
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,71 @@ def evaluator_names(self) -> list[str]:
return sorted(self.evaluator_scores)


def _score_to_dict(score: ScoreRollup | None) -> dict[str, Any] | None:
if score is None:
return None
return {
"sum": score.sum,
"mean": score.mean,
"median": score.median,
"p90": score.p90,
"p95": score.p95,
"p99": score.p99,
"count": score.count,
}


def _score_from_dict(data: dict[str, Any] | None) -> ScoreRollup | None:
if data is None:
return None
return ScoreRollup(
sum=data.get("sum"),
mean=data.get("mean"),
median=data.get("median"),
p90=data.get("p90"),
p95=data.get("p95"),
p99=data.get("p99"),
count=int(data.get("count", 0)),
)


# Bump when the stored metrics shape or how an aggregate is computed changes, so readers can tell
# blobs written by an older rollup version apart (and recompute/fall back rather than trust stale shapes).
METRICS_VERSION = 1

Comment thread
shanaiabuggy marked this conversation as resolved.

def rollup_to_metrics(rollup: ExperimentRollup, *, refreshed_at: str) -> dict[str, Any]:
"""Serialize a rollup into the JSON-safe dict stored on ``Experiment.metrics``."""
return {
"version": METRICS_VERSION,
"run_count": rollup.run_count,
"model_names": rollup.model_names,
"agent_names": rollup.agent_names,
"agent_versions": rollup.agent_versions,
"evaluators": {name: _score_to_dict(score) for name, score in rollup.evaluator_scores.items()},
"cost_usd": _score_to_dict(rollup.cost_usd),
"latency_ms": _score_to_dict(rollup.latency_ms),
"refreshed_at": refreshed_at,
}


def metrics_to_rollup(experiment_id: str, metrics: dict[str, Any]) -> ExperimentRollup:
"""Reconstruct a rollup from a stored ``Experiment.metrics`` dict (inverse of ``rollup_to_metrics``)."""
evaluators = metrics.get("evaluators") or {}
return ExperimentRollup(
experiment_id=experiment_id,
run_count=int(metrics.get("run_count", 0)),
model_names=list(metrics.get("model_names") or []),
agent_names=list(metrics.get("agent_names") or []),
agent_versions=list(metrics.get("agent_versions") or []),
evaluator_scores={
name: score for name, data in evaluators.items() if (score := _score_from_dict(data)) is not None
},
cost_usd=_score_from_dict(metrics.get("cost_usd")),
latency_ms=_score_from_dict(metrics.get("latency_ms")),
)


class ExperimentRollupRepository:
def __init__(self, client: ClickHouseSpanClient) -> None:
self._client = client
Expand Down
8 changes: 6 additions & 2 deletions services/intake/src/nmp/intake/spans/ingest/atif.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from fastapi import APIRouter, Depends, Response, status
from nmp.common.entities.client import EntityClient
from nmp.common.service.dependencies import get_entity_client
from nmp.intake.spans.api.dependencies import SpansServiceDep, require_workspace_access
from nmp.intake.spans.api.dependencies import RollupRefresherDep, SpansServiceDep, require_workspace_access
from nmp.intake.spans.domain import TraceBatch
from nmp.intake.spans.ingest.atif_domain import (
AtifAgent,
Expand Down Expand Up @@ -82,10 +82,12 @@ async def ingest_atif(
body: AtifIngestRequest,
service: SpansServiceDep,
entity_client: EntityClientDep,
refresher: RollupRefresherDep,
) -> Response:
context = body.resolved_evaluation_context()
await validate_experiment_context(
workspace=workspace,
context=body.resolved_evaluation_context(),
context=context,
entity_client=entity_client,
)
ingested_at = utc_now()
Expand All @@ -102,4 +104,6 @@ async def ingest_atif(
ingested_at=ingested_at,
)
await service.ingest_batch(TraceBatch(spans=spans, evaluator_results=evaluator_results))
if refresher is not None and context is not None and context.evaluation_id:
refresher.mark_dirty(workspace=workspace, experiment_id=context.evaluation_id)
return Response(status_code=status.HTTP_201_CREATED)
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from fastapi import APIRouter, Depends, status
from nmp.common.entities.client import EntityClient
from nmp.common.service.dependencies import get_entity_client
from nmp.intake.spans.api.dependencies import SpansServiceDep, require_workspace_access
from nmp.intake.spans.api.dependencies import RollupRefresherDep, SpansServiceDep, require_workspace_access
from nmp.intake.spans.domain import IntakeSpan, SpanKind, SpanStatus, TraceBatch
from nmp.intake.spans.ingest.evaluation_context import (
ExperimentContextIngestModel,
Expand Down Expand Up @@ -152,15 +152,19 @@ async def ingest_chat_completion(
body: ChatCompletionsIngestRequest,
service: SpansServiceDep,
entity_client: EntityClientDep,
refresher: RollupRefresherDep,
) -> ChatCompletionsIngestResponse:
context = body.resolved_evaluation_context()
await validate_experiment_context(
workspace=workspace,
context=body.resolved_evaluation_context(),
context=context,
entity_client=entity_client,
)
ingested_at = utc_now()
span = _chat_completion_to_span(workspace=workspace, body=body, ingested_at=ingested_at)
await service.ingest_batch(TraceBatch(spans=[span]))
if refresher is not None and context is not None and context.evaluation_id:
refresher.mark_dirty(workspace=workspace, experiment_id=context.evaluation_id)
return ChatCompletionsIngestResponse(
session_id=span.session_id,
span_id=span.external_span_id,
Expand Down
Loading
Loading