Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 56 additions & 111 deletions scripts/jobs/parking/parking_permit_denormalised_gds_street_llpg.py
Original file line number Diff line number Diff line change
@@ -1,75 +1,19 @@
import sys
from calendar import c

from awsglue import DynamicFrame
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext

from scripts.helpers.helpers import (
PARTITION_KEYS,
create_pushdown_predicate,
get_glue_env_var,
get_latest_partitions,
)


def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame:
for alias, frame in mapping.items():
frame.toDF().createOrReplaceTempView(alias)
result = spark.sql(query)
return DynamicFrame.fromDF(result, glueContext, transformation_ctx)
"""
Only need to change the table name and the query prototyped on the Athena UI
by replacing table_name and query_on_athena
"""

from scripts.helpers.athena_helpers import create_update_table_with_partition
from scripts.helpers.helpers import get_glue_env_var

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
environment = get_glue_env_var("environment")

# Script generated for node S3 bucket - refined - parking_permit_denormalised_data
S3bucketrefinedparking_permit_denormalised_data_node1 = glueContext.create_dynamic_frame.from_catalog(
database="dataplatform-" + environment + "-liberator-refined-zone",
table_name="parking_permit_denormalised_data",
transformation_ctx="S3bucketrefinedparking_permit_denormalised_data_node1",
# teporarily removed while table partitions are fixed
# push_down_predicate=create_pushdown_predicate("import_date", 7),
)

# Script generated for node Amazon S3 - raw - liberator_permit_llpg
AmazonS3rawliberator_permit_llpg_node1657535904691 = (
glueContext.create_dynamic_frame.from_catalog(
database="dataplatform-" + environment + "-liberator-raw-zone",
table_name="liberator_permit_llpg",
transformation_ctx="AmazonS3rawliberator_permit_llpg_node1657535904691",
push_down_predicate=create_pushdown_predicate("import_date", 7),
)
)

# Script generated for node Amazon S3 - unrestricted_address_api_dbo_hackney_address
AmazonS3unrestricted_address_api_dbo_hackney_address_node1657535910004 = glueContext.create_dynamic_frame.from_catalog(
database="dataplatform-" + environment + "-raw-zone-unrestricted-address-api",
table_name="unrestricted_address_api_dbo_hackney_address",
transformation_ctx="AmazonS3unrestricted_address_api_dbo_hackney_address_node1657535910004",
push_down_predicate=create_pushdown_predicate("import_date", 30),
)

# Script generated for node Amazon S3 - parking raw - ltn_london_fields
AmazonS3parkingrawltn_london_fields_node1657536241729 = (
glueContext.create_dynamic_frame.from_catalog(
database="parking-raw-zone",
table_name="ltn_london_fields",
transformation_ctx="AmazonS3parkingrawltn_london_fields_node1657536241729",
)
)
# The target table in liberator refined zone
table_name = "parking_permit_denormalised_gds_street_llpg"

# Script generated for node Apply Mapping - New
SqlQuery0 = """
/*08/04/2022 - added ,case when latest_permit_status in('Approved','Renewed','Created','ORDER_APPROVED','PENDING_VRM_CHANGE','RENEW_EVID','PENDING_ADDR_CHANGE') and live_permit_flag = 1 then 1 else 0 end as live_flag */
# The exact same query prototyped in pre-prod(stg) or prod Athena
query_on_athena = """
-- Updated 2025-01-29 "unrestricted-raw-zone"."geolive_llpg_llpg_address" replaces "dataplatform-prod-raw-zone-unrestricted-address-api"."unrestricted_address_api_dbo_hackney_address"
with street as (select
UPRN as SR_UPRN,
ADDRESS1 as SR_ADDRESS1,
Expand Down Expand Up @@ -136,18 +80,47 @@ def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFra
,import_month
,import_day
,import_date
FROM liberator_permit_llpg
where (ADDRESS1 like 'Street Record' or ADDRESS1 like 'STREET RECORD') and liberator_permit_llpg.import_date = (SELECT MAX(liberator_permit_llpg.import_date) FROM liberator_permit_llpg)
FROM "dataplatform-prod-liberator-raw-zone".liberator_permit_llpg
where (ADDRESS1 like 'Street Record' or ADDRESS1 like 'STREET RECORD') and liberator_permit_llpg.import_date = (SELECT MAX(liberator_permit_llpg.import_date) FROM "dataplatform-prod-liberator-raw-zone".liberator_permit_llpg)
)
, llpg as (
SELECT * FROM unrestricted_address_api_dbo_hackney_address where unrestricted_address_api_dbo_hackney_address.import_date = (SELECT max(unrestricted_address_api_dbo_hackney_address.import_date) FROM unrestricted_address_api_dbo_hackney_address) and lpi_logical_status like 'Approved Preferred'
SELECT
-- conversion of new column names and data-types to support the original query...
uprn, -- bigint -- Confirmed: was previously converted from double to bigint to prevent subsequent varchar casts being broken.
usrn, -- int
street_description, -- string
isparent AS property_shell, -- boolean
blpu_class, -- string
usage_primary, -- string
usage_description, -- string
planning_use_class, -- string
CAST(longitude AS DOUBLE) AS longitude, -- converts string to double
CAST(latitude AS DOUBLE) AS latitude, -- converts string to double
lpi_logical_status_code, -- int BS7666 code
lpi_logical_status, --string logical description as per previous API version
import_date -- string
FROM "unrestricted-raw-zone"."geolive_llpg_llpg_address"
-- replaces "dataplatform-prod-raw-zone-unrestricted-address-api"."unrestricted_address_api_dbo_hackney_address"
where import_date = (SELECT max(import_date) FROM "unrestricted-raw-zone"."geolive_llpg_llpg_address")
and lpi_logical_status_code = 1 -- like 'Approved Preferred'
)
SELECT street.usrn as sr_usrn, SR_ADDRESS1, SR_ADDRESS2, llpg.street_description, SR_WARD_CODE, SR_ward_name, llpg.property_shell, llpg.blpu_class, llpg.usage_primary, llpg.usage_description
,concat(cast(street.usrn as string),' - ', llpg.street_description) as street, concat(cast(llpg.uprn as string),' - ',llpg.usage_description) as add_type , concat(llpg.blpu_class,' - ',llpg.planning_use_class ) as add_class,
SELECT
street.usrn as sr_usrn,
SR_ADDRESS1, SR_ADDRESS2,
llpg.street_description,
SR_WARD_CODE,
SR_ward_name,
llpg.property_shell,
llpg.blpu_class,
llpg.usage_primary,
llpg.usage_description,
concat(cast(street.usrn as varchar),' - ', llpg.street_description) as street,
concat(cast(llpg.uprn as varchar),' - ',llpg.usage_description) as add_type,
concat(llpg.blpu_class,' - ',llpg.planning_use_class ) as add_class,

