Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 9 additions & 63 deletions scripts/jobs/planning/tascomi_create_daily_snapshot.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import sys
from datetime import datetime, timedelta
from typing import Optional
from datetime import datetime

import pyspark.sql.functions as F
from awsglue.context import GlueContext
Expand All @@ -11,7 +10,7 @@
from pydeequ.repository import FileSystemMetricsRepository, ResultKey
from pydeequ.verification import VerificationResult, VerificationSuite
from pyspark import SparkContext
from pyspark.sql import DataFrame, SparkSession, Window
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, max

from scripts.helpers.data_quality_testing import (
Expand All @@ -27,15 +26,9 @@
)


def get_latest_snapshot(dfa: DataFrame) -> DataFrame:
if "snapshot_date" not in dfa.columns:
logger.warn(
"No snapshot_date column found in the dataframe, adding snapshot date columns"
)
dfa = add_snapshot_date_columns(dfa)
max_date = dfa.select(max("snapshot_date")).first()[0]
df_latest_snapshot = dfa.where(col("snapshot_date") == max_date)
return df_latest_snapshot
def get_latest_snapshot(dfa):
dfa = dfa.where(col("snapshot_date") == dfa.select(max("snapshot_date")).first()[0])
return dfa


def add_snapshot_date_columns(data_frame):
Expand Down Expand Up @@ -78,22 +71,11 @@ def apply_increments(snapshot_df, increment_df):
return snapshot_df


def loadIncrementsSinceDate(
increment_table_name: str, name_space: str, date: Optional[str] = None
) -> DataFrame:
"""
Loads increments from the specified catalog table starting from a given date.
If the provided date is None, it defaults to 60 days ago.

Returns:
DataFrame: A Spark DataFrame containing the loaded increments.
"""
if date is None:
date = (datetime.now() - timedelta(days=60)).strftime("%Y%m%d") # default date
def loadIncrementsSinceDate(increment_table_name, name_space, date):
increment_ddf = glueContext.create_dynamic_frame.from_catalog(
name_space=name_space,
table_name=increment_table_name,
push_down_predicate=f"import_date>='{date}'",
push_down_predicate=f"import_date>={date}",
transformation_ctx=f"datasource_{increment_table_name}",
)
increment_df = increment_ddf.toDF()
Expand Down Expand Up @@ -149,32 +131,6 @@ def loadIncrementsSinceDate(
"enforcement_breach_details": {"unique": ["id"]},
}


def purge_today_partition(
glueContext: GlueContext, target_destination: str, retentionPeriod: int = 0
) -> None:
"""
Purges (delete) only today's partition under the given target destination.

Parameters:
glueContext: GlueContext instance.
target_destination: Base S3 path (e.g., "s3://your-bucket/path").
retentionPeriod: Retention period in hours (default 0, meaning delete all files immediately).

Returns:
partition_path: The S3 partition path that was purged.
"""
now = datetime.now()
snapshot_year = str(now.year)
snapshot_month = str(now.month).zfill(2)
snapshot_day = str(now.day).zfill(2)
snapshot_date = snapshot_year + snapshot_month + snapshot_day

partition_path = f"{target_destination}/snapshot_year={snapshot_year}/snapshot_month={snapshot_month}/snapshot_day={snapshot_day}/snapshot_date={snapshot_date}"

glueContext.purge_s3_path(partition_path, {"retentionPeriod": retentionPeriod})


if __name__ == "__main__":
args = getResolvedOptions(sys.argv, ["JOB_NAME"])

Expand Down Expand Up @@ -216,11 +172,10 @@ def purge_today_partition(
f"No snapshot and no increment for {increment_table_name}, going to the next table"
)
continue

# Load increments from default date
increment_df = loadIncrementsSinceDate(
increment_table_name=increment_table_name,
name_space=source_catalog_database,
date="20210101",
)
if increment_df.rdd.isEmpty():
logger.info(
Expand All @@ -234,18 +189,14 @@ def purge_today_partition(
# snapshot table in glue catalogue
else:
pushDownPredicate = create_pushdown_predicate(
partitionDateColumn="snapshot_date", daysBuffer=60
partitionDateColumn="snapshot_date", daysBuffer=3
)
# load latest snpashot
snapshot_ddf = glueContext.create_dynamic_frame.from_catalog(
name_space=source_catalog_database,
table_name=snapshot_table_name,
push_down_predicate=pushDownPredicate,
)
if snapshot_ddf.count() == 0:
logger.error(
f"No data returned for table {snapshot_table_name} using push_down_predicate: {pushDownPredicate}. "
)
snapshot_df = snapshot_ddf.toDF()
snapshot_df = get_latest_snapshot(snapshot_df)
last_snapshot_date = snapshot_df.select(max("snapshot_date")).first()[0]
Expand Down Expand Up @@ -278,7 +229,6 @@ def purge_today_partition(
# apply COU
logger.info(f"Applying increment {increment_table_name}")
snapshot_df = apply_increments(snapshot_df, increment_df)

else:
logger.info(
f"Couldn't find table {increment_table_name} in database {source_catalog_database}, saving same snapshot as yesterday"
Expand Down Expand Up @@ -368,9 +318,6 @@ def purge_today_partition(
snapshot_df, glueContext, "resultDataFrame"
)
target_destination = s3_bucket_target + table_name

# Clean up today's partition before writing
purge_today_partition(glueContext, target_destination)
parquetData = glueContext.write_dynamic_frame.from_options(
frame=resultDataFrame,
connection_type="s3",
Expand All @@ -380,7 +327,6 @@ def purge_today_partition(
"partitionKeys": PARTITION_KEYS,
},
)

job.commit()
finally:
if len(dq_errors) > 0:
Expand Down
2 changes: 1 addition & 1 deletion terraform/etl/24-aws-glue-tascomi-data.tf
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ module "tascomi_create_daily_snapshot" {
job_name = "${local.short_identifier_prefix}tascomi_create_daily_snapshot_planning"
glue_version = "2.0"
glue_job_worker_type = "G.2X"
number_of_workers_for_glue_job = 6
number_of_workers_for_glue_job = 12
helper_module_key = data.aws_s3_object.helpers.key
pydeequ_zip_key = data.aws_s3_object.pydeequ.key
spark_ui_output_storage_id = module.spark_ui_output_storage_data_source.bucket_id
Expand Down