Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 34 additions & 22 deletions scripts/helpers/housing_nec_migration_gx_dq_inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,40 @@

table_list = {
"properties": [
"properties_1a",
"properties_1b",
"properties_1c",
"properties_1d",
"properties_1e",
"properties_2a",
"properties_3a",
"properties_4a",
"properties_4b",
"properties_4c",
"properties_7a",
# "properties_1a",

Check warning on line 25 in scripts/helpers/housing_nec_migration_gx_dq_inputs.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this commented out code.

See more on https://sonarcloud.io/project/issues?id=LBHackney-IT_Data-Platform&issues=AZq6zDoZHrYHAQ2Vf2BP&open=AZq6zDoZHrYHAQ2Vf2BP&pullRequest=2571
# "properties_1b",
# "properties_1c",
# "properties_1d",
# "properties_1e",
# "properties_2a",
# "properties_3a",
# "properties_4a",
# "properties_4b",
# "properties_4c",
# "properties_7a",
"properties_all_tranches",
],
"tenancies": [
"tenancies_1a",
"tenancies_1c",
"tenancies_2a",
# "tenancies_1a",

Check warning on line 39 in scripts/helpers/housing_nec_migration_gx_dq_inputs.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this commented out code.

See more on https://sonarcloud.io/project/issues?id=LBHackney-IT_Data-Platform&issues=AZq6zDoZHrYHAQ2Vf2BQ&open=AZq6zDoZHrYHAQ2Vf2BQ&pullRequest=2571
# "tenancies_1c",
# "tenancies_2a",
# "tenancies_other",
"tenancies_all",
"tenancies_other",
],
"people": ["people_1a", "people_1b", "people_1c", "people_2a", "people_all"],
"contacts": ["contacts_1a", "contacts_1b", "contacts_2a", "contacts_all"],
"people": [
# "people_1a",
# "people_1b",
# "people_1c",
# "people_2a",
"people_all"
],
"contacts": [
# "contacts_1a",

Check warning on line 53 in scripts/helpers/housing_nec_migration_gx_dq_inputs.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this commented out code.

See more on https://sonarcloud.io/project/issues?id=LBHackney-IT_Data-Platform&issues=AZq6zDoZHrYHAQ2Vf2BR&open=AZq6zDoZHrYHAQ2Vf2BR&pullRequest=2571
# "contacts_1b",
# "contacts_1c",
# "contacts_2a",
"contacts_all",
],
"arrears_actions": [
"arrears_actions_1a",
"arrears_actions_1c",
Expand All @@ -57,13 +69,13 @@
"revenue_accounts_other",
],
"transactions": [
"transactions_1a",
"transactions_1c",
"transactions_2a",
"transactions_other",
# "transactions_1a",

Check warning on line 72 in scripts/helpers/housing_nec_migration_gx_dq_inputs.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this commented out code.

See more on https://sonarcloud.io/project/issues?id=LBHackney-IT_Data-Platform&issues=AZq6zDoZHrYHAQ2Vf2BS&open=AZq6zDoZHrYHAQ2Vf2BS&pullRequest=2571
# "transactions_1c",
# "transactions_2a",
# "transactions_other",
"transactions_all",
],
"addresses": ["addresses_1a", "addresses_2a"],
"addresses": ["addresses_1a", "addresses_1b", "addresses_2a"],
}

partition_keys = ["import_date"]
128 changes: 74 additions & 54 deletions scripts/jobs/housing/housing_nec_migration_apply_gx_dq_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import scripts.jobs.housing.housing_nec_migration_contacts_data_load_gx_suite
import scripts.jobs.housing.housing_nec_migration_arrears_actions_data_load_gx_suite
import scripts.jobs.housing.housing_nec_migration_revenue_accounts_data_load_gx_suite

# import scripts.jobs.housing.housing_nec_migration_transactions_data_load_gx_suite
import scripts.jobs.housing.housing_nec_migration_addresses_data_load_gx_suite

Expand Down Expand Up @@ -66,49 +67,52 @@ def main():
logger.info(f"{data_load} loading...")

for table in table_list.get(data_load):
logger.info(f"{table} loading...")
logger.info(f"{table} processing...")

