Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,24 @@ All notable changes to the `fipsagents` package will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/).

## [0.14.0] - 2026-04-27

### Added

- **`SessionStore.update()`** — partial-update method on the ABC for recording per-session accumulator state without rewriting message history. Signature: `update(session_id, *, cost_data: dict | None = None) -> bool`. Implementations on `Null` (no-op `False`), `Sqlite` (Python-side shallow merge), `Postgres` (native `||` JSONB merge), and `Http` (maps to `PATCH /v1/sessions/{id}`). First slice of [#104](https://github.com/fips-agents/agent-template/issues/104) (Cost Tracking).
- **`SessionStore.get_cost_data()`** — symmetric reader so the server-side accumulator can read existing totals before writing cumulative ones back. Implemented on `Null` / `Sqlite` / `Postgres`; `Http` raises `NotImplementedError` until the platform exposes a GET endpoint (tracked at [fipsagents-platform#4](https://github.com/fips-agents/fipsagents-platform/issues/4)).
- **Per-turn token-usage persistence** — `OpenAIChatServer` extracts `prompt_tokens` / `completion_tokens` from each turn's terminal `StreamComplete` event (sync and streaming paths) and accumulates `input_tokens`, `output_tokens`, `cached_tokens`, `model`, and `turn_count` onto the session's `cost_data` via `SessionStore.update()`. Persistence failures are caught and logged so cost-tracking issues never break the chat response.
- **`cost_data` column** on the `sessions` table — `TEXT NOT NULL DEFAULT '{}'` on SQLite, `JSONB NOT NULL DEFAULT '{}'::jsonb` on Postgres. Existing databases pick up the column on first connect via idempotent `ALTER TABLE ADD COLUMN` migrations; no operator action required.

### Changed

- `SqliteSessionStore.save()` switches from `INSERT OR REPLACE` to `ON CONFLICT(session_id) DO UPDATE SET messages, updated_at` so `cost_data` survives saves of new messages. Postgres's `save()` already had the right shape.

### Notes

- HTTP-backed deployments currently fall back to per-turn-delta writes (last-write-wins) for `cost_data` because the platform doesn't yet expose a read endpoint — see [fipsagents-platform#4](https://github.com/fips-agents/fipsagents-platform/issues/4). SQLite/Postgres backends get cumulative semantics for free.
- Cost data shape (`input_tokens`, `output_tokens`, `cached_tokens`, `model`, `turn_count`) is owned by the server layer; pricing, budget enforcement, and aggregation endpoints are deferred follow-ups on [#104](https://github.com/fips-agents/agent-template/issues/104).

## [0.13.0] - 2026-04-27

### Added
Expand Down
2 changes: 1 addition & 1 deletion packages/fipsagents/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "fipsagents"
version = "0.13.0"
version = "0.14.0"
description = "Production-ready AI agent framework for FIPS/OpenShift environments"
readme = "README.md"
license = {file = "LICENSE"}
Expand Down
80 changes: 80 additions & 0 deletions packages/fipsagents/src/fipsagents/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,9 @@ async def _chat_completions(self, request: Request, req: ChatCompletionRequest):
# Session: save after sync response.
if req.session_id and self._session_store:
await self._session_store.save(req.session_id, agent.messages)
await self._persist_cost_data(
req.session_id, metrics, model_name,
)
if collector:
await collector.end_request()
if self._metrics_collector and metrics_start is not None:
Expand Down Expand Up @@ -677,6 +680,7 @@ async def _stream(
self._agent.messages = list(incoming)

stream_status = "ok"
captured_metrics: StreamMetrics | None = None
try:
events = self._agent.astep_stream(max_iterations=10, **(overrides or {}))
if self._metrics_collector is not None:
Expand All @@ -685,6 +689,18 @@ async def _stream(
)
if collector:
events = collector.observe(events)

# Pass-through observer that snapshots the StreamMetrics
# from the StreamComplete event so the post-stream
# cost-data accumulator can read them.
async def _capture_metrics(stream):
nonlocal captured_metrics
async for ev in stream:
if isinstance(ev, StreamComplete):
captured_metrics = ev.metrics
yield ev

events = _capture_metrics(events)
async for chunk in stream_events_as_sse(
events, model_name, trace_id=trace_id,
):
Expand All @@ -706,6 +722,70 @@ async def _stream(
# Session: save after streaming completes.
if session_id and self._session_store:
await self._session_store.save(session_id, self._agent.messages)
await self._persist_cost_data(
session_id, captured_metrics, model_name,
)

# -- Cost-data accumulator -----------------------------------------------

async def _persist_cost_data(
self,
session_id: str,
metrics: StreamMetrics | None,
model_name: str,
) -> None:
"""Accumulate this turn's token usage into the session's cost_data.

Cumulative-for-the-session: read the existing accumulator, add
this turn's deltas, write it back. Failures are logged and
swallowed -- cost tracking must never break the chat response.

Backends that don't support reading cost_data (eg
:class:`HttpSessionStore`) raise :class:`NotImplementedError`
from ``get_cost_data``; in that case we treat the existing
total as empty and the next ``update`` records this turn's
delta only. A follow-up issue tracks exposing the platform
read endpoint so HTTP-backed deployments get cumulative totals.
"""
if metrics is None or self._session_store is None:
return

prompt = metrics.prompt_tokens
completion = metrics.completion_tokens
# Nothing useful to record when the provider didn't report usage.
if prompt is None and completion is None:
return

try:
existing = await self._session_store.get_cost_data(session_id)
except NotImplementedError:
existing = {}
except Exception: # noqa: BLE001 — keep chat response alive
logger.warning(
"Failed to read cost_data for %s; using empty baseline",
session_id,
exc_info=True,
)
existing = {}

new_data = {
"input_tokens": int(existing.get("input_tokens", 0) or 0)
+ int(prompt or 0),
"output_tokens": int(existing.get("output_tokens", 0) or 0)
+ int(completion or 0),
"cached_tokens": int(existing.get("cached_tokens", 0) or 0),
"model": model_name or existing.get("model"),
"turn_count": int(existing.get("turn_count", 0) or 0) + 1,
}

try:
await self._session_store.update(session_id, cost_data=new_data)
except Exception: # noqa: BLE001 — keep chat response alive
logger.warning(
"Failed to persist cost_data for %s",
session_id,
exc_info=True,
)

# -- Run -----------------------------------------------------------------

Expand Down
31 changes: 31 additions & 0 deletions packages/fipsagents/src/fipsagents/server/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ class HttpSessionStore(SessionStore):
- ``create`` → ``POST /v1/sessions``
- ``load`` → ``GET /v1/sessions/{id}``
- ``save`` → ``PUT /v1/sessions/{id}`` (upsert)
- ``update`` → ``PATCH /v1/sessions/{id}``
- ``exists`` → ``HEAD /v1/sessions/{id}``
- ``delete`` → ``DELETE /v1/sessions/{id}``
- ``delete_before`` → no platform endpoint; logged no-op (the
Expand Down Expand Up @@ -233,6 +234,36 @@ async def save(self, session_id: str, messages: list[dict]) -> None:
json={"messages": messages},
)

async def update(
self,
session_id: str,
*,
cost_data: dict | None = None,
) -> bool:
if cost_data is None:
return await self.exists(session_id)
body: dict[str, Any] = {"cost_data": cost_data}
status, _ = await self._client.request(
"PATCH",
f"/v1/sessions/{session_id}",
json=body,
not_found_returns_none=True,
)
return status != 404

async def get_cost_data(self, session_id: str) -> dict:
# The platform service has no GET /v1/sessions/{id}/cost_data
# endpoint yet. Until it does, callers must treat the HTTP
# backend as write-only for cost accumulator state. The server's
# per-turn accumulator catches NotImplementedError and treats
# the existing total as empty (so the next write is the turn's
# delta rather than a true cumulative). A follow-up issue tracks
# exposing the read endpoint on the platform.
raise NotImplementedError(
"HttpSessionStore.get_cost_data: the platform service does "
"not expose a GET endpoint for cost_data yet."
)

async def delete(self, session_id: str) -> bool:
status, _ = await self._client.request(
"DELETE",
Expand Down
Loading