|
| 1 | +import importlib |
| 2 | +import json |
| 3 | +import sys |
| 4 | +import types |
| 5 | +import pytest |
| 6 | + |
| 7 | + |
| 8 | +def setup_api(tmp_path, monkeypatch): |
| 9 | + monkeypatch.chdir(tmp_path) |
| 10 | + monkeypatch.setenv("RULES_PATH", str(tmp_path / "rules.json")) |
| 11 | + monkeypatch.setenv("PROPOSED_RULES_PATH", str(tmp_path / "rules_proposed.json")) |
| 12 | + api = importlib.import_module("src.api") |
| 13 | + importlib.reload(api) |
| 14 | + return api |
| 15 | + |
| 16 | + |
| 17 | +def test_policy_engine_env_and_enabled(): |
| 18 | + from src.policy.engine import PolicyEngine, Event, Rule |
| 19 | + |
| 20 | + rules = [ |
| 21 | + Rule(id="skip-env", target="table", selector="dbo.Env", action="block", reason="env", enabled=True), |
| 22 | + Rule(id="only-dev", target="table", selector="dbo.Dev", action="block", reason="env", enabled=True), |
| 23 | + Rule(id="disabled", target="table", selector="dbo.Off", action="block", reason="off", enabled=False), |
| 24 | + ] |
| 25 | + # Attach apply_in_envs dynamically to simulate Pydantic model fields |
| 26 | + setattr(rules[1], "apply_in_envs", ["dev"]) # only in dev |
| 27 | + |
| 28 | + pe = PolicyEngine(rules, environment="prod") |
| 29 | + # 'skip-env' not restricted by env, so it can match |
| 30 | + d1 = pe.decide(Event(None, None, None, "dbo.Env", None, None)) |
| 31 | + assert d1.rule_id == "skip-env" |
| 32 | + # 'only-dev' is restricted to dev and should be skipped in prod |
| 33 | + d2 = pe.decide(Event(None, None, None, "dbo.Dev", None, None)) |
| 34 | + assert d2.rule_id != "only-dev" |
| 35 | + # 'disabled' should never match |
| 36 | + d3 = pe.decide(Event(None, None, None, "dbo.Off", None, None)) |
| 37 | + assert d3.rule_id != "disabled" |
| 38 | + |
| 39 | + |
| 40 | +def test_metrics_prom_success_branch(tmp_path, monkeypatch): |
| 41 | + api = setup_api(tmp_path, monkeypatch) |
| 42 | + # Provide a fake prometheus_client with required symbols |
| 43 | + fake = types.SimpleNamespace( |
| 44 | + generate_latest=lambda: b"metric 1\n", |
| 45 | + CONTENT_TYPE_LATEST="text/plain; version=0.0.4; charset=utf-8", |
| 46 | + ) |
| 47 | + monkeypatch.setitem(sys.modules, "prometheus_client", fake) # type: ignore[arg-type] |
| 48 | + resp = api.metrics_prom() |
| 49 | + # In shim Response, .content is set |
| 50 | + content = getattr(resp, "content", b"") |
| 51 | + assert b"metric 1" in content or "metric 1" in str(content) |
| 52 | + |
| 53 | + |
| 54 | +def test_rules_delete_success(tmp_path, monkeypatch): |
| 55 | + api = setup_api(tmp_path, monkeypatch) |
| 56 | + r = api.Rule(id="del", target="table", selector="dbo.X", action="allow") |
| 57 | + api.add_rule(r) |
| 58 | + out = api.delete_rule("del") |
| 59 | + assert out.get("deleted") == "del" |
| 60 | + |
| 61 | + |
| 62 | +def test_proposed_duplicate_conflict(tmp_path, monkeypatch): |
| 63 | + api = setup_api(tmp_path, monkeypatch) |
| 64 | + r = api.Rule(id="dup", target="pattern", selector="INSERT", action="block") |
| 65 | + api.add_rule_proposed(r) |
| 66 | + with pytest.raises(api.HTTPException) as ei: |
| 67 | + api.add_rule_proposed(r) |
| 68 | + assert getattr(ei.value, "status_code", 0) == 409 |
| 69 | + |
| 70 | + |
| 71 | +def test_rules_diff_changed(tmp_path, monkeypatch): |
| 72 | + api = setup_api(tmp_path, monkeypatch) |
| 73 | + cur = [api.Rule(id="same", target="table", selector="dbo.T", action="allow")] |
| 74 | + prop = [api.Rule(id="same", target="table", selector="dbo.T2", action="allow")] |
| 75 | + (tmp_path / "rules.json").write_text(json.dumps([r.model_dump() for r in cur]), encoding="utf-8") |
| 76 | + (tmp_path / "rules_proposed.json").write_text(json.dumps([r.model_dump() for r in prop]), encoding="utf-8") |
| 77 | + diff = api.rules_diff() |
| 78 | + assert any(d.get("id") == "same" for d in diff.get("changed", [])) |
| 79 | + |
| 80 | + |
| 81 | +def test_rules_test_default_path(tmp_path, monkeypatch): |
| 82 | + api = setup_api(tmp_path, monkeypatch) |
| 83 | + # No rules -> should allow by default |
| 84 | + ev = api._TestEvent(table="dbo.Nope", column=None, value=None, sql_text=None) |
| 85 | + out = api.rules_test(ev) |
| 86 | + assert out.get("action") == "allow" |
0 commit comments