diff --git a/scripts/jobs/planning/tascomi_create_daily_snapshot.py b/scripts/jobs/planning/tascomi_create_daily_snapshot.py index 916a3f746..ae9f8bd10 100644 --- a/scripts/jobs/planning/tascomi_create_daily_snapshot.py +++ b/scripts/jobs/planning/tascomi_create_daily_snapshot.py @@ -83,13 +83,13 @@ def loadIncrementsSinceDate( ) -> DataFrame: """ Loads increments from the specified catalog table starting from a given date. - If the provided date is None, it defaults to 30 days ago. + If the provided date is None, it defaults to 60 days ago. Returns: DataFrame: A Spark DataFrame containing the loaded increments. """ if date is None: - date = (datetime.now() - timedelta(days=30)).strftime("%Y%m%d") # default date + date = (datetime.now() - timedelta(days=60)).strftime("%Y%m%d") # default date increment_ddf = glueContext.create_dynamic_frame.from_catalog( name_space=name_space, table_name=increment_table_name, @@ -149,6 +149,32 @@ def loadIncrementsSinceDate( "enforcement_breach_details": {"unique": ["id"]}, } + +def purge_today_partition( + glueContext: GlueContext, target_destination: str, retentionPeriod: int = 0 +) -> None: + """ + Purges (delete) only today's partition under the given target destination. + + Parameters: + glueContext: GlueContext instance. + target_destination: Base S3 path (e.g., "s3://your-bucket/path"). + retentionPeriod: Retention period in hours (default 0, meaning delete all files immediately). + + Returns: + partition_path: The S3 partition path that was purged. + """ + now = datetime.now() + snapshot_year = str(now.year) + snapshot_month = str(now.month).zfill(2) + snapshot_day = str(now.day).zfill(2) + snapshot_date = snapshot_year + snapshot_month + snapshot_day + + partition_path = f"{target_destination}/snapshot_year={snapshot_year}/snapshot_month={snapshot_month}/snapshot_day={snapshot_day}/snapshot_date={snapshot_date}" + + glueContext.purge_s3_path(partition_path, {"retentionPeriod": retentionPeriod}) + + if __name__ == "__main__": args = getResolvedOptions(sys.argv, ["JOB_NAME"]) @@ -208,14 +234,18 @@ def loadIncrementsSinceDate( # snapshot table in glue catalogue else: pushDownPredicate = create_pushdown_predicate( - partitionDateColumn="snapshot_date", daysBuffer=3 + partitionDateColumn="snapshot_date", daysBuffer=60 ) # load latest snpashot snapshot_ddf = glueContext.create_dynamic_frame.from_catalog( name_space=source_catalog_database, table_name=snapshot_table_name, - # push_down_predicate=pushDownPredicate + push_down_predicate=pushDownPredicate, ) + if snapshot_ddf.count() == 0: + logger.error( + f"No data returned for table {snapshot_table_name} using push_down_predicate: {pushDownPredicate}. " + ) snapshot_df = snapshot_ddf.toDF() snapshot_df = get_latest_snapshot(snapshot_df) last_snapshot_date = snapshot_df.select(max("snapshot_date")).first()[0] @@ -248,6 +278,7 @@ def loadIncrementsSinceDate( # apply COU logger.info(f"Applying increment {increment_table_name}") snapshot_df = apply_increments(snapshot_df, increment_df) + else: logger.info( f"Couldn't find table {increment_table_name} in database {source_catalog_database}, saving same snapshot as yesterday" @@ -337,6 +368,9 @@ def loadIncrementsSinceDate( snapshot_df, glueContext, "resultDataFrame" ) target_destination = s3_bucket_target + table_name + + # Clean up today's partition before writing + purge_today_partition(glueContext, target_destination) parquetData = glueContext.write_dynamic_frame.from_options( frame=resultDataFrame, connection_type="s3", @@ -346,6 +380,7 @@ def loadIncrementsSinceDate( "partitionKeys": PARTITION_KEYS, }, ) + job.commit() finally: if len(dq_errors) > 0: diff --git a/terraform/etl/24-aws-glue-tascomi-data.tf b/terraform/etl/24-aws-glue-tascomi-data.tf index 64de561be..32bbc2cfa 100644 --- a/terraform/etl/24-aws-glue-tascomi-data.tf +++ b/terraform/etl/24-aws-glue-tascomi-data.tf @@ -53,7 +53,7 @@ locals { "complaint_sources", "file_closure_reasons", "enforcement_case_statuses" -] + ] table_list = join(",", concat(local.tascomi_table_names, local.tascomi_static_tables)) @@ -264,7 +264,7 @@ module "tascomi_create_daily_snapshot" { job_name = "${local.short_identifier_prefix}tascomi_create_daily_snapshot_planning" glue_version = "2.0" glue_job_worker_type = "G.2X" - number_of_workers_for_glue_job = 12 + number_of_workers_for_glue_job = 6 helper_module_key = data.aws_s3_object.helpers.key pydeequ_zip_key = data.aws_s3_object.pydeequ.key spark_ui_output_storage_id = module.spark_ui_output_storage_data_source.bucket_id