Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 60 additions & 91 deletions scripts/jobs/parking/parking_foreign_vrm_pcns.py
Original file line number Diff line number Diff line change
@@ -1,103 +1,72 @@
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame
"""
Only need to change the table name and the query prototyped on the Athena UI
by replacing table_name and query_on_athena
Note: python file name should be the same as the table name
"""

from scripts.helpers.athena_helpers import create_update_table_with_partition
from scripts.helpers.helpers import get_glue_env_var
environment = get_glue_env_var("environment")


def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame:
for alias, frame in mapping.items():
frame.toDF().createOrReplaceTempView(alias)
result = spark.sql(query)
return DynamicFrame.fromDF(result, glueContext, transformation_ctx)


args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node Amazon S3
AmazonS3_node1625732651466 = glueContext.create_dynamic_frame.from_catalog(
database="dataplatform-" + environment + "-liberator-refined-zone",
table_name="pcnfoidetails_pcn_foi_full",
transformation_ctx="AmazonS3_node1625732651466",
)
environment = get_glue_env_var("environment")

# Script generated for node Amazon S3
AmazonS3_node1646229922398 = glueContext.create_dynamic_frame.from_catalog(
database="dataplatform-" + environment + "-liberator-raw-zone",
table_name="liberator_pcn_tickets",
transformation_ctx="AmazonS3_node1646229922398",
)
# The target table in liberator refined zone
table_name = "Parking_Foreign_VRM_PCNs"

# Script generated for node ApplyMapping
SqlQuery0 = """
# The exact same query prototyped in pre-prod(stg) or prod Athena
query_on_athena = """
/*************************************************************************************************************************
Parking_Foreign_VRM_PCNs
Parking_Foreign_VRM_PCNs

This SQL creates the list of VRMs against the current valid Permits
This SQL creates the list of VRMs against the current valid Permits

02/03/2022 - Create Query
*************************************************************************************************************************/
/* Collect the Foreign PCNs from the raw PCN data */
02/03/2022 - Create Query
*************************************************************************************************************************/
With PCN_Raw as (
select * from liberator_pcn_tickets
Where Import_Date = (Select MAX(Import_Date)
from liberator_pcn_tickets) and lower(foreignvehiclecountry) = 'y')

SELECT
PCN, cast(pcnissuedatetime as timestamp) as pcnissuedatetime, pcn_canx_date, A.cancellationgroup,
A.cancellationreason, street_location,
whereonlocation, zone, usrn, A.contraventioncode, debttype, A.vrm, vehiclemake, vehiclemodel,
vehiclecolour, ceo, isremoval, A.progressionstage, lib_payment_received as payment_received,
whenpaid as PaymentDate, noderef, replace(replace(ticketnotes, '\r',''), '\n','') as ticketnotes,

current_timestamp() as ImportDateTime,

replace(cast(current_date() as string),'-','') as import_date,

-- Add the Import date
cast(Year(current_date) as string) as import_year,
cast(month(current_date) as string) as import_month,
cast(day(current_date) as string) as import_day


FROM pcnfoidetails_pcn_foi_full as A
INNER JOIN PCN_Raw as B ON A.pcn = B.ticketserialnumber
Where ImportDateTime = (Select MAX(ImportDateTime) from pcnfoidetails_pcn_foi_full) and warningflag = 0
Order By pcnissuedatetime
"""
ApplyMapping_node2 = sparkSqlQuery(
glueContext,
query=SqlQuery0,
mapping={
"pcnfoidetails_pcn_foi_full": AmazonS3_node1625732651466,
"liberator_pcn_tickets": AmazonS3_node1646229922398,
},
transformation_ctx="ApplyMapping_node2",
select *
from "dataplatform-prod-liberator-raw-zone".liberator_pcn_tickets
Where Import_Date = (
Select MAX(Import_Date)
from "dataplatform-prod-liberator-raw-zone".liberator_pcn_tickets
)
and lower(foreignvehiclecountry) = 'y'
)
SELECT PCN,
cast(pcnissuedatetime as timestamp) as pcnissuedatetime,
pcn_canx_date,
A.cancellationgroup,
A.cancellationreason,
street_location,
whereonlocation,
zone,
usrn,
A.contraventioncode,
debttype,
A.vrm,
vehiclemake,
vehiclemodel,
vehiclecolour,
ceo,
isremoval,
A.progressionstage,
lib_payment_received as payment_received,
whenpaid as PaymentDate,
noderef,
replace(replace(ticketnotes, '\r', ''), '\n', '') as ticketnotes,
cast(current_timestamp as timestamp) as ImportDateTime,
format_datetime(current_date, 'yyyy') AS import_year,
format_datetime(current_date, 'MM') AS import_month,
format_datetime(current_date, 'dd') AS import_day,
format_datetime(current_date, 'yyyyMMdd') AS import_date
FROM "dataplatform-prod-liberator-refined-zone".pcnfoidetails_pcn_foi_full as A
INNER JOIN PCN_Raw as B ON A.pcn = B.ticketserialnumber
Where ImportDateTime = (
Select MAX(ImportDateTime)
from "dataplatform-prod-liberator-refined-zone".pcnfoidetails_pcn_foi_full
)
and warningflag = 0
Order By pcnissuedatetime;
"""

# Script generated for node S3 bucket
S3bucket_node3 = glueContext.getSink(
path="s3://dataplatform-" + environment + "-refined-zone/parking/liberator/Parking_Foreign_VRM_PCNs/",
connection_type="s3",
updateBehavior="UPDATE_IN_DATABASE",
partitionKeys=["import_year", "import_month", "import_day"],
enableUpdateCatalog=True,
transformation_ctx="S3bucket_node3",
)
S3bucket_node3.setCatalogInfo(
catalogDatabase="dataplatform-" + environment + "-liberator-refined-zone",
catalogTableName="Parking_Foreign_VRM_PCNs",
create_update_table_with_partition(
environment=environment, query_on_athena=query_on_athena, table_name=table_name
)
S3bucket_node3.setFormat("glueparquet")
S3bucket_node3.writeFrame(ApplyMapping_node2)
job.commit()
2 changes: 1 addition & 1 deletion terraform/etl/38-aws-glue-job-parking.tf
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ module "parking_foreign_vrm_pcns" {
triggered_by_job = module.parking_pcn_denormalisation.job_name
job_description = "This job creates the LTN PCN count and Total paid"
workflow_name = "${local.short_identifier_prefix}parking-liberator-data-workflow"
number_of_workers_for_glue_job = 10
number_of_workers_for_glue_job = 2
glue_job_worker_type = "G.1X"
glue_version = "4.0"
job_parameters = {
Expand Down
Loading