diff --git a/scripts/helpers/housing_nec_migration_gx_dq_inputs.py b/scripts/helpers/housing_nec_migration_gx_dq_inputs.py index ae4620d04..ecfddfac4 100644 --- a/scripts/helpers/housing_nec_migration_gx_dq_inputs.py +++ b/scripts/helpers/housing_nec_migration_gx_dq_inputs.py @@ -22,28 +22,40 @@ table_list = { "properties": [ - "properties_1a", - "properties_1b", - "properties_1c", - "properties_1d", - "properties_1e", - "properties_2a", - "properties_3a", - "properties_4a", - "properties_4b", - "properties_4c", - "properties_7a", + # "properties_1a", + # "properties_1b", + # "properties_1c", + # "properties_1d", + # "properties_1e", + # "properties_2a", + # "properties_3a", + # "properties_4a", + # "properties_4b", + # "properties_4c", + # "properties_7a", "properties_all_tranches", ], "tenancies": [ - "tenancies_1a", - "tenancies_1c", - "tenancies_2a", + # "tenancies_1a", + # "tenancies_1c", + # "tenancies_2a", + # "tenancies_other", "tenancies_all", - "tenancies_other", ], - "people": ["people_1a", "people_1b", "people_1c", "people_2a", "people_all"], - "contacts": ["contacts_1a", "contacts_1b", "contacts_2a", "contacts_all"], + "people": [ + # "people_1a", + # "people_1b", + # "people_1c", + # "people_2a", + "people_all" + ], + "contacts": [ + # "contacts_1a", + # "contacts_1b", + # "contacts_1c", + # "contacts_2a", + "contacts_all", + ], "arrears_actions": [ "arrears_actions_1a", "arrears_actions_1c", @@ -57,13 +69,13 @@ "revenue_accounts_other", ], "transactions": [ - "transactions_1a", - "transactions_1c", - "transactions_2a", - "transactions_other", + # "transactions_1a", + # "transactions_1c", + # "transactions_2a", + # "transactions_other", "transactions_all", ], - "addresses": ["addresses_1a", "addresses_2a"], + "addresses": ["addresses_1a", "addresses_1b", "addresses_2a"], } partition_keys = ["import_date"] diff --git a/scripts/jobs/housing/housing_nec_migration_apply_gx_dq_tests.py b/scripts/jobs/housing/housing_nec_migration_apply_gx_dq_tests.py index 800f7c8db..9ae7fd01b 100644 --- a/scripts/jobs/housing/housing_nec_migration_apply_gx_dq_tests.py +++ b/scripts/jobs/housing/housing_nec_migration_apply_gx_dq_tests.py @@ -21,6 +21,7 @@ import scripts.jobs.housing.housing_nec_migration_contacts_data_load_gx_suite import scripts.jobs.housing.housing_nec_migration_arrears_actions_data_load_gx_suite import scripts.jobs.housing.housing_nec_migration_revenue_accounts_data_load_gx_suite + # import scripts.jobs.housing.housing_nec_migration_transactions_data_load_gx_suite import scripts.jobs.housing.housing_nec_migration_addresses_data_load_gx_suite @@ -66,49 +67,52 @@ def main(): logger.info(f"{data_load} loading...") for table in table_list.get(data_load): - logger.info(f"{table} loading...") + logger.info(f"{table} processing...") try: sql_query, id_field = get_sql_query( sql_config=sql_config, data_load=data_load, table=table ) + conn = connect( + s3_staging_dir=s3_staging_location, region_name=region_name + ) + df = pd.read_sql_query(sql_query, conn) + except Exception as e: + logger.error(f"SQL/Connection error for {table}: {e}. Skipping.") + continue # Skip to next table - conn = connect(s3_staging_dir=s3_staging_location, region_name=region_name) - - try: - df = pd.read_sql_query(sql_query, conn) - except Exception as e: - logger.error(f"SQL Read Problem found with {table}: {e}, skipping table.") - continue - - # set up batch + # --- STEP 2: GX ASSET SETUP --- + try: data_source = context.data_sources.add_pandas(f"{table}_pandas") data_asset = data_source.add_dataframe_asset(name=f"{table}_df_asset") batch_definition = data_asset.add_batch_definition_whole_dataframe( "Athena batch definition" ) batch_parameters = {"dataframe": df} + except Exception as e: + logger.error(f"GX Asset Setup error for {table}: {e}. Skipping.") + continue + try: + suite = context.suites.get(name=f"{data_load}_data_load_suite") + expectations = suite.expectations # Get this now to ensure it exists + except Exception as e: + logger.error( + f"GX Suite retrieval error for {data_load}: {e}. Skipping." + ) + continue - # get expectation suite for dataset - try: - suite = context.suites.get(name=f"{data_load}_data_load_suite") - except Exception as e: - logger.error(f"GX Suite Problem found with {data_load}: {e}, skipping suite.") - continue - else: - expectations = suite.expectations - + # VALIDATION & CHECKPOINT + try: validation_definition = gx.ValidationDefinition( data=batch_definition, suite=suite, name=f"validation_definition_{table}", ) - + # Use add_or_update to avoid duplicates validation_definition = context.validation_definitions.add_or_update( validation_definition ) - # create and start checking data with checkpoints checkpoint = context.checkpoints.add_or_update( gx.checkpoint.checkpoint.Checkpoint( name=f"{table}_checkpoint", @@ -120,13 +124,25 @@ def main(): }, ) ) - checkpoint_result = checkpoint.run(batch_parameters=batch_parameters) + except Exception as e: + logger.error(f"Checkpoint Run error for {table}: {e}. Skipping.") + continue - # Logic to handle results - results_dict = list(checkpoint_result.run_results.values())[0].to_json_dict() + # PROCESS RESULTS + try: + results_dict = list(checkpoint_result.run_results.values())[ + 0 + ].to_json_dict() table_results_df = pd.json_normalize(results_dict["results"]) + # Guard clause: If table passed perfectly, results might be empty or structure different + if table_results_df.empty: + logger.info( + f"No results to process for {table} (possibly empty batch)." + ) + continue + cols_not_needed = ["result.unexpected_list", "result.observed_value"] cols_to_drop = [ c @@ -137,28 +153,36 @@ def main(): table_results_df = table_results_df.drop(columns=cols_to_drop) table_results_df_list.append(table_results_df) - # generate id lists for each unexpected result set + # Filter for rows that actually have unexpected indices query_df = table_results_df.loc[ (~table_results_df["result.unexpected_index_list"].isna()) - & (table_results_df["result.unexpected_index_list"].values != "[]") - ] + & ( + table_results_df["result.unexpected_index_list"].astype(str) + != "[]" + ) + ] table_results_df["unexpected_id_list"] = pd.Series(dtype="object") + for i, row in query_df.iterrows(): try: - # check this - list(df[id_field].iloc[row["result.unexpected_index_list"]]) - except Exception as e: - logger.warning( - f"Problem mapping IDs for {table}: {e}. Proceeding without ID list." - ) - continue - else: - table_results_df.loc[i, "unexpected_id_list"] = str( - list(df[id_field].iloc[row["result.unexpected_index_list"]]) + indices = row["result.unexpected_index_list"] + # Safety check: Ensure indices are integers + if isinstance(indices, list): + mapped_ids = df[id_field].iloc[indices].tolist() + table_results_df.at[i, "unexpected_id_list"] = str( + mapped_ids + ) + except KeyError: + logger.error( + f"ID Field '{id_field}' not found in DataFrame for {table}." ) + except IndexError: + logger.error(f"Indices out of bounds for {table}.") + except Exception as e: + logger.warning(f"Mapping error for {table} row {i}: {e}") - # drop columns not needed in metadata + # METADATA PROCESSING cols_to_drop_meta = [ "notes", "result_format", @@ -166,24 +190,24 @@ def main(): "rendered_content", "windows", ] - suite_df = pd.DataFrame() - for i in expectations: - temp_i = i - temp_df = pd.json_normalize(dict(temp_i)) - temp_df["expectation_type"] = temp_i.expectation_type + + # 'expectations' variable is guaranteed to exist from Step 3 + for exp in expectations: + temp_df = pd.json_normalize(dict(exp)) + temp_df["expectation_type"] = exp.expectation_type temp_df["dataset_name"] = table - temp_df["expectation_id_full"] = temp_i.expectation_type + '_' + table - temp_df = temp_df.drop(columns=cols_to_drop_meta, errors='ignore') # errors='ignore' is safer + temp_df["expectation_id_full"] = f"{exp.expectation_type}_{table}" + temp_df = temp_df.drop(columns=cols_to_drop_meta, errors="ignore") suite_df = pd.concat([suite_df, temp_df]) df_all_suite_list.append(suite_df) except Exception as e: - logger.error(f"CRITICAL ERROR processing table '{table}': {str(e)}") - logger.error("Skipping this table and moving to the next.") + logger.error(f"Result Processing Error for {table}: {e}") continue + # END LOOPS if not table_results_df_list: logger.error("No tables were processed successfully. Exiting.") return @@ -191,10 +215,6 @@ def main(): results_df = pd.concat(table_results_df_list) metadata_df = pd.concat(df_all_suite_list) - # add expectation_id - # metadata_df["expectation_id"] = ( - # metadata_df["expectation_type"] + "_" + metadata_df["dataset_name"] - # ) metadata_df["import_date"] = datetime.today().strftime("%Y%m%d") # set dtypes for Athena with default of string @@ -213,10 +233,10 @@ def main(): value=results_df.set_index( ["expectation_config.type", "dataset_name"] ).index.factorize()[0] - + 1, + + 1, ) results_df["expectation_id"] = ( - results_df["expectation_config.type"] + "_" + results_df["dataset_name"] + results_df["expectation_config.type"] + "_" + results_df["dataset_name"] ) results_df["import_date"] = datetime.today().strftime("%Y%m%d") @@ -230,7 +250,7 @@ def main(): "result.element_count": "bigint", "result.unexpected_count": "bigint", "result.missing_count": "bigint", - "result.details_mismatched": 'string', + "result.details_mismatched": "string", "result.partial_unexpected_list": "array", "result.unexpected_index_list": "array", "result.unexpected_index_query": "string", diff --git a/scripts/jobs/housing/housing_nec_migration_contacts_data_load_gx_suite.py b/scripts/jobs/housing/housing_nec_migration_contacts_data_load_gx_suite.py index ef8bc55e8..32aeba6cb 100644 --- a/scripts/jobs/housing/housing_nec_migration_contacts_data_load_gx_suite.py +++ b/scripts/jobs/housing/housing_nec_migration_contacts_data_load_gx_suite.py @@ -20,7 +20,7 @@ class ContactsExpectValueColumnValuesToNotBeNull(gxe.ExpectColumnValuesToNotBeNu class ContactsExpectContactTypeCodeToBeInSet(gxe.ExpectColumnValuesToBeInSet): - column: str = "LCDE_FRV_CME_CODE," + column: str = "LCDE_FRV_CME_CODE" value_set: list = ["WORKTEL", "MOBILETEL", "HOMETEL", "EMAIL", "OTHER"] description: str = "Expect contact type code to be one of the set" diff --git a/scripts/jobs/housing/housing_nec_migration_people_data_load_gx_suite.py b/scripts/jobs/housing/housing_nec_migration_people_data_load_gx_suite.py index c563396f1..0bedec7fd 100644 --- a/scripts/jobs/housing/housing_nec_migration_people_data_load_gx_suite.py +++ b/scripts/jobs/housing/housing_nec_migration_people_data_load_gx_suite.py @@ -34,7 +34,7 @@ class PeopleExpectTitleToBeInSet(gxe.ExpectColumnValuesToBeInSet): "RABBI", "REVEREND", "SIR", - None, + "", ] description: str = "Expect title to be one of the set" @@ -69,7 +69,7 @@ class PeopleExpectPeopleColumnsToMatchOrderedList(gxe.ExpectTableColumnsToMatchO "LPAR_PER_NI_NO", "LPAR_PER_FRV_HGO_CODE", "LPAR_PER_FRV_FNL_CODE", - "LPAR_PER_OTHER_NAME", + "LPAR_PER_OTHER_NAME" ] description: str = "Expect people load columns to match ordered list exactly"