diff --git a/pyproject.toml b/pyproject.toml index bf3ae6e..b0168af 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -207,15 +207,19 @@ asyncio_mode = "auto" # mutates: scripts/mutation_report.py drives the gate from its own # MODULE_TARGETS, which now mutates sdk/agentflow/retry.py, # src/serving/semantic_layer/sql_guard.py, src/serving/masking.py, -# src/serving/api/rate_limiter.py, src/serving/semantic_layer/query/sql_builder.py -# AND src/serving/semantic_layer/query/nl_queries.py live. The serving modules are -# mutated as a top-level `serving` package against duckdb-free narrow tests: -# mutmut's trampoline rejects a module name starting with `src.`, which (not -# duckdb) was the real blocker. The remaining declared serving surface -- the auth -# manager / key_rotation -- stays declared-only for now: its unit tests pull the -# duckdb-backed query engine, so mutating it in isolation needs a duckdb-free test -# (the pattern the six live modules now use); the blocker is the test import -# chain, not the module. See scripts/mutation_report.py. +# src/serving/api/rate_limiter.py, src/serving/semantic_layer/query/sql_builder.py, +# src/serving/semantic_layer/query/nl_queries.py AND src/serving/api/auth/manager.py +# live. The serving modules are mutated as a top-level `serving` package against +# duckdb-free narrow tests: mutmut's trampoline rejects a module name starting with +# `src.`, which (not duckdb) was the real blocker. manager.py runs at an honest +# 0.80 threshold (not 0.90): it is a large stateful auth class whose residual +# survivors are equivalent mutants (structured-logging args, model_copy updates +# equal to their defaults, redis-url strings masked by the `_redis = None` +# override, the env-only-dead write path) -- every behaviour-reachable mutant, +# including every auth-bypass and throttle off-by-one, is killed. The remaining +# declared serving surface -- auth/key_rotation -- stays declared-only until it +# gets a duckdb-free test of its own (the pattern the live modules use); the +# blocker is the test import chain, not the module. See scripts/mutation_report.py. paths_to_mutate = [ "src/serving/api/auth/manager.py", "src/serving/api/auth/key_rotation.py", diff --git a/scripts/mutation_report.py b/scripts/mutation_report.py index a74c378..132ac81 100644 --- a/scripts/mutation_report.py +++ b/scripts/mutation_report.py @@ -30,20 +30,23 @@ class ModuleTarget: # (a) copy the module so it imports as a top-level package and (b) pair it with a # NARROW test that does not pull the duckdb-backed engine import chain. So # retry.py mutates as agentflow.retry (from sdk/agentflow), and sql_guard, -# masking, rate_limiter, sql_builder and nl_queries mutate as serving.* (from -# src/serving) against duckdb-free tests. Each duckdb-free test also avoids -# fixtures and calls the module's methods directly: under mutate_only_covered_lines -# a fixture-built object left every method line uncovered, so only __init__ got -# mutated. rate_limiter additionally imports `from src.constants import ...`; its -# test registers a tiny src.constants stub before importing the module, because -# the serving workspace copies src/serving -> top-level `serving` without `src`. -# sql_builder and nl_queries live under the query package whose __init__ imports -# the duckdb-backed QueryEngine, so their tests also stub -# serving.semantic_layer.query.{engine,contracts} (and the src.* helpers) before -# import. The only remaining declared-but-not-live serving surface is auth -# (manager / key_rotation), whose tests still need the duckdb engine; it stays -# declared-only in the [tool.mutmut] policy until it gets duckdb-free unit tests -# of its own. +# masking, rate_limiter, sql_builder, nl_queries and auth/manager mutate as +# serving.* (from src/serving) against duckdb-free tests. Each duckdb-free test +# also avoids fixtures and calls the module's methods directly: under +# mutate_only_covered_lines a fixture-built object left every method line +# uncovered, so only __init__ got mutated. rate_limiter additionally imports +# `from src.constants import ...`; its test registers a tiny src.constants stub +# before importing the module, because the serving workspace copies src/serving +# -> top-level `serving` without `src`. sql_builder and nl_queries live under the +# query package whose __init__ imports the duckdb-backed QueryEngine, so their +# tests also stub serving.semantic_layer.query.{engine,contracts} (and the src.* +# helpers) before import. auth/manager imports as the auth package whose __init__ +# imports duckdb plus the key_rotation/usage_table chain, but manager.py itself +# never calls duckdb (all usage-table I/O lives in usage_table.py), so its test +# swaps in a fake top-level `duckdb` module and mutates manager duckdb-free. The +# only remaining declared-but-not-live serving surface is auth/key_rotation, +# which uses the duckdb connection directly; it stays declared-only in the +# [tool.mutmut] policy until it gets a duckdb-free unit test of its own. MODULE_TARGETS = { Path("agentflow/retry.py"): ModuleTarget( threshold=0.75, @@ -69,6 +72,25 @@ class ModuleTarget: threshold=0.90, tests=("tests/unit/test_nl_queries_mutation.py",), ), + # manager.py runs at 0.80, not the 0.90 the pure-function guards (sql_guard, + # masking, sql_builder, ...) hold. It is a ~400-line stateful auth class whose + # surviving mutants are dominated by EQUIVALENTS that no behaviour-level test + # can kill: structured-logging arguments (the auth logger event names / kwargs), + # `model_copy(update=...)` dicts whose mutated field equals its default + # ("matched_slot" already defaults to "current"; "key"==api_key on a plaintext + # match), the redis-url strings masked by the `_redis = None` override under the + # duckdb-free harness, and the config-file write path that is dead under the + # env-only test. Every BEHAVIOUR-reachable mutant is killed -- crucially every + # auth bypass (the verify_api_key argument-swap mutants on the indexed / legacy + # / previous-key paths and in _matches_key_material) and every rate-limit / + # failed-auth throttle off-by-one. Local mutmut (py3.10) scores 405/483 = 83.9%; + # 0.80 leaves headroom for equivalent-mutant noise while still enforcing a real + # floor (the do-nothing baseline was 76.5%). key_rotation is the next target and + # stays declared-only until it gets its own duckdb-free test. + Path("serving/api/auth/manager.py"): ModuleTarget( + threshold=0.80, + tests=("tests/unit/test_auth_manager_mutation.py",), + ), } STATUS_BY_EXIT_CODE = { diff --git a/tests/unit/test_auth_manager_mutation.py b/tests/unit/test_auth_manager_mutation.py new file mode 100644 index 0000000..9221717 --- /dev/null +++ b/tests/unit/test_auth_manager_mutation.py @@ -0,0 +1,1055 @@ +"""Narrow, duckdb-free mutation test for the API-key auth manager +(src/serving/api/auth/manager.py). + +This is the test the mutation gate runs against +``serving/api/auth/manager.py`` (see scripts/mutation_report.py MODULE_TARGETS). +manager.py is the request-authentication boundary: ``authenticate()`` resolves a +presented key to a ``TenantKey`` (or ``None``), ``tenant_key_allowed_tables`` / +``is_entity_allowed`` are the tenant-isolation gates, and the failed-auth window +trio is the per-IP brute-force throttle (audit_28_06_26.md auth surface). A +surviving mutant in any of these is an auth-bypass, a cross-tenant read or a +disabled throttle -- exactly what a mutation gate should pin. + +Design rules, shared with test_rate_limiter_mutation.py / +test_sql_builder_mutation.py / test_nl_queries_mutation.py (see fable_handoff.md +cont.16-21): + +1. **duckdb-free.** Importing ``serving.api.auth.manager`` runs the auth package + ``__init__`` (``import duckdb`` + ``.key_rotation`` + ``.usage_table`` import + chain), and real duckdb's lazy ``_duckdb._sqltypes`` import crashes mutmut's + coverage-instrumented stats pass (the same break ci.yml works around with + ``coverage run``; see .github/workflows/ci.yml). manager.py *itself* never + calls duckdb -- every usage-table read/write lives in usage_table.py -- so a + fake top-level ``duckdb`` module that satisfies the import keeps the mutation + target genuinely duckdb-free. All other deps (rate_limiter, security, + audit_publisher) are duckdb-free already. + +2. **No fixtures for the subject -- inline construction + direct method calls.** + With ``mutate_only_covered_lines = true`` a fixture-built manager left method + lines uncovered (only ``__init__`` mutated, score 0%). ``_build_manager`` is a + plain helper called inside each test, and the methods under test are called + directly so coverage attributes every line. + +3. **Workspace discrimination by top-level ``serving``.** mutmut's mutants/ + workspace copies src/serving to a TOP-LEVEL ``serving`` package; ordinary + pytest has no top-level ``serving`` (only src.serving). Gate the harness stubs + on ``find_spec("serving")`` -- NOT ``import src``, which stays importable via + the editable install even inside the workspace (cont.21 duckdb-crash root + cause). Under ordinary pytest no stub is installed and the real modules load. +""" + +from __future__ import annotations + +import sys +import types +from datetime import date + + +def _in_mutation_workspace() -> bool: + # mutmut's mutants/ workspace copies src/serving to a TOP-LEVEL `serving` + # package (scripts/mutation_report.py prepare_workspace); ordinary pytest has + # no top-level `serving` (only src.serving), so its presence cleanly marks the + # harness. `import src` does NOT discriminate: the editable install keeps the + # real `src` importable even inside the workspace. + import importlib.util + + try: + return importlib.util.find_spec("serving") is not None + except (ImportError, ValueError): + return False + + +def _ensure_module(name: str) -> types.ModuleType: + module = sys.modules.get(name) + if module is None: + module = types.ModuleType(name) + sys.modules[name] = module + return module + + +def _install_harness_stubs() -> None: + # The auth package import chain pulls duckdb at import time + # (auth/__init__.py `import duckdb`; key_rotation / usage_table / + # duckdb_connection). Replace the whole `duckdb` module with a fake that + # supplies the names those modules reference at import / type-check time. + # manager.py never executes a duckdb call, so the fake changes no logic in + # the mutation target; it only keeps the import chain off real duckdb's + # coverage-breaking native extension. Force-overwrite (not _ensure_module) so + # a duckdb already pulled in by the coverage harness can't shadow the fake. + duckdb_stub = types.ModuleType("duckdb") + + class _DuckDBError(Exception): + pass + + duckdb_stub.Error = _DuckDBError + duckdb_stub.IOException = type("IOException", (_DuckDBError,), {}) + duckdb_stub.DuckDBPyConnection = object + + def _connect(*_args: object, **_kwargs: object) -> object: + raise _DuckDBError("duckdb is stubbed out in the mutation harness") + + duckdb_stub.connect = _connect + sys.modules["duckdb"] = duckdb_stub + + +if _in_mutation_workspace(): + _install_harness_stubs() + +try: # mutation-harness workspace exposes it as a top-level package + from serving.api.auth import manager as manager_module +except ImportError: # ordinary pytest sees it under the src package + from src.serving.api.auth import manager as manager_module + +import pytest + +AuthManager = manager_module.AuthManager +TenantKey = manager_module.TenantKey +tenant_key_allowed_tables = manager_module.tenant_key_allowed_tables +get_current_tenant_id = manager_module.get_current_tenant_id +_CURRENT_TENANT_ID = manager_module._CURRENT_TENANT_ID +DEFAULT_RATE_LIMIT_RPM = manager_module.DEFAULT_RATE_LIMIT_RPM +FAILED_AUTH_WINDOW_SECONDS = manager_module.FAILED_AUTH_WINDOW_SECONDS +DEFAULT_RATE_LIMIT_WINDOW_SECONDS = manager_module.DEFAULT_RATE_LIMIT_WINDOW_SECONDS +DEFAULT_ROTATION_GRACE_PERIOD_SECONDS = manager_module.DEFAULT_ROTATION_GRACE_PERIOD_SECONDS + + +# --------------------------------------------------------------------------- # +# Inline construction helpers (no fixtures for the subject). +# --------------------------------------------------------------------------- # + + +class FrozenClock: + def __init__(self, now: float = 1_000.0) -> None: + self.now = now + + def __call__(self) -> float: + return self.now + + +def _key(**overrides: object) -> TenantKey: + base: dict[str, object] = { + "key": "plain-key", + "name": "n", + "tenant": "acme", + "created_at": date(2026, 1, 1), + } + base.update(overrides) + return TenantKey(**base) # type: ignore[arg-type] + + +def _build_manager(**overrides: object) -> AuthManager: + # api_keys_path=None -> env-only config, no file I/O. A neutral db_path + # (not the magic "agentflow_api.duckdb") skips the pipeline-path derivation + # branch, and the path is never connected because no usage is recorded here. + params: dict[str, object] = { + "api_keys_path": None, + "db_path": "usage.duckdb", + "time_source": FrozenClock(1_000.0), + } + params.update(overrides) + return AuthManager(**params) # type: ignore[arg-type] + + +class _FullRemainingLimiter: + """Redis-backed limiter that fails open (full quota remaining) while keeping + a live `_redis` handle -- the condition that triggers `check_rate_limit`'s + in-memory secondary window.""" + + def __init__(self) -> None: + self._redis = object() + + async def check(self, key: str, rpm: int) -> tuple[bool, int, int]: + return True, rpm, 0 + + +# --------------------------------------------------------------------------- # +# Tenant isolation: tenant_key_allowed_tables. +# --------------------------------------------------------------------------- # + + +class TestTenantKeyAllowedTables: + def test_none_tenant_key_returns_all_tables_from_list(self) -> None: + assert tenant_key_allowed_tables(None, ["a", "b", "c"]) == ["a", "b", "c"] + + def test_none_tenant_key_returns_all_table_values_from_mapping(self) -> None: + catalog = {"customer": "dim_customer", "order": "fct_order"} + assert tenant_key_allowed_tables(None, catalog) == ["dim_customer", "fct_order"] + + def test_none_allowed_entity_types_returns_all_tables(self) -> None: + tk = _key(allowed_entity_types=None) + assert tenant_key_allowed_tables(tk, ["a", "b"]) == ["a", "b"] + + def test_filters_mapping_by_entity_type(self) -> None: + tk = _key(allowed_entity_types=["customer"]) + catalog = {"customer": "dim_customer", "order": "fct_order"} + assert tenant_key_allowed_tables(tk, catalog) == ["dim_customer"] + + def test_filters_mapping_by_table_name_when_entity_type_differs(self) -> None: + # The allowlist matches either the entity type OR the resolved table + # name, so naming the physical table also grants it. + tk = _key(allowed_entity_types=["fct_order"]) + catalog = {"customer": "dim_customer", "order": "fct_order"} + assert tenant_key_allowed_tables(tk, catalog) == ["fct_order"] + + def test_filters_list_input_by_allowed(self) -> None: + tk = _key(allowed_entity_types=["a"]) + assert tenant_key_allowed_tables(tk, ["a", "b"]) == ["a"] + + def test_empty_allowlist_excludes_everything(self) -> None: + tk = _key(allowed_entity_types=[]) + assert tenant_key_allowed_tables(tk, ["a", "b"]) == [] + + +# --------------------------------------------------------------------------- # +# TenantKey key-material validation. +# --------------------------------------------------------------------------- # + + +class TestValidateKeyMaterial: + def test_key_only_is_valid(self) -> None: + assert _key(key="k", key_hash=None).key == "k" + + def test_key_hash_only_is_valid(self) -> None: + assert _key(key=None, key_hash="h").key_hash == "h" + + def test_both_key_and_hash_is_valid(self) -> None: + tk = _key(key="k", key_hash="h") + assert tk.key == "k" + assert tk.key_hash == "h" + + def test_neither_key_nor_hash_raises(self) -> None: + from pydantic import ValidationError + + with pytest.raises(ValidationError, match="Either key or key_hash must be provided"): + _key(key=None, key_hash=None) + + +# --------------------------------------------------------------------------- # +# authenticate(): the request-auth boundary. +# --------------------------------------------------------------------------- # + + +class TestAuthenticate: + def test_plaintext_match_returns_current_slot_copy_with_presented_key(self) -> None: + m = _build_manager() + m.keys_by_value = {"plain-secret": _key(key="plain-secret", tenant="acme")} + out = m.authenticate("plain-secret") + assert out is not None + assert out.key == "plain-secret" + assert out.tenant == "acme" + assert out.matched_slot == "current" + + def test_plaintext_mismatch_returns_none(self) -> None: + m = _build_manager() + m.keys_by_value = {"plain-secret": _key(key="plain-secret")} + assert m.authenticate("not-the-secret") is None + + def test_entry_without_runtime_key_is_skipped_not_matched(self) -> None: + # item.key is None -> the `continue` guard; a None runtime key must never + # be compared/matched. No hashed/indexed entries -> overall None. + m = _build_manager() + m.keys_by_value = {"slot": _key(key=None, key_hash="h")} + assert m.authenticate("anything") is None + + def test_indexed_lookup_verifies_and_remembers_runtime_key( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + m = _build_manager() + entry = _key(key=None, key_hash="stored-hash", key_lookup="digest-x", tenant="t1") + m._keys_by_lookup = {"digest-x": entry} + monkeypatch.setattr(manager_module, "compute_key_lookup", lambda value: "digest-x") + # The stub requires the PRESENTED key as verify's first arg, so a mutant + # that passes None (or the hash) instead of api_key fails to authenticate. + monkeypatch.setattr( + manager_module, + "verify_api_key", + lambda value, h: value == "real-key" and h == "stored-hash", + ) + out = m.authenticate("real-key") + assert out is not None + assert out.matched_slot == "current" + assert out.key == "real-key" + assert out.tenant == "t1" + # _remember_runtime_key cached the plaintext->hash binding. + assert m._runtime_plaintext_by_hash["stored-hash"] == "real-key" + assert m.keys_by_value["real-key"].key_hash == "stored-hash" + + def test_indexed_lookup_wrong_hash_does_not_match( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + m = _build_manager() + entry = _key(key=None, key_hash="stored-hash", key_lookup="digest-x") + m._keys_by_lookup = {"digest-x": entry} + monkeypatch.setattr(manager_module, "compute_key_lookup", lambda value: "digest-x") + monkeypatch.setattr(manager_module, "verify_api_key", lambda value, h: False) + assert m.authenticate("real-key") is None + + def test_legacy_scan_matches_only_unindexed_hashed_entries( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + m = _build_manager() + indexed = _key(key=None, key_hash="indexed-hash", key_lookup="has-digest") + unindexed = _key(key=None, key_hash="legacy-hash", key_lookup=None, tenant="legacy") + m._hashed_keys = [indexed, unindexed] + # compute_key_lookup misses the index so the O(n) fallback runs; verify + # only accepts the legacy hash. The indexed entry must be skipped by the + # `key_lookup is not None -> continue` guard, not verified. + monkeypatch.setattr(manager_module, "compute_key_lookup", lambda value: "no-index-hit") + seen: list[tuple[str, str]] = [] + + def _verify(value: str, h: str) -> bool: + seen.append((value, h)) + return value == "real-key" and h == "legacy-hash" + + monkeypatch.setattr(manager_module, "verify_api_key", _verify) + out = m.authenticate("real-key") + assert out is not None + assert out.tenant == "legacy" + assert out.matched_slot == "current" + assert out.key == "real-key" # the presented key is bound onto the match copy + # the plaintext->hash binding is remembered under the PRESENTED key + assert m._runtime_plaintext_by_hash["legacy-hash"] == "real-key" + assert "indexed-hash" not in [h for _, h in seen] # indexed entry skipped + + def test_no_configured_keys_returns_none(self) -> None: + m = _build_manager() + assert m.authenticate("whatever") is None + + +# --------------------------------------------------------------------------- # +# _remember_runtime_key: plaintext cache binding. +# --------------------------------------------------------------------------- # + + +class TestRememberRuntimeKey: + def test_no_hash_is_not_cached(self) -> None: + m = _build_manager() + matched = _key(key="plain", key_hash=None) + m._remember_runtime_key("plain", matched) + assert m._runtime_plaintext_by_hash == {} + + def test_hashed_match_is_cached_under_its_hash(self) -> None: + m = _build_manager() + matched = _key(key="real-key", key_hash="the-hash") + m._remember_runtime_key("real-key", matched) + assert m._runtime_plaintext_by_hash["the-hash"] == "real-key" + assert m.keys_by_value["real-key"] is matched + + +# --------------------------------------------------------------------------- # +# _matches_key_material: revoke-path matcher. +# --------------------------------------------------------------------------- # + + +class TestMatchesKeyMaterial: + def test_plaintext_key_match(self) -> None: + m = _build_manager() + assert m._matches_key_material(_key(key="secret", key_hash=None), "secret") is True + + def test_plaintext_key_mismatch_without_hash_returns_false(self) -> None: + m = _build_manager() + assert m._matches_key_material(_key(key="secret", key_hash=None), "wrong") is False + + def test_literal_key_hash_match(self) -> None: + m = _build_manager() + item = _key(key=None, key_hash="stored-hash-literal") + assert m._matches_key_material(item, "stored-hash-literal") is True + + def test_cached_plaintext_match(self) -> None: + m = _build_manager() + item = _key(key=None, key_hash="bcrypt-ish-hash") + m._runtime_plaintext_by_hash["bcrypt-ish-hash"] = "cached-plain" + assert m._matches_key_material(item, "cached-plain") is True + + def test_unrelated_value_against_non_bcrypt_hash_returns_false(self) -> None: + m = _build_manager() + item = _key(key=None, key_hash="not-a-valid-bcrypt-hash") + assert m._matches_key_material(item, "anything") is False + + def test_previous_key_hash_branch_verifies_presented_value( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + # `_matches_key_material` is the revoke-path matcher (key_rotation.py). + # Its previous_key_hash branch -- `item.previous_key_hash is not None and + # verify_api_key(value, item.previous_key_hash)` -- must verify the + # PRESENTED value against the PREVIOUS hash. Pins arg order and arg + # presence so a None/dropped-arg mutant on that verify call dies. + m = _build_manager() + item = _key(key=None, key_hash="cur-hash") + item = item.model_copy(update={"previous_key_hash": "prev-hash"}) + monkeypatch.setattr( + manager_module, + "verify_api_key", + lambda value, h: value == "old-secret" and h == "prev-hash", + ) + assert m._matches_key_material(item, "old-secret") is True + assert m._matches_key_material(item, "wrong-secret") is False + + +# --------------------------------------------------------------------------- # +# Per-IP failed-auth brute-force throttle. +# --------------------------------------------------------------------------- # + + +def _policy(limit: int) -> types.SimpleNamespace: + return types.SimpleNamespace(max_failed_auth_per_ip_per_hour=limit) + + +class TestFailedAuthThrottle: + def test_record_failed_auth_trips_strictly_above_limit(self) -> None: + m = _build_manager(time_source=FrozenClock(1_000.0)) + m.security_policy = _policy(2) + assert m.record_failed_auth("1.2.3.4") is False # 1 > 2 -> no + assert m.record_failed_auth("1.2.3.4") is False # 2 > 2 -> no + assert m.record_failed_auth("1.2.3.4") is True # 3 > 2 -> tripped + + def test_is_failed_auth_limited_reads_window_without_appending(self) -> None: + m = _build_manager(time_source=FrozenClock(1_000.0)) + m.security_policy = _policy(1) + m.record_failed_auth("ip") # window size 1 + # is_failed_auth_limited must NOT append; 1 > 1 is False. + assert m.is_failed_auth_limited("ip") is False + assert m.is_failed_auth_limited("ip") is False + m.record_failed_auth("ip") # window size 2 + assert m.is_failed_auth_limited("ip") is True # 2 > 1 + + def test_record_failed_auth_evicts_stamps_outside_window(self) -> None: + clock = FrozenClock(1_000.0) + m = _build_manager(time_source=clock) + m.security_policy = _policy(5) + m.record_failed_auth("ip") + clock.now = 1_000.0 + FAILED_AUTH_WINDOW_SECONDS + 1.0 + m.record_failed_auth("ip") + # The stale stamp fell strictly outside the cutoff; only the new one left. + assert len(m._failed_auth_windows["ip"]) == 1 + + def test_stamp_exactly_at_cutoff_is_excluded(self) -> None: + clock = FrozenClock(1_000.0) + m = _build_manager(time_source=clock) + m.security_policy = _policy(5) + m.record_failed_auth("ip") # stamp at 1000.0 + clock.now = 1_000.0 + FAILED_AUTH_WINDOW_SECONDS # cutoff == old stamp + # window keeps stamp only if stamp > cutoff; 1000.0 > 1000.0 is False. + m.record_failed_auth("ip") + assert len(m._failed_auth_windows["ip"]) == 1 + + def test_is_failed_auth_limited_drops_stamp_exactly_at_cutoff(self) -> None: + clock = FrozenClock(1_000.0) + m = _build_manager(time_source=clock) + m.security_policy = _policy(0) + # stamp == now - FAILED_AUTH_WINDOW == cutoff. is_failed_auth_limited's own + # filter keeps a stamp only if stamp > cutoff, so the cutoff stamp is + # dropped -> empty window -> 0 > 0 is False. A `>=` mutant on that read-path + # filter keeps it -> 1 > 0 -> True, so this pins the strict comparison. + m._failed_auth_windows["ip"] = [1_000.0 - FAILED_AUTH_WINDOW_SECONDS] + assert m.is_failed_auth_limited("ip") is False + + def test_clear_failed_auth_removes_ip(self) -> None: + m = _build_manager(time_source=FrozenClock(1_000.0)) + m.security_policy = _policy(5) + m.record_failed_auth("ip") + m.clear_failed_auth("ip") + assert "ip" not in m._failed_auth_windows + + def test_clear_failed_auth_unknown_ip_is_noop(self) -> None: + m = _build_manager(time_source=FrozenClock(1_000.0)) + m.security_policy = _policy(5) + m.clear_failed_auth("never-seen") # pop(..., None) must not raise + assert "never-seen" not in m._failed_auth_windows + + +# --------------------------------------------------------------------------- # +# _sweep_expired_windows: opportunistic GC of rate / failed-auth dicts. +# --------------------------------------------------------------------------- # + + +class TestSweepExpiredWindows: + def test_drops_fully_expired_rate_window(self) -> None: + clock = FrozenClock(1_000.0) + m = _build_manager(time_source=clock) + m._rate_windows["k"] = [1_000.0 - DEFAULT_RATE_LIMIT_WINDOW_SECONDS - 5.0] + m._sweep_expired_windows() + assert "k" not in m._rate_windows + + def test_keeps_live_rate_window_stamps(self) -> None: + clock = FrozenClock(1_000.0) + m = _build_manager(time_source=clock) + live = 1_000.0 - 1.0 + m._rate_windows["k"] = [1_000.0 - DEFAULT_RATE_LIMIT_WINDOW_SECONDS - 5.0, live] + m._sweep_expired_windows() + assert m._rate_windows["k"] == [live] + + def test_drops_fully_expired_failed_auth_window(self) -> None: + clock = FrozenClock(1_000.0) + m = _build_manager(time_source=clock) + m._failed_auth_windows["ip"] = [1_000.0 - FAILED_AUTH_WINDOW_SECONDS - 5.0] + m._sweep_expired_windows() + assert "ip" not in m._failed_auth_windows + + +# --------------------------------------------------------------------------- # +# In-memory rate limiting (is_rate_limited / check_rate_limit). +# --------------------------------------------------------------------------- # + + +class TestInMemoryRateLimiting: + def test_is_rate_limited_blocks_after_rpm_in_window(self) -> None: + m = _build_manager(time_source=FrozenClock(1_000.0)) + tenant_key = _key(rate_limit_rpm=2) + assert m.is_rate_limited(tenant_key) is False + assert m.is_rate_limited(tenant_key) is False + assert m.is_rate_limited(tenant_key) is True + + def test_is_rate_limited_evicts_stale_stamps(self) -> None: + clock = FrozenClock(1_000.0) + m = _build_manager(time_source=clock) + tenant_key = _key(rate_limit_rpm=1) + assert m.is_rate_limited(tenant_key) is False + assert m.is_rate_limited(tenant_key) is True + clock.now = 1_000.0 + DEFAULT_RATE_LIMIT_WINDOW_SECONDS + 1.0 + # Old stamp expired -> allowed again. + assert m.is_rate_limited(tenant_key) is False + + def test_is_rate_limited_tracks_keys_independently(self) -> None: + # Each tenant key gets its own window (keyed by _rate_limit_key). A mutant + # that drops the key (windows collapse onto one) would let key "a"'s + # traffic throttle the unrelated key "b" -- a cross-tenant rate-limit leak. + m = _build_manager(time_source=FrozenClock(1_000.0)) + key_a = _key(key="a", rate_limit_rpm=1) + key_b = _key(key="b", rate_limit_rpm=1) + assert m.is_rate_limited(key_a) is False + assert m.is_rate_limited(key_a) is True # "a" now at its own limit + assert m.is_rate_limited(key_b) is False # "b" is independent + + def test_is_rate_limited_drops_stamp_exactly_at_cutoff(self) -> None: + clock = FrozenClock(1_000.0) + m = _build_manager(time_source=clock) + tenant_key = _key(key="k", rate_limit_rpm=1) + assert m.is_rate_limited(tenant_key) is False # stamp recorded at 1000 + clock.now = 1_000.0 + DEFAULT_RATE_LIMIT_WINDOW_SECONDS # cutoff == old stamp + # the window keeps a stamp only if stamp > cutoff; 1000 > 1000 is False, so + # the stamp is evicted and the caller is allowed. A `>=` mutant keeps it and + # would block at exactly the window edge. + assert m.is_rate_limited(tenant_key) is False + + @pytest.mark.asyncio + async def test_check_rate_limit_applies_local_window_when_redis_reports_full(self) -> None: + m = _build_manager( + time_source=FrozenClock(1_000.0), + rate_limiter=_FullRemainingLimiter(), + ) + tenant_key = _key(rate_limit_rpm=2) + first = await m.check_rate_limit(tenant_key) + second = await m.check_rate_limit(tenant_key) + third = await m.check_rate_limit(tenant_key) + assert first == (True, 1, 1_060) + assert second == (True, 0, 1_060) + assert third == (False, 0, 1_060) + + @pytest.mark.asyncio + async def test_check_rate_limit_reset_uses_first_window_stamp(self) -> None: + # rpm=1 -> the local secondary window holds exactly ONE stamp when reset_at + # is computed as int(window[0] + WINDOW). A window[1] mutant would IndexError + # on this single-element window, so reset_at pins the [0] index. + m = _build_manager( + time_source=FrozenClock(1_000.0), + rate_limiter=_FullRemainingLimiter(), + ) + tenant_key = _key(rate_limit_rpm=1) + allowed, remaining, reset_at = await m.check_rate_limit(tenant_key) + assert allowed is True + assert remaining == 0 + assert reset_at == 1_060 + # second call: window already at rpm -> blocked, reset still from window[0] + assert await m.check_rate_limit(tenant_key) == (False, 0, 1_060) + + +# --------------------------------------------------------------------------- # +# _rate_limit_key, is_entity_allowed, configured-key accessors. +# --------------------------------------------------------------------------- # + + +class TestKeyingAndAuthorization: + def test_rate_limit_key_prefers_plaintext_key(self) -> None: + m = _build_manager() + assert m._rate_limit_key(_key(key="k", key_hash="h")) == "k" + + def test_rate_limit_key_falls_back_to_hash_when_no_plaintext(self) -> None: + m = _build_manager() + assert m._rate_limit_key(_key(key=None, key_hash="h")) == "h" + + def test_is_entity_allowed_true_when_unrestricted(self) -> None: + m = _build_manager() + assert m.is_entity_allowed(_key(allowed_entity_types=None), "user") is True + + def test_is_entity_allowed_true_when_in_allowlist(self) -> None: + m = _build_manager() + assert m.is_entity_allowed(_key(allowed_entity_types=["user"]), "user") is True + + def test_is_entity_allowed_false_when_not_in_allowlist(self) -> None: + m = _build_manager() + assert m.is_entity_allowed(_key(allowed_entity_types=["order"]), "user") is False + + def test_has_configured_keys_false_when_empty(self) -> None: + m = _build_manager() + assert m.has_configured_keys() is False + + def test_has_configured_keys_true_with_plaintext_key(self) -> None: + m = _build_manager() + m.keys_by_value = {"k": _key(key="k")} + assert m.has_configured_keys() is True + + def test_has_configured_keys_true_with_hashed_key(self) -> None: + m = _build_manager() + m._hashed_keys = [_key(key=None, key_hash="h")] + assert m.has_configured_keys() is True + + +# --------------------------------------------------------------------------- # +# get_current_tenant_id contextvar. +# --------------------------------------------------------------------------- # + + +class TestCurrentTenantId: + def test_returns_default_when_context_unset(self) -> None: + assert get_current_tenant_id(default="fallback-tenant") == "fallback-tenant" + + def test_returns_context_value_when_set(self) -> None: + token = _CURRENT_TENANT_ID.set("tenant-x") + try: + assert get_current_tenant_id() == "tenant-x" + finally: + _CURRENT_TENANT_ID.reset(token) + + +# --------------------------------------------------------------------------- # +# Legacy AGENTFLOW_API_KEYS env parsing. +# --------------------------------------------------------------------------- # + + +class TestLegacyEnvKeys: + def test_empty_env_returns_empty(self, monkeypatch: pytest.MonkeyPatch) -> None: + m = _build_manager() + monkeypatch.delenv("AGENTFLOW_API_KEYS", raising=False) + assert m._legacy_env_keys() == [] + + def test_whitespace_env_returns_empty(self, monkeypatch: pytest.MonkeyPatch) -> None: + m = _build_manager() + monkeypatch.setenv("AGENTFLOW_API_KEYS", " ") + assert m._legacy_env_keys() == [] + + def test_key_with_name(self, monkeypatch: pytest.MonkeyPatch) -> None: + m = _build_manager() + monkeypatch.setenv("AGENTFLOW_API_KEYS", "abc123:Alice") + keys = m._legacy_env_keys() + assert len(keys) == 1 + assert keys[0].key == "abc123" + assert keys[0].name == "Alice" + assert keys[0].tenant == "default" + assert keys[0].rate_limit_rpm == DEFAULT_RATE_LIMIT_RPM + + def test_key_without_colon_gets_unnamed(self, monkeypatch: pytest.MonkeyPatch) -> None: + m = _build_manager() + monkeypatch.setenv("AGENTFLOW_API_KEYS", "loosekey") + keys = m._legacy_env_keys() + assert len(keys) == 1 + assert keys[0].key == "loosekey" + assert keys[0].name == "unnamed" + + def test_multiple_keys_skip_empty_segments(self, monkeypatch: pytest.MonkeyPatch) -> None: + m = _build_manager() + monkeypatch.setenv("AGENTFLOW_API_KEYS", "k1:n1, k2:n2 ,,") + keys = m._legacy_env_keys() + assert [(k.key, k.name) for k in keys] == [("k1", "n1"), ("k2", "n2")] + + +# --------------------------------------------------------------------------- # +# __init__ config branches + lifecycle. +# --------------------------------------------------------------------------- # + + +class TestInitConfigBranches: + def test_derives_api_db_path_from_pipeline_duckdb_path( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + monkeypatch.delenv("AGENTFLOW_USAGE_DB_PATH", raising=False) + monkeypatch.setenv("DUCKDB_PATH", "/data/pipeline.duckdb") + m = _build_manager(db_path="agentflow_api.duckdb") + assert m.db_path.name == "pipeline_api.duckdb" + + def test_falls_back_on_invalid_rotation_grace_period( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + monkeypatch.setenv("AGENTFLOW_ROTATION_GRACE_PERIOD_SECONDS", "not-an-int") + m = _build_manager() + assert m.rotation_grace_period_seconds == DEFAULT_ROTATION_GRACE_PERIOD_SECONDS + + def test_valid_rotation_grace_period_is_used(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("AGENTFLOW_ROTATION_GRACE_PERIOD_SECONDS", "7") + m = _build_manager() + assert m.rotation_grace_period_seconds == 7 + + def test_rotation_grace_period_floor_is_one(self, monkeypatch: pytest.MonkeyPatch) -> None: + # max(1, int(env)) floors the grace at 1; a configured 1 stays 1 and is + # not bumped (pins the literal floor against a max(2, ...) mutant). + monkeypatch.setenv("AGENTFLOW_ROTATION_GRACE_PERIOD_SECONDS", "1") + m = _build_manager() + assert m.rotation_grace_period_seconds == 1 + + def test_derives_db_path_preserves_explicit_pipeline_suffix( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + monkeypatch.delenv("AGENTFLOW_USAGE_DB_PATH", raising=False) + monkeypatch.setenv("DUCKDB_PATH", "/data/pipeline.db") + m = _build_manager(db_path="agentflow_api.duckdb") + # pipeline_path.suffix (".db") is truthy -> `suffix or ".duckdb"` keeps ".db" + assert m.db_path.name == "pipeline_api.db" + + def test_derives_db_path_defaults_suffix_when_pipeline_has_none( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + monkeypatch.delenv("AGENTFLOW_USAGE_DB_PATH", raising=False) + monkeypatch.setenv("DUCKDB_PATH", "/data/pipeline") + m = _build_manager(db_path="agentflow_api.duckdb") + # no suffix -> the `or ".duckdb"` fallback supplies the literal ".duckdb" + assert m.db_path.name == "pipeline_api.duckdb" + + def test_load_with_env_only_config_has_no_keys(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("AGENTFLOW_API_KEYS", raising=False) + m = _build_manager() + m.load() + assert m.configured_key_count == 0 + + +# --------------------------------------------------------------------------- # +# Full __init__ state: pin every index / window / config assignment so a mutant +# that drops or swaps one is killed (not merely executed). +# --------------------------------------------------------------------------- # + + +class TestInitState: + def test_construction_initializes_empty_indexes_and_windows( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + monkeypatch.delenv("AGENTFLOW_ADMIN_KEY", raising=False) + monkeypatch.delenv("AGENTFLOW_ROTATION_GRACE_PERIOD_SECONDS", raising=False) + monkeypatch.delenv("REDIS_URL", raising=False) + clock = FrozenClock(1_234.0) + m = _build_manager(time_source=clock) + assert m.api_keys_path is None + assert str(m.db_path) == "usage.duckdb" + assert m.admin_key is None + assert m.time_source is clock + assert m.keys_by_value == {} + assert m._keys_by_id == {} + assert m._hashed_keys == [] + assert m._keys_by_lookup == {} + assert m._previous_keys_by_lookup == {} + assert m._loaded_keys == [] + assert m._runtime_plaintext_by_hash == {} + assert dict(m._rate_windows) == {} + assert dict(m._failed_auth_windows) == {} + assert m.rotation_grace_period_seconds == DEFAULT_ROTATION_GRACE_PERIOD_SECONDS + assert m.security_config_path is not None + assert m.security_policy is not None + assert m.rate_limiter is not None + assert m.audit_publisher is not None + assert m._key_rotator._manager is m + + def test_admin_key_param_overrides_env(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("AGENTFLOW_ADMIN_KEY", "from-env") + m = _build_manager(admin_key="explicit") + assert m.admin_key == "explicit" + + def test_admin_key_falls_back_to_env_when_unset(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("AGENTFLOW_ADMIN_KEY", "from-env") + m = _build_manager(admin_key=None) + assert m.admin_key == "from-env" + + def test_injected_rate_limiter_is_used_verbatim(self) -> None: + limiter = _FullRemainingLimiter() + m = _build_manager(rate_limiter=limiter) + assert m.rate_limiter is limiter + + def test_default_rate_limiter_has_redis_handle_disabled_without_url( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + monkeypatch.delenv("REDIS_URL", raising=False) + m = _build_manager() + assert m.rate_limiter._redis is None + + def test_neutral_db_path_is_not_rewritten_even_with_pipeline_env( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + monkeypatch.delenv("AGENTFLOW_USAGE_DB_PATH", raising=False) + monkeypatch.setenv("DUCKDB_PATH", "/data/pipeline.duckdb") + # db_path != the magic "agentflow_api.duckdb" -> the derivation guard is + # False and the path is left untouched. + m = _build_manager(db_path="custom.duckdb") + assert str(m.db_path) == "custom.duckdb" + + def test_usage_db_path_env_blocks_derivation(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("AGENTFLOW_USAGE_DB_PATH", "/explicit.duckdb") + monkeypatch.setenv("DUCKDB_PATH", "/data/pipeline.duckdb") + # AGENTFLOW_USAGE_DB_PATH is set -> `is None` guard is False -> the magic + # name is NOT rewritten despite DUCKDB_PATH being present. + m = _build_manager(db_path="agentflow_api.duckdb") + assert str(m.db_path) == "agentflow_api.duckdb" + + +# --------------------------------------------------------------------------- # +# load(): rebuild the value / id / hash / lookup indexes from a populated config +# and prune the runtime plaintext cache. Empty-config load is covered above; this +# exercises and pins the per-key loop body and the cache-GC. +# --------------------------------------------------------------------------- # + + +class TestLoadRebuildsIndexes: + _CONFIG_YAML = ( + "keys:\n" + " - key_id: id-plain\n" + " key: plain1\n" + " name: n1\n" + " tenant: t1\n" + " created_at: 2026-01-01\n" + " - key_id: id-hash\n" + " key_hash: hash2\n" + " key_lookup: lookup2\n" + " name: n2\n" + " tenant: t2\n" + " created_at: 2026-01-01\n" + ) + + def test_load_indexes_plaintext_and_hashed_keys( + self, monkeypatch: pytest.MonkeyPatch, tmp_path: object + ) -> None: + monkeypatch.delenv("AGENTFLOW_API_KEYS", raising=False) + keys_file = tmp_path / "api_keys.yaml" # type: ignore[operator] + keys_file.write_text(self._CONFIG_YAML, encoding="utf-8") + m = _build_manager(api_keys_path=keys_file) + m.load() + + assert m.configured_key_count == 2 + # plaintext key -> keys_by_value with a current-slot copy + assert "plain1" in m.keys_by_value + assert m.keys_by_value["plain1"].matched_slot == "current" + assert m.keys_by_value["plain1"].tenant == "t1" + # both ids indexed -> to the actual TenantKey entries, not None + assert set(m._keys_by_id) == {"id-plain", "id-hash"} + assert m._keys_by_id["id-hash"].key_id == "id-hash" + assert m._keys_by_id["id-plain"].key == "plain1" + # only the hashed entry lands in _hashed_keys and the lookup index + assert [k.key_hash for k in m._hashed_keys] == ["hash2"] + assert "lookup2" in m._keys_by_lookup + assert m._keys_by_lookup["lookup2"].key_id == "id-hash" + # no previous-rotation entries in this config -> the index is reset to {} + assert m._previous_keys_by_lookup == {} + # a fresh policy object is loaded (not left None) and used by the throttle + assert m.security_policy is not None + # _rate_windows stays a list-defaultdict: an unseen key yields [], not KeyError + assert m._rate_windows["never-seen"] == [] + + def test_load_rebuilds_runtime_cache_for_live_hash_and_prunes_stale( + self, monkeypatch: pytest.MonkeyPatch, tmp_path: object + ) -> None: + monkeypatch.delenv("AGENTFLOW_API_KEYS", raising=False) + keys_file = tmp_path / "api_keys.yaml" # type: ignore[operator] + keys_file.write_text(self._CONFIG_YAML, encoding="utf-8") + m = _build_manager(api_keys_path=keys_file) + # A live hash (present in config) keeps its cached plaintext and is + # re-indexed into keys_by_value; a stale hash (gone from config) is GC'd. + m._runtime_plaintext_by_hash = {"hash2": "runtime-plain", "stale-hash": "x"} + m.load() + assert m._runtime_plaintext_by_hash == {"hash2": "runtime-plain"} + assert "runtime-plain" in m.keys_by_value + assert m.keys_by_value["runtime-plain"].key_hash == "hash2" + # the re-indexed copy carries the runtime plaintext as its current key + assert m.keys_by_value["runtime-plain"].key == "runtime-plain" + assert m.keys_by_value["runtime-plain"].matched_slot == "current" + + +# --------------------------------------------------------------------------- # +# authenticate(): previous-key (rotation grace) resolution paths. +# --------------------------------------------------------------------------- # + + +class TestAuthenticatePreviousKey: + def test_indexed_previous_key_resolves_to_previous_slot( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + m = _build_manager() + entry = _key(key=None, key_hash="cur", key_lookup="cur-lk") + entry = entry.model_copy( + update={"previous_key_hash": "prev-hash", "previous_key_lookup": "prev-lk"} + ) + m._previous_keys_by_lookup = {"prev-lk": entry} + monkeypatch.setattr(manager_module, "compute_key_lookup", lambda value: "prev-lk") + # is_previous_key_active must receive the matched ENTRY and verify must + # receive the PRESENTED key -- both pinned so an arg-swap-to-None mutant dies. + monkeypatch.setattr(m._key_rotator, "is_previous_key_active", lambda item: item is entry) + monkeypatch.setattr( + manager_module, + "verify_api_key", + lambda value, h: value == "old-key" and h == "prev-hash", + ) + out = m.authenticate("old-key") + assert out is not None + assert out.matched_slot == "previous" + assert out.key == "old-key" + + def test_indexed_previous_inactive_does_not_match( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + m = _build_manager() + entry = _key(key=None, key_hash="cur", key_lookup="cur-lk") + entry = entry.model_copy( + update={"previous_key_hash": "prev-hash", "previous_key_lookup": "prev-lk"} + ) + m._previous_keys_by_lookup = {"prev-lk": entry} + monkeypatch.setattr(manager_module, "compute_key_lookup", lambda value: "prev-lk") + monkeypatch.setattr(m._key_rotator, "is_previous_key_active", lambda item: False) + monkeypatch.setattr(manager_module, "verify_api_key", lambda value, h: True) + assert m.authenticate("old-key") is None + + def test_legacy_previous_scan_matches_unindexed_previous_key( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + m = _build_manager() + entry = _key(key=None, key_hash="cur") + entry = entry.model_copy( + update={"previous_key_hash": "prev-hash", "previous_key_lookup": None} + ) + m._loaded_keys = [entry] + monkeypatch.setattr(manager_module, "compute_key_lookup", lambda value: "no-hit") + monkeypatch.setattr(m._key_rotator, "is_previous_key_active", lambda item: item is entry) + monkeypatch.setattr( + manager_module, + "verify_api_key", + lambda value, h: value == "old-key" and h == "prev-hash", + ) + out = m.authenticate("old-key") + assert out is not None + assert out.matched_slot == "previous" + assert out.key == "old-key" + + def test_legacy_previous_scan_skips_inactive_previous_key( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + # An INACTIVE previous key (rotation grace expired) must be skipped by the + # guard `if not is_previous_key_active(item) or previous_key_hash is None: + # continue`. The `or`->`and` mutant stops skipping it and would verify an + # expired key -- a real rotation-grace bypass. + m = _build_manager() + entry = _key(key=None, key_hash="cur") + entry = entry.model_copy( + update={"previous_key_hash": "prev-hash", "previous_key_lookup": None} + ) + m._loaded_keys = [entry] + monkeypatch.setattr(manager_module, "compute_key_lookup", lambda value: "no-hit") + monkeypatch.setattr(m._key_rotator, "is_previous_key_active", lambda item: False) + seen: list[str] = [] + + def _verify(value: str, h: str) -> bool: + seen.append(h) + return True + + monkeypatch.setattr(manager_module, "verify_api_key", _verify) + assert m.authenticate("old-key") is None + assert "prev-hash" not in seen # inactive entry never verified + + def test_legacy_previous_scan_skips_indexed_entries( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + # An entry WITH a previous_key_lookup must be skipped by the legacy scan + # (it is handled by the indexed path), not re-verified here. + m = _build_manager() + entry = _key(key=None, key_hash="cur") + entry = entry.model_copy( + update={"previous_key_hash": "prev-hash", "previous_key_lookup": "has-lk"} + ) + m._loaded_keys = [entry] + monkeypatch.setattr(manager_module, "compute_key_lookup", lambda value: "no-hit") + monkeypatch.setattr(m._key_rotator, "is_previous_key_active", lambda item: True) + seen: list[str] = [] + + def _verify(value: str, h: str) -> bool: + seen.append(h) + return True + + monkeypatch.setattr(manager_module, "verify_api_key", _verify) + assert m.authenticate("old-key") is None + assert "prev-hash" not in seen # indexed entry skipped by the scan + + +# --------------------------------------------------------------------------- # +# Extra pins: legacy-key field population, sweep boundary, rate-limit boundary. +# --------------------------------------------------------------------------- # + + +class TestLegacyEnvKeyFields: + def test_constructed_key_populates_every_field(self, monkeypatch: pytest.MonkeyPatch) -> None: + m = _build_manager() + monkeypatch.setenv("AGENTFLOW_API_KEYS", "thekey:TheName") + [k] = m._legacy_env_keys() + assert k.key == "thekey" + assert k.name == "TheName" + assert k.tenant == "default" + assert k.rate_limit_rpm == DEFAULT_RATE_LIMIT_RPM + assert k.allowed_entity_types is None + assert isinstance(k.created_at, date) + # key_id is generated from tenant+name slugs. + assert k.key_id is not None + assert k.key_id.startswith("default-thename-") + + def test_name_is_stripped_before_id_generation(self, monkeypatch: pytest.MonkeyPatch) -> None: + m = _build_manager() + monkeypatch.setenv("AGENTFLOW_API_KEYS", " spacedkey : Spaced Name ") + [k] = m._legacy_env_keys() + assert k.key == "spacedkey" + assert k.name == "Spaced Name" + assert k.key_id is not None + assert k.key_id.startswith("default-spaced-name-") + + +class TestSweepBoundary: + def test_rate_stamp_exactly_at_cutoff_is_dropped(self) -> None: + clock = FrozenClock(1_000.0) + m = _build_manager(time_source=clock) + # stamp == now - window == cutoff; kept only if stamp > cutoff -> dropped. + m._rate_windows["k"] = [1_000.0 - DEFAULT_RATE_LIMIT_WINDOW_SECONDS] + m._sweep_expired_windows() + assert "k" not in m._rate_windows + + def test_failed_auth_stamp_just_inside_window_is_kept(self) -> None: + clock = FrozenClock(1_000.0) + m = _build_manager(time_source=clock) + inside = 1_000.0 - FAILED_AUTH_WINDOW_SECONDS + 1.0 + m._failed_auth_windows["ip"] = [inside] + m._sweep_expired_windows() + assert m._failed_auth_windows["ip"] == [inside] + + def test_failed_auth_stamp_exactly_at_cutoff_is_dropped(self) -> None: + clock = FrozenClock(1_000.0) + m = _build_manager(time_source=clock) + # stamp == now - FAILED_AUTH_WINDOW == cutoff; kept only if > cutoff (not + # >=) -> the failed-auth sweep drops it, pinning the strict comparison. + m._failed_auth_windows["ip"] = [1_000.0 - FAILED_AUTH_WINDOW_SECONDS] + m._sweep_expired_windows() + assert "ip" not in m._failed_auth_windows + + +class TestRateLimitBoundary: + def test_is_rate_limited_allows_exactly_up_to_rpm(self) -> None: + clock = FrozenClock(1_000.0) + m = _build_manager(time_source=clock) + tenant_key = _key(rate_limit_rpm=3) + # len(window) >= rpm blocks; the 4th call (window already 3) trips. + assert m.is_rate_limited(tenant_key) is False + assert m.is_rate_limited(tenant_key) is False + assert m.is_rate_limited(tenant_key) is False + assert m.is_rate_limited(tenant_key) is True + + def test_shutdown_is_idempotent(self) -> None: + m = _build_manager() + m.shutdown() + m.shutdown() diff --git a/tests/unit/test_key_rotation_mutation.py b/tests/unit/test_key_rotation_mutation.py new file mode 100644 index 0000000..889de61 --- /dev/null +++ b/tests/unit/test_key_rotation_mutation.py @@ -0,0 +1,1122 @@ +"""Narrow, duckdb-free mutation test for the API-key rotation lifecycle +(src/serving/api/auth/key_rotation.py). + +This is the test the mutation gate runs against ``serving/api/auth/key_rotation.py`` +(see scripts/mutation_report.py MODULE_TARGETS). key_rotation.py is the +create / rotate / revoke / revoke-old surface plus the rotation-grace state +machine: a surviving mutant here is a key that fails to rotate, an old key that +outlives its grace window, a revoke that does not actually remove material, a +generated key persisted with its plaintext, or a key hashed with the wrong +cost/scheme. The same security-critical class the HTTP admin endpoints in +tests/integration/test_rotation.py drive end to end. + +Design rules, shared with test_auth_manager_mutation.py / +test_rate_limiter_mutation.py / test_sql_builder_mutation.py (see +fable_handoff.md cont.16-23): + +1. **duckdb-free.** Importing ``serving.api.auth.key_rotation`` runs the auth + package ``__init__`` (``import duckdb`` + the key_rotation / usage_table + import chain) and key_rotation itself does ``import duckdb`` + + ``from ...duckdb_connection import connect_duckdb``. Real duckdb's lazy + ``_duckdb._sqltypes`` import crashes mutmut's coverage-instrumented stats + pass (the same break ci.yml works around with ``coverage run``). A fake + top-level ``duckdb`` module satisfies the import chain; the three usage-stat + methods that actually *call* duckdb (``old_key_usage_by_key_id`` / + ``_usage_by_key`` / ``old_key_usage_last_hour``) are stubbed on the rotator in + the tests that need them, so their bodies stay uncovered and are NOT mutated. + Those duckdb-querying observability methods (not an auth boundary; their SQL + string literals are unkillable without a real duckdb to execute them) are + pinned instead by the real-duckdb retry tests in tests/unit/test_key_rotation.py. + +2. **The two ``while True`` uniqueness loops (``generate_key`` / + ``generate_key_id``) are deliberately left uncovered.** mutmut flips + ``not in`` -> ``in`` (node_mutation._keyword_mapping), which turns + ``while True: candidate = ...; if candidate not in seen: return`` into an + infinite loop -> a *timeout* mutant, and scripts/mutation_report.py counts a + timeout as a hard violation, not a survivor. There is no per-line mutmut + ignore in this version, and refactoring security-relevant key generation just + to satisfy the tool is the wrong trade. So both generators are stubbed in the + create / rotate / ensure_key_ids tests (their bodies never execute -> never + mutated), and their slug + 256-bit entropy + collision behaviour is pinned by + tests/unit/test_key_rotation.py (``af-prod-`` prefix, real ``token_urlsafe``). + The stubs RECORD their args so a mutant that passes the wrong tenant/name/ + existing-id set to a generator is still killed. To keep ``AuthManager.load()`` + -> ``ensure_key_ids`` from calling the real ``generate_key_id`` (which would + re-cover the loop), every seed key carries a ``key_id``. + +3. **No fixtures for the subject -- inline construction + direct method calls.** + With ``mutate_only_covered_lines = true`` a fixture-built object leaves method + lines uncovered (only ``__init__`` mutated). The rotator methods under test + are called directly so coverage attributes every line. + +4. **Workspace discrimination by top-level ``serving``.** mutmut's mutants/ + workspace copies src/serving to a TOP-LEVEL ``serving`` package; ordinary + pytest has no top-level ``serving`` (only src.serving). Gate the harness stubs + on ``find_spec("serving")`` -- NOT ``import src``, which stays importable via + the editable install even inside the workspace. + +5. **No real timers.** ``threading.Timer`` is replaced with a record-only fake so + ``schedule_rotation_cleanup`` (reached via ``load()`` after a rotation) never + spawns a background thread across the hundreds of per-mutant runs, and so the + timer wiring (delay / callback / args) can be asserted. + +Residual survivors are genuine EQUIVALENT mutants, documented so the honest 0.90 +threshold is defensible: strict-vs-nonstrict comparisons against the live +``datetime.now(UTC)`` wall clock (``is_previous_key_active`` ``>`` vs ``>=``, +``cleanup_expired_rotations`` / ``schedule_rotation_cleanup`` ``<=`` boundaries -- +only differ at an unreachable exact-equality instant); ``datetime.now(UTC)`` vs +``datetime.now(None)`` (identical date on a UTC runner); the ``revoke_key`` +runtime-cache prune clauses (subsumed by ``load()``'s live-hash reprune); the +``updated_item`` ``"key"`` field in ``rotate_key`` (stripped by ``_storage_payload`` +on persist and overwritten by the final ``model_copy`` on return); and +``write_text`` ``encoding`` / ``newline`` kwargs (no observable change on a +UTF-8 / ``\\n`` platform). +""" + +from __future__ import annotations + +import sys +import types +from datetime import UTC, date, datetime, timedelta + + +def _in_mutation_workspace() -> bool: + # mutmut's mutants/ workspace copies src/serving to a TOP-LEVEL `serving` + # package (scripts/mutation_report.py prepare_workspace); ordinary pytest has + # no top-level `serving` (only src.serving), so its presence cleanly marks the + # harness. `import src` does NOT discriminate: the editable install keeps the + # real `src` importable even inside the workspace. + import importlib.util + + try: + return importlib.util.find_spec("serving") is not None + except (ImportError, ValueError): + return False + + +def _install_harness_stubs() -> None: + # The auth package import chain (auth/__init__.py `import duckdb`; + # key_rotation `import duckdb` + `from ...duckdb_connection import + # connect_duckdb`) pulls duckdb at import time. Replace the whole `duckdb` + # module with a fake supplying the names referenced at import / type-check + # time. The methods that genuinely execute a duckdb call are stubbed per-test + # (see module docstring), so the fake changes no mutated logic; it only keeps + # the import chain off real duckdb's coverage-breaking native extension. + # Force-overwrite (not setdefault) so a duckdb already pulled in by the + # coverage harness can't shadow the fake. + duckdb_stub = types.ModuleType("duckdb") + + class _DuckDBError(Exception): + pass + + duckdb_stub.Error = _DuckDBError + duckdb_stub.IOException = type("IOException", (_DuckDBError,), {}) + duckdb_stub.DuckDBPyConnection = object + + def _connect(*_args: object, **_kwargs: object) -> object: + raise _DuckDBError("duckdb is stubbed out in the mutation harness") + + duckdb_stub.connect = _connect + sys.modules["duckdb"] = duckdb_stub + + +if _in_mutation_workspace(): + _install_harness_stubs() + +try: # mutation-harness workspace exposes these as a top-level package + from serving.api.auth import key_rotation as kr + from serving.api.auth.manager import ApiKeysConfig, AuthManager, KeyCreateRequest, TenantKey +except ImportError: # ordinary pytest sees them under the src package + from src.serving.api.auth import key_rotation as kr + from src.serving.api.auth.manager import ApiKeysConfig, AuthManager, KeyCreateRequest, TenantKey + +import pytest + +rotate_all_keys = kr.rotate_all_keys + +SEED_KEY_YAML = ( + "keys:\n" + ' - key_id: "acme-rotation-agent-aaaa1111"\n' + ' key: "rotation-acme-key"\n' + ' name: "Rotation Agent"\n' + ' tenant: "acme"\n' + " rate_limit_rpm: 100\n" + ' created_at: "2026-04-10"\n' +) +SEED_KEY_ID = "acme-rotation-agent-aaaa1111" + + +# --------------------------------------------------------------------------- # +# Record-only Timer + fast, recording hash/lookup/generator fakes. +# --------------------------------------------------------------------------- # + + +class _FakeTimer: + def __init__(self, delay: float, function: object, args: object = ()) -> None: + self.delay = delay + self.function = function + self.args = args + self.daemon = False + self.started = False + self.cancelled = False + + def start(self) -> None: + self.started = True + + def cancel(self) -> None: + self.cancelled = True + + +def _rec_hash(calls: list) -> object: + # Fast, deterministic stand-in for hash_api_key that RECORDS (value, rounds, + # scheme). Encoding the value lets the hash track the key actually passed; + # recording rounds/scheme kills a mutant that drops/nulls the cost factor or + # hashing scheme (a real downgrade of the at-rest key protection). + def _h(value: str, rounds: object = None, scheme: object = None) -> str: + calls.append((value, rounds, scheme)) + return f"hash::{value}" + + return _h + + +def _rec_lookup(calls: list | None = None) -> object: + def _l(value: str) -> str: + if calls is not None: + calls.append(value) + return f"lk::{value}" + + return _l + + +def _rec_gen(calls: list, ret: str) -> object: + def _g(tenant: str, name: str) -> str: + calls.append((tenant, name)) + return ret + + return _g + + +def _rec_gen_id(calls: list, rets: list[str]) -> object: + it = iter(rets) + + def _g(tenant: str, name: str, existing_ids: object = None) -> str: + snapshot = set(existing_ids) if existing_ids is not None else None + calls.append((tenant, name, snapshot)) + return next(it) + + return _g + + +def _build_manager( + tmp_path: object, + monkeypatch: pytest.MonkeyPatch, + *, + grace: int = 300, + seed: str = SEED_KEY_YAML, +) -> AuthManager: + monkeypatch.setattr(kr.threading, "Timer", _FakeTimer) + monkeypatch.delenv("AGENTFLOW_API_KEYS", raising=False) + api_keys_path = tmp_path / "config" / "api_keys.yaml" # type: ignore[operator] + api_keys_path.parent.mkdir(parents=True, exist_ok=True) + api_keys_path.write_text(seed, encoding="utf-8", newline="\n") + manager = AuthManager( + api_keys_path=api_keys_path, + db_path=tmp_path / "usage.duckdb", # type: ignore[operator] + admin_key="admin-secret", + ) + manager.security_policy.bcrypt_rounds = 4 + manager.rotation_grace_period_seconds = grace + manager.load() + return manager + + +def _tk(**overrides: object) -> TenantKey: + base: dict[str, object] = { + "key": "plain-key", + "name": "n", + "tenant": "acme", + "created_at": date(2026, 1, 1), + } + base.update(overrides) + return TenantKey(**base) # type: ignore[arg-type] + + +def _assert_hash_uses_policy(calls: list, manager: AuthManager) -> None: + # Every hash_api_key call must carry the configured cost + scheme; a dropped + # or nulled rounds/scheme kwarg is a real downgrade. + assert calls + for _value, rounds, scheme in calls: + assert rounds == manager.security_policy.bcrypt_rounds + assert scheme == manager.security_policy.key_hashing + + +# --------------------------------------------------------------------------- # +# create_key +# --------------------------------------------------------------------------- # + + +class TestCreateKey: + def test_create_key_populates_every_field_and_persists_without_plaintext( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + rotator = manager._key_rotator + gen_calls: list = [] + id_calls: list = [] + hash_calls: list = [] + lookup_calls: list = [] + new_key = "af-prod-beta-reporting-" + "z" * 43 + + monkeypatch.setattr(rotator, "generate_key", _rec_gen(gen_calls, new_key)) + monkeypatch.setattr( + rotator, "generate_key_id", _rec_gen_id(id_calls, ["beta-reporting-dead"]) + ) + monkeypatch.setattr(kr, "hash_api_key", _rec_hash(hash_calls)) + monkeypatch.setattr(kr, "compute_key_lookup", _rec_lookup(lookup_calls)) + + created = rotator.create_key( + KeyCreateRequest( + name="Reporting", tenant="beta", rate_limit_rpm=50, allowed_entity_types=["order"] + ) + ) + + # generate_key / generate_key_id received the request's tenant+name, and + # generate_key_id received the existing-id set (the seed's id), not None. + assert gen_calls == [("beta", "Reporting")] + assert id_calls == [("beta", "Reporting", {SEED_KEY_ID})] + # hashed/looked-up the NEWLY generated key with the configured cost/scheme. + assert hash_calls == [ + (new_key, manager.security_policy.bcrypt_rounds, manager.security_policy.key_hashing) + ] + assert lookup_calls == [new_key] + # Every field flows from the request / generated material. + assert created.key == new_key + assert created.key_id == "beta-reporting-dead" + assert created.key_hash == f"hash::{new_key}" + assert created.key_lookup == f"lk::{new_key}" + assert created.name == "Reporting" + assert created.tenant == "beta" + assert created.rate_limit_rpm == 50 + assert created.allowed_entity_types == ["order"] + assert created.created_at == datetime.now(UTC).date() + # Cached plaintext binding keyed by the new hash. + assert manager._runtime_plaintext_by_hash[created.key_hash] == new_key + # Persisted (reload indexed it) and the plaintext was stripped on disk. + assert created.key_id in manager._keys_by_id + import yaml + + on_disk = yaml.safe_load(manager.api_keys_path.read_text(encoding="utf-8")) + entry = next(k for k in on_disk["keys"] if k["key_id"] == "beta-reporting-dead") + assert entry["key_hash"] == f"hash::{new_key}" + assert "key" not in entry # never persist plaintext for a hashed key + + def test_create_key_validates_generated_key_length( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + # create_key must run validate_generated_key on the generated key; a + # mutant that validates None instead would let a too-short key through. + rotator = manager._key_rotator + manager.security_policy.min_key_length = 100 + monkeypatch.setattr(rotator, "generate_key", lambda tenant, name: "too-short") + monkeypatch.setattr(rotator, "generate_key_id", lambda *a, **k: "beta-x-0001") + monkeypatch.setattr(kr, "hash_api_key", _rec_hash([])) + monkeypatch.setattr(kr, "compute_key_lookup", _rec_lookup()) + with pytest.raises(ValueError, match="below min_key_length"): + rotator.create_key(KeyCreateRequest(name="X", tenant="beta")) + + +# --------------------------------------------------------------------------- # +# rotate_key +# --------------------------------------------------------------------------- # + + +class TestRotateKey: + def test_rotate_plaintext_key_hashes_old_material_and_opens_grace( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + rotator = manager._key_rotator + gen_calls: list = [] + hash_calls: list = [] + new_key = "af-prod-acme-rotation-agent-" + "y" * 43 + monkeypatch.setattr(rotator, "generate_key", _rec_gen(gen_calls, new_key)) + monkeypatch.setattr(kr, "hash_api_key", _rec_hash(hash_calls)) + monkeypatch.setattr(kr, "compute_key_lookup", _rec_lookup()) + + before = datetime.now(UTC) + rotated, expires_at = manager.rotate_key(SEED_KEY_ID) + after = datetime.now(UTC) + + assert rotated.key == new_key + # generate_key got the rotated key's own tenant/name. + assert gen_calls == [("acme", "Rotation Agent")] + # Both the new-key hash and the on-the-fly old-key hash carry the policy + # cost + scheme (the seed is PLAINTEXT-only, so the old material is hashed + # via the `old_key_hash is None` fallback branch). + _assert_hash_uses_policy(hash_calls, manager) + assert {c[0] for c in hash_calls} == {new_key, "rotation-acme-key"} + stored = manager._keys_by_id[SEED_KEY_ID] + assert stored.key_hash == f"hash::{new_key}" + assert stored.key_lookup == f"lk::{new_key}" + assert stored.previous_key_hash == "hash::rotation-acme-key" + assert stored.previous_key_lookup == "lk::rotation-acme-key" + # expires_at == now + grace (300s); pins the timedelta sign + seconds. + assert before + timedelta(seconds=300) <= expires_at <= after + timedelta(seconds=300) + # The new plaintext is cached under the new hash (survives load()'s reprune). + assert manager._runtime_plaintext_by_hash[f"hash::{new_key}"] == new_key + assert rotator.is_previous_key_active(stored) is True + assert rotator.rotation_phase(stored) == "grace_period" + + def test_rotate_existing_hash_keeps_stored_hash_as_previous( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + # A key that already carries a hash takes the non-fallback branch: + # old_key_hash = item.key_hash directly (no re-hash of plaintext). + seed = ( + "keys:\n" + ' - key_id: "acme-hashed-agent-bbbb2222"\n' + ' key_hash: "stored-current-hash"\n' + ' key_lookup: "stored-current-lk"\n' + ' name: "Hashed Agent"\n' + ' tenant: "acme"\n' + " rate_limit_rpm: 100\n" + ' created_at: "2026-04-10"\n' + ) + manager = _build_manager(tmp_path, monkeypatch, seed=seed) + try: + rotator = manager._key_rotator + new_key = "af-prod-acme-hashed-agent-" + "x" * 43 + monkeypatch.setattr(rotator, "generate_key", lambda tenant, name: new_key) + monkeypatch.setattr(kr, "hash_api_key", _rec_hash([])) + monkeypatch.setattr(kr, "compute_key_lookup", _rec_lookup()) + + manager.rotate_key("acme-hashed-agent-bbbb2222") + + stored = manager._keys_by_id["acme-hashed-agent-bbbb2222"] + assert stored.previous_key_hash == "stored-current-hash" # not re-hashed + assert stored.previous_key_lookup == "stored-current-lk" + finally: + manager.shutdown() + + def test_rotate_validates_generated_key_length( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + # rotate_key must validate the freshly generated key; a mutant validating + # None would let a too-short rotated key through. + rotator = manager._key_rotator + manager.security_policy.min_key_length = 100 + monkeypatch.setattr(rotator, "generate_key", lambda tenant, name: "too-short") + monkeypatch.setattr(kr, "hash_api_key", _rec_hash([])) + monkeypatch.setattr(kr, "compute_key_lookup", _rec_lookup()) + with pytest.raises(ValueError, match="below min_key_length"): + manager.rotate_key(SEED_KEY_ID) + + def test_rotate_unknown_key_id_raises_with_that_id( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + monkeypatch.setattr(manager._key_rotator, "generate_key", lambda tenant, name: "x" * 50) + # The KeyError must name the missing id (a KeyError(None) mutant is killed). + with pytest.raises(KeyError, match="does-not-exist"): + manager.rotate_key("does-not-exist") + + def test_rotate_twice_rejects_overlap( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + rotator = manager._key_rotator + monkeypatch.setattr( + rotator, "generate_key", lambda tenant, name: "af-prod-acme-rotation-agent-" + "y" * 43 + ) + monkeypatch.setattr(kr, "hash_api_key", _rec_hash([])) + monkeypatch.setattr(kr, "compute_key_lookup", _rec_lookup()) + manager.rotate_key(SEED_KEY_ID) + # Anchored so an XX-wrapped / re-cased message mutant is killed. + with pytest.raises(ValueError, match=r"^Rotation already in progress\.$"): + manager.rotate_key(SEED_KEY_ID) + + +# --------------------------------------------------------------------------- # +# revoke_key +# --------------------------------------------------------------------------- # + + +class TestRevokeKey: + def test_revoke_known_plaintext_removes_it( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + assert manager.revoke_key("rotation-acme-key") is True + assert "rotation-acme-key" not in manager.keys_by_value + assert manager.configured_key_count == 0 + + def test_revoke_unknown_returns_false( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + # remaining == all keys -> nothing matched -> False, config untouched. + assert manager.revoke_key("not-a-real-key") is False + assert manager.configured_key_count == 1 + + def test_revoke_clears_cached_material_for_the_removed_key( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + # Revoking a key must not leave its plaintext lingering in the runtime + # cache. (revoke_key's own value/hash prune is belt-and-suspenders that + # load()'s live-hash reprune subsumes -- those filter clauses are + # equivalent mutants; this pins the observable contract.) + manager._runtime_plaintext_by_hash = {"some-hash": "rotation-acme-key"} + assert manager.revoke_key("rotation-acme-key") is True + assert manager.configured_key_count == 0 + assert manager._runtime_plaintext_by_hash == {} + + def test_revoke_cancels_the_removed_keys_cleanup_timer( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + # A revoked key with a scheduled rotation-cleanup timer must have that + # timer cancelled by id (kills the `is None` branch flip and the + # cancel(None) arg mutant). + timer = _FakeTimer(300, None) + manager._rotation_cleanup_timers[SEED_KEY_ID] = timer + assert manager.revoke_key("rotation-acme-key") is True + assert timer.cancelled is True + assert SEED_KEY_ID not in manager._rotation_cleanup_timers + + +# --------------------------------------------------------------------------- # +# revoke_old_key +# --------------------------------------------------------------------------- # + + +class TestRevokeOldKey: + def test_revoke_old_key_ends_grace_and_cancels_timer( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + rotator = manager._key_rotator + monkeypatch.setattr( + rotator, "generate_key", lambda tenant, name: "af-prod-acme-rotation-agent-" + "y" * 43 + ) + monkeypatch.setattr(kr, "hash_api_key", _rec_hash([])) + monkeypatch.setattr(kr, "compute_key_lookup", _rec_lookup()) + manager.rotate_key(SEED_KEY_ID) + # Plant a cleanup timer so the cancel-by-id call is observable. + timer = _FakeTimer(300, None) + manager._rotation_cleanup_timers[SEED_KEY_ID] = timer + + assert manager.revoke_old_key(SEED_KEY_ID) is True + stored = manager._keys_by_id[SEED_KEY_ID] + assert stored.previous_key_hash is None + assert rotator.rotation_phase(stored) == "idle" + assert timer.cancelled is True # cancel(key_id), not cancel(None) + + def test_revoke_old_key_without_rotation_returns_false( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + # previous_key_hash is None -> the early `return False` branch. + assert manager.revoke_old_key(SEED_KEY_ID) is False + + def test_revoke_old_key_unknown_raises_with_that_id( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + with pytest.raises(KeyError, match="does-not-exist"): + manager.revoke_old_key("does-not-exist") + + +# --------------------------------------------------------------------------- # +# get_rotation_status +# --------------------------------------------------------------------------- # + + +class TestGetRotationStatus: + def test_status_idle_uses_key_id_for_usage( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + # old_key_usage_last_hour must be called with the requested key_id (a + # mutant passing None is killed because the stub keys off the id). + monkeypatch.setattr( + manager._key_rotator, + "old_key_usage_last_hour", + lambda key_id: 7 if key_id == SEED_KEY_ID else -1, + ) + status = manager.get_rotation_status(SEED_KEY_ID) + assert status["phase"] == "idle" + assert status["old_key_active_until"] is None + assert status["requests_on_old_key_last_hour"] == 7 + + def test_status_grace_period_serialises_active_until( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + rotator = manager._key_rotator + monkeypatch.setattr( + rotator, "generate_key", lambda tenant, name: "af-prod-acme-rotation-agent-" + "y" * 43 + ) + monkeypatch.setattr(kr, "hash_api_key", _rec_hash([])) + monkeypatch.setattr(kr, "compute_key_lookup", _rec_lookup()) + monkeypatch.setattr(rotator, "old_key_usage_last_hour", lambda key_id: 0) + _, expires_at = manager.rotate_key(SEED_KEY_ID) + + status = manager.get_rotation_status(SEED_KEY_ID) + assert status["phase"] == "grace_period" + assert status["old_key_active_until"] == expires_at.isoformat() + + def test_status_unknown_raises_with_that_id( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + with pytest.raises(KeyError, match="does-not-exist"): + manager.get_rotation_status("does-not-exist") + + +# --------------------------------------------------------------------------- # +# list_keys_with_usage (usage queries stubbed -> their duckdb bodies uncovered) +# --------------------------------------------------------------------------- # + + +class TestListKeysWithUsage: + _TWO_KEY_YAML = ( + "keys:\n" + ' - key_id: "acme-zzz-agent-11112222"\n' + ' key: "zzz-plain"\n' + ' name: "Zzz Agent"\n' + ' tenant: "acme"\n' + " rate_limit_rpm: 100\n" + ' created_at: "2026-04-10"\n' + ' - key_id: "acme-aaa-agent-33334444"\n' + ' key_hash: "aaa-hash"\n' + ' key_lookup: "aaa-lk"\n' + ' name: "Aaa Agent"\n' + ' tenant: "acme"\n' + " rate_limit_rpm: 50\n" + ' allowed_entity_types: ["order"]\n' + ' created_at: "2026-04-11"\n' + ) + + def test_listing_is_sorted_maps_fields_and_omits_plaintext( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch, seed=self._TWO_KEY_YAML) + try: + rotator = manager._key_rotator + monkeypatch.setattr(rotator, "_usage_by_key", lambda: {("acme", "Zzz Agent"): 12}) + monkeypatch.setattr( + rotator, "old_key_usage_by_key_id", lambda: {"acme-aaa-agent-33334444": 3} + ) + + items = manager.list_keys_with_usage() + + # sorted by (tenant, name): "Aaa Agent" before "Zzz Agent". + assert [it["name"] for it in items] == ["Aaa Agent", "Zzz Agent"] + aaa, zzz = items + # No plaintext key field anywhere. + assert "key" not in aaa + assert "key" not in zzz + # Every documented field is present under its exact name. + assert aaa["key_id"] == "acme-aaa-agent-33334444" + assert zzz["key_id"] == "acme-zzz-agent-11112222" + assert aaa["key_hash_present"] is True # hashed entry + assert zzz["key_hash_present"] is False # plaintext-only entry + assert aaa["allowed_entity_types"] == ["order"] + assert zzz["allowed_entity_types"] is None + # 24h usage keyed by (tenant, name); old-key usage keyed by key_id. + assert zzz["requests_last_24h"] == 12 + assert aaa["requests_last_24h"] == 0 + assert aaa["requests_on_old_key_last_hour"] == 3 + assert zzz["requests_on_old_key_last_hour"] == 0 + # Field mapping is faithful. + assert zzz["tenant"] == "acme" + assert zzz["rate_limit_rpm"] == 100 + assert aaa["rate_limit_rpm"] == 50 + assert zzz["rotation_phase"] == "idle" + assert zzz["created_at"] == "2026-04-10" + finally: + manager.shutdown() + + def test_listing_falls_back_to_keys_by_value_when_loaded_keys_empty( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + # When _loaded_keys is falsy the source is list(keys_by_value.values()); + # a mutant that calls list(None) raises here (kills the fallback mutant). + rotator = manager._key_rotator + monkeypatch.setattr(rotator, "_usage_by_key", lambda: {}) + monkeypatch.setattr(rotator, "old_key_usage_by_key_id", lambda: {}) + manager._loaded_keys = [] + manager.keys_by_value = {"rotation-acme-key": _tk(key="rotation-acme-key", name="Solo")} + items = manager.list_keys_with_usage() + assert [it["name"] for it in items] == ["Solo"] + + +# --------------------------------------------------------------------------- # +# rotation-grace state helpers +# --------------------------------------------------------------------------- # + + +class TestPreviousKeyActive: + def test_inactive_when_no_previous_hash( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + item = _tk(previous_key_active_until=datetime.now(UTC) + timedelta(hours=1)) + assert manager._key_rotator.is_previous_key_active(item) is False + + def test_inactive_when_no_active_until( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + item = _tk(previous_key_hash="prev") + assert manager._key_rotator.is_previous_key_active(item) is False + + def test_inactive_when_expired(self, tmp_path: object, monkeypatch: pytest.MonkeyPatch) -> None: + manager = _build_manager(tmp_path, monkeypatch) + item = _tk( + previous_key_hash="prev", + previous_key_active_until=datetime.now(UTC) - timedelta(hours=1), + ) + assert manager._key_rotator.is_previous_key_active(item) is False + + def test_active_when_hash_and_future_until( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + item = _tk( + previous_key_hash="prev", + previous_key_active_until=datetime.now(UTC) + timedelta(hours=1), + ) + assert manager._key_rotator.is_previous_key_active(item) is True + + def test_rotation_phase_reflects_active_state( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + active = _tk( + previous_key_hash="prev", + previous_key_active_until=datetime.now(UTC) + timedelta(hours=1), + ) + idle = _tk(previous_key_hash=None) + assert manager._key_rotator.rotation_phase(active) == "grace_period" + assert manager._key_rotator.rotation_phase(idle) == "idle" + + +class TestClearPreviousKey: + def test_clears_all_three_previous_fields( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + item = _tk( + key_hash="cur", + previous_key_hash="prev", + previous_key_lookup="prev-lk", + previous_key_active_until=datetime.now(UTC) + timedelta(hours=1), + ) + cleared = manager._key_rotator.clear_previous_key(item) + assert cleared.previous_key_hash is None + assert cleared.previous_key_lookup is None + assert cleared.previous_key_active_until is None + # unrelated fields preserved + assert cleared.key_hash == "cur" + + +class TestCleanupExpiredRotations: + def test_clears_only_expired_previous_keys( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + expired = _tk( + key_id="x-1", + key_hash="c1", + previous_key_hash="p1", + previous_key_active_until=datetime.now(UTC) - timedelta(hours=1), + ) + live = _tk( + key_id="x-2", + key_hash="c2", + previous_key_hash="p2", + previous_key_active_until=datetime.now(UTC) + timedelta(hours=1), + ) + config = ApiKeysConfig(keys=[expired, live]) + changed = manager._key_rotator.cleanup_expired_rotations(config) + assert changed is True + assert config.keys[0].previous_key_hash is None # expired cleared + assert config.keys[1].previous_key_hash == "p2" # live untouched + + def test_returns_false_when_nothing_expired( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + live = _tk( + key_id="x-2", + key_hash="c2", + previous_key_hash="p2", + previous_key_active_until=datetime.now(UTC) + timedelta(hours=1), + ) + config = ApiKeysConfig(keys=[live]) + assert manager._key_rotator.cleanup_expired_rotations(config) is False + + +# --------------------------------------------------------------------------- # +# index helpers +# --------------------------------------------------------------------------- # + + +class TestEnsureKeyIds: + def test_assigns_ids_to_idless_entries_accumulating_existing( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + # generate_key_id is stubbed (its real `while True` loop must stay + # uncovered) but RECORDS args: each id-less entry is generated with its + # own tenant/name and an existing-id set that grows as ids are assigned. + id_calls: list = [] + monkeypatch.setattr( + manager._key_rotator, "generate_key_id", _rec_gen_id(id_calls, ["gen-1", "gen-2"]) + ) + keyed = _tk(key_id="keep-id", key_hash="h0") + first = _tk(key_id=None, key_hash="h1", tenant="t1", name="n1") + second = _tk(key_id=None, key_hash="h2", tenant="t2", name="n2") + config = ApiKeysConfig(keys=[keyed, first, second]) + + changed = manager._key_rotator.ensure_key_ids(config) + + assert changed is True + assert config.keys[0].key_id == "keep-id" # untouched + assert config.keys[1].key_id == "gen-1" + assert config.keys[2].key_id == "gen-2" + # tenant/name forwarded correctly; existing-id set seeded with keep-id and + # then accumulates the freshly assigned gen-1. + assert id_calls[0] == ("t1", "n1", {"keep-id"}) + assert id_calls[1] == ("t2", "n2", {"keep-id", "gen-1"}) + + def test_returns_false_when_all_have_ids( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + called = {"n": 0} + + def _boom(*_a: object, **_k: object) -> str: + called["n"] += 1 + return "never" + + monkeypatch.setattr(manager._key_rotator, "generate_key_id", _boom) + config = ApiKeysConfig(keys=[_tk(key_id="a", key_hash="h"), _tk(key_id="b", key_hash="h")]) + assert manager._key_rotator.ensure_key_ids(config) is False + assert called["n"] == 0 # no id generated when every entry has one + + +class TestFindKeyIndex: + def test_returns_matching_index( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + config = ApiKeysConfig(keys=[_tk(key_id="a", key_hash="h"), _tk(key_id="b", key_hash="h")]) + assert manager._key_rotator.find_key_index(config, "b") == 1 + + def test_returns_none_when_absent( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + config = ApiKeysConfig(keys=[_tk(key_id="a", key_hash="h")]) + assert manager._key_rotator.find_key_index(config, "missing") is None + + +# --------------------------------------------------------------------------- # +# cleanup-timer scheduling / cancellation (Timer is the record-only fake) +# --------------------------------------------------------------------------- # + + +class TestScheduleRotationCleanup: + def test_noop_without_key_id(self, tmp_path: object, monkeypatch: pytest.MonkeyPatch) -> None: + manager = _build_manager(tmp_path, monkeypatch) + item = _tk( + key_id=None, + key_hash="h", + previous_key_active_until=datetime.now(UTC) + timedelta(seconds=300), + ) + manager._key_rotator.schedule_rotation_cleanup(item) + assert manager._rotation_cleanup_timers == {} + + def test_noop_without_active_until( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + item = _tk(key_id="k-1", key_hash="h", previous_key_active_until=None) + manager._key_rotator.schedule_rotation_cleanup(item) + assert manager._rotation_cleanup_timers == {} + + def test_noop_when_already_expired( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + item = _tk( + key_id="k-1", + key_hash="h", + previous_key_active_until=datetime.now(UTC) - timedelta(seconds=1), + ) + manager._key_rotator.schedule_rotation_cleanup(item) + assert manager._rotation_cleanup_timers == {} + + def test_schedules_timer_wired_to_expire_with_key_id_arg( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + item = _tk( + key_id="k-1", + key_hash="h", + previous_key_active_until=datetime.now(UTC) + timedelta(seconds=300), + ) + manager._key_rotator.schedule_rotation_cleanup(item) + timer = manager._rotation_cleanup_timers["k-1"] + assert isinstance(timer, _FakeTimer) + assert timer.started is True + assert timer.daemon is True + assert 0 < timer.delay <= 300 + # callback + args wiring pinned (kills function=None / args=None / dropped). + assert timer.function == manager._key_rotator.expire_previous_key + assert timer.args == ("k-1",) + + def test_reschedule_cancels_only_the_same_keys_prior_timer( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + # schedule_rotation_cleanup cancels the existing timer for THIS key BEFORE + # scheduling the new one -- cancel(item.key_id), not cancel(None). A + # cancel(None) mutant would fall through to the cancel-ALL branch and also + # kill an unrelated sibling timer, so the sibling must survive untouched. + old = _FakeTimer(300, None) + sibling = _FakeTimer(300, None) + manager._rotation_cleanup_timers["k-1"] = old + manager._rotation_cleanup_timers["k-2"] = sibling + item = _tk( + key_id="k-1", + key_hash="h", + previous_key_active_until=datetime.now(UTC) + timedelta(seconds=300), + ) + manager._key_rotator.schedule_rotation_cleanup(item) + assert old.cancelled is True + assert manager._rotation_cleanup_timers["k-1"] is not old + assert sibling.cancelled is False # cancel(key_id), not cancel-all + assert manager._rotation_cleanup_timers["k-2"] is sibling + + +class TestCancelRotationCleanupTimers: + def test_cancels_single_named_timer( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + t1, t2 = _FakeTimer(1, None), _FakeTimer(1, None) + manager._rotation_cleanup_timers = {"a": t1, "b": t2} + manager._key_rotator.cancel_rotation_cleanup_timers("a") + assert t1.cancelled is True + assert "a" not in manager._rotation_cleanup_timers + assert "b" in manager._rotation_cleanup_timers # untouched + assert t2.cancelled is False + + def test_unknown_named_timer_is_noop( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + manager._rotation_cleanup_timers = {} + manager._key_rotator.cancel_rotation_cleanup_timers("missing") # pop(None) must not raise + + def test_cancels_all_when_no_id( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + t1, t2 = _FakeTimer(1, None), _FakeTimer(1, None) + manager._rotation_cleanup_timers = {"a": t1, "b": t2} + manager._key_rotator.cancel_rotation_cleanup_timers() + assert t1.cancelled is True + assert t2.cancelled is True + assert manager._rotation_cleanup_timers == {} + + +# --------------------------------------------------------------------------- # +# expire_previous_key +# --------------------------------------------------------------------------- # + + +class TestExpirePreviousKey: + def test_unknown_id_is_silently_swallowed( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + # revoke_old_key raises KeyError for an unknown id; expire swallows it. + manager._key_rotator.expire_previous_key("does-not-exist") + + def test_revokes_old_after_rotation( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + rotator = manager._key_rotator + monkeypatch.setattr( + rotator, "generate_key", lambda tenant, name: "af-prod-acme-rotation-agent-" + "y" * 43 + ) + monkeypatch.setattr(kr, "hash_api_key", _rec_hash([])) + monkeypatch.setattr(kr, "compute_key_lookup", _rec_lookup()) + manager.rotate_key(SEED_KEY_ID) + + rotator.expire_previous_key(SEED_KEY_ID) + assert rotator.rotation_phase(manager._keys_by_id[SEED_KEY_ID]) == "idle" + + def test_unexpected_error_is_logged_not_raised( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + rotator = manager._key_rotator + + def _boom(_key_id: str) -> bool: + raise RuntimeError("disk gone") + + monkeypatch.setattr(rotator, "revoke_old_key", _boom) + import src.serving.api.auth as auth_pkg + + warnings: list[tuple[str, dict]] = [] + monkeypatch.setattr( + auth_pkg.logger, "warning", lambda event, **kw: warnings.append((event, kw)) + ) + # A non-KeyError must be caught and logged, never propagated. + rotator.expire_previous_key("k-1") + assert warnings + event, kwargs = warnings[0] + assert event == "api_key_rotation_cleanup_failed" + assert kwargs["key_id"] == "k-1" + assert "disk gone" in kwargs["error"] + + +# --------------------------------------------------------------------------- # +# validate_generated_key +# --------------------------------------------------------------------------- # + + +class TestValidateGeneratedKey: + def test_none_does_not_raise(self, tmp_path: object, monkeypatch: pytest.MonkeyPatch) -> None: + manager = _build_manager(tmp_path, monkeypatch) + manager._key_rotator.validate_generated_key(None) + + def test_short_key_raises(self, tmp_path: object, monkeypatch: pytest.MonkeyPatch) -> None: + manager = _build_manager(tmp_path, monkeypatch) + manager.security_policy.min_key_length = 20 + with pytest.raises(ValueError, match="below min_key_length"): + manager._key_rotator.validate_generated_key("x" * 19) + + def test_exactly_min_length_is_allowed( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + # len < min raises; len == min must pass (pins the strict `<`). + manager.security_policy.min_key_length = 20 + manager._key_rotator.validate_generated_key("x" * 20) + + +# --------------------------------------------------------------------------- # +# _storage_payload +# --------------------------------------------------------------------------- # + + +class TestStoragePayload: + def test_hashed_entry_drops_plaintext_and_json_serialises_dates( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + item = _tk(key="plain", key_hash="the-hash") + payload = manager._key_rotator._storage_payload(item) + assert payload["key_hash"] == "the-hash" + assert "key" not in payload # plaintext stripped when a hash is present + # mode="json" -> the date is serialised to an ISO string, not a date + # object (kills mode=None / dropped-mode / re-cased-mode mutants). + assert payload["created_at"] == "2026-01-01" + assert isinstance(payload["created_at"], str) + + def test_plaintext_only_entry_keeps_key( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + item = _tk(key="plain", key_hash=None) + payload = manager._key_rotator._storage_payload(item) + assert payload["key"] == "plain" + assert "key_hash" not in payload # exclude_none drops the absent hash + + +# --------------------------------------------------------------------------- # +# write_config +# --------------------------------------------------------------------------- # + + +class TestWriteConfig: + def test_without_path_raises(self, tmp_path: object, monkeypatch: pytest.MonkeyPatch) -> None: + manager = _build_manager(tmp_path, monkeypatch) + manager.api_keys_path = None + # Anchored so an XX-wrapped / re-cased message mutant is killed. + with pytest.raises( + RuntimeError, + match=r"^AGENTFLOW_API_KEYS_FILE must be configured for key management\.$", + ): + manager._key_rotator.write_config(ApiKeysConfig(keys=[])) + + def test_writes_block_yaml_in_field_order_without_plaintext( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + import yaml + + config = ApiKeysConfig(keys=[_tk(key="plain", key_hash="h", key_id="id-1")]) + manager._key_rotator.write_config(config) + text = manager.api_keys_path.read_text(encoding="utf-8") + # Block-style YAML, not JSON (kills the `yaml is not None` flip that would + # fall through to json.dumps -- which safe_load would still parse). + assert not text.lstrip().startswith("{") + # sort_keys=False -> insertion (model field) order, so key_id precedes the + # alphabetically-earlier created_at (kills sort_keys=True/None/dropped). + assert text.index("key_id") < text.index("created_at") + data = yaml.safe_load(text) + [entry] = data["keys"] + assert entry["key_id"] == "id-1" + assert entry["key_hash"] == "h" + assert "key" not in entry + + def test_creates_missing_parent_directories( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + # parents=True must create the full chain; a parents=False/dropped mutant + # raises FileNotFoundError on the missing intermediate dir. + manager = _build_manager(tmp_path, monkeypatch) + deep = tmp_path / "newa" / "newb" / "api_keys.yaml" # type: ignore[operator] + manager.api_keys_path = deep + manager._key_rotator.write_config(ApiKeysConfig(keys=[_tk(key="p", key_id="id-1")])) + assert deep.exists() + + +# --------------------------------------------------------------------------- # +# rotate_all_keys (module-level) +# --------------------------------------------------------------------------- # + + +class TestRotateAllKeys: + def test_rotates_every_keyed_entry_and_skips_idless( + self, tmp_path: object, monkeypatch: pytest.MonkeyPatch + ) -> None: + manager = _build_manager(tmp_path, monkeypatch) + monkeypatch.setattr( + manager, + "list_keys_with_usage", + lambda: [{"key_id": "a"}, {"key_id": None}, {"key_id": "b"}], + ) + rotated_ids: list[str] = [] + + def _rotate(key_id: str) -> tuple[TenantKey, datetime]: + rotated_ids.append(key_id) + return _tk(key_hash="h"), datetime.now(UTC) + + monkeypatch.setattr(manager, "rotate_key", _rotate) + + rotated = rotate_all_keys(manager) + assert rotated_ids == ["a", "b"] # None key_id skipped + assert len(rotated) == 2 diff --git a/tests/unit/test_mutmut_policy.py b/tests/unit/test_mutmut_policy.py index 4d26f56..c54eeaa 100644 --- a/tests/unit/test_mutmut_policy.py +++ b/tests/unit/test_mutmut_policy.py @@ -18,12 +18,12 @@ # assembled here. # NOTE: these are the *declared* targets (intent). Actual mutation execution is # gated by scripts/mutation_report.py (MODULE_TARGETS), which now runs retry.py, -# sql_guard.py, masking.py, rate_limiter.py, sql_builder.py AND nl_queries.py live -# (the serving modules via duckdb-free narrow tests, mutated as a top-level -# `serving` package so mutmut's trampoline accepts them). The remaining declared -# serving surface -- auth manager / key_rotation -- stays declared-only until it -# gets duckdb-free unit tests of its own. These assertions guard the declared -# policy, not live coverage. +# sql_guard.py, masking.py, rate_limiter.py, sql_builder.py, nl_queries.py AND +# auth/manager.py live (the serving modules via duckdb-free narrow tests, mutated +# as a top-level `serving` package so mutmut's trampoline accepts them). The only +# remaining declared serving surface -- auth/key_rotation -- stays declared-only +# until it gets a duckdb-free unit test of its own. These assertions guard the +# declared policy, not live coverage. REQUIRED_MUTATION_TARGETS = { "src/serving/semantic_layer/sql_guard.py", "src/serving/api/auth/manager.py",