From 28c4e07af8ce28fb13b0855221c49ac61355c1da Mon Sep 17 00:00:00 2001 From: Tian Chen <38001883+Tian-2017@users.noreply.github.com> Date: Wed, 19 Mar 2025 11:08:16 +0000 Subject: [PATCH] tascomi snapshot - change the daysBuffer and clean the partition before writing the data --- .../planning/tascomi_create_daily_snapshot.py | 28 ++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/scripts/jobs/planning/tascomi_create_daily_snapshot.py b/scripts/jobs/planning/tascomi_create_daily_snapshot.py index 7dbf58ece..46c79873a 100644 --- a/scripts/jobs/planning/tascomi_create_daily_snapshot.py +++ b/scripts/jobs/planning/tascomi_create_daily_snapshot.py @@ -82,6 +82,29 @@ def loadIncrementsSinceDate(increment_table_name, name_space, date): return increment_df +def purge_today_partition( + glueContext: GlueContext, target_destination: str, retentionPeriod: int = 0 +) -> None: + """ + Purges (delete) only today's partition under the given target destination. + Parameters: + glueContext: GlueContext instance. + target_destination: Base S3 path (e.g., "s3://your-bucket/path"). + retentionPeriod: Retention period in hours (default 0, meaning delete all files immediately). + Returns: + partition_path: The S3 partition path that was purged. + """ + now = datetime.now() + snapshot_year = str(now.year) + snapshot_month = str(now.month).zfill(2) + snapshot_day = str(now.day).zfill(2) + snapshot_date = snapshot_year + snapshot_month + snapshot_day + + partition_path = f"{target_destination}/snapshot_year={snapshot_year}/snapshot_month={snapshot_month}/snapshot_day={snapshot_day}/snapshot_date={snapshot_date}" + + glueContext.purge_s3_path(partition_path, {"retentionPeriod": retentionPeriod}) + + # dict containing parameters for DQ checks dq_params = { "appeals": {"unique": ["id"]}, @@ -189,7 +212,7 @@ def loadIncrementsSinceDate(increment_table_name, name_space, date): # snapshot table in glue catalogue else: pushDownPredicate = create_pushdown_predicate( - partitionDateColumn="snapshot_date", daysBuffer=3 + partitionDateColumn="snapshot_date", daysBuffer=30 ) # load latest snpashot snapshot_ddf = glueContext.create_dynamic_frame.from_catalog( @@ -318,6 +341,9 @@ def loadIncrementsSinceDate(increment_table_name, name_space, date): snapshot_df, glueContext, "resultDataFrame" ) target_destination = s3_bucket_target + table_name + + # Clean up today's partition before writing + purge_today_partition(glueContext, target_destination) parquetData = glueContext.write_dynamic_frame.from_options( frame=resultDataFrame, connection_type="s3",