From 8a241dbff3daeaee9148fc05528ef74435aafb17 Mon Sep 17 00:00:00 2001 From: Tian Chen <38001883+Tian-2017@users.noreply.github.com> Date: Thu, 22 May 2025 11:58:54 +0100 Subject: [PATCH] change Parking_Foreign_VRM_PCNs to athena sql --- .../jobs/parking/parking_foreign_vrm_pcns.py | 151 +++++++----------- terraform/etl/38-aws-glue-job-parking.tf | 2 +- 2 files changed, 61 insertions(+), 92 deletions(-) diff --git a/scripts/jobs/parking/parking_foreign_vrm_pcns.py b/scripts/jobs/parking/parking_foreign_vrm_pcns.py index 446b5553d..2af83b08f 100644 --- a/scripts/jobs/parking/parking_foreign_vrm_pcns.py +++ b/scripts/jobs/parking/parking_foreign_vrm_pcns.py @@ -1,103 +1,72 @@ -import sys -from awsglue.transforms import * -from awsglue.utils import getResolvedOptions -from pyspark.context import SparkContext -from awsglue.context import GlueContext -from awsglue.job import Job -from awsglue import DynamicFrame +""" +Only need to change the table name and the query prototyped on the Athena UI +by replacing table_name and query_on_athena +Note: python file name should be the same as the table name +""" +from scripts.helpers.athena_helpers import create_update_table_with_partition from scripts.helpers.helpers import get_glue_env_var -environment = get_glue_env_var("environment") - - -def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame: - for alias, frame in mapping.items(): - frame.toDF().createOrReplaceTempView(alias) - result = spark.sql(query) - return DynamicFrame.fromDF(result, glueContext, transformation_ctx) - - -args = getResolvedOptions(sys.argv, ["JOB_NAME"]) -sc = SparkContext() -glueContext = GlueContext(sc) -spark = glueContext.spark_session -job = Job(glueContext) -job.init(args["JOB_NAME"], args) -# Script generated for node Amazon S3 -AmazonS3_node1625732651466 = glueContext.create_dynamic_frame.from_catalog( - database="dataplatform-" + environment + "-liberator-refined-zone", - table_name="pcnfoidetails_pcn_foi_full", - transformation_ctx="AmazonS3_node1625732651466", -) +environment = get_glue_env_var("environment") -# Script generated for node Amazon S3 -AmazonS3_node1646229922398 = glueContext.create_dynamic_frame.from_catalog( - database="dataplatform-" + environment + "-liberator-raw-zone", - table_name="liberator_pcn_tickets", - transformation_ctx="AmazonS3_node1646229922398", -) +# The target table in liberator refined zone +table_name = "Parking_Foreign_VRM_PCNs" -# Script generated for node ApplyMapping -SqlQuery0 = """ +# The exact same query prototyped in pre-prod(stg) or prod Athena +query_on_athena = """ /************************************************************************************************************************* -Parking_Foreign_VRM_PCNs + Parking_Foreign_VRM_PCNs -This SQL creates the list of VRMs against the current valid Permits + This SQL creates the list of VRMs against the current valid Permits -02/03/2022 - Create Query -*************************************************************************************************************************/ -/* Collect the Foreign PCNs from the raw PCN data */ + 02/03/2022 - Create Query + *************************************************************************************************************************/ With PCN_Raw as ( -select * from liberator_pcn_tickets -Where Import_Date = (Select MAX(Import_Date) - from liberator_pcn_tickets) and lower(foreignvehiclecountry) = 'y') - -SELECT - PCN, cast(pcnissuedatetime as timestamp) as pcnissuedatetime, pcn_canx_date, A.cancellationgroup, - A.cancellationreason, street_location, - whereonlocation, zone, usrn, A.contraventioncode, debttype, A.vrm, vehiclemake, vehiclemodel, - vehiclecolour, ceo, isremoval, A.progressionstage, lib_payment_received as payment_received, - whenpaid as PaymentDate, noderef, replace(replace(ticketnotes, '\r',''), '\n','') as ticketnotes, - - current_timestamp() as ImportDateTime, - - replace(cast(current_date() as string),'-','') as import_date, - - -- Add the Import date - cast(Year(current_date) as string) as import_year, - cast(month(current_date) as string) as import_month, - cast(day(current_date) as string) as import_day - - - FROM pcnfoidetails_pcn_foi_full as A - INNER JOIN PCN_Raw as B ON A.pcn = B.ticketserialnumber - Where ImportDateTime = (Select MAX(ImportDateTime) from pcnfoidetails_pcn_foi_full) and warningflag = 0 - Order By pcnissuedatetime -""" -ApplyMapping_node2 = sparkSqlQuery( - glueContext, - query=SqlQuery0, - mapping={ - "pcnfoidetails_pcn_foi_full": AmazonS3_node1625732651466, - "liberator_pcn_tickets": AmazonS3_node1646229922398, - }, - transformation_ctx="ApplyMapping_node2", + select * + from "dataplatform-prod-liberator-raw-zone".liberator_pcn_tickets + Where Import_Date = ( + Select MAX(Import_Date) + from "dataplatform-prod-liberator-raw-zone".liberator_pcn_tickets + ) + and lower(foreignvehiclecountry) = 'y' ) +SELECT PCN, + cast(pcnissuedatetime as timestamp) as pcnissuedatetime, + pcn_canx_date, + A.cancellationgroup, + A.cancellationreason, + street_location, + whereonlocation, + zone, + usrn, + A.contraventioncode, + debttype, + A.vrm, + vehiclemake, + vehiclemodel, + vehiclecolour, + ceo, + isremoval, + A.progressionstage, + lib_payment_received as payment_received, + whenpaid as PaymentDate, + noderef, + replace(replace(ticketnotes, '\r', ''), '\n', '') as ticketnotes, + cast(current_timestamp as timestamp) as ImportDateTime, + format_datetime(current_date, 'yyyy') AS import_year, + format_datetime(current_date, 'MM') AS import_month, + format_datetime(current_date, 'dd') AS import_day, + format_datetime(current_date, 'yyyyMMdd') AS import_date +FROM "dataplatform-prod-liberator-refined-zone".pcnfoidetails_pcn_foi_full as A + INNER JOIN PCN_Raw as B ON A.pcn = B.ticketserialnumber +Where ImportDateTime = ( + Select MAX(ImportDateTime) + from "dataplatform-prod-liberator-refined-zone".pcnfoidetails_pcn_foi_full + ) + and warningflag = 0 +Order By pcnissuedatetime; +""" -# Script generated for node S3 bucket -S3bucket_node3 = glueContext.getSink( - path="s3://dataplatform-" + environment + "-refined-zone/parking/liberator/Parking_Foreign_VRM_PCNs/", - connection_type="s3", - updateBehavior="UPDATE_IN_DATABASE", - partitionKeys=["import_year", "import_month", "import_day"], - enableUpdateCatalog=True, - transformation_ctx="S3bucket_node3", -) -S3bucket_node3.setCatalogInfo( - catalogDatabase="dataplatform-" + environment + "-liberator-refined-zone", - catalogTableName="Parking_Foreign_VRM_PCNs", +create_update_table_with_partition( + environment=environment, query_on_athena=query_on_athena, table_name=table_name ) -S3bucket_node3.setFormat("glueparquet") -S3bucket_node3.writeFrame(ApplyMapping_node2) -job.commit() diff --git a/terraform/etl/38-aws-glue-job-parking.tf b/terraform/etl/38-aws-glue-job-parking.tf index 821062b0e..5638ce788 100644 --- a/terraform/etl/38-aws-glue-job-parking.tf +++ b/terraform/etl/38-aws-glue-job-parking.tf @@ -200,7 +200,7 @@ module "parking_foreign_vrm_pcns" { triggered_by_job = module.parking_pcn_denormalisation.job_name job_description = "This job creates the LTN PCN count and Total paid" workflow_name = "${local.short_identifier_prefix}parking-liberator-data-workflow" - number_of_workers_for_glue_job = 10 + number_of_workers_for_glue_job = 2 glue_job_worker_type = "G.1X" glue_version = "4.0" job_parameters = {