-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrun_data_audit.py
More file actions
146 lines (121 loc) · 5.24 KB
/
run_data_audit.py
File metadata and controls
146 lines (121 loc) · 5.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#!/usr/bin/env python3
"""
Data Integrity Audit Runner
Issue: #184 — [Hardening 2/7] Automated Data Integrity Audits (Nightly)
Connects to Supabase via REST API, runs the master audit function,
stores results in audit_results table, and generates a JSON report.
Exit code:
0 = no critical findings
1 = critical findings detected (requires review)
Environment variables:
SUPABASE_URL - Supabase project URL (required)
SUPABASE_SERVICE_KEY - Service role key (required, bypasses RLS)
"""
import json
import os
import sys
import uuid
from datetime import UTC, datetime
import requests
def get_env(name: str) -> str:
"""Get required environment variable or exit."""
value = os.environ.get(name)
if not value:
print(f"ERROR: {name} environment variable is required", file=sys.stderr)
sys.exit(2)
return value
def supabase_headers(service_key: str) -> dict:
"""Build headers for Supabase REST API."""
return {
"apikey": service_key,
"Authorization": f"Bearer {service_key}",
"Content-Type": "application/json",
"Prefer": "return=minimal",
}
def run_audit() -> None:
"""Execute the full data integrity audit."""
url = get_env("SUPABASE_URL")
key = get_env("SUPABASE_SERVICE_KEY")
headers = supabase_headers(key)
run_id = str(uuid.uuid4())
timestamp = datetime.now(UTC).isoformat()
# ── Execute master audit via RPC ──────────────────────────────────────
print(f"Connecting to {url.split('//')[1].split('.')[0]}...")
rpc_url = f"{url}/rest/v1/rpc/run_full_data_audit"
resp = requests.post(rpc_url, headers=headers, json={}, timeout=120)
if resp.status_code != 200:
print(f"ERROR: RPC call failed with status {resp.status_code}", file=sys.stderr)
print(resp.text, file=sys.stderr)
sys.exit(2)
findings = resp.json() or []
# ── Classify findings ─────────────────────────────────────────────────
critical = [f for f in findings if f["severity"] == "critical"]
warnings = [f for f in findings if f["severity"] == "warning"]
infos = [f for f in findings if f["severity"] == "info"]
# ── Store results in audit_results table ──────────────────────────────
if findings:
insert_url = f"{url}/rest/v1/audit_results"
rows = [
{
"run_id": run_id,
"run_timestamp": timestamp,
"check_name": f["check_name"],
"severity": f["severity"],
"product_id": f.get("product_id"),
"product_name": f.get("product_name"),
"ean": f.get("ean"),
"details": f.get("details"),
}
for f in findings
]
store_resp = requests.post(insert_url, headers=headers, json=rows, timeout=60)
if store_resp.status_code not in (200, 201):
print(
f"WARNING: Failed to store results (status {store_resp.status_code})",
file=sys.stderr,
)
# ── Generate JSON report ──────────────────────────────────────────────
report = {
"run_id": run_id,
"timestamp": timestamp,
"summary": {
"total_findings": len(findings),
"critical": len(critical),
"warnings": len(warnings),
"info": len(infos),
},
"findings": findings,
}
os.makedirs("audit-reports", exist_ok=True)
date_str = timestamp[:10]
report_path = f"audit-reports/audit_{date_str}.json"
with open(report_path, "w", encoding="utf-8") as f:
json.dump(report, f, indent=2, default=str)
# ── Print summary ─────────────────────────────────────────────────────
sep = "=" * 60
print(f"\n{sep}")
print(f"DATA INTEGRITY AUDIT — {timestamp}")
print(sep)
print(f"Run ID: {run_id}")
print(f"Total findings: {len(findings)}")
print(f" Critical: {len(critical)}")
print(f" Warning: {len(warnings)}")
print(f" Info: {len(infos)}")
if critical:
print(f"\n{'—' * 40}")
print("CRITICAL FINDINGS:")
for finding in critical[:20]:
name = finding.get("product_name", "N/A")
ean_val = finding.get("ean", "N/A")
print(f" * {finding['check_name']}: {name} (EAN: {ean_val})")
print(f" Details: {json.dumps(finding.get('details', {}), indent=4)}")
print(f"\nReport saved to: {report_path}")
# ── Exit code ─────────────────────────────────────────────────────────
if critical:
print(f"\n {len(critical)} CRITICAL findings — review required!")
sys.exit(1)
else:
print("\n No critical findings.")
sys.exit(0)
if __name__ == "__main__":
run_audit()