Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pipeline/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,12 @@ def run_step_2_preprocess(
input_path = input_dir / input_file
df_raw = preprocess.read_input(input_path)
mapped_df, column_mapping = preprocess.map_columns(df_raw)
df_filtered = preprocess.filter_columns(mapped_df)
df_filtered = preprocess.filter_columns(mapped_df, keep_columns=["AGE"])
df = preprocess.ensure_required_columns(df_filtered)

# Check that addresses are complete, return only complete rows
df = preprocess.check_addresses_complete(df)
df = preprocess.check_client_info_complete(df)

# Load configuration
vaccine_reference_path = preprocess.VACCINE_REFERENCE_PATH
Expand Down
97 changes: 92 additions & 5 deletions pipeline/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,17 +219,89 @@ def check_addresses_complete(df: pd.DataFrame) -> pd.DataFrame:
"There are %d records with incomplete address information.",
incomplete_count,
)
print(
f"⚠️ There are {incomplete_count} records with incomplete address information."
)

incomplete_records = df.loc[~df["address_complete"]]

incomplete_path = Path("output/incomplete_addresses.csv")
incomplete_records.to_csv(incomplete_path, index=False)
LOG.info("Incomplete address records written to %s", incomplete_path)
print(f"Incomplete address records written to {incomplete_path}")

# Return only rows with complete addresses
return df.loc[df["address_complete"]].drop(columns=["address_complete"])


def check_client_info_complete(df: pd.DataFrame) -> pd.DataFrame:
"""
Check if client fields are complete in the DataFrame.

Adds a boolean 'client_info_complete' column based on presence of
first name, last name, DOB, school name, overdue disease, immunizations given, and client ID.

Check that client ID is valid (10 digits), and recover if in float format.
"""

df = df.copy()

# Normalize text fields: convert to string, strip whitespace, convert "" to NA
client_info_cols = [
"SCHOOL_NAME",
"CLIENT_ID",
"FIRST_NAME",
"LAST_NAME",
"DATE_OF_BIRTH",
"OVERDUE_DISEASE",
"IMMS_GIVEN",
]

for col in client_info_cols:
df[col] = df[col].astype(str).str.strip().replace({"": pd.NA, "nan": pd.NA})

# Check client ID formatting - expects 10-digit number, recover if in float format with '.0'
df["CLIENT_ID"] = df["CLIENT_ID"].str.replace(r"\.0$", "", regex=True)
df["client_id_valid"] = df["CLIENT_ID"].str.isdigit() & (
df["CLIENT_ID"].str.len() == 10
)
if len(df[~df["client_id_valid"]]) > 0:
print(f"Invalid client IDs: {len(df[~df['client_id_valid']])}")

# Check completeness
df["client_info_complete"] = (
df["FIRST_NAME"].notna()
& df["LAST_NAME"].notna()
& df["CLIENT_ID"].notna()
& df["DATE_OF_BIRTH"].notna()
& df["OVERDUE_DISEASE"].notna()
& df["IMMS_GIVEN"].notna()
& df["client_id_valid"]
)

df = df.drop(columns=["client_id_valid"])

if not df["client_info_complete"].all():
incomplete_count = (~df["client_info_complete"]).sum()
LOG.warning(
"There are %d records with incomplete/invalid client information.",
incomplete_count,
)
print(
f"⚠️ There are {incomplete_count} total records with incomplete/invalid client information."
)

incomplete_records = df.loc[~df["client_info_complete"]]

incomplete_path = Path("output/incomplete_clients.csv")
incomplete_records.to_csv(incomplete_path, index=False)
LOG.info("Incomplete client records written to %s", incomplete_path)
print(f"Incomplete client records written to {incomplete_path}")

# Return only rows with complete addresses
return df.loc[df["client_info_complete"]].drop(columns=["client_info_complete"])


def convert_date_iso(date_str: str) -> str:
"""Convert a date from English display format to ISO format.

Expand Down Expand Up @@ -267,7 +339,11 @@ def over_16_check(date_of_birth, date_notice_delivery):
"""

birth_datetime = datetime.strptime(date_of_birth, "%Y-%m-%d")
delivery_datetime = datetime.strptime(date_notice_delivery, "%Y-%m-%d")

if isinstance(date_notice_delivery, datetime):
date_notice_delivery = str(date_notice_delivery.date())

delivery_datetime = datetime.strptime(str(date_notice_delivery), "%Y-%m-%d")

age = delivery_datetime.year - birth_datetime.year

Expand Down Expand Up @@ -455,11 +531,20 @@ def map_columns(df: pd.DataFrame, required_columns=REQUIRED_COLUMNS):
if score >= THRESHOLD: # adjustable threshold
# Map the original column name, not the normalized one
actual_in_col = next(c for c in input_cols if normalize(c) == input_col)
col_map[actual_in_col] = best_match
# col_map[actual_in_col] = best_match

# print colname and score for debugging
print(f"Matching '{input_col}' to '{best_match}' with score {score}")

# Check if column already has an assigned mapping
if best_match not in col_map.values():
print(
f"The value {best_match} does not exist in the dictionary - adding value."
)
col_map[actual_in_col] = best_match
# else:
# print(f"The value {best_match} does not exist.")

return df.rename(columns=col_map), col_map


Expand All @@ -476,13 +561,15 @@ def filter_columns(


def filter_columns(
df: pd.DataFrame | None, required_columns: list[str] = REQUIRED_COLUMNS
df: pd.DataFrame | None,
required_columns: list[str] = REQUIRED_COLUMNS,
keep_columns: list[str] = [],
) -> pd.DataFrame | None:
"""Filter dataframe to only include required columns."""
if df is None or df.empty:
return df

return df[[col for col in df.columns if col in required_columns]]
return df[[col for col in df.columns if col in required_columns + keep_columns]]


def ensure_required_columns(df: pd.DataFrame) -> pd.DataFrame:
Expand Down Expand Up @@ -814,7 +901,7 @@ def build_preprocess_result(
if not pd.isna(row.AGE): # type: ignore[attr-defined]
over_16 = bool(row.AGE >= 16) # type: ignore[attr-defined]
elif dob_iso and date_notice_delivery:
over_16 = over_16_check(dob_iso, date_notice_delivery)
over_16 = over_16_check(dob_iso, datetime.now())
else:
over_16 = False

Expand Down