From ef6f571c7e2b4b90e2c75322dec0051e544f00d0 Mon Sep 17 00:00:00 2001 From: Tim Burke Date: Wed, 14 May 2025 15:10:51 +0100 Subject: [PATCH 1/3] repartition data before writing to shuffle data addresses failing executors assumed to be because of data skew --- scripts/jobs/planning/tascomi_create_daily_snapshot.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/scripts/jobs/planning/tascomi_create_daily_snapshot.py b/scripts/jobs/planning/tascomi_create_daily_snapshot.py index c36e7bdb7..157ad5e35 100644 --- a/scripts/jobs/planning/tascomi_create_daily_snapshot.py +++ b/scripts/jobs/planning/tascomi_create_daily_snapshot.py @@ -368,7 +368,9 @@ def purge_today_partition( verificationSuite.saveOrAppendResult(resultKey).run() # if data quality tests succeed, write to S3 - snapshot_df = snapshot_df.coalesce(300) + snapshot_df = snapshot_df.repartition(200) + + resultDataFrame = DynamicFrame.fromDF( snapshot_df, glueContext, "resultDataFrame" ) From 4ecd71fcfc6c7ea04f2b274b671e8512c4c46554 Mon Sep 17 00:00:00 2001 From: Tim Burke Date: Wed, 14 May 2025 15:12:08 +0100 Subject: [PATCH 2/3] reduce buffer in pushdown predicate --- scripts/jobs/planning/tascomi_create_daily_snapshot.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/jobs/planning/tascomi_create_daily_snapshot.py b/scripts/jobs/planning/tascomi_create_daily_snapshot.py index 157ad5e35..064a46aac 100644 --- a/scripts/jobs/planning/tascomi_create_daily_snapshot.py +++ b/scripts/jobs/planning/tascomi_create_daily_snapshot.py @@ -244,7 +244,7 @@ def purge_today_partition( # snapshot table in glue catalogue else: pushDownPredicate = create_pushdown_predicate( - partitionDateColumn="snapshot_date", daysBuffer=60 + partitionDateColumn="snapshot_date", daysBuffer=14 ) # load latest snpashot snapshot_ddf = glueContext.create_dynamic_frame.from_catalog( From eeffad2bf61e5a4d4f0514af3982d350d8fa284c Mon Sep 17 00:00:00 2001 From: Tim Burke Date: Wed, 14 May 2025 15:12:58 +0100 Subject: [PATCH 3/3] formatting --- scripts/jobs/planning/tascomi_create_daily_snapshot.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/scripts/jobs/planning/tascomi_create_daily_snapshot.py b/scripts/jobs/planning/tascomi_create_daily_snapshot.py index 064a46aac..b2ce45a15 100644 --- a/scripts/jobs/planning/tascomi_create_daily_snapshot.py +++ b/scripts/jobs/planning/tascomi_create_daily_snapshot.py @@ -318,13 +318,13 @@ def purge_today_partition( check = check.hasUniqueness( dq_params[snapshot_table_name]["unique"], lambda x: x == 1, - f'{dq_params[snapshot_table_name]["unique"]} are not unique', + f"{dq_params[snapshot_table_name]['unique']} are not unique", ) if dq_params.get(snapshot_table_name, {}).get("complete"): check = check.hasCompleteness( dq_params[snapshot_table_name]["complete"], lambda x: x >= 0.99, - f'{dq_params[snapshot_table_name]["complete"]} has missing values', + f"{dq_params[snapshot_table_name]['complete']} has missing values", ) verificationSuite = ( @@ -335,7 +335,6 @@ def purge_today_partition( ) try: - verificationRun = verificationSuite.run() # check if any errors and raise exception if true @@ -369,8 +368,7 @@ def purge_today_partition( # if data quality tests succeed, write to S3 snapshot_df = snapshot_df.repartition(200) - - + resultDataFrame = DynamicFrame.fromDF( snapshot_df, glueContext, "resultDataFrame" )