diff --git a/scripts/jobs/planning/tascomi_create_daily_snapshot.py b/scripts/jobs/planning/tascomi_create_daily_snapshot.py index 90aa3bc30..c36e7bdb7 100644 --- a/scripts/jobs/planning/tascomi_create_daily_snapshot.py +++ b/scripts/jobs/planning/tascomi_create_daily_snapshot.py @@ -1,3 +1,4 @@ +import logging import sys from datetime import datetime @@ -25,6 +26,11 @@ table_exists_in_catalog, ) +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + def get_latest_snapshot(dfa): dfa = dfa.where(col("snapshot_date") == dfa.select(max("snapshot_date")).first()[0]) @@ -44,6 +50,24 @@ def add_snapshot_date_columns(data_frame): return data_frame +def deduplicate_by_id_and_last_updated(df): + """ + Deduplicates rows with the same (id, last_updated) combination by keeping the one with the latest import_date. + To resolve: spotted duplicated rows with same id and last_updated timestamp in some incremental tables (e.g. documents) + """ + window_spec = Window.partitionBy("id", "last_updated").orderBy( + F.col("import_date").desc() + ) + + deduplicated_df = ( + df.withColumn("row_num", F.row_number().over(window_spec)) + .filter(F.col("row_num") == 1) + .drop("row_num") + ) + + return deduplicated_df + + def prepare_increments(increment_df): # In case there are several days worth of increments: only keep the latest version of a record id_partition = Window.partitionBy("id") @@ -62,6 +86,15 @@ def prepare_increments(increment_df): .where(F.col("last_updated_nonull") == F.col("latest")) .drop("latest", "last_updated_nonull") ) + + # Check for residual duplicates - print and further de-duplicate + duplicate_ids = increment_df.groupBy("id").count().filter("count > 1") + if duplicate_ids.limit(1).count() > 0: + duplicate_ids.join(increment_df, "id").show(truncate=False) + increment_df = deduplicate_by_id_and_last_updated(increment_df) + else: + logger.info("No duplicated rows after initial deduplication.") + return increment_df @@ -165,7 +198,6 @@ def purge_today_partition( sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) spark = SparkSession(sc) - logger = glueContext.get_logger() job = Job(glueContext) job.init(args["JOB_NAME"], args) @@ -336,7 +368,7 @@ def purge_today_partition( verificationSuite.saveOrAppendResult(resultKey).run() # if data quality tests succeed, write to S3 - + snapshot_df = snapshot_df.coalesce(300) resultDataFrame = DynamicFrame.fromDF( snapshot_df, glueContext, "resultDataFrame" ) @@ -356,7 +388,8 @@ def purge_today_partition( job.commit() finally: if len(dq_errors) > 0: - logger.error(f"DQ Errors: {dq_errors}") - raise Exception(f"Data quality check failed: {'; '.join(dq_errors)}") + logger.error(f"Errors: {dq_errors}") + spark.stop() + raise SystemExit(f"Failed: {'; '.join(dq_errors)}") spark.sparkContext._gateway.close() spark.stop()