case when cpz !='' and cpz_name != '' then concat(cpz,' - ', cpz_name)
when cpz !='' and cpz_name = '' then concat(cpz)
when cpz ='' and cpz_name != '' then concat(cpz_name)
when cpz !='' and cpz_name = '' then concat(cpz,'-')
when cpz ='' and cpz_name != '' then concat(cpz_name,'-')
else 'NONE'
end as zone_name,
case
Expand Down Expand Up @@ -219,49 +192,21 @@ def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFra
, Case when ltn_london_fields.uprn !='' then 'LTN London Fields' else 'NOT LTN London Fields' end as flag_name_ltn_london_fields


, parking_permit_denormalised_data.*, llpg.planning_use_class, llpg.longitude, llpg.latitude FROM parking_permit_denormalised_data
, parking_permit_denormalised_data.*, llpg.planning_use_class, llpg.longitude, llpg.latitude FROM "dataplatform-prod-liberator-refined-zone".parking_permit_denormalised_data

left join llpg on cast(llpg.uprn as string) = cast(parking_permit_denormalised_data.uprn as string) /*and
cast(concat(parking_permit_denormalised_data.import_year,parking_permit_denormalised_data.import_month,parking_permit_denormalised_data.import_day) as string
left join llpg on cast(llpg.uprn as varchar) = cast(parking_permit_denormalised_data.uprn as varchar) /*and
cast(concat(parking_permit_denormalised_data.import_year,parking_permit_denormalised_data.import_month,parking_permit_denormalised_data.import_day) as varchar
) = llpg.import_date*/

left join street on cast(street.usrn as string) = cast(llpg.usrn as string)
left join street on cast(street.usrn as varchar) = cast(llpg.usrn as varchar)

left join ltn_london_fields on ltn_london_fields.uprn = parking_permit_denormalised_data.uprn


where parking_permit_denormalised_data.import_date = (SELECT max(parking_permit_denormalised_data.import_date) FROM parking_permit_denormalised_data)
left join "parking-raw-zone".ltn_london_fields on ltn_london_fields.uprn = parking_permit_denormalised_data.uprn


where parking_permit_denormalised_data.import_date = (SELECT max(parking_permit_denormalised_data.import_date) FROM "dataplatform-prod-liberator-refined-zone".parking_permit_denormalised_data)
;
"""
ApplyMappingNew_node1657710041310 = sparkSqlQuery(
glueContext,
query=SqlQuery0,
mapping={
"parking_permit_denormalised_data": S3bucketrefinedparking_permit_denormalised_data_node1,
"liberator_permit_llpg": AmazonS3rawliberator_permit_llpg_node1657535904691,
"unrestricted_address_api_dbo_hackney_address": AmazonS3unrestricted_address_api_dbo_hackney_address_node1657535910004,
"ltn_london_fields": AmazonS3parkingrawltn_london_fields_node1657536241729,
},
transformation_ctx="ApplyMappingNew_node1657710041310",
)

# Script generated for node Amazon S3
AmazonS3_node1657709892303 = glueContext.getSink(
path="s3://dataplatform-"
+ environment
+ "-refined-zone/parking/liberator/parking_permit_denormalised_gds_street_llpg/",
connection_type="s3",
updateBehavior="UPDATE_IN_DATABASE",
partitionKeys=["import_year", "import_month", "import_day", "import_date"],
compression="snappy",
enableUpdateCatalog=True,
transformation_ctx="AmazonS3_node1657709892303",
)
AmazonS3_node1657709892303.setCatalogInfo(
catalogDatabase="dataplatform-" + environment + "-liberator-refined-zone",
catalogTableName="parking_permit_denormalised_gds_street_llpg",
create_update_table_with_partition(
environment=environment, query_on_athena=query_on_athena, table_name=table_name
)
AmazonS3_node1657709892303.setFormat("glueparquet")
AmazonS3_node1657709892303.writeFrame(ApplyMappingNew_node1657710041310)
job.commit()