Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 39 additions & 4 deletions scripts/jobs/planning/tascomi_create_daily_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ def loadIncrementsSinceDate(
) -> DataFrame:
"""
Loads increments from the specified catalog table starting from a given date.
If the provided date is None, it defaults to 30 days ago.
If the provided date is None, it defaults to 60 days ago.

Returns:
DataFrame: A Spark DataFrame containing the loaded increments.
"""
if date is None:
date = (datetime.now() - timedelta(days=30)).strftime("%Y%m%d") # default date
date = (datetime.now() - timedelta(days=60)).strftime("%Y%m%d") # default date
increment_ddf = glueContext.create_dynamic_frame.from_catalog(
name_space=name_space,
table_name=increment_table_name,
Expand Down Expand Up @@ -149,6 +149,32 @@ def loadIncrementsSinceDate(
"enforcement_breach_details": {"unique": ["id"]},
}


def purge_today_partition(
glueContext: GlueContext, target_destination: str, retentionPeriod: int = 0
) -> None:
"""
Purges (delete) only today's partition under the given target destination.

Parameters:
glueContext: GlueContext instance.
target_destination: Base S3 path (e.g., "s3://your-bucket/path").
retentionPeriod: Retention period in hours (default 0, meaning delete all files immediately).

Returns:
partition_path: The S3 partition path that was purged.
"""
now = datetime.now()
snapshot_year = str(now.year)
snapshot_month = str(now.month).zfill(2)
snapshot_day = str(now.day).zfill(2)
snapshot_date = snapshot_year + snapshot_month + snapshot_day

partition_path = f"{target_destination}/snapshot_year={snapshot_year}/snapshot_month={snapshot_month}/snapshot_day={snapshot_day}/snapshot_date={snapshot_date}"

glueContext.purge_s3_path(partition_path, {"retentionPeriod": retentionPeriod})


if __name__ == "__main__":
args = getResolvedOptions(sys.argv, ["JOB_NAME"])

Expand Down Expand Up @@ -208,14 +234,18 @@ def loadIncrementsSinceDate(
# snapshot table in glue catalogue
else:
pushDownPredicate = create_pushdown_predicate(
partitionDateColumn="snapshot_date", daysBuffer=3
partitionDateColumn="snapshot_date", daysBuffer=60
)
# load latest snpashot
snapshot_ddf = glueContext.create_dynamic_frame.from_catalog(
name_space=source_catalog_database,
table_name=snapshot_table_name,
# push_down_predicate=pushDownPredicate
push_down_predicate=pushDownPredicate,
)
if snapshot_ddf.count() == 0:
logger.error(
f"No data returned for table {snapshot_table_name} using push_down_predicate: {pushDownPredicate}. "
)
snapshot_df = snapshot_ddf.toDF()
snapshot_df = get_latest_snapshot(snapshot_df)
last_snapshot_date = snapshot_df.select(max("snapshot_date")).first()[0]
Expand Down Expand Up @@ -248,6 +278,7 @@ def loadIncrementsSinceDate(
# apply COU
logger.info(f"Applying increment {increment_table_name}")
snapshot_df = apply_increments(snapshot_df, increment_df)

else:
logger.info(
f"Couldn't find table {increment_table_name} in database {source_catalog_database}, saving same snapshot as yesterday"
Expand Down Expand Up @@ -337,6 +368,9 @@ def loadIncrementsSinceDate(
snapshot_df, glueContext, "resultDataFrame"
)
target_destination = s3_bucket_target + table_name

# Clean up today's partition before writing
purge_today_partition(glueContext, target_destination)
parquetData = glueContext.write_dynamic_frame.from_options(
frame=resultDataFrame,
connection_type="s3",
Expand All @@ -346,6 +380,7 @@ def loadIncrementsSinceDate(
"partitionKeys": PARTITION_KEYS,
},
)

job.commit()
finally:
if len(dq_errors) > 0:
Expand Down
4 changes: 2 additions & 2 deletions terraform/etl/24-aws-glue-tascomi-data.tf
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ locals {
"complaint_sources",
"file_closure_reasons",
"enforcement_case_statuses"
]
]

table_list = join(",", concat(local.tascomi_table_names, local.tascomi_static_tables))

Expand Down Expand Up @@ -264,7 +264,7 @@ module "tascomi_create_daily_snapshot" {
job_name = "${local.short_identifier_prefix}tascomi_create_daily_snapshot_planning"
glue_version = "2.0"
glue_job_worker_type = "G.2X"
number_of_workers_for_glue_job = 12
number_of_workers_for_glue_job = 6
helper_module_key = data.aws_s3_object.helpers.key
pydeequ_zip_key = data.aws_s3_object.pydeequ.key
spark_ui_output_storage_id = module.spark_ui_output_storage_data_source.bucket_id
Expand Down