try:
sql_query, id_field = get_sql_query(
sql_config=sql_config, data_load=data_load, table=table
)
conn = connect(
s3_staging_dir=s3_staging_location, region_name=region_name
)
df = pd.read_sql_query(sql_query, conn)
except Exception as e:
logger.error(f"SQL/Connection error for {table}: {e}. Skipping.")
continue # Skip to next table

conn = connect(s3_staging_dir=s3_staging_location, region_name=region_name)

try:
df = pd.read_sql_query(sql_query, conn)
except Exception as e:
logger.error(f"SQL Read Problem found with {table}: {e}, skipping table.")
continue

# set up batch
# --- STEP 2: GX ASSET SETUP ---
try:
data_source = context.data_sources.add_pandas(f"{table}_pandas")
data_asset = data_source.add_dataframe_asset(name=f"{table}_df_asset")
batch_definition = data_asset.add_batch_definition_whole_dataframe(
"Athena batch definition"
)
batch_parameters = {"dataframe": df}
except Exception as e:
logger.error(f"GX Asset Setup error for {table}: {e}. Skipping.")
continue
try:
suite = context.suites.get(name=f"{data_load}_data_load_suite")
expectations = suite.expectations # Get this now to ensure it exists
except Exception as e:
logger.error(
f"GX Suite retrieval error for {data_load}: {e}. Skipping."
)
continue

# get expectation suite for dataset
try:
suite = context.suites.get(name=f"{data_load}_data_load_suite")
except Exception as e:
logger.error(f"GX Suite Problem found with {data_load}: {e}, skipping suite.")
continue
else:
expectations = suite.expectations

# VALIDATION & CHECKPOINT
try:
validation_definition = gx.ValidationDefinition(
data=batch_definition,
suite=suite,
name=f"validation_definition_{table}",
)

# Use add_or_update to avoid duplicates
validation_definition = context.validation_definitions.add_or_update(
validation_definition
)

# create and start checking data with checkpoints
checkpoint = context.checkpoints.add_or_update(
gx.checkpoint.checkpoint.Checkpoint(
name=f"{table}_checkpoint",
Expand All @@ -120,13 +124,25 @@ def main():
},
)
)

checkpoint_result = checkpoint.run(batch_parameters=batch_parameters)
except Exception as e:
logger.error(f"Checkpoint Run error for {table}: {e}. Skipping.")
continue

# Logic to handle results
results_dict = list(checkpoint_result.run_results.values())[0].to_json_dict()
# PROCESS RESULTS
try:
results_dict = list(checkpoint_result.run_results.values())[
0
].to_json_dict()
table_results_df = pd.json_normalize(results_dict["results"])

# Guard clause: If table passed perfectly, results might be empty or structure different
if table_results_df.empty:
logger.info(
f"No results to process for {table} (possibly empty batch)."
)
continue

cols_not_needed = ["result.unexpected_list", "result.observed_value"]
cols_to_drop = [
c
Expand All @@ -137,64 +153,68 @@ def main():
table_results_df = table_results_df.drop(columns=cols_to_drop)
table_results_df_list.append(table_results_df)

# generate id lists for each unexpected result set
# Filter for rows that actually have unexpected indices
query_df = table_results_df.loc[
(~table_results_df["result.unexpected_index_list"].isna())
& (table_results_df["result.unexpected_index_list"].values != "[]")
]
& (
table_results_df["result.unexpected_index_list"].astype(str)
!= "[]"
)
]

table_results_df["unexpected_id_list"] = pd.Series(dtype="object")

