diff --git a/scripts/jobs/planning/tascomi_create_daily_snapshot.py b/scripts/jobs/planning/tascomi_create_daily_snapshot.py index c36e7bdb7..b2ce45a15 100644 --- a/scripts/jobs/planning/tascomi_create_daily_snapshot.py +++ b/scripts/jobs/planning/tascomi_create_daily_snapshot.py @@ -244,7 +244,7 @@ def purge_today_partition( # snapshot table in glue catalogue else: pushDownPredicate = create_pushdown_predicate( - partitionDateColumn="snapshot_date", daysBuffer=60 + partitionDateColumn="snapshot_date", daysBuffer=14 ) # load latest snpashot snapshot_ddf = glueContext.create_dynamic_frame.from_catalog( @@ -318,13 +318,13 @@ def purge_today_partition( check = check.hasUniqueness( dq_params[snapshot_table_name]["unique"], lambda x: x == 1, - f'{dq_params[snapshot_table_name]["unique"]} are not unique', + f"{dq_params[snapshot_table_name]['unique']} are not unique", ) if dq_params.get(snapshot_table_name, {}).get("complete"): check = check.hasCompleteness( dq_params[snapshot_table_name]["complete"], lambda x: x >= 0.99, - f'{dq_params[snapshot_table_name]["complete"]} has missing values', + f"{dq_params[snapshot_table_name]['complete']} has missing values", ) verificationSuite = ( @@ -335,7 +335,6 @@ def purge_today_partition( ) try: - verificationRun = verificationSuite.run() # check if any errors and raise exception if true @@ -368,7 +367,8 @@ def purge_today_partition( verificationSuite.saveOrAppendResult(resultKey).run() # if data quality tests succeed, write to S3 - snapshot_df = snapshot_df.coalesce(300) + snapshot_df = snapshot_df.repartition(200) + resultDataFrame = DynamicFrame.fromDF( snapshot_df, glueContext, "resultDataFrame" )