diff --git a/scripts/jobs/planning/tascomi_create_daily_snapshot.py b/scripts/jobs/planning/tascomi_create_daily_snapshot.py index 9fb88fb6f..916a3f746 100644 --- a/scripts/jobs/planning/tascomi_create_daily_snapshot.py +++ b/scripts/jobs/planning/tascomi_create_daily_snapshot.py @@ -1,5 +1,6 @@ import sys -from datetime import datetime +from datetime import datetime, timedelta +from typing import Optional import pyspark.sql.functions as F from awsglue.context import GlueContext @@ -77,9 +78,18 @@ def apply_increments(snapshot_df, increment_df): return snapshot_df -def loadIncrementsSinceDate(increment_table_name, name_space, date): +def loadIncrementsSinceDate( + increment_table_name: str, name_space: str, date: Optional[str] = None +) -> DataFrame: + """ + Loads increments from the specified catalog table starting from a given date. + If the provided date is None, it defaults to 30 days ago. + + Returns: + DataFrame: A Spark DataFrame containing the loaded increments. + """ if date is None: - date = "20210101" # default date + date = (datetime.now() - timedelta(days=30)).strftime("%Y%m%d") # default date increment_ddf = glueContext.create_dynamic_frame.from_catalog( name_space=name_space, table_name=increment_table_name, @@ -180,10 +190,11 @@ def loadIncrementsSinceDate(increment_table_name, name_space, date): f"No snapshot and no increment for {increment_table_name}, going to the next table" ) continue + + # Load increments from default date increment_df = loadIncrementsSinceDate( increment_table_name=increment_table_name, name_space=source_catalog_database, - date="20210101", ) if increment_df.rdd.isEmpty(): logger.info(