for i, row in query_df.iterrows():
try:
# check this
list(df[id_field].iloc[row["result.unexpected_index_list"]])
except Exception as e:
logger.warning(
f"Problem mapping IDs for {table}: {e}. Proceeding without ID list."
)
continue
else:
table_results_df.loc[i, "unexpected_id_list"] = str(
list(df[id_field].iloc[row["result.unexpected_index_list"]])
indices = row["result.unexpected_index_list"]
# Safety check: Ensure indices are integers
if isinstance(indices, list):
mapped_ids = df[id_field].iloc[indices].tolist()
table_results_df.at[i, "unexpected_id_list"] = str(
mapped_ids
)
except KeyError:
logger.error(
f"ID Field '{id_field}' not found in DataFrame for {table}."
)
except IndexError:
logger.error(f"Indices out of bounds for {table}.")
except Exception as e:
logger.warning(f"Mapping error for {table} row {i}: {e}")

# drop columns not needed in metadata
# METADATA PROCESSING
cols_to_drop_meta = [
"notes",
"result_format",
"catch_exceptions",
"rendered_content",
"windows",
]

suite_df = pd.DataFrame()
for i in expectations:
temp_i = i
temp_df = pd.json_normalize(dict(temp_i))
temp_df["expectation_type"] = temp_i.expectation_type

# 'expectations' variable is guaranteed to exist from Step 3
for exp in expectations:
temp_df = pd.json_normalize(dict(exp))
temp_df["expectation_type"] = exp.expectation_type
temp_df["dataset_name"] = table
temp_df["expectation_id_full"] = temp_i.expectation_type + '_' + table
temp_df = temp_df.drop(columns=cols_to_drop_meta, errors='ignore') # errors='ignore' is safer
temp_df["expectation_id_full"] = f"{exp.expectation_type}_{table}"
temp_df = temp_df.drop(columns=cols_to_drop_meta, errors="ignore")
suite_df = pd.concat([suite_df, temp_df])

df_all_suite_list.append(suite_df)

except Exception as e:
logger.error(f"CRITICAL ERROR processing table '{table}': {str(e)}")
logger.error("Skipping this table and moving to the next.")
logger.error(f"Result Processing Error for {table}: {e}")
continue

# END LOOPS
if not table_results_df_list:
logger.error("No tables were processed successfully. Exiting.")
return

results_df = pd.concat(table_results_df_list)
metadata_df = pd.concat(df_all_suite_list)

# add expectation_id
# metadata_df["expectation_id"] = (
# metadata_df["expectation_type"] + "_" + metadata_df["dataset_name"]
# )
metadata_df["import_date"] = datetime.today().strftime("%Y%m%d")

# set dtypes for Athena with default of string
Expand All @@ -213,10 +233,10 @@ def main():
value=results_df.set_index(
["expectation_config.type", "dataset_name"]
).index.factorize()[0]
+ 1,
+ 1,
)
results_df["expectation_id"] = (
results_df["expectation_config.type"] + "_" + results_df["dataset_name"]
results_df["expectation_config.type"] + "_" + results_df["dataset_name"]
)
results_df["import_date"] = datetime.today().strftime("%Y%m%d")

Expand All @@ -230,7 +250,7 @@ def main():
"result.element_count": "bigint",
"result.unexpected_count": "bigint",
"result.missing_count": "bigint",
"result.details_mismatched": 'string',
"result.details_mismatched": "string",
"result.partial_unexpected_list": "array<string>",
"result.unexpected_index_list": "array<bigint>",
"result.unexpected_index_query": "string",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class ContactsExpectValueColumnValuesToNotBeNull(gxe.ExpectColumnValuesToNotBeNu


class ContactsExpectContactTypeCodeToBeInSet(gxe.ExpectColumnValuesToBeInSet):
column: str = "LCDE_FRV_CME_CODE,"
column: str = "LCDE_FRV_CME_CODE"
value_set: list = ["WORKTEL", "MOBILETEL", "HOMETEL", "EMAIL", "OTHER"]
description: str = "Expect contact type code to be one of the set"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class PeopleExpectTitleToBeInSet(gxe.ExpectColumnValuesToBeInSet):
"RABBI",
"REVEREND",
"SIR",
None,
"",
]
description: str = "Expect title to be one of the set"

Expand Down Expand Up @@ -69,7 +69,7 @@ class PeopleExpectPeopleColumnsToMatchOrderedList(gxe.ExpectTableColumnsToMatchO
"LPAR_PER_NI_NO",
"LPAR_PER_FRV_HGO_CODE",
"LPAR_PER_FRV_FNL_CODE",
"LPAR_PER_OTHER_NAME",
"LPAR_PER_OTHER_NAME"
]
description: str = "Expect people load columns to match ordered list exactly"

Expand Down