From 3f2b7122e134b4c42e2d4612196180a9d188dc72 Mon Sep 17 00:00:00 2001 From: Tian Chen <38001883+Tian-2017@users.noreply.github.com> Date: Tue, 11 Mar 2025 14:16:56 +0000 Subject: [PATCH 1/2] fix taxcomi create daily snapshot by changing push_down_predicate --- .../planning/tascomi_create_daily_snapshot.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/scripts/jobs/planning/tascomi_create_daily_snapshot.py b/scripts/jobs/planning/tascomi_create_daily_snapshot.py index 9fb88fb6f..360be0060 100644 --- a/scripts/jobs/planning/tascomi_create_daily_snapshot.py +++ b/scripts/jobs/planning/tascomi_create_daily_snapshot.py @@ -1,5 +1,6 @@ import sys -from datetime import datetime +from datetime import datetime, timedelta +from typing import Optional import pyspark.sql.functions as F from awsglue.context import GlueContext @@ -77,9 +78,18 @@ def apply_increments(snapshot_df, increment_df): return snapshot_df -def loadIncrementsSinceDate(increment_table_name, name_space, date): +def loadIncrementsSinceDate( + increment_table_name: str, name_space: str, date: Optional[str] = None +) -> DataFrame: + """ + Loads increments from the specified catalog table starting from a given date. + If the provided date is None, it defaults to 30 days ago. + + Returns: + DataFrame: A Spark DataFrame containing the loaded increments. + """ if date is None: - date = "20210101" # default date + date = (datetime.now() - timedelta(days=30)).strftime("%Y%m%d") # default date increment_ddf = glueContext.create_dynamic_frame.from_catalog( name_space=name_space, table_name=increment_table_name, @@ -170,7 +180,7 @@ def loadIncrementsSinceDate(increment_table_name, name_space, date): glueContext, snapshot_table_name, source_catalog_database ): logger.info( - f"Couldn't find table {snapshot_table_name} in database {source_catalog_database}, creating a snapshot from all the increments, starting from 20210101" + f"Couldn't find table {snapshot_table_name} in database {source_catalog_database}, creating a snapshot from all the increments, starting from 30 days ago" ) # Increment table does not exist in glue catalogue if not table_exists_in_catalog( @@ -183,7 +193,6 @@ def loadIncrementsSinceDate(increment_table_name, name_space, date): increment_df = loadIncrementsSinceDate( increment_table_name=increment_table_name, name_space=source_catalog_database, - date="20210101", ) if increment_df.rdd.isEmpty(): logger.info( From fda0eeaec2b4693683c272452bd9e7108570fef3 Mon Sep 17 00:00:00 2001 From: Tian Chen <38001883+Tian-2017@users.noreply.github.com> Date: Tue, 11 Mar 2025 15:12:34 +0000 Subject: [PATCH 2/2] comments --- scripts/jobs/planning/tascomi_create_daily_snapshot.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/scripts/jobs/planning/tascomi_create_daily_snapshot.py b/scripts/jobs/planning/tascomi_create_daily_snapshot.py index 360be0060..916a3f746 100644 --- a/scripts/jobs/planning/tascomi_create_daily_snapshot.py +++ b/scripts/jobs/planning/tascomi_create_daily_snapshot.py @@ -180,7 +180,7 @@ def loadIncrementsSinceDate( glueContext, snapshot_table_name, source_catalog_database ): logger.info( - f"Couldn't find table {snapshot_table_name} in database {source_catalog_database}, creating a snapshot from all the increments, starting from 30 days ago" + f"Couldn't find table {snapshot_table_name} in database {source_catalog_database}, creating a snapshot from all the increments, starting from 20210101" ) # Increment table does not exist in glue catalogue if not table_exists_in_catalog( @@ -190,6 +190,8 @@ def loadIncrementsSinceDate( f"No snapshot and no increment for {increment_table_name}, going to the next table" ) continue + + # Load increments from default date increment_df = loadIncrementsSinceDate( increment_table_name=increment_table_name, name_space=source_catalog_database,