From 27222b07468ed21f8330fd7ce74636cf37ac4eef Mon Sep 17 00:00:00 2001 From: Tian Chen <38001883+Tian-2017@users.noreply.github.com> Date: Tue, 11 Feb 2025 16:40:48 +0000 Subject: [PATCH] fix the table related to llpg using steve sql script --- ...ing_permit_denormalised_gds_street_llpg.py | 167 ++++++------------ 1 file changed, 56 insertions(+), 111 deletions(-) diff --git a/scripts/jobs/parking/parking_permit_denormalised_gds_street_llpg.py b/scripts/jobs/parking/parking_permit_denormalised_gds_street_llpg.py index 873716f20..ea472fe00 100644 --- a/scripts/jobs/parking/parking_permit_denormalised_gds_street_llpg.py +++ b/scripts/jobs/parking/parking_permit_denormalised_gds_street_llpg.py @@ -1,75 +1,19 @@ -import sys -from calendar import c - -from awsglue import DynamicFrame -from awsglue.context import GlueContext -from awsglue.job import Job -from awsglue.transforms import * -from awsglue.utils import getResolvedOptions -from pyspark.context import SparkContext - -from scripts.helpers.helpers import ( - PARTITION_KEYS, - create_pushdown_predicate, - get_glue_env_var, - get_latest_partitions, -) - - -def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame: - for alias, frame in mapping.items(): - frame.toDF().createOrReplaceTempView(alias) - result = spark.sql(query) - return DynamicFrame.fromDF(result, glueContext, transformation_ctx) +""" +Only need to change the table name and the query prototyped on the Athena UI +by replacing table_name and query_on_athena +""" +from scripts.helpers.athena_helpers import create_update_table_with_partition +from scripts.helpers.helpers import get_glue_env_var -args = getResolvedOptions(sys.argv, ["JOB_NAME"]) -sc = SparkContext() -glueContext = GlueContext(sc) -spark = glueContext.spark_session -job = Job(glueContext) -job.init(args["JOB_NAME"], args) environment = get_glue_env_var("environment") -# Script generated for node S3 bucket - refined - parking_permit_denormalised_data -S3bucketrefinedparking_permit_denormalised_data_node1 = glueContext.create_dynamic_frame.from_catalog( - database="dataplatform-" + environment + "-liberator-refined-zone", - table_name="parking_permit_denormalised_data", - transformation_ctx="S3bucketrefinedparking_permit_denormalised_data_node1", - # teporarily removed while table partitions are fixed - # push_down_predicate=create_pushdown_predicate("import_date", 7), -) - -# Script generated for node Amazon S3 - raw - liberator_permit_llpg -AmazonS3rawliberator_permit_llpg_node1657535904691 = ( - glueContext.create_dynamic_frame.from_catalog( - database="dataplatform-" + environment + "-liberator-raw-zone", - table_name="liberator_permit_llpg", - transformation_ctx="AmazonS3rawliberator_permit_llpg_node1657535904691", - push_down_predicate=create_pushdown_predicate("import_date", 7), - ) -) - -# Script generated for node Amazon S3 - unrestricted_address_api_dbo_hackney_address -AmazonS3unrestricted_address_api_dbo_hackney_address_node1657535910004 = glueContext.create_dynamic_frame.from_catalog( - database="dataplatform-" + environment + "-raw-zone-unrestricted-address-api", - table_name="unrestricted_address_api_dbo_hackney_address", - transformation_ctx="AmazonS3unrestricted_address_api_dbo_hackney_address_node1657535910004", - push_down_predicate=create_pushdown_predicate("import_date", 30), -) - -# Script generated for node Amazon S3 - parking raw - ltn_london_fields -AmazonS3parkingrawltn_london_fields_node1657536241729 = ( - glueContext.create_dynamic_frame.from_catalog( - database="parking-raw-zone", - table_name="ltn_london_fields", - transformation_ctx="AmazonS3parkingrawltn_london_fields_node1657536241729", - ) -) +# The target table in liberator refined zone +table_name = "parking_permit_denormalised_gds_street_llpg" -# Script generated for node Apply Mapping - New -SqlQuery0 = """ -/*08/04/2022 - added ,case when latest_permit_status in('Approved','Renewed','Created','ORDER_APPROVED','PENDING_VRM_CHANGE','RENEW_EVID','PENDING_ADDR_CHANGE') and live_permit_flag = 1 then 1 else 0 end as live_flag */ +# The exact same query prototyped in pre-prod(stg) or prod Athena +query_on_athena = """ +-- Updated 2025-01-29 "unrestricted-raw-zone"."geolive_llpg_llpg_address" replaces "dataplatform-prod-raw-zone-unrestricted-address-api"."unrestricted_address_api_dbo_hackney_address" with street as (select UPRN as SR_UPRN, ADDRESS1 as SR_ADDRESS1, @@ -136,18 +80,47 @@ def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFra ,import_month ,import_day ,import_date -FROM liberator_permit_llpg -where (ADDRESS1 like 'Street Record' or ADDRESS1 like 'STREET RECORD') and liberator_permit_llpg.import_date = (SELECT MAX(liberator_permit_llpg.import_date) FROM liberator_permit_llpg) +FROM "dataplatform-prod-liberator-raw-zone".liberator_permit_llpg +where (ADDRESS1 like 'Street Record' or ADDRESS1 like 'STREET RECORD') and liberator_permit_llpg.import_date = (SELECT MAX(liberator_permit_llpg.import_date) FROM "dataplatform-prod-liberator-raw-zone".liberator_permit_llpg) ) , llpg as ( - SELECT * FROM unrestricted_address_api_dbo_hackney_address where unrestricted_address_api_dbo_hackney_address.import_date = (SELECT max(unrestricted_address_api_dbo_hackney_address.import_date) FROM unrestricted_address_api_dbo_hackney_address) and lpi_logical_status like 'Approved Preferred' + SELECT + -- conversion of new column names and data-types to support the original query... + uprn, -- bigint -- Confirmed: was previously converted from double to bigint to prevent subsequent varchar casts being broken. + usrn, -- int + street_description, -- string + isparent AS property_shell, -- boolean + blpu_class, -- string + usage_primary, -- string + usage_description, -- string + planning_use_class, -- string + CAST(longitude AS DOUBLE) AS longitude, -- converts string to double + CAST(latitude AS DOUBLE) AS latitude, -- converts string to double + lpi_logical_status_code, -- int BS7666 code + lpi_logical_status, --string logical description as per previous API version + import_date -- string + FROM "unrestricted-raw-zone"."geolive_llpg_llpg_address" + -- replaces "dataplatform-prod-raw-zone-unrestricted-address-api"."unrestricted_address_api_dbo_hackney_address" + where import_date = (SELECT max(import_date) FROM "unrestricted-raw-zone"."geolive_llpg_llpg_address") + and lpi_logical_status_code = 1 -- like 'Approved Preferred' ) -SELECT street.usrn as sr_usrn, SR_ADDRESS1, SR_ADDRESS2, llpg.street_description, SR_WARD_CODE, SR_ward_name, llpg.property_shell, llpg.blpu_class, llpg.usage_primary, llpg.usage_description -,concat(cast(street.usrn as string),' - ', llpg.street_description) as street, concat(cast(llpg.uprn as string),' - ',llpg.usage_description) as add_type , concat(llpg.blpu_class,' - ',llpg.planning_use_class ) as add_class, +SELECT + street.usrn as sr_usrn, + SR_ADDRESS1, SR_ADDRESS2, + llpg.street_description, + SR_WARD_CODE, + SR_ward_name, + llpg.property_shell, + llpg.blpu_class, + llpg.usage_primary, + llpg.usage_description, + concat(cast(street.usrn as varchar),' - ', llpg.street_description) as street, + concat(cast(llpg.uprn as varchar),' - ',llpg.usage_description) as add_type, + concat(llpg.blpu_class,' - ',llpg.planning_use_class ) as add_class, case when cpz !='' and cpz_name != '' then concat(cpz,' - ', cpz_name) -when cpz !='' and cpz_name = '' then concat(cpz) -when cpz ='' and cpz_name != '' then concat(cpz_name) +when cpz !='' and cpz_name = '' then concat(cpz,'-') +when cpz ='' and cpz_name != '' then concat(cpz_name,'-') else 'NONE' end as zone_name, case @@ -219,49 +192,21 @@ def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFra , Case when ltn_london_fields.uprn !='' then 'LTN London Fields' else 'NOT LTN London Fields' end as flag_name_ltn_london_fields -, parking_permit_denormalised_data.*, llpg.planning_use_class, llpg.longitude, llpg.latitude FROM parking_permit_denormalised_data +, parking_permit_denormalised_data.*, llpg.planning_use_class, llpg.longitude, llpg.latitude FROM "dataplatform-prod-liberator-refined-zone".parking_permit_denormalised_data -left join llpg on cast(llpg.uprn as string) = cast(parking_permit_denormalised_data.uprn as string) /*and -cast(concat(parking_permit_denormalised_data.import_year,parking_permit_denormalised_data.import_month,parking_permit_denormalised_data.import_day) as string +left join llpg on cast(llpg.uprn as varchar) = cast(parking_permit_denormalised_data.uprn as varchar) /*and +cast(concat(parking_permit_denormalised_data.import_year,parking_permit_denormalised_data.import_month,parking_permit_denormalised_data.import_day) as varchar ) = llpg.import_date*/ -left join street on cast(street.usrn as string) = cast(llpg.usrn as string) +left join street on cast(street.usrn as varchar) = cast(llpg.usrn as varchar) -left join ltn_london_fields on ltn_london_fields.uprn = parking_permit_denormalised_data.uprn - - -where parking_permit_denormalised_data.import_date = (SELECT max(parking_permit_denormalised_data.import_date) FROM parking_permit_denormalised_data) +left join "parking-raw-zone".ltn_london_fields on ltn_london_fields.uprn = parking_permit_denormalised_data.uprn +where parking_permit_denormalised_data.import_date = (SELECT max(parking_permit_denormalised_data.import_date) FROM "dataplatform-prod-liberator-refined-zone".parking_permit_denormalised_data) +; """ -ApplyMappingNew_node1657710041310 = sparkSqlQuery( - glueContext, - query=SqlQuery0, - mapping={ - "parking_permit_denormalised_data": S3bucketrefinedparking_permit_denormalised_data_node1, - "liberator_permit_llpg": AmazonS3rawliberator_permit_llpg_node1657535904691, - "unrestricted_address_api_dbo_hackney_address": AmazonS3unrestricted_address_api_dbo_hackney_address_node1657535910004, - "ltn_london_fields": AmazonS3parkingrawltn_london_fields_node1657536241729, - }, - transformation_ctx="ApplyMappingNew_node1657710041310", -) -# Script generated for node Amazon S3 -AmazonS3_node1657709892303 = glueContext.getSink( - path="s3://dataplatform-" - + environment - + "-refined-zone/parking/liberator/parking_permit_denormalised_gds_street_llpg/", - connection_type="s3", - updateBehavior="UPDATE_IN_DATABASE", - partitionKeys=["import_year", "import_month", "import_day", "import_date"], - compression="snappy", - enableUpdateCatalog=True, - transformation_ctx="AmazonS3_node1657709892303", -) -AmazonS3_node1657709892303.setCatalogInfo( - catalogDatabase="dataplatform-" + environment + "-liberator-refined-zone", - catalogTableName="parking_permit_denormalised_gds_street_llpg", +create_update_table_with_partition( + environment=environment, query_on_athena=query_on_athena, table_name=table_name ) -AmazonS3_node1657709892303.setFormat("glueparquet") -AmazonS3_node1657709892303.writeFrame(ApplyMappingNew_node1657710041310) -job.commit()