Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,31 @@ All notable changes to AgentFlow are documented in this file.

### Added

- **DV2 raw vault migrated from ClickHouse to PostgreSQL** with a cloud
supplier / product reference (`warehouse/agentflow/dv2/`). Hubs, links, and
satellites are emitted in a PostgreSQL dialect (`argMax` → `DISTINCT ON`,
`splitByString` → `split_part`); both ingestion feeds (the X5 loader and the
reference loader) repoint to a shared, parameterized `pg_vault_writer`; and
OLTP → vault promotion runs as an in-database `INSERT … SELECT` now that OLTP
and the vault share one engine. The ClickHouse dialect regenerates
byte-for-byte and is retained for optional mart-serving. Each generated
`INSERT` is parsed by `sqlglot` in `tests/unit/test_dv2_postgres_ingestion.py`
to assert every interpolated column exists in the committed DDL. (#91)
- **PyIceberg sink backed by a real MinIO object store** — the REST catalog
now writes through `S3FileIO` to `s3://agentflow-lake/warehouse` (the same
bucket the Flink path uses) instead of an ephemeral `/tmp/warehouse`
`HadoopFileIO`. A self-contained `docker-compose.iceberg.yml` (MinIO +
bucket-init + REST catalog) and env-overridable credentials in
`config/iceberg.yaml` make the local catalog object-store-backed; a no-Docker
guard asserts an `s3://` warehouse never triggers a local `mkdir`. (#92)
- **Event-driven OLTP → vault freshness via PostgreSQL `LISTEN`/`NOTIFY`** —
`AFTER INSERT/UPDATE` triggers on each `ops_<branch>` table emit
`pg_notify('dv2_vault_refresh', …)`, and a guarded listener runs an
idempotent promote on each event (push, not polling), the PostgreSQL
equivalent of the ClickHouse `MaterializedPostgreSQL` CDC path. Lag is
observed on the server clock (`db_now`) to stay free of host/container
clock skew. The driver-agnostic core is covered no-Docker by
`tests/unit/test_dv2_freshness_listen_notify.py`. (#93)
- OpenSSF Scorecard $0 supply-chain security posture channel:
`.github/workflows/scorecard.yml` (`ossf/scorecard-action@v2.4.3`) runs on
push to `main`, weekly, and on branch-protection changes with top-level
Expand Down Expand Up @@ -41,6 +66,13 @@ All notable changes to AgentFlow are documented in this file.

### Changed

- Dependency maintenance batch (consolidated from eight Dependabot PRs): the
Docker `python` base digest, `cp-kafka-connect-base` 7.9.7 → 7.9.8,
`apache-flink` 2.2.1 → 2.3.0 (validated by the Flink smoke job), and the SDK
`vitest` / `schemathesis` bumps, plus the GitHub Pages action major bumps
(`checkout` v7, `configure-pages` v6, `upload-pages-artifact` v5,
`deploy-pages` v5) — all SHA-pinned and validated on `main` by the Deploy
Pages and Flink Smoke runs. (#94)
- Production CDC onboarding, PMF/pricing evidence, a production-hardware
benchmark, and an external pen-test attestation are documented as out of
scope for the current plan. Their acceptance criteria require external
Expand Down
20 changes: 16 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

> Event-native metrics layer: business metrics that move when events happen — measured **1.1 s p50** event-to-metric freshness on production defaults. Live entity lookups, typed contracts, dual-language SDKs, and release-gated delivery for people, dashboards, services, and AI agents alike.

[![Release gate](https://img.shields.io/badge/release_gate-v1.4_published-brightgreen)](docs/dv2-multi-branch/RELEASE_STATUS.md)
[![Release gate](https://img.shields.io/badge/release_gate-v1.5_published-brightgreen)](docs/dv2-multi-branch/RELEASE_STATUS.md)
[![codecov](https://codecov.io/gh/brownjuly2003-code/agentflow/branch/main/graph/badge.svg)](https://codecov.io/gh/brownjuly2003-code/agentflow)
[![Python](https://img.shields.io/badge/python-3.11+-blue)](pyproject.toml)
[![License](https://img.shields.io/badge/license-MIT-blue)](LICENSE)
Expand All @@ -24,7 +24,7 @@ Consumers are whoever needs the number now: humans, dashboards, downstream servi

- **Measured event-to-metric freshness** — an event entering the pipeline is reflected in `GET /v1/metrics/*` in **1.06 s p50 / 1.99 s p95** on production defaults (event-driven cache invalidation, no webhook registration), tunable to **238 ms p50**; a plain TTL cache on the same pipeline sits at ~15 s. Reproducible via `python scripts/benchmark_freshness.py` → [freshness benchmark](docs/freshness-benchmark.md)
- **Lineage as a contract** — all six metrics declare their source events, serving table, and a 2.5 s p95 staleness budget in versioned contracts, exposed through `/v1/catalog` and `/v1/contracts` and pinned by tests against the actual write path
- **Published release line through `v1.4.0`** on PyPI (`agentflow-runtime`, `agentflow-client`) and npm (`@yuliaedomskikh/agentflow-client`) via OIDC Trusted Publishers with SLSA provenance on every artifact
- **Published release line through `v1.5.0`** on PyPI (`agentflow-runtime`, `agentflow-client`) and npm (`@yuliaedomskikh/agentflow-client`) via OIDC Trusted Publishers with SLSA provenance on every artifact
- **Tested and gated** — 960+ unit tests plus a broad Windows no-Docker suite; CI enforces 12 required status checks (lint, schema, unit, integration, helm, perf, terraform, bandit, safety, npm-audit, trivy, contract) through branch protection
- **Dual SDK parity** across Python and TypeScript — retries, circuit breakers, batching, pagination, contract pinning, idempotency keys, `as_of` historical reads — over sub-second entity lookups (p50 `38–55 ms`, p99 `167 ms` on local hardware)
- **Security in the hot path** — tenant isolation on every read surface, parameterized queries, `sqlglot` AST validation for NL-to-SQL, fail-closed auth, secret scrubbing, and a Bandit gate for new findings
Expand Down Expand Up @@ -147,12 +147,12 @@ python scripts/bandit_diff.py .bandit-baseline.json .tmp/bandit-current.json

## Status

**`v1.4.0` is the current release line** — PyPI `agentflow-runtime` /
**`v1.5.0` is the current release line** — PyPI `agentflow-runtime` /
`agentflow-client` and npm `@yuliaedomskikh/agentflow-client`, all
published via OIDC Trusted Publishers with SLSA provenance attestations.
CI on `main` is green across all 12 required checks.

The `v1.1.0` → `v1.4.0` arc landed in four increments on top of a security
The `v1.1.0` → `v1.5.0` arc landed in five increments on top of a security
audit-closure sprint:

- **`v1.1.0`** — audit closure: tenant isolation across every read
Expand All @@ -170,6 +170,18 @@ audit-closure sprint:
templates, contract/DORA CI hardening, repo hygiene, and a dependency
wave (`mypy`, Terraform AWS provider, TypeScript, GitHub Actions,
Vitest). No runtime API changes from `v1.3.0`.
- **`v1.5.0`** — security & correctness hardening: argon2id key hashing
with an O(1) peppered lookup index (M-C4), an NL→SQL guard bypass fix
(typed `read_csv` / `read_parquet` scan functions now denied in
projection position), `sqlglot` control-byte and mutation-target
repairs, and a strict-`mypy` expansion across the orchestration and
freshness slices. No public API changes.

Beyond the tagged line, `main` carries post-`v1.5.0` work pending the next
tag: the DV2 raw vault migrated from ClickHouse to PostgreSQL with a cloud
supplier reference, the PyIceberg sink backed by a real MinIO object store,
and event-driven OLTP→vault freshness via PostgreSQL `LISTEN`/`NOTIFY`. See
the `[Unreleased]` section of the [changelog](CHANGELOG.md) for details.

### Scope

Expand Down
11 changes: 11 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
# Supplemental runtime pins — NOT the full dependency set.
#
# The source of truth for installing AgentFlow is pyproject.toml
# ([project.dependencies] + the optional-dependency extras). Install the
# project itself with `pip install .` (or `.[dev,cloud,...]`); this file does
# not stand alone.
#
# It pins the OpenTelemetry observability stack explicitly so the e2e,
# mutation, and staging-deploy workflows install a consistent OTel floor on
# top of the package install (`pip install -r requirements.txt`), and so the
# security.yml Safety scan resolves it alongside pyproject's runtime deps.
opentelemetry-sdk>=1.41,<2
opentelemetry-exporter-otlp-proto-grpc>=1.41,<2
opentelemetry-instrumentation-fastapi>=0.62b0,<1
Expand Down
5 changes: 2 additions & 3 deletions src/serving/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import inspect
import json
from datetime import timedelta
from typing import Any, cast

import structlog
Expand Down Expand Up @@ -68,10 +67,10 @@ async def set(self, key: str, value: dict, ttl: int = 30) -> None:
)
return
try:
await self._redis.setex(
await self._redis.set(
key,
timedelta(seconds=ttl),
json.dumps(jsonable_encoder(value)),
ex=ttl,
)
except Exception as exc:
logger.warning(
Expand Down
6 changes: 3 additions & 3 deletions tests/chaos/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ def __init__(self, host: str, port: int) -> None:
async def get(self, key: str):
return await self._command("GET", key)

async def setex(self, key: str, ttl, value: str):
seconds = int(ttl.total_seconds()) if hasattr(ttl, "total_seconds") else int(ttl)
await self._command("SETEX", key, str(seconds), value)
async def set(self, key: str, value: str, ex=None):
seconds = int(ex.total_seconds()) if hasattr(ex, "total_seconds") else int(ex)
await self._command("SET", key, value, "EX", str(seconds))

async def keys(self, pattern: str):
return await self._command("KEYS", pattern)
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/test_tenant_isolation.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def __init__(self) -> None:
async def get(self, key: str):
return self.data.get(key)

async def setex(self, key: str, ttl, value: str) -> None:
async def set(self, key: str, value: str, ex=None) -> None:
self.data[key] = value

async def keys(self, pattern: str):
Expand Down Expand Up @@ -265,7 +265,7 @@ def __init__(self) -> None:
async def get(self, key: str):
return self.data.get(key)

async def setex(self, key: str, ttl, value: str) -> None:
async def set(self, key: str, value: str, ex=None) -> None:
self.data[key] = value

async def keys(self, pattern: str):
Expand Down
14 changes: 7 additions & 7 deletions tests/unit/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def __init__(self):
self.deleted: list[tuple[str, ...]] = []
self.set_calls: list[tuple[str, object, str]] = []
self.raise_on_get: Exception | None = None
self.raise_on_setex: Exception | None = None
self.raise_on_set: Exception | None = None
self.raise_on_keys: Exception | None = None
self.closed = False

Expand All @@ -27,10 +27,10 @@ async def get(self, key: str):
raise self.raise_on_get
return self.data.get(key)

async def setex(self, key: str, ttl, value: str):
if self.raise_on_setex is not None:
raise self.raise_on_setex
self.set_calls.append((key, ttl, value))
async def set(self, key: str, value: str, ex=None):
if self.raise_on_set is not None:
raise self.raise_on_set
self.set_calls.append((key, ex, value))
self.data[key] = value

async def keys(self, pattern: str):
Expand Down Expand Up @@ -139,7 +139,7 @@ async def test_query_cache_set_serializes_payload_with_ttl():

key, ttl, value = redis_client.set_calls[0]
assert key == "metric:revenue:1h:now"
assert int(ttl.total_seconds()) == 45
assert ttl == 45
assert json.loads(value) == {"value": 99.0}


Expand Down Expand Up @@ -215,7 +215,7 @@ def test_metric_endpoint_warns_and_serves_uncached_when_redis_is_unavailable(mon
monkeypatch.setattr(cache_module, "logger", logger)
redis_client = FakeRedis()
redis_client.raise_on_get = RuntimeError("redis down")
redis_client.raise_on_setex = RuntimeError("redis down")
redis_client.raise_on_set = RuntimeError("redis down")
engine = EngineStub()
client = _build_client(QueryCache(redis_client=redis_client), engine)

Expand Down
6 changes: 3 additions & 3 deletions tests/unit/test_entity_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ def __init__(self) -> None:
async def get(self, key: str):
return self.data.get(key)

async def setex(self, key: str, ttl, value: str) -> None:
self.set_calls.append((key, ttl, value))
async def set(self, key: str, value: str, ex=None) -> None:
self.set_calls.append((key, ex, value))
self.data[key] = value

async def delete(self, *keys: str) -> None:
Expand Down Expand Up @@ -101,7 +101,7 @@ def test_entity_endpoint_returns_miss_then_hit_header_and_populates_cache() -> N
assert engine.calls == [("order", "ORD-20260401-0001", None, "acme")]
key, ttl, cached_payload = redis_client.set_calls[0]
assert key == cache_key
assert int(ttl.total_seconds()) == ENTITY_TTL_SECONDS
assert ttl == ENTITY_TTL_SECONDS
assert json.loads(cached_payload)["payload"]["entity_id"] == "ORD-20260401-0001"


Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_versioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ def __init__(self) -> None:
async def get(self, key: str):
return self.data.get(key)

async def setex(self, key: str, ttl, value: str) -> None:
async def set(self, key: str, value: str, ex=None) -> None:
self.data[key] = value

async def keys(self, pattern: str):
Expand Down