Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions scripts/jobs/planning/tascomi_create_daily_snapshot.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sys
from datetime import datetime
from datetime import datetime, timedelta
from typing import Optional

import pyspark.sql.functions as F
from awsglue.context import GlueContext
Expand Down Expand Up @@ -77,9 +78,18 @@ def apply_increments(snapshot_df, increment_df):
return snapshot_df


def loadIncrementsSinceDate(increment_table_name, name_space, date):
def loadIncrementsSinceDate(
increment_table_name: str, name_space: str, date: Optional[str] = None
) -> DataFrame:
"""
Loads increments from the specified catalog table starting from a given date.
If the provided date is None, it defaults to 30 days ago.

Returns:
DataFrame: A Spark DataFrame containing the loaded increments.
"""
if date is None:
date = "20210101" # default date
date = (datetime.now() - timedelta(days=30)).strftime("%Y%m%d") # default date
increment_ddf = glueContext.create_dynamic_frame.from_catalog(
name_space=name_space,
table_name=increment_table_name,
Expand Down Expand Up @@ -180,10 +190,11 @@ def loadIncrementsSinceDate(increment_table_name, name_space, date):
f"No snapshot and no increment for {increment_table_name}, going to the next table"
)
continue

# Load increments from default date
increment_df = loadIncrementsSinceDate(
increment_table_name=increment_table_name,
name_space=source_catalog_database,
date="20210101",
)
if increment_df.rdd.isEmpty():
logger.info(
Expand Down