-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathvalidate_soa.py
More file actions
139 lines (117 loc) · 4.5 KB
/
validate_soa.py
File metadata and controls
139 lines (117 loc) · 4.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#!/usr/bin/env python
"""Validate normalized SoA artifacts.
Checks implemented:
1. Imaging schedule: baseline + expected Week 6, 12, 18 imaging occurrences.
2. Interval consistency: approximate midpoints of windows between imaging visits should align with expected ~6 week spacing (tolerance configurable).
Exit code: 0 if all checks pass, 1 if any fail.
"""
from __future__ import annotations
import argparse
import math
import os
import re
import sys
import pandas as pd
IMAGING_KEYWORDS = ["imaging (ct/mri", "imaging"]
EXPECTED_IMAGING_COUNT = 4 # baseline + week6 + week12 + week18
EXPECTED_INTERVAL_WEEKS = 6
WEEK_TO_DAYS = 7
DEFAULT_TOLERANCE_DAYS = 10 # allow variability due to windows
def load_normalized(dir_path: str):
visits = pd.read_csv(os.path.join(dir_path, "visits.csv"))
activities = pd.read_csv(os.path.join(dir_path, "activities.csv"))
va = pd.read_csv(os.path.join(dir_path, "visit_activities.csv"))
return visits, activities, va
def find_imaging_activity_ids(activities: pd.DataFrame):
ids = []
for _, row in activities.iterrows():
name = row["activity_name"].lower()
if any(k in name for k in IMAGING_KEYWORDS):
ids.append(row["activity_id"])
return ids
WEEK_RE = re.compile(r"week\s*(\d+)", re.IGNORECASE)
DAY_RE = re.compile(r"day\s*(\d+)", re.IGNORECASE)
def derive_nominal_day(row) -> float:
"""Attempt to infer a nominal study day for a visit.
Priority:
1. Week N -> N*7
2. Day N -> N
3. Screening -> 0 (treat baseline reference)
4. Use window midpoint if both bounds present
5. Fallback to sequence_index * 7 (coarse)
"""
name = str(row.get("visit_name", "")).lower()
header = str(row.get("label", "")).lower()
for text in (name, header):
m = WEEK_RE.search(text)
if m:
return int(m.group(1)) * 7
m = DAY_RE.search(text)
if m:
return int(m.group(1))
if "screening" in name:
return 0.0
wl = row.get("window_lower")
wu = row.get("window_upper")
if math.isfinite(wl) and math.isfinite(wu):
return (wl + wu) / 2.0
# Coarse fallback: assume each sequence index spaced a week
return float(row["sequence_index"]) * 7.0
def validate_imaging(
visits: pd.DataFrame, va: pd.DataFrame, imaging_ids: list[int], tolerance_days: int
):
if not imaging_ids:
return False, ["No imaging activity detected."]
imaging_va = va[va["activity_id"].isin(imaging_ids) & (va["required_flag"] == 1)]
if imaging_va.empty:
return False, ["No required imaging entries found."]
# Merge to get visit info
imaging_visits = imaging_va.merge(
visits, left_on="visit_id", right_on="visit_id", how="left"
)
imaging_visits = imaging_visits.sort_values("sequence_index")
errors = []
if len(imaging_visits) < EXPECTED_IMAGING_COUNT:
errors.append(
f"Expected >= {EXPECTED_IMAGING_COUNT} required imaging visits; found {len(imaging_visits)}."
)
# Compute intervals
nominal_days = [derive_nominal_day(r) for _, r in imaging_visits.iterrows()]
for i in range(1, len(nominal_days)):
delta = nominal_days[i] - nominal_days[i - 1]
expected_days = EXPECTED_INTERVAL_WEEKS * WEEK_TO_DAYS
if abs(delta - expected_days) > tolerance_days:
vnames = imaging_visits.iloc[[i - 1, i]]["visit_name"].tolist()
errors.append(
f"Imaging interval between {vnames[0]} and {vnames[1]} is ~{delta:.1f}d (expected ~{expected_days}d ±{tolerance_days}d)."
)
return len(errors) == 0, errors
def main():
ap = argparse.ArgumentParser(description="Validate normalized SoA schedule")
ap.add_argument(
"--dir", required=True, help="Directory containing normalized CSV outputs"
)
ap.add_argument(
"--tolerance-days",
type=int,
default=DEFAULT_TOLERANCE_DAYS,
help="Tolerance for imaging interval deviations",
)
args = ap.parse_args()
visits, activities, va = load_normalized(args.dir)
imaging_ids = find_imaging_activity_ids(activities)
imaging_ok, imaging_errors = validate_imaging(
visits, va, imaging_ids, args.tolerance_days
)
overall_ok = imaging_ok
if overall_ok:
print("VALIDATION PASSED")
else:
print("VALIDATION FAILED")
if imaging_errors:
print("Imaging checks:")
for e in imaging_errors:
print(f" - {e}")
sys.exit(0 if overall_ok else 1)
if __name__ == "__main__":
main()