Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions scripts/jobs/planning/tascomi_create_daily_snapshot.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import sys
from datetime import datetime

Expand Down Expand Up @@ -25,6 +26,11 @@
table_exists_in_catalog,
)

logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)


def get_latest_snapshot(dfa):
dfa = dfa.where(col("snapshot_date") == dfa.select(max("snapshot_date")).first()[0])
Expand All @@ -44,6 +50,24 @@ def add_snapshot_date_columns(data_frame):
return data_frame


def deduplicate_by_id_and_last_updated(df):
"""
Deduplicates rows with the same (id, last_updated) combination by keeping the one with the latest import_date.
To resolve: spotted duplicated rows with same id and last_updated timestamp in some incremental tables (e.g. documents)
"""
window_spec = Window.partitionBy("id", "last_updated").orderBy(
F.col("import_date").desc()
)

deduplicated_df = (
df.withColumn("row_num", F.row_number().over(window_spec))
.filter(F.col("row_num") == 1)
.drop("row_num")
)

return deduplicated_df


def prepare_increments(increment_df):
# In case there are several days worth of increments: only keep the latest version of a record
id_partition = Window.partitionBy("id")
Expand All @@ -62,6 +86,15 @@ def prepare_increments(increment_df):
.where(F.col("last_updated_nonull") == F.col("latest"))
.drop("latest", "last_updated_nonull")
)

# Check for residual duplicates - print and further de-duplicate
duplicate_ids = increment_df.groupBy("id").count().filter("count > 1")
if duplicate_ids.limit(1).count() > 0:
duplicate_ids.join(increment_df, "id").show(truncate=False)
increment_df = deduplicate_by_id_and_last_updated(increment_df)
else:
logger.info("No duplicated rows after initial deduplication.")

return increment_df


Expand Down Expand Up @@ -165,7 +198,6 @@ def purge_today_partition(
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = SparkSession(sc)
logger = glueContext.get_logger()
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

Expand Down Expand Up @@ -336,7 +368,7 @@ def purge_today_partition(
verificationSuite.saveOrAppendResult(resultKey).run()

# if data quality tests succeed, write to S3

snapshot_df = snapshot_df.coalesce(300)
resultDataFrame = DynamicFrame.fromDF(
snapshot_df, glueContext, "resultDataFrame"
)
Expand All @@ -356,7 +388,8 @@ def purge_today_partition(
job.commit()
finally:
if len(dq_errors) > 0:
logger.error(f"DQ Errors: {dq_errors}")
raise Exception(f"Data quality check failed: {'; '.join(dq_errors)}")
logger.error(f"Errors: {dq_errors}")
spark.stop()
raise SystemExit(f"Failed: {'; '.join(dq_errors)}")
spark.sparkContext._gateway.close()
spark.stop()