From baf1581c5e6325939806257dd43f400ebc13b467 Mon Sep 17 00:00:00 2001 From: AGibson <4319494+annajgibson@users.noreply.github.com> Date: Mon, 15 Sep 2025 20:49:05 +0100 Subject: [PATCH] Add metadata table to GX DQ test Glue job --- .../housing_nec_migration_gx_dq_inputs.py | 1 + ...housing_nec_migration_apply_gx_dq_tests.py | 61 ++++++++++++++++--- ...housing-nec-migration-apply-gx-dq-tests.tf | 6 +- 3 files changed, 58 insertions(+), 10 deletions(-) diff --git a/scripts/helpers/housing_nec_migration_gx_dq_inputs.py b/scripts/helpers/housing_nec_migration_gx_dq_inputs.py index 1c9d76c61..22a9ecb34 100644 --- a/scripts/helpers/housing_nec_migration_gx_dq_inputs.py +++ b/scripts/helpers/housing_nec_migration_gx_dq_inputs.py @@ -9,6 +9,7 @@ "properties_1c", "properties_1d", "properties_1e", + "properties_2a", "properties_4a", "properties_4c", ] diff --git a/scripts/jobs/housing/housing_nec_migration_apply_gx_dq_tests.py b/scripts/jobs/housing/housing_nec_migration_apply_gx_dq_tests.py index 73f17c565..6bca3cb83 100644 --- a/scripts/jobs/housing/housing_nec_migration_apply_gx_dq_tests.py +++ b/scripts/jobs/housing/housing_nec_migration_apply_gx_dq_tests.py @@ -27,6 +27,9 @@ "s3_staging_location", "target_database", "target_table", + "target_table_metadata", + "s3_target_location_metadata", + "s3_target_location_results", ] args = getResolvedOptions(sys.argv, arg_keys) locals().update(args) @@ -50,6 +53,7 @@ def main(): context = gx.get_context(mode="file", project_root_dir=s3_target_location) table_results_df_list = [] + df_all_suite_list = [] for data_load in data_load_list: logger.info(f"{data_load} loading...") @@ -75,6 +79,7 @@ def main(): # get expectation suite for dataset suite = context.suites.get(name=f"{data_load}_data_load_suite") + expectations = suite.expectations validation_definition = gx.ValidationDefinition( data=batch_definition, @@ -125,14 +130,45 @@ def main(): list(df[id_field].iloc[row["result.unexpected_index_list"]]) ) + # drop columns not needed in metatdata + cols_to_drop_meta = [ + "notes", + "result_format", + "catch_exceptions", + "rendered_content", + "windows", + ] + + suite_df = pd.DataFrame() + for i in expectations: + temp_i = i + temp_df = pd.json_normalize(dict(temp_i)) + temp_df["expectation_type"] = temp_i.expectation_type + temp_df["dataset_name"] = table + temp_df = temp_df.drop(columns=cols_to_drop_meta) + suite_df = pd.concat([suite_df, temp_df]) + + df_all_suite_list.append(suite_df) + results_df = pd.concat(table_results_df_list) + metadata_df = pd.concat(df_all_suite_list) + + # add expectation_id + metadata_df["expectation_id"] = ( + metadata_df["expectation_type"] + "_" + metadata_df["dataset_name"] + ) + metadata_df["import_date"] = datetime.today().strftime("%Y%m%d") + + # set dtypes for Athena with default of string + dict_values = ["string" for _ in range(len(metadata_df.columns))] + dtype_dict_metadata = dict(zip(metadata_df.columns, dict_values)) # add clean dataset name results_df["dataset_name"] = results_df["expectation_config.kwargs.batch_id"].map( lambda x: x.removeprefix("pandas-").removesuffix("_df_asset") ) - # add composite key for each specific test (so can be tracked over time) + # add composite key for each test (so can be tracked over time) results_df.insert( loc=0, column="expectation_key", @@ -146,8 +182,8 @@ def main(): ) results_df["import_date"] = datetime.today().strftime("%Y%m%d") - # set dtypes for Athena - dtype_dict = { + # # set dtypes for Athena + dtype_dict_results = { "expectation_config.type": "string", "expectation_config.kwargs.batch_id": "string", "expectation_config.kwargs.column": "string", @@ -165,20 +201,27 @@ def main(): "import_date": "string", } - # write to s3 + # TODO for df_vars in [[results_df, dtype_dict_results, target_table], [metadata_df, dtype_dict_metadata, target_metadata_table]]: + # will loop the writing of these tables + wr.s3.to_parquet( df=results_df, - path=s3_target_location, + path=s3_target_location_results, dataset=True, database=target_database, table=target_table, mode="overwrite", - dtype=dtype_dict, - schema_evolution=True, + dtype=dtype_dict_results, ) - logger.info( - f"Data Quality test results for NEC data loads written to {s3_target_location}" + wr.s3.to_parquet( + df=metadata_df, + path=s3_target_location_metadata, + dataset=True, + database=target_database, + table=target_table_metadata, + mode="overwrite", + dtype=dtype_dict_metadata, ) diff --git a/terraform/etl/54-aws-glue-housing-nec-migration-apply-gx-dq-tests.tf b/terraform/etl/54-aws-glue-housing-nec-migration-apply-gx-dq-tests.tf index 9cb795b5d..6ad60f43c 100644 --- a/terraform/etl/54-aws-glue-housing-nec-migration-apply-gx-dq-tests.tf +++ b/terraform/etl/54-aws-glue-housing-nec-migration-apply-gx-dq-tests.tf @@ -16,7 +16,7 @@ module "housing_nec_migration_apply_gx_dq_tests" { trigger_enabled = local.is_production_environment number_of_workers_for_glue_job = 2 schedule = "cron(0 10 ? * MON-FRI *)" - job_parameters = { + job_parameters = { "--job-bookmark-option" = "job-bookmark-enable" "--enable-glue-datacatalog" = "true" "--enable-continuous-cloudwatch-log" = "true" @@ -24,9 +24,13 @@ module "housing_nec_migration_apply_gx_dq_tests" { "--region_name" = data.aws_region.current.name "--s3_endpoint" = "https://s3.${data.aws_region.current.name}.amazonaws.com" "--s3_target_location" = "s3://${module.raw_zone_data_source.bucket_id}/housing/nec-migration-data-quality-tests/" + "--s3_target_location_metadata" = "s3://${module.raw_zone_data_source.bucket_id}/housing/nec-migration-data-quality-tests/metadata/" + "--s3_target_location_results" = "s3://${module.raw_zone_data_source.bucket_id}/housing/nec-migration-data-quality-tests/results/" "--s3_staging_location" = "s3://${module.athena_storage_data_source.bucket_id}/housing/nec-migration-data-quality-tests/" "--target_database" = "housing_nec_migration" "--target_table" = "housing_nec_data_loads_dq_tests" + "--target_table_metadata" = "housing_nec_data_loads_dq_metadata" + } script_name = "housing_nec_migration_apply_gx_dq_tests"