From 9fcd13183b65b30a01bef082838ea6493237803b Mon Sep 17 00:00:00 2001 From: Tian Chen <38001883+Tian-2017@users.noreply.github.com> Date: Fri, 11 Apr 2025 16:14:57 +0100 Subject: [PATCH 1/6] further deduplicate the incremental rows --- .../planning/tascomi_create_daily_snapshot.py | 31 +++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/scripts/jobs/planning/tascomi_create_daily_snapshot.py b/scripts/jobs/planning/tascomi_create_daily_snapshot.py index 90aa3bc30..cc5af84a8 100644 --- a/scripts/jobs/planning/tascomi_create_daily_snapshot.py +++ b/scripts/jobs/planning/tascomi_create_daily_snapshot.py @@ -44,6 +44,24 @@ def add_snapshot_date_columns(data_frame): return data_frame +def deduplicate_by_id_and_last_updated(df): + """ + Deduplicates rows with the same (id, last_updated) combination by keeping the one with the latest import_date. + To resolve: spotted duplicated rows with same id and last_updated timestamp in some incremental tables (e.g. documents) + """ + window_spec = Window.partitionBy("id", "last_updated").orderBy( + F.col("import_date").desc() + ) + + deduplicated_df = ( + df.withColumn("row_num", F.row_number().over(window_spec)) + .filter(F.col("row_num") == 1) + .drop("row_num") + ) + + return deduplicated_df + + def prepare_increments(increment_df): # In case there are several days worth of increments: only keep the latest version of a record id_partition = Window.partitionBy("id") @@ -62,6 +80,15 @@ def prepare_increments(increment_df): .where(F.col("last_updated_nonull") == F.col("latest")) .drop("latest", "last_updated_nonull") ) + + # Check for residual duplicates - print and further de-duplicate + duplicate_ids = increment_df.groupBy("id").count().filter("count > 1") + if duplicate_ids.count() > 0: + duplicate_ids.join(increment_df, "id").show(truncate=False) + increment_df = deduplicate_by_id_and_last_updated(increment_df) + else: + logger.info("No duplicated rows after initial deduplication.") + return increment_df @@ -356,7 +383,7 @@ def purge_today_partition( job.commit() finally: if len(dq_errors) > 0: - logger.error(f"DQ Errors: {dq_errors}") - raise Exception(f"Data quality check failed: {'; '.join(dq_errors)}") + logger.error(f"Errors: {dq_errors}") + raise Exception(f"Job Failed: {'; '.join(dq_errors)}") spark.sparkContext._gateway.close() spark.stop() From 0edc08cbd6688dae0d895d1b173d67a27c0cceba Mon Sep 17 00:00:00 2001 From: Tian Chen <38001883+Tian-2017@users.noreply.github.com> Date: Fri, 11 Apr 2025 16:27:26 +0100 Subject: [PATCH 2/6] pass the logger to the function --- scripts/jobs/planning/tascomi_create_daily_snapshot.py | 6 +++--- terraform/etl/24-aws-glue-tascomi-data.tf | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/scripts/jobs/planning/tascomi_create_daily_snapshot.py b/scripts/jobs/planning/tascomi_create_daily_snapshot.py index cc5af84a8..a455ced10 100644 --- a/scripts/jobs/planning/tascomi_create_daily_snapshot.py +++ b/scripts/jobs/planning/tascomi_create_daily_snapshot.py @@ -62,7 +62,7 @@ def deduplicate_by_id_and_last_updated(df): return deduplicated_df -def prepare_increments(increment_df): +def prepare_increments(increment_df, logger): # In case there are several days worth of increments: only keep the latest version of a record id_partition = Window.partitionBy("id") # preparation step: create a temporary column to replace NULL last_updated values with 01/01/2020 @@ -233,7 +233,7 @@ def purge_today_partition( ) continue # create first snapshot - increment_df = prepare_increments(increment_df) + increment_df = prepare_increments(increment_df, logger) snapshot_df = increment_df # snapshot table in glue catalogue @@ -274,7 +274,7 @@ def purge_today_partition( ) else: # prepare COU - increment_df = prepare_increments(increment_df) + increment_df = prepare_increments(increment_df, logger) increment_df = add_snapshot_date_columns(increment_df) # apply COU logger.info(f"Applying increment {increment_table_name}") diff --git a/terraform/etl/24-aws-glue-tascomi-data.tf b/terraform/etl/24-aws-glue-tascomi-data.tf index ca68a1ef6..9589780aa 100644 --- a/terraform/etl/24-aws-glue-tascomi-data.tf +++ b/terraform/etl/24-aws-glue-tascomi-data.tf @@ -264,7 +264,7 @@ module "tascomi_create_daily_snapshot" { job_name = "${local.short_identifier_prefix}tascomi_create_daily_snapshot_planning" glue_version = "2.0" glue_job_worker_type = "G.2X" - number_of_workers_for_glue_job = 12 + number_of_workers_for_glue_job = 30 helper_module_key = data.aws_s3_object.helpers.key pydeequ_zip_key = data.aws_s3_object.pydeequ.key spark_ui_output_storage_id = module.spark_ui_output_storage_data_source.bucket_id From f4550e5fc8faa8668db1c94f67f45709d9ee4203 Mon Sep 17 00:00:00 2001 From: Tian Chen <38001883+Tian-2017@users.noreply.github.com> Date: Fri, 11 Apr 2025 16:38:11 +0100 Subject: [PATCH 3/6] revert the workers to 12 - it must less than 16 based on the terraform linter --- terraform/etl/24-aws-glue-tascomi-data.tf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/terraform/etl/24-aws-glue-tascomi-data.tf b/terraform/etl/24-aws-glue-tascomi-data.tf index 9589780aa..ca68a1ef6 100644 --- a/terraform/etl/24-aws-glue-tascomi-data.tf +++ b/terraform/etl/24-aws-glue-tascomi-data.tf @@ -264,7 +264,7 @@ module "tascomi_create_daily_snapshot" { job_name = "${local.short_identifier_prefix}tascomi_create_daily_snapshot_planning" glue_version = "2.0" glue_job_worker_type = "G.2X" - number_of_workers_for_glue_job = 30 + number_of_workers_for_glue_job = 12 helper_module_key = data.aws_s3_object.helpers.key pydeequ_zip_key = data.aws_s3_object.pydeequ.key spark_ui_output_storage_id = module.spark_ui_output_storage_data_source.bucket_id From 8d2a09d926965237f45ed03ac8706ae1441daafa Mon Sep 17 00:00:00 2001 From: Tian Chen <38001883+Tian-2017@users.noreply.github.com> Date: Fri, 11 Apr 2025 16:46:45 +0100 Subject: [PATCH 4/6] use standard logger --- .../jobs/planning/tascomi_create_daily_snapshot.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/scripts/jobs/planning/tascomi_create_daily_snapshot.py b/scripts/jobs/planning/tascomi_create_daily_snapshot.py index a455ced10..81bc593d4 100644 --- a/scripts/jobs/planning/tascomi_create_daily_snapshot.py +++ b/scripts/jobs/planning/tascomi_create_daily_snapshot.py @@ -1,3 +1,4 @@ +import logging import sys from datetime import datetime @@ -25,6 +26,11 @@ table_exists_in_catalog, ) +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + def get_latest_snapshot(dfa): dfa = dfa.where(col("snapshot_date") == dfa.select(max("snapshot_date")).first()[0]) @@ -62,7 +68,7 @@ def deduplicate_by_id_and_last_updated(df): return deduplicated_df -def prepare_increments(increment_df, logger): +def prepare_increments(increment_df): # In case there are several days worth of increments: only keep the latest version of a record id_partition = Window.partitionBy("id") # preparation step: create a temporary column to replace NULL last_updated values with 01/01/2020 @@ -192,7 +198,6 @@ def purge_today_partition( sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) spark = SparkSession(sc) - logger = glueContext.get_logger() job = Job(glueContext) job.init(args["JOB_NAME"], args) @@ -233,7 +238,7 @@ def purge_today_partition( ) continue # create first snapshot - increment_df = prepare_increments(increment_df, logger) + increment_df = prepare_increments(increment_df) snapshot_df = increment_df # snapshot table in glue catalogue @@ -274,7 +279,7 @@ def purge_today_partition( ) else: # prepare COU - increment_df = prepare_increments(increment_df, logger) + increment_df = prepare_increments(increment_df) increment_df = add_snapshot_date_columns(increment_df) # apply COU logger.info(f"Applying increment {increment_table_name}") From d6c58bc9bd25a7ef6469bb6ccc1c8a6ab9c1f06b Mon Sep 17 00:00:00 2001 From: Tian Chen <38001883+Tian-2017@users.noreply.github.com> Date: Fri, 11 Apr 2025 16:53:18 +0100 Subject: [PATCH 5/6] stop the spark quickly in finally --- scripts/jobs/planning/tascomi_create_daily_snapshot.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scripts/jobs/planning/tascomi_create_daily_snapshot.py b/scripts/jobs/planning/tascomi_create_daily_snapshot.py index 81bc593d4..75cc862c0 100644 --- a/scripts/jobs/planning/tascomi_create_daily_snapshot.py +++ b/scripts/jobs/planning/tascomi_create_daily_snapshot.py @@ -389,6 +389,7 @@ def purge_today_partition( finally: if len(dq_errors) > 0: logger.error(f"Errors: {dq_errors}") - raise Exception(f"Job Failed: {'; '.join(dq_errors)}") + spark.stop() + raise SystemExit(f"Failed: {'; '.join(dq_errors)}") spark.sparkContext._gateway.close() spark.stop() From d879684d09ddf9ea791cca83e947f5e6d27b92bb Mon Sep 17 00:00:00 2001 From: Tian Chen <38001883+Tian-2017@users.noreply.github.com> Date: Tue, 15 Apr 2025 09:24:35 +0100 Subject: [PATCH 6/6] small tweak on shuffle and small files --- scripts/jobs/planning/tascomi_create_daily_snapshot.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/jobs/planning/tascomi_create_daily_snapshot.py b/scripts/jobs/planning/tascomi_create_daily_snapshot.py index 75cc862c0..c36e7bdb7 100644 --- a/scripts/jobs/planning/tascomi_create_daily_snapshot.py +++ b/scripts/jobs/planning/tascomi_create_daily_snapshot.py @@ -89,7 +89,7 @@ def prepare_increments(increment_df): # Check for residual duplicates - print and further de-duplicate duplicate_ids = increment_df.groupBy("id").count().filter("count > 1") - if duplicate_ids.count() > 0: + if duplicate_ids.limit(1).count() > 0: duplicate_ids.join(increment_df, "id").show(truncate=False) increment_df = deduplicate_by_id_and_last_updated(increment_df) else: @@ -368,7 +368,7 @@ def purge_today_partition( verificationSuite.saveOrAppendResult(resultKey).run() # if data quality tests succeed, write to S3 - + snapshot_df = snapshot_df.coalesce(300) resultDataFrame = DynamicFrame.fromDF( snapshot_df, glueContext, "resultDataFrame" )