From e25cc415e44b2c81fe1c4eb4e9d6f044d905f5e3 Mon Sep 17 00:00:00 2001 From: JuliaEdom Date: Sun, 28 Jun 2026 00:35:50 +0300 Subject: [PATCH 1/2] fix(cache): use redis set(ex=) instead of deprecated setex MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `Redis.setex` is `@deprecated_function` in redis-py 8.0.0 ("Use 'set' instead") and emits a DeprecationWarning from the query-cache hot path on every cached write. Switch `QueryCache.set` to `set(key, value, ex=ttl)`, which is behaviorally identical (int seconds), and drop the now-unused `timedelta` import. All six in-repo Redis test doubles (unit cache/entity_cache/versioning, integration tenant-isolation, chaos RESP client) implemented `setex`; they move to `set(self, key, value, ex=None)` with the matching argument order, and the chaos RESP client now issues `SET … EX` over the wire. The two `set_calls` ttl assertions compare the integer `ex` directly. Verified no-Docker: ruff + mypy clean, full unit suite 1096 passed / 1 skipped (the redis.setex DeprecationWarning is gone). The integration and chaos doubles change symmetrically and are validated by their CI jobs. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/serving/cache.py | 5 ++--- tests/chaos/conftest.py | 6 +++--- tests/integration/test_tenant_isolation.py | 4 ++-- tests/unit/test_cache.py | 14 +++++++------- tests/unit/test_entity_cache.py | 6 +++--- tests/unit/test_versioning.py | 2 +- 6 files changed, 18 insertions(+), 19 deletions(-) diff --git a/src/serving/cache.py b/src/serving/cache.py index 7a02f8a..97c371f 100644 --- a/src/serving/cache.py +++ b/src/serving/cache.py @@ -2,7 +2,6 @@ import inspect import json -from datetime import timedelta from typing import Any, cast import structlog @@ -68,10 +67,10 @@ async def set(self, key: str, value: dict, ttl: int = 30) -> None: ) return try: - await self._redis.setex( + await self._redis.set( key, - timedelta(seconds=ttl), json.dumps(jsonable_encoder(value)), + ex=ttl, ) except Exception as exc: logger.warning( diff --git a/tests/chaos/conftest.py b/tests/chaos/conftest.py index 8079c9c..4850565 100644 --- a/tests/chaos/conftest.py +++ b/tests/chaos/conftest.py @@ -158,9 +158,9 @@ def __init__(self, host: str, port: int) -> None: async def get(self, key: str): return await self._command("GET", key) - async def setex(self, key: str, ttl, value: str): - seconds = int(ttl.total_seconds()) if hasattr(ttl, "total_seconds") else int(ttl) - await self._command("SETEX", key, str(seconds), value) + async def set(self, key: str, value: str, ex=None): + seconds = int(ex.total_seconds()) if hasattr(ex, "total_seconds") else int(ex) + await self._command("SET", key, value, "EX", str(seconds)) async def keys(self, pattern: str): return await self._command("KEYS", pattern) diff --git a/tests/integration/test_tenant_isolation.py b/tests/integration/test_tenant_isolation.py index 10cc7a1..812eabf 100644 --- a/tests/integration/test_tenant_isolation.py +++ b/tests/integration/test_tenant_isolation.py @@ -195,7 +195,7 @@ def __init__(self) -> None: async def get(self, key: str): return self.data.get(key) - async def setex(self, key: str, ttl, value: str) -> None: + async def set(self, key: str, value: str, ex=None) -> None: self.data[key] = value async def keys(self, pattern: str): @@ -265,7 +265,7 @@ def __init__(self) -> None: async def get(self, key: str): return self.data.get(key) - async def setex(self, key: str, ttl, value: str) -> None: + async def set(self, key: str, value: str, ex=None) -> None: self.data[key] = value async def keys(self, pattern: str): diff --git a/tests/unit/test_cache.py b/tests/unit/test_cache.py index 4811187..19d0cf7 100644 --- a/tests/unit/test_cache.py +++ b/tests/unit/test_cache.py @@ -18,7 +18,7 @@ def __init__(self): self.deleted: list[tuple[str, ...]] = [] self.set_calls: list[tuple[str, object, str]] = [] self.raise_on_get: Exception | None = None - self.raise_on_setex: Exception | None = None + self.raise_on_set: Exception | None = None self.raise_on_keys: Exception | None = None self.closed = False @@ -27,10 +27,10 @@ async def get(self, key: str): raise self.raise_on_get return self.data.get(key) - async def setex(self, key: str, ttl, value: str): - if self.raise_on_setex is not None: - raise self.raise_on_setex - self.set_calls.append((key, ttl, value)) + async def set(self, key: str, value: str, ex=None): + if self.raise_on_set is not None: + raise self.raise_on_set + self.set_calls.append((key, ex, value)) self.data[key] = value async def keys(self, pattern: str): @@ -139,7 +139,7 @@ async def test_query_cache_set_serializes_payload_with_ttl(): key, ttl, value = redis_client.set_calls[0] assert key == "metric:revenue:1h:now" - assert int(ttl.total_seconds()) == 45 + assert ttl == 45 assert json.loads(value) == {"value": 99.0} @@ -215,7 +215,7 @@ def test_metric_endpoint_warns_and_serves_uncached_when_redis_is_unavailable(mon monkeypatch.setattr(cache_module, "logger", logger) redis_client = FakeRedis() redis_client.raise_on_get = RuntimeError("redis down") - redis_client.raise_on_setex = RuntimeError("redis down") + redis_client.raise_on_set = RuntimeError("redis down") engine = EngineStub() client = _build_client(QueryCache(redis_client=redis_client), engine) diff --git a/tests/unit/test_entity_cache.py b/tests/unit/test_entity_cache.py index 5a337ad..4f97f52 100644 --- a/tests/unit/test_entity_cache.py +++ b/tests/unit/test_entity_cache.py @@ -28,8 +28,8 @@ def __init__(self) -> None: async def get(self, key: str): return self.data.get(key) - async def setex(self, key: str, ttl, value: str) -> None: - self.set_calls.append((key, ttl, value)) + async def set(self, key: str, value: str, ex=None) -> None: + self.set_calls.append((key, ex, value)) self.data[key] = value async def delete(self, *keys: str) -> None: @@ -101,7 +101,7 @@ def test_entity_endpoint_returns_miss_then_hit_header_and_populates_cache() -> N assert engine.calls == [("order", "ORD-20260401-0001", None, "acme")] key, ttl, cached_payload = redis_client.set_calls[0] assert key == cache_key - assert int(ttl.total_seconds()) == ENTITY_TTL_SECONDS + assert ttl == ENTITY_TTL_SECONDS assert json.loads(cached_payload)["payload"]["entity_id"] == "ORD-20260401-0001" diff --git a/tests/unit/test_versioning.py b/tests/unit/test_versioning.py index 0f81e2a..62cdb0c 100644 --- a/tests/unit/test_versioning.py +++ b/tests/unit/test_versioning.py @@ -306,7 +306,7 @@ def __init__(self) -> None: async def get(self, key: str): return self.data.get(key) - async def setex(self, key: str, ttl, value: str) -> None: + async def set(self, key: str, value: str, ex=None) -> None: self.data[key] = value async def keys(self, pattern: str): From f7c9e7514d64cfe5485908486bae86779190d151 Mon Sep 17 00:00:00 2001 From: JuliaEdom Date: Sun, 28 Jun 2026 00:36:09 +0300 Subject: [PATCH 2/2] docs: sync version story to v1.5.0 and clarify requirements.txt The README Status section and release badge still described v1.4.0 as the current line even though v1.5.0 is tagged and published, and the CHANGELOG [Unreleased] section did not record the DV2 re-architecture already on main. - README: badge v1.4 -> v1.5, "current release line" -> v1.5.0, extend the release arc to five increments with a v1.5.0 bullet (argon2id O(1) key hashing, NL->SQL guard bypass fix, strict-mypy expansion), and add a note that main carries post-v1.5.0 work pending the next tag. - CHANGELOG [Unreleased]: document the DV2 raw vault migration ClickHouse -> PostgreSQL (#91), the PyIceberg sink backed by real MinIO (#92), the LISTEN/NOTIFY OLTP->vault freshness (#93), and the dependency batch (#94). - requirements.txt: add a header explaining it is a supplemental OTel pin set installed on top of the pyproject package by the e2e/mutation/staging workflows and the security Safety scan, not the full dependency set (pyproject.toml is the source of truth). load_requirements() skips comment lines and `pip -r` ignores them, so the header is non-breaking. Co-Authored-By: Claude Opus 4.8 (1M context) --- CHANGELOG.md | 32 ++++++++++++++++++++++++++++++++ README.md | 20 ++++++++++++++++---- requirements.txt | 11 +++++++++++ 3 files changed, 59 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ee7168..4b07956 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,31 @@ All notable changes to AgentFlow are documented in this file. ### Added +- **DV2 raw vault migrated from ClickHouse to PostgreSQL** with a cloud + supplier / product reference (`warehouse/agentflow/dv2/`). Hubs, links, and + satellites are emitted in a PostgreSQL dialect (`argMax` → `DISTINCT ON`, + `splitByString` → `split_part`); both ingestion feeds (the X5 loader and the + reference loader) repoint to a shared, parameterized `pg_vault_writer`; and + OLTP → vault promotion runs as an in-database `INSERT … SELECT` now that OLTP + and the vault share one engine. The ClickHouse dialect regenerates + byte-for-byte and is retained for optional mart-serving. Each generated + `INSERT` is parsed by `sqlglot` in `tests/unit/test_dv2_postgres_ingestion.py` + to assert every interpolated column exists in the committed DDL. (#91) +- **PyIceberg sink backed by a real MinIO object store** — the REST catalog + now writes through `S3FileIO` to `s3://agentflow-lake/warehouse` (the same + bucket the Flink path uses) instead of an ephemeral `/tmp/warehouse` + `HadoopFileIO`. A self-contained `docker-compose.iceberg.yml` (MinIO + + bucket-init + REST catalog) and env-overridable credentials in + `config/iceberg.yaml` make the local catalog object-store-backed; a no-Docker + guard asserts an `s3://` warehouse never triggers a local `mkdir`. (#92) +- **Event-driven OLTP → vault freshness via PostgreSQL `LISTEN`/`NOTIFY`** — + `AFTER INSERT/UPDATE` triggers on each `ops_` table emit + `pg_notify('dv2_vault_refresh', …)`, and a guarded listener runs an + idempotent promote on each event (push, not polling), the PostgreSQL + equivalent of the ClickHouse `MaterializedPostgreSQL` CDC path. Lag is + observed on the server clock (`db_now`) to stay free of host/container + clock skew. The driver-agnostic core is covered no-Docker by + `tests/unit/test_dv2_freshness_listen_notify.py`. (#93) - OpenSSF Scorecard $0 supply-chain security posture channel: `.github/workflows/scorecard.yml` (`ossf/scorecard-action@v2.4.3`) runs on push to `main`, weekly, and on branch-protection changes with top-level @@ -41,6 +66,13 @@ All notable changes to AgentFlow are documented in this file. ### Changed +- Dependency maintenance batch (consolidated from eight Dependabot PRs): the + Docker `python` base digest, `cp-kafka-connect-base` 7.9.7 → 7.9.8, + `apache-flink` 2.2.1 → 2.3.0 (validated by the Flink smoke job), and the SDK + `vitest` / `schemathesis` bumps, plus the GitHub Pages action major bumps + (`checkout` v7, `configure-pages` v6, `upload-pages-artifact` v5, + `deploy-pages` v5) — all SHA-pinned and validated on `main` by the Deploy + Pages and Flink Smoke runs. (#94) - Production CDC onboarding, PMF/pricing evidence, a production-hardware benchmark, and an external pen-test attestation are documented as out of scope for the current plan. Their acceptance criteria require external diff --git a/README.md b/README.md index 34125f2..5659b4c 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ > Event-native metrics layer: business metrics that move when events happen — measured **1.1 s p50** event-to-metric freshness on production defaults. Live entity lookups, typed contracts, dual-language SDKs, and release-gated delivery for people, dashboards, services, and AI agents alike. -[![Release gate](https://img.shields.io/badge/release_gate-v1.4_published-brightgreen)](docs/dv2-multi-branch/RELEASE_STATUS.md) +[![Release gate](https://img.shields.io/badge/release_gate-v1.5_published-brightgreen)](docs/dv2-multi-branch/RELEASE_STATUS.md) [![codecov](https://codecov.io/gh/brownjuly2003-code/agentflow/branch/main/graph/badge.svg)](https://codecov.io/gh/brownjuly2003-code/agentflow) [![Python](https://img.shields.io/badge/python-3.11+-blue)](pyproject.toml) [![License](https://img.shields.io/badge/license-MIT-blue)](LICENSE) @@ -24,7 +24,7 @@ Consumers are whoever needs the number now: humans, dashboards, downstream servi - **Measured event-to-metric freshness** — an event entering the pipeline is reflected in `GET /v1/metrics/*` in **1.06 s p50 / 1.99 s p95** on production defaults (event-driven cache invalidation, no webhook registration), tunable to **238 ms p50**; a plain TTL cache on the same pipeline sits at ~15 s. Reproducible via `python scripts/benchmark_freshness.py` → [freshness benchmark](docs/freshness-benchmark.md) - **Lineage as a contract** — all six metrics declare their source events, serving table, and a 2.5 s p95 staleness budget in versioned contracts, exposed through `/v1/catalog` and `/v1/contracts` and pinned by tests against the actual write path -- **Published release line through `v1.4.0`** on PyPI (`agentflow-runtime`, `agentflow-client`) and npm (`@yuliaedomskikh/agentflow-client`) via OIDC Trusted Publishers with SLSA provenance on every artifact +- **Published release line through `v1.5.0`** on PyPI (`agentflow-runtime`, `agentflow-client`) and npm (`@yuliaedomskikh/agentflow-client`) via OIDC Trusted Publishers with SLSA provenance on every artifact - **Tested and gated** — 960+ unit tests plus a broad Windows no-Docker suite; CI enforces 12 required status checks (lint, schema, unit, integration, helm, perf, terraform, bandit, safety, npm-audit, trivy, contract) through branch protection - **Dual SDK parity** across Python and TypeScript — retries, circuit breakers, batching, pagination, contract pinning, idempotency keys, `as_of` historical reads — over sub-second entity lookups (p50 `38–55 ms`, p99 `167 ms` on local hardware) - **Security in the hot path** — tenant isolation on every read surface, parameterized queries, `sqlglot` AST validation for NL-to-SQL, fail-closed auth, secret scrubbing, and a Bandit gate for new findings @@ -147,12 +147,12 @@ python scripts/bandit_diff.py .bandit-baseline.json .tmp/bandit-current.json ## Status -**`v1.4.0` is the current release line** — PyPI `agentflow-runtime` / +**`v1.5.0` is the current release line** — PyPI `agentflow-runtime` / `agentflow-client` and npm `@yuliaedomskikh/agentflow-client`, all published via OIDC Trusted Publishers with SLSA provenance attestations. CI on `main` is green across all 12 required checks. -The `v1.1.0` → `v1.4.0` arc landed in four increments on top of a security +The `v1.1.0` → `v1.5.0` arc landed in five increments on top of a security audit-closure sprint: - **`v1.1.0`** — audit closure: tenant isolation across every read @@ -170,6 +170,18 @@ audit-closure sprint: templates, contract/DORA CI hardening, repo hygiene, and a dependency wave (`mypy`, Terraform AWS provider, TypeScript, GitHub Actions, Vitest). No runtime API changes from `v1.3.0`. +- **`v1.5.0`** — security & correctness hardening: argon2id key hashing + with an O(1) peppered lookup index (M-C4), an NL→SQL guard bypass fix + (typed `read_csv` / `read_parquet` scan functions now denied in + projection position), `sqlglot` control-byte and mutation-target + repairs, and a strict-`mypy` expansion across the orchestration and + freshness slices. No public API changes. + +Beyond the tagged line, `main` carries post-`v1.5.0` work pending the next +tag: the DV2 raw vault migrated from ClickHouse to PostgreSQL with a cloud +supplier reference, the PyIceberg sink backed by a real MinIO object store, +and event-driven OLTP→vault freshness via PostgreSQL `LISTEN`/`NOTIFY`. See +the `[Unreleased]` section of the [changelog](CHANGELOG.md) for details. ### Scope diff --git a/requirements.txt b/requirements.txt index 5116e44..e749357 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,14 @@ +# Supplemental runtime pins — NOT the full dependency set. +# +# The source of truth for installing AgentFlow is pyproject.toml +# ([project.dependencies] + the optional-dependency extras). Install the +# project itself with `pip install .` (or `.[dev,cloud,...]`); this file does +# not stand alone. +# +# It pins the OpenTelemetry observability stack explicitly so the e2e, +# mutation, and staging-deploy workflows install a consistent OTel floor on +# top of the package install (`pip install -r requirements.txt`), and so the +# security.yml Safety scan resolves it alongside pyproject's runtime deps. opentelemetry-sdk>=1.41,<2 opentelemetry-exporter-otlp-proto-grpc>=1.41,<2 opentelemetry-instrumentation-fastapi>=0.62b0,<1