diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ee7168..4b07956 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,31 @@ All notable changes to AgentFlow are documented in this file. ### Added +- **DV2 raw vault migrated from ClickHouse to PostgreSQL** with a cloud + supplier / product reference (`warehouse/agentflow/dv2/`). Hubs, links, and + satellites are emitted in a PostgreSQL dialect (`argMax` → `DISTINCT ON`, + `splitByString` → `split_part`); both ingestion feeds (the X5 loader and the + reference loader) repoint to a shared, parameterized `pg_vault_writer`; and + OLTP → vault promotion runs as an in-database `INSERT … SELECT` now that OLTP + and the vault share one engine. The ClickHouse dialect regenerates + byte-for-byte and is retained for optional mart-serving. Each generated + `INSERT` is parsed by `sqlglot` in `tests/unit/test_dv2_postgres_ingestion.py` + to assert every interpolated column exists in the committed DDL. (#91) +- **PyIceberg sink backed by a real MinIO object store** — the REST catalog + now writes through `S3FileIO` to `s3://agentflow-lake/warehouse` (the same + bucket the Flink path uses) instead of an ephemeral `/tmp/warehouse` + `HadoopFileIO`. A self-contained `docker-compose.iceberg.yml` (MinIO + + bucket-init + REST catalog) and env-overridable credentials in + `config/iceberg.yaml` make the local catalog object-store-backed; a no-Docker + guard asserts an `s3://` warehouse never triggers a local `mkdir`. (#92) +- **Event-driven OLTP → vault freshness via PostgreSQL `LISTEN`/`NOTIFY`** — + `AFTER INSERT/UPDATE` triggers on each `ops_` table emit + `pg_notify('dv2_vault_refresh', …)`, and a guarded listener runs an + idempotent promote on each event (push, not polling), the PostgreSQL + equivalent of the ClickHouse `MaterializedPostgreSQL` CDC path. Lag is + observed on the server clock (`db_now`) to stay free of host/container + clock skew. The driver-agnostic core is covered no-Docker by + `tests/unit/test_dv2_freshness_listen_notify.py`. (#93) - OpenSSF Scorecard $0 supply-chain security posture channel: `.github/workflows/scorecard.yml` (`ossf/scorecard-action@v2.4.3`) runs on push to `main`, weekly, and on branch-protection changes with top-level @@ -41,6 +66,13 @@ All notable changes to AgentFlow are documented in this file. ### Changed +- Dependency maintenance batch (consolidated from eight Dependabot PRs): the + Docker `python` base digest, `cp-kafka-connect-base` 7.9.7 → 7.9.8, + `apache-flink` 2.2.1 → 2.3.0 (validated by the Flink smoke job), and the SDK + `vitest` / `schemathesis` bumps, plus the GitHub Pages action major bumps + (`checkout` v7, `configure-pages` v6, `upload-pages-artifact` v5, + `deploy-pages` v5) — all SHA-pinned and validated on `main` by the Deploy + Pages and Flink Smoke runs. (#94) - Production CDC onboarding, PMF/pricing evidence, a production-hardware benchmark, and an external pen-test attestation are documented as out of scope for the current plan. Their acceptance criteria require external diff --git a/README.md b/README.md index 34125f2..5659b4c 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ > Event-native metrics layer: business metrics that move when events happen — measured **1.1 s p50** event-to-metric freshness on production defaults. Live entity lookups, typed contracts, dual-language SDKs, and release-gated delivery for people, dashboards, services, and AI agents alike. -[![Release gate](https://img.shields.io/badge/release_gate-v1.4_published-brightgreen)](docs/dv2-multi-branch/RELEASE_STATUS.md) +[![Release gate](https://img.shields.io/badge/release_gate-v1.5_published-brightgreen)](docs/dv2-multi-branch/RELEASE_STATUS.md) [![codecov](https://codecov.io/gh/brownjuly2003-code/agentflow/branch/main/graph/badge.svg)](https://codecov.io/gh/brownjuly2003-code/agentflow) [![Python](https://img.shields.io/badge/python-3.11+-blue)](pyproject.toml) [![License](https://img.shields.io/badge/license-MIT-blue)](LICENSE) @@ -24,7 +24,7 @@ Consumers are whoever needs the number now: humans, dashboards, downstream servi - **Measured event-to-metric freshness** — an event entering the pipeline is reflected in `GET /v1/metrics/*` in **1.06 s p50 / 1.99 s p95** on production defaults (event-driven cache invalidation, no webhook registration), tunable to **238 ms p50**; a plain TTL cache on the same pipeline sits at ~15 s. Reproducible via `python scripts/benchmark_freshness.py` → [freshness benchmark](docs/freshness-benchmark.md) - **Lineage as a contract** — all six metrics declare their source events, serving table, and a 2.5 s p95 staleness budget in versioned contracts, exposed through `/v1/catalog` and `/v1/contracts` and pinned by tests against the actual write path -- **Published release line through `v1.4.0`** on PyPI (`agentflow-runtime`, `agentflow-client`) and npm (`@yuliaedomskikh/agentflow-client`) via OIDC Trusted Publishers with SLSA provenance on every artifact +- **Published release line through `v1.5.0`** on PyPI (`agentflow-runtime`, `agentflow-client`) and npm (`@yuliaedomskikh/agentflow-client`) via OIDC Trusted Publishers with SLSA provenance on every artifact - **Tested and gated** — 960+ unit tests plus a broad Windows no-Docker suite; CI enforces 12 required status checks (lint, schema, unit, integration, helm, perf, terraform, bandit, safety, npm-audit, trivy, contract) through branch protection - **Dual SDK parity** across Python and TypeScript — retries, circuit breakers, batching, pagination, contract pinning, idempotency keys, `as_of` historical reads — over sub-second entity lookups (p50 `38–55 ms`, p99 `167 ms` on local hardware) - **Security in the hot path** — tenant isolation on every read surface, parameterized queries, `sqlglot` AST validation for NL-to-SQL, fail-closed auth, secret scrubbing, and a Bandit gate for new findings @@ -147,12 +147,12 @@ python scripts/bandit_diff.py .bandit-baseline.json .tmp/bandit-current.json ## Status -**`v1.4.0` is the current release line** — PyPI `agentflow-runtime` / +**`v1.5.0` is the current release line** — PyPI `agentflow-runtime` / `agentflow-client` and npm `@yuliaedomskikh/agentflow-client`, all published via OIDC Trusted Publishers with SLSA provenance attestations. CI on `main` is green across all 12 required checks. -The `v1.1.0` → `v1.4.0` arc landed in four increments on top of a security +The `v1.1.0` → `v1.5.0` arc landed in five increments on top of a security audit-closure sprint: - **`v1.1.0`** — audit closure: tenant isolation across every read @@ -170,6 +170,18 @@ audit-closure sprint: templates, contract/DORA CI hardening, repo hygiene, and a dependency wave (`mypy`, Terraform AWS provider, TypeScript, GitHub Actions, Vitest). No runtime API changes from `v1.3.0`. +- **`v1.5.0`** — security & correctness hardening: argon2id key hashing + with an O(1) peppered lookup index (M-C4), an NL→SQL guard bypass fix + (typed `read_csv` / `read_parquet` scan functions now denied in + projection position), `sqlglot` control-byte and mutation-target + repairs, and a strict-`mypy` expansion across the orchestration and + freshness slices. No public API changes. + +Beyond the tagged line, `main` carries post-`v1.5.0` work pending the next +tag: the DV2 raw vault migrated from ClickHouse to PostgreSQL with a cloud +supplier reference, the PyIceberg sink backed by a real MinIO object store, +and event-driven OLTP→vault freshness via PostgreSQL `LISTEN`/`NOTIFY`. See +the `[Unreleased]` section of the [changelog](CHANGELOG.md) for details. ### Scope diff --git a/requirements.txt b/requirements.txt index 5116e44..e749357 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,14 @@ +# Supplemental runtime pins — NOT the full dependency set. +# +# The source of truth for installing AgentFlow is pyproject.toml +# ([project.dependencies] + the optional-dependency extras). Install the +# project itself with `pip install .` (or `.[dev,cloud,...]`); this file does +# not stand alone. +# +# It pins the OpenTelemetry observability stack explicitly so the e2e, +# mutation, and staging-deploy workflows install a consistent OTel floor on +# top of the package install (`pip install -r requirements.txt`), and so the +# security.yml Safety scan resolves it alongside pyproject's runtime deps. opentelemetry-sdk>=1.41,<2 opentelemetry-exporter-otlp-proto-grpc>=1.41,<2 opentelemetry-instrumentation-fastapi>=0.62b0,<1 diff --git a/src/serving/cache.py b/src/serving/cache.py index 7a02f8a..97c371f 100644 --- a/src/serving/cache.py +++ b/src/serving/cache.py @@ -2,7 +2,6 @@ import inspect import json -from datetime import timedelta from typing import Any, cast import structlog @@ -68,10 +67,10 @@ async def set(self, key: str, value: dict, ttl: int = 30) -> None: ) return try: - await self._redis.setex( + await self._redis.set( key, - timedelta(seconds=ttl), json.dumps(jsonable_encoder(value)), + ex=ttl, ) except Exception as exc: logger.warning( diff --git a/tests/chaos/conftest.py b/tests/chaos/conftest.py index 8079c9c..4850565 100644 --- a/tests/chaos/conftest.py +++ b/tests/chaos/conftest.py @@ -158,9 +158,9 @@ def __init__(self, host: str, port: int) -> None: async def get(self, key: str): return await self._command("GET", key) - async def setex(self, key: str, ttl, value: str): - seconds = int(ttl.total_seconds()) if hasattr(ttl, "total_seconds") else int(ttl) - await self._command("SETEX", key, str(seconds), value) + async def set(self, key: str, value: str, ex=None): + seconds = int(ex.total_seconds()) if hasattr(ex, "total_seconds") else int(ex) + await self._command("SET", key, value, "EX", str(seconds)) async def keys(self, pattern: str): return await self._command("KEYS", pattern) diff --git a/tests/integration/test_tenant_isolation.py b/tests/integration/test_tenant_isolation.py index 10cc7a1..812eabf 100644 --- a/tests/integration/test_tenant_isolation.py +++ b/tests/integration/test_tenant_isolation.py @@ -195,7 +195,7 @@ def __init__(self) -> None: async def get(self, key: str): return self.data.get(key) - async def setex(self, key: str, ttl, value: str) -> None: + async def set(self, key: str, value: str, ex=None) -> None: self.data[key] = value async def keys(self, pattern: str): @@ -265,7 +265,7 @@ def __init__(self) -> None: async def get(self, key: str): return self.data.get(key) - async def setex(self, key: str, ttl, value: str) -> None: + async def set(self, key: str, value: str, ex=None) -> None: self.data[key] = value async def keys(self, pattern: str): diff --git a/tests/unit/test_cache.py b/tests/unit/test_cache.py index 4811187..19d0cf7 100644 --- a/tests/unit/test_cache.py +++ b/tests/unit/test_cache.py @@ -18,7 +18,7 @@ def __init__(self): self.deleted: list[tuple[str, ...]] = [] self.set_calls: list[tuple[str, object, str]] = [] self.raise_on_get: Exception | None = None - self.raise_on_setex: Exception | None = None + self.raise_on_set: Exception | None = None self.raise_on_keys: Exception | None = None self.closed = False @@ -27,10 +27,10 @@ async def get(self, key: str): raise self.raise_on_get return self.data.get(key) - async def setex(self, key: str, ttl, value: str): - if self.raise_on_setex is not None: - raise self.raise_on_setex - self.set_calls.append((key, ttl, value)) + async def set(self, key: str, value: str, ex=None): + if self.raise_on_set is not None: + raise self.raise_on_set + self.set_calls.append((key, ex, value)) self.data[key] = value async def keys(self, pattern: str): @@ -139,7 +139,7 @@ async def test_query_cache_set_serializes_payload_with_ttl(): key, ttl, value = redis_client.set_calls[0] assert key == "metric:revenue:1h:now" - assert int(ttl.total_seconds()) == 45 + assert ttl == 45 assert json.loads(value) == {"value": 99.0} @@ -215,7 +215,7 @@ def test_metric_endpoint_warns_and_serves_uncached_when_redis_is_unavailable(mon monkeypatch.setattr(cache_module, "logger", logger) redis_client = FakeRedis() redis_client.raise_on_get = RuntimeError("redis down") - redis_client.raise_on_setex = RuntimeError("redis down") + redis_client.raise_on_set = RuntimeError("redis down") engine = EngineStub() client = _build_client(QueryCache(redis_client=redis_client), engine) diff --git a/tests/unit/test_entity_cache.py b/tests/unit/test_entity_cache.py index 5a337ad..4f97f52 100644 --- a/tests/unit/test_entity_cache.py +++ b/tests/unit/test_entity_cache.py @@ -28,8 +28,8 @@ def __init__(self) -> None: async def get(self, key: str): return self.data.get(key) - async def setex(self, key: str, ttl, value: str) -> None: - self.set_calls.append((key, ttl, value)) + async def set(self, key: str, value: str, ex=None) -> None: + self.set_calls.append((key, ex, value)) self.data[key] = value async def delete(self, *keys: str) -> None: @@ -101,7 +101,7 @@ def test_entity_endpoint_returns_miss_then_hit_header_and_populates_cache() -> N assert engine.calls == [("order", "ORD-20260401-0001", None, "acme")] key, ttl, cached_payload = redis_client.set_calls[0] assert key == cache_key - assert int(ttl.total_seconds()) == ENTITY_TTL_SECONDS + assert ttl == ENTITY_TTL_SECONDS assert json.loads(cached_payload)["payload"]["entity_id"] == "ORD-20260401-0001" diff --git a/tests/unit/test_versioning.py b/tests/unit/test_versioning.py index 0f81e2a..62cdb0c 100644 --- a/tests/unit/test_versioning.py +++ b/tests/unit/test_versioning.py @@ -306,7 +306,7 @@ def __init__(self) -> None: async def get(self, key: str): return self.data.get(key) - async def setex(self, key: str, ttl, value: str) -> None: + async def set(self, key: str, value: str, ex=None) -> None: self.data[key] = value async def keys(self, pattern: str):