From 5175683d400289d2aa5a258418d163e89bf6a7f4 Mon Sep 17 00:00:00 2001 From: Tian Chen <38001883+Tian-2017@users.noreply.github.com> Date: Thu, 13 Mar 2025 14:59:08 +0000 Subject: [PATCH 1/3] Roll back all the changes I made on the Tascomi snapshot job. Roll back all the changes: The below is the code of previous verison (Aug 10, 2023) before I made changed https://github.com/LBHackney-IT/Data-Platform/blob/04263eef9512467d9d38b11c46cd2ad48f0c280c/scripts/jobs/planning/tascomi_create_daily_snapshot.py --- .../planning/tascomi_create_daily_snapshot.py | 77 +++---------------- 1 file changed, 12 insertions(+), 65 deletions(-) diff --git a/scripts/jobs/planning/tascomi_create_daily_snapshot.py b/scripts/jobs/planning/tascomi_create_daily_snapshot.py index ae9f8bd10..27c9a37f5 100644 --- a/scripts/jobs/planning/tascomi_create_daily_snapshot.py +++ b/scripts/jobs/planning/tascomi_create_daily_snapshot.py @@ -1,7 +1,7 @@ import sys -from datetime import datetime, timedelta -from typing import Optional +from datetime import datetime +import pydeequ import pyspark.sql.functions as F from awsglue.context import GlueContext from awsglue.dynamicframe import DynamicFrame @@ -11,7 +11,7 @@ from pydeequ.repository import FileSystemMetricsRepository, ResultKey from pydeequ.verification import VerificationResult, VerificationSuite from pyspark import SparkContext -from pyspark.sql import DataFrame, SparkSession, Window +from pyspark.sql import SparkSession, Window from pyspark.sql.functions import col, max from scripts.helpers.data_quality_testing import ( @@ -27,15 +27,9 @@ ) -def get_latest_snapshot(dfa: DataFrame) -> DataFrame: - if "snapshot_date" not in dfa.columns: - logger.warn( - "No snapshot_date column found in the dataframe, adding snapshot date columns" - ) - dfa = add_snapshot_date_columns(dfa) - max_date = dfa.select(max("snapshot_date")).first()[0] - df_latest_snapshot = dfa.where(col("snapshot_date") == max_date) - return df_latest_snapshot +def get_latest_snapshot(dfa): + dfa = dfa.where(col("snapshot_date") == dfa.select(max("snapshot_date")).first()[0]) + return dfa def add_snapshot_date_columns(data_frame): @@ -78,22 +72,11 @@ def apply_increments(snapshot_df, increment_df): return snapshot_df -def loadIncrementsSinceDate( - increment_table_name: str, name_space: str, date: Optional[str] = None -) -> DataFrame: - """ - Loads increments from the specified catalog table starting from a given date. - If the provided date is None, it defaults to 60 days ago. - - Returns: - DataFrame: A Spark DataFrame containing the loaded increments. - """ - if date is None: - date = (datetime.now() - timedelta(days=60)).strftime("%Y%m%d") # default date +def loadIncrementsSinceDate(increment_table_name, name_space, date): increment_ddf = glueContext.create_dynamic_frame.from_catalog( name_space=name_space, table_name=increment_table_name, - push_down_predicate=f"import_date>='{date}'", + push_down_predicate=f"import_date>={date}", transformation_ctx=f"datasource_{increment_table_name}", ) increment_df = increment_ddf.toDF() @@ -149,32 +132,6 @@ def loadIncrementsSinceDate( "enforcement_breach_details": {"unique": ["id"]}, } - -def purge_today_partition( - glueContext: GlueContext, target_destination: str, retentionPeriod: int = 0 -) -> None: - """ - Purges (delete) only today's partition under the given target destination. - - Parameters: - glueContext: GlueContext instance. - target_destination: Base S3 path (e.g., "s3://your-bucket/path"). - retentionPeriod: Retention period in hours (default 0, meaning delete all files immediately). - - Returns: - partition_path: The S3 partition path that was purged. - """ - now = datetime.now() - snapshot_year = str(now.year) - snapshot_month = str(now.month).zfill(2) - snapshot_day = str(now.day).zfill(2) - snapshot_date = snapshot_year + snapshot_month + snapshot_day - - partition_path = f"{target_destination}/snapshot_year={snapshot_year}/snapshot_month={snapshot_month}/snapshot_day={snapshot_day}/snapshot_date={snapshot_date}" - - glueContext.purge_s3_path(partition_path, {"retentionPeriod": retentionPeriod}) - - if __name__ == "__main__": args = getResolvedOptions(sys.argv, ["JOB_NAME"]) @@ -216,11 +173,10 @@ def purge_today_partition( f"No snapshot and no increment for {increment_table_name}, going to the next table" ) continue - - # Load increments from default date increment_df = loadIncrementsSinceDate( increment_table_name=increment_table_name, name_space=source_catalog_database, + date="20210101", ) if increment_df.rdd.isEmpty(): logger.info( @@ -234,7 +190,7 @@ def purge_today_partition( # snapshot table in glue catalogue else: pushDownPredicate = create_pushdown_predicate( - partitionDateColumn="snapshot_date", daysBuffer=60 + partitionDateColumn="snapshot_date", daysBuffer=3 ) # load latest snpashot snapshot_ddf = glueContext.create_dynamic_frame.from_catalog( @@ -242,10 +198,6 @@ def purge_today_partition( table_name=snapshot_table_name, push_down_predicate=pushDownPredicate, ) - if snapshot_ddf.count() == 0: - logger.error( - f"No data returned for table {snapshot_table_name} using push_down_predicate: {pushDownPredicate}. " - ) snapshot_df = snapshot_ddf.toDF() snapshot_df = get_latest_snapshot(snapshot_df) last_snapshot_date = snapshot_df.select(max("snapshot_date")).first()[0] @@ -278,7 +230,6 @@ def purge_today_partition( # apply COU logger.info(f"Applying increment {increment_table_name}") snapshot_df = apply_increments(snapshot_df, increment_df) - else: logger.info( f"Couldn't find table {increment_table_name} in database {source_catalog_database}, saving same snapshot as yesterday" @@ -347,7 +298,7 @@ def purge_today_partition( except Exception as verificationError: logger.info( - "Job cancelled due to data quality test failure, continuing to next table." + f"Job cancelled due to data quality test failure, continuing to next table." ) message = verificationError.args logger.info(f"{message[0]}") @@ -358,7 +309,7 @@ def purge_today_partition( else: logger.info( - "Data quality tests passed, appending data quality results to JSON and moving on to writing data" + f"Data quality tests passed, appending data quality results to JSON and moving on to writing data" ) verificationSuite.saveOrAppendResult(resultKey).run() @@ -368,9 +319,6 @@ def purge_today_partition( snapshot_df, glueContext, "resultDataFrame" ) target_destination = s3_bucket_target + table_name - - # Clean up today's partition before writing - purge_today_partition(glueContext, target_destination) parquetData = glueContext.write_dynamic_frame.from_options( frame=resultDataFrame, connection_type="s3", @@ -380,7 +328,6 @@ def purge_today_partition( "partitionKeys": PARTITION_KEYS, }, ) - job.commit() finally: if len(dq_errors) > 0: From 0874c27cfdc8fc56a658ac431af9ae24ef8f2e49 Mon Sep 17 00:00:00 2001 From: Tian Chen <38001883+Tian-2017@users.noreply.github.com> Date: Thu, 13 Mar 2025 15:00:20 +0000 Subject: [PATCH 2/3] revert the workers --- terraform/etl/24-aws-glue-tascomi-data.tf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/terraform/etl/24-aws-glue-tascomi-data.tf b/terraform/etl/24-aws-glue-tascomi-data.tf index 32bbc2cfa..ca68a1ef6 100644 --- a/terraform/etl/24-aws-glue-tascomi-data.tf +++ b/terraform/etl/24-aws-glue-tascomi-data.tf @@ -264,7 +264,7 @@ module "tascomi_create_daily_snapshot" { job_name = "${local.short_identifier_prefix}tascomi_create_daily_snapshot_planning" glue_version = "2.0" glue_job_worker_type = "G.2X" - number_of_workers_for_glue_job = 6 + number_of_workers_for_glue_job = 12 helper_module_key = data.aws_s3_object.helpers.key pydeequ_zip_key = data.aws_s3_object.pydeequ.key spark_ui_output_storage_id = module.spark_ui_output_storage_data_source.bucket_id From 7c098a57f79fffef7325c90843da4cb7fbfddeda Mon Sep 17 00:00:00 2001 From: Tian Chen <38001883+Tian-2017@users.noreply.github.com> Date: Thu, 13 Mar 2025 15:01:25 +0000 Subject: [PATCH 3/3] fix python linter --- scripts/jobs/planning/tascomi_create_daily_snapshot.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/scripts/jobs/planning/tascomi_create_daily_snapshot.py b/scripts/jobs/planning/tascomi_create_daily_snapshot.py index 27c9a37f5..7dbf58ece 100644 --- a/scripts/jobs/planning/tascomi_create_daily_snapshot.py +++ b/scripts/jobs/planning/tascomi_create_daily_snapshot.py @@ -1,7 +1,6 @@ import sys from datetime import datetime -import pydeequ import pyspark.sql.functions as F from awsglue.context import GlueContext from awsglue.dynamicframe import DynamicFrame @@ -298,7 +297,7 @@ def loadIncrementsSinceDate(increment_table_name, name_space, date): except Exception as verificationError: logger.info( - f"Job cancelled due to data quality test failure, continuing to next table." + "Job cancelled due to data quality test failure, continuing to next table." ) message = verificationError.args logger.info(f"{message[0]}") @@ -309,7 +308,7 @@ def loadIncrementsSinceDate(increment_table_name, name_space, date): else: logger.info( - f"Data quality tests passed, appending data quality results to JSON and moving on to writing data" + "Data quality tests passed, appending data quality results to JSON and moving on to writing data" ) verificationSuite.saveOrAppendResult(resultKey).run()