diff --git a/scripts/jobs/parking/parking_all_suspensions_processed_review.py b/scripts/jobs/parking/parking_all_suspensions_processed_review.py deleted file mode 100644 index aa300c3d6..000000000 --- a/scripts/jobs/parking/parking_all_suspensions_processed_review.py +++ /dev/null @@ -1,173 +0,0 @@ -""" -Only need to change the table name and the query prototyped on the Athena UI -by replacing table_name and query_on_athena -""" - -from scripts.helpers.athena_helpers import create_update_table_with_partition -from scripts.helpers.helpers import get_glue_env_var - -environment = get_glue_env_var("environment") - -# The target table in liberator refined zone -table_name = "parking_all_suspensions_processed_review" - -# The exact same query prototyped in pre-prod(stg) orprod Athena -query_on_athena = """ -/********************************************************************************* -parking_all_suspensions_processed_review - -SQL TO create the ALL of the unique Suspension activities (accept/amend/reject/etc.) -that have been processed - -20/02/2024 - Create Query -*********************************************************************************/ -/** Obtain the Suspension activity **/ -With Sus_Activity_Approved as ( - SELECT - permit_referece, activity_date, activity,activity_by, - ROW_NUMBER() OVER ( PARTITION BY permit_referece, activity_by, cast(activity_date as date) - ORDER BY permit_referece, activity_by, activity_date DESC) R1 - FROM "dataplatform-stg-liberator-raw-zone".liberator_permit_activity - WHERE import_Date = format_datetime(current_date, 'yyyyMMdd') - AND permit_referece like 'HYS%' - AND (lower(activity) like '%approved%' OR lower(activity) like '%amend%') - AND activity_by != 'system' and activity_by not like '%@%'), - -Sus_Activity_rejected as ( - SELECT - permit_referece, activity_date, activity,activity_by, - ROW_NUMBER() OVER ( PARTITION BY permit_referece, activity_by, cast(activity_date as date) - ORDER BY permit_referece, activity_by, activity_date DESC) R2 - FROM "dataplatform-stg-liberator-raw-zone".liberator_permit_activity - WHERE import_Date = format_datetime(current_date, 'yyyyMMdd') - AND permit_referece like 'HYS%' - AND lower(activity) like '%reject%' - AND activity_by != 'system' and activity_by not like '%@%'), - -Sus_Activity_cancelled as ( - SELECT - permit_referece, activity_date, activity,activity_by, - ROW_NUMBER() OVER ( PARTITION BY permit_referece, activity_by, cast(activity_date as date) - ORDER BY permit_referece, activity_by, activity_date DESC) R3 - FROM "dataplatform-stg-liberator-raw-zone".liberator_permit_activity - WHERE import_Date = format_datetime(current_date, 'yyyyMMdd') - AND permit_referece like 'HYS%' - AND lower(activity) like '%cancelled%' - AND activity_by != 'system' and activity_by not like '%@%'), - -Sus_Activity_additional as ( - SELECT - permit_referece, activity_date, activity,activity_by, - ROW_NUMBER() OVER ( PARTITION BY permit_referece, activity_by, cast(activity_date as date) - ORDER BY permit_referece, activity_by, activity_date DESC) R4 - FROM "dataplatform-stg-liberator-raw-zone".liberator_permit_activity - WHERE import_Date = format_datetime(current_date, 'yyyyMMdd') - AND permit_referece like 'HYS%' - AND lower(activity) like '%additional%' - AND activity_by != 'system' and activity_by not like '%@%'), - -Sus_Activity_duration as ( - SELECT - permit_referece, activity_date, activity,activity_by, - ROW_NUMBER() OVER ( PARTITION BY permit_referece, activity_by, cast(activity_date as date) - ORDER BY permit_referece, activity_by, activity_date DESC) R5 - FROM "dataplatform-stg-liberator-raw-zone".liberator_permit_activity - WHERE import_Date = format_datetime(current_date, 'yyyyMMdd') - AND permit_referece like 'HYS%' - AND lower(activity) like '%suspension duration changed%' - AND activity_by != 'system' and activity_by not like '%@%'), - - Sus_Activity_refund as ( - SELECT - permit_referece, activity_date, activity,activity_by, - ROW_NUMBER() OVER ( PARTITION BY permit_referece, activity_by, cast(activity_date as date) - ORDER BY permit_referece, activity_by, activity_date DESC) R6 - FROM "dataplatform-stg-liberator-raw-zone".liberator_permit_activity - WHERE import_Date = format_datetime(current_date, 'yyyyMMdd') - AND permit_referece like 'HYS%' - AND lower(activity) like '%Manual refund processed%' - AND activity_by != 'system' and activity_by not like '%@%'), - -Sus_Activity as ( - SELECT * FROM Sus_Activity_Approved - WHERE R1 = 1 - union - SELECT * FROM Sus_Activity_rejected - WHERE R2 = 1 - union - SELECT * FROM Sus_Activity_cancelled - WHERE R3 = 1 - union - SELECT * FROM Sus_Activity_additional - WHERE R4 = 1 - union - SELECT * FROM Sus_Activity_duration - WHERE R5 = 1 - union - SELECT * FROM Sus_Activity_refund - WHERE R6 = 1), - -/*** Calendar import ***/ -Calendar as ( -Select - *, - CAST(CASE - When date like '%/%'Then substr(date, 7, 4)||'-'|| - substr(date, 4, 2)||'-'|| - substr(date, 1, 2) - ELSE substr(date, 1, 10) - END as date) as Format_date - From "parking-raw-zone".calendar - WHERE import_Date = format_datetime(current_date, 'yyyyMMdd')), - -CalendarMAX as ( -Select MAX(fin_year) as Max_Fin_Year From calendar -WHERE import_date = (Select MAX(import_date) from calendar)), - -/*** Select the Suspension denormal data ***/ -Suspensions_basic as ( - SELECT - suspensions_reference, - applicationdate - FROM "dataplatform-stg-liberator-refined-zone".parking_suspension_denormalised_data - WHERE import_Date = format_datetime(current_date, 'yyyyMMdd')), - -Suspensions as ( - SELECT - B.suspensions_reference, - B.applicationdate, - A.activity_date, - date_diff('day',B.applicationdate,A.activity_date) as DateDiff, - A.activity, A.activity_by - FROM Sus_Activity as A - INNER JOIN Suspensions_basic as B - ON A.permit_referece = B.suspensions_reference) - -/** Output the data **/ -SELECT - A.*, - - /** Obtain the Fin year flag and fin year ***/ - CASE - When H.Fin_Year = (Select Max_Fin_Year From CalendarMAX) Then 'Current' - When H.Fin_Year = (Select CAST(Cast(Max_Fin_Year as int)-1 as varchar(4)) From CalendarMAX) Then 'Previous' - Else '' - END as Fin_Year_Flag, - - H.Fin_Year, - - format_datetime(CAST(CURRENT_TIMESTAMP AS timestamp), - 'yyyy-MM-dd HH:mm:ss') AS import_date_timestamp, - - format_datetime(current_date, 'yyyy') AS import_year, - format_datetime(current_date, 'MM') AS import_month, - format_datetime(current_date, 'dd') AS import_day, - format_datetime(current_date, 'yyyyMMdd') AS import_date -FROM Suspensions as A -LEFT JOIN Calendar as H - ON CAST(substr(cast(applicationdate as varchar),1, 10) as date) = cast(Format_date as date) -""" - -create_update_table_with_partition( - environment=environment, query_on_athena=query_on_athena, table_name=table_name -) diff --git a/scripts/jobs/parking/parking_suspension_de-normalised_data.py b/scripts/jobs/parking/parking_suspension_de-normalised_data.py deleted file mode 100644 index f5e844ac5..000000000 --- a/scripts/jobs/parking/parking_suspension_de-normalised_data.py +++ /dev/null @@ -1,278 +0,0 @@ -import sys - -from awsglue import DynamicFrame -from awsglue.context import GlueContext -from awsglue.job import Job -from awsglue.transforms import * -from awsglue.utils import getResolvedOptions -from pyspark.context import SparkContext - -from scripts.helpers.helpers import create_pushdown_predicate, get_glue_env_var - -environment = get_glue_env_var("environment") - - -def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame: - for alias, frame in mapping.items(): - frame.toDF().createOrReplaceTempView(alias) - result = spark.sql(query) - return DynamicFrame.fromDF(result, glueContext, transformation_ctx) - - -args = getResolvedOptions(sys.argv, ["JOB_NAME"]) -sc = SparkContext() -glueContext = GlueContext(sc) -spark = glueContext.spark_session -job = Job(glueContext) -job.init(args["JOB_NAME"], args) - -# Script generated for node Amazon S3 -AmazonS3_node1627053246341 = glueContext.create_dynamic_frame.from_catalog( - database="dataplatform-" + environment + "-liberator-raw-zone", - table_name="liberator_permit_suspension_change", - transformation_ctx="AmazonS3_node1627053246341", - push_down_predicate=create_pushdown_predicate("import_date", 7), -) - -# Script generated for node Amazon S3 -AmazonS3_node1627053109317 = glueContext.create_dynamic_frame.from_catalog( - database="dataplatform-" + environment + "-liberator-raw-zone", - table_name="liberator_permit_activity", - transformation_ctx="AmazonS3_node1627053109317", - push_down_predicate=create_pushdown_predicate("import_date", 7), -) - -# Script generated for node Amazon S3 -AmazonS3_node1627053334221 = glueContext.create_dynamic_frame.from_catalog( - database="dataplatform-" + environment + "-liberator-raw-zone", - table_name="liberator_permit_suspension", - transformation_ctx="AmazonS3_node1627053334221", - push_down_predicate=create_pushdown_predicate("import_date", 7), -) - -# Script generated for node Amazon S3 -AmazonS3_node1625732651466 = glueContext.create_dynamic_frame.from_catalog( - database="dataplatform-" + environment + "-liberator-raw-zone", - table_name="liberator_permit_status", - transformation_ctx="AmazonS3_node1625732651466", - push_down_predicate=create_pushdown_predicate("import_date", 7), -) - -# Script generated for node ApplyMapping -SqlQuery0 = """ -/************************************************************************************************************************* -SuspensionDeNormalisation - -The SQL denormalises the Suspension data into a SINGLE row for each of the Suspension requests - -19/07/2021 - Create SQL -20/08/2021 - changed because I did not have an HYS filter? -26/09/2022 - Add an additional check for ONLY suspensions -*************************************************************************************************************************/ - -/************************************************************************************************************************ -Get the LATEST Suspension status -*************************************************************************************************************************/ -WITH SuspensionStatus as ( -SELECT - permit_referece, - CAST(status_date as Timestamp) as status_date, - status, - status_change_by, - ROW_NUMBER() OVER ( PARTITION BY permit_referece ORDER BY permit_referece, status_date DESC) row_num -FROM liberator_permit_status -WHERE permit_referece like 'HYS%' AND - import_Date = (Select MAX(import_date) from liberator_permit_status)), - --- Get the various status -SusStatusCreated as ( -SELECT *, ROW_NUMBER() OVER ( PARTITION BY permit_referece ORDER BY permit_referece, status_date DESC) RNum -FROM SuspensionStatus -WHERE status = 'Created'), - -SusStatusReceived as ( -SELECT *, ROW_NUMBER() OVER ( PARTITION BY permit_referece ORDER BY permit_referece, status_date DESC) RNum -FROM SuspensionStatus -WHERE status = 'Received'), - -SusStatusExtnReq as ( -SELECT *, ROW_NUMBER() OVER ( PARTITION BY permit_referece ORDER BY permit_referece, status_date DESC) RNum -FROM SuspensionStatus -WHERE status = 'Extension Requested'), - -SusStatusNotRecd as ( -SELECT *, ROW_NUMBER() OVER ( PARTITION BY permit_referece ORDER BY permit_referece, status_date DESC) RNum -FROM SuspensionStatus -WHERE status = 'Not-Received'), - -SusStatusExtnApp as ( -SELECT *, ROW_NUMBER() OVER ( PARTITION BY permit_referece ORDER BY permit_referece, status_date DESC) RNum -FROM SuspensionStatus -WHERE status = 'Extension Approved'), - -SusStatusExtnRej as ( -SELECT *, ROW_NUMBER() OVER ( PARTITION BY permit_referece ORDER BY permit_referece, status_date DESC) RNum -FROM SuspensionStatus -WHERE status = 'Extension Rejected'), - -SusStatusSignUp as ( -SELECT *, ROW_NUMBER() OVER ( PARTITION BY permit_referece ORDER BY permit_referece, status_date DESC) RNum -FROM SuspensionStatus -WHERE status = 'Sign up'), - -SusStatusAppRej as ( -SELECT *, ROW_NUMBER() OVER ( PARTITION BY permit_referece ORDER BY permit_referece, status_date DESC) RNum -FROM SuspensionStatus -WHERE status = 'Approved' OR status = 'Rejected'), - -SusStatusCancelled as ( -SELECT *, ROW_NUMBER() OVER ( PARTITION BY permit_referece ORDER BY permit_referece, status_date DESC) RNum -FROM SuspensionStatus -WHERE status = 'Cancelled'), - -/*************************************************************************************************************************************** -Obtain the latest suspension approval records -****************************************************************************************************************************************/ SusApprovalPre as ( - -SELECT permit_referece as PermitReference, - CAST(status_date as timestamp) as ApprovalDate, - status as Approvaltype, - status_change_by as ApprovedBy - -FROM SuspensionStatus -WHERE Status LIKE '%Approved%' OR Status LIKE '%Rejected%' -UNION ALL -SELECT permit_referece, - CAST(activity_date as timestamp), - 'Additional evidence requested', - activity_by -FROM liberator_permit_activity as A -WHERE import_Date = (Select MAX(import_date) from liberator_permit_activity) AND activity like 'Additional evidence requested%' - AND A.permit_referece like 'HYS%'), - --- Find the latest approval date -SusApproval as ( -SELECT *, - ROW_NUMBER() OVER ( PARTITION BY PermitReference ORDER BY PermitReference, ApprovalDate DESC) row_num -FROM SusApprovalPre), - -/*************************************************************************************************************************************** -Get the Suspension change ecords and format the dates to allow a second stage to find the latest -****************************************************************************************************************************************/ -LibSusChange_Before as ( - SELECT - supension_reference, cast(supension_change_application_date as timestamp) as Change_App_Date, new_start_date, new_end_date, - supension_change_amount, supension_change_payment_date, supension_change_payment_status - -FROM liberator_permit_suspension_change -WHERE import_Date = (Select MAX(import_date) from liberator_permit_suspension_change)), - --- Format the data to find the very LATEST suspension change -LibSusChange as ( -SELECT - *, - ROW_NUMBER() OVER ( PARTITION BY supension_reference ORDER BY supension_reference, Change_App_Date DESC) row_num -FROM LibSusChange_Before), - -/*************************************************************************************************************************************** -Get the 'base' Suspension Data -****************************************************************************************************************************************/ -LibSusData as ( -SELECT suspensions_reference, - -- Cast the application date from string to date - CASE When application_date = '' Then cast(NULL as timestamp) - ELSE CAST(application_date as timestamp) END as ApplicationDate, - forename_of_applicant, surname_of_applicant, email_address_of_applicant, - -- CAST The Start Date from string to date - CASE When start_date = '' Then cast(NULL as timestamp) - ELSE CAST(start_date as timestamp) END as StartDate, - -- CAST The End Date from string to date - CASE When end_date = '' Then cast(NULL as timestamp) - ELSE CAST(end_date as timestamp) END as EndDate, - start_time, end_time, - -- CAST The amount paid from string to money/decimal - CASE When amount = '' Then cast(0 as decimal(11,2)) - ELSE CAST(amount as decimal(11,2)) END as Payment, - -- CAST The payment Date from string to date - CASE When payment_date = '' Then cast(NULL as timestamp) - ELSE CAST(payment_date as timestamp) END as PaymentDate, - payment_received as PaymentStatus, - permit_type, business_name, applicant_address, - -- CAST The number of bays from string to integer - CASE When number_of_bays = '' Then cast(0 as int) - ELSE CAST(number_of_bays as int) END as number_of_bays, - street_name, usrn, suspension_reason - -FROM liberator_permit_suspension -WHERE import_Date = (Select MAX(import_date) from liberator_permit_suspension)) - -/*************************************************************************************************************************************** -Combine the CTR data -****************************************************************************************************************************************/ -SELECT A.*, - Change_App_Date,new_start_date,new_end_date, - C.status_date as LatestStatusDate, - C.status as LatestStatus, - C.status_change_by as LatestStatusChangeBy, - D.status_date as Created_Date, - E.status_date as Received_Date, - F.status_date as Extension_Req_Date, - G.status_date as Not_Received_Date, - H.status_date as Extn_Approved_Date, - I.status_date as Extn_Rejected_Date, - J.status_date as Signs_Up_Date, - K.status_date as App_Reject_Date, - K.status as App_Reject_status, - L.status_date as Cancel_Date, - - current_timestamp() as ImportDateTime, - date_format(current_date, 'yyyy') AS import_year, - date_format(current_date, 'MM') AS import_month, - date_format(current_date, 'dd') AS import_day, - date_format(current_date, 'yyyyMMdd') AS import_date - - -FROM LibSusData as A -LEFT JOIN LibSusChange as B ON A.suspensions_reference = B.supension_reference AND B.row_num = 1 -LEFT JOIN SuspensionStatus as C ON A.suspensions_reference = C.permit_referece AND C.row_num = 1 -LEFT JOIN SusStatusCreated as D ON A.suspensions_reference = D.permit_referece AND D.row_num = 1 -LEFT JOIN SusStatusReceived as E ON A.suspensions_reference = E.permit_referece AND E.row_num = 1 -LEFT JOIN SusStatusExtnReq as F ON A.suspensions_reference = F.permit_referece AND F.row_num = 1 -LEFT JOIN SusStatusNotRecd as G ON A.suspensions_reference = G.permit_referece AND G.row_num = 1 -LEFT JOIN SusStatusExtnApp as H ON A.suspensions_reference = H.permit_referece AND H.row_num = 1 -LEFT JOIN SusStatusExtnRej as I ON A.suspensions_reference = I.permit_referece AND I.row_num = 1 -LEFT JOIN SusStatusSignUp as J ON A.suspensions_reference = J.permit_referece AND J.row_num = 1 -LEFT JOIN SusStatusAppRej as K ON A.suspensions_reference = K.permit_referece AND K.row_num = 1 -LEFT JOIN SusStatusCancelled as L ON A.suspensions_reference = L.permit_referece AND L.row_num = 1 -WHERE lower(permit_type) = 'suspension' -""" -ApplyMapping_node2 = sparkSqlQuery( - glueContext, - query=SqlQuery0, - mapping={ - "liberator_permit_status": AmazonS3_node1625732651466, - "liberator_permit_activity": AmazonS3_node1627053109317, - "liberator_permit_suspension_change": AmazonS3_node1627053246341, - "liberator_permit_suspension": AmazonS3_node1627053334221, - }, - transformation_ctx="ApplyMapping_node2", -) - -# Script generated for node S3 bucket -S3bucket_node3 = glueContext.getSink( - path="s3://dataplatform-" - + environment - + "-refined-zone/parking/liberator/Parking_Suspension_DeNormalised_Data/", - connection_type="s3", - updateBehavior="UPDATE_IN_DATABASE", - partitionKeys=["import_year", "import_month", "import_day", "import_date"], - enableUpdateCatalog=True, - transformation_ctx="S3bucket_node3", -) -S3bucket_node3.setCatalogInfo( - catalogDatabase="dataplatform-" + environment + "-liberator-refined-zone", - catalogTableName="Parking_Suspension_DeNormalised_Data", -) -S3bucket_node3.setFormat("glueparquet") -S3bucket_node3.writeFrame(ApplyMapping_node2) -job.commit() diff --git a/scripts/jobs/parking/parking_suspensions_processed.py b/scripts/jobs/parking/parking_suspensions_processed.py deleted file mode 100644 index feedd8337..000000000 --- a/scripts/jobs/parking/parking_suspensions_processed.py +++ /dev/null @@ -1,125 +0,0 @@ -import sys - -from awsglue import DynamicFrame -from awsglue.context import GlueContext -from awsglue.job import Job -from awsglue.transforms import * -from awsglue.utils import getResolvedOptions -from pyspark.context import SparkContext - -from scripts.helpers.helpers import ( - PARTITION_KEYS, - create_pushdown_predicate, - create_pushdown_predicate_for_max_date_partition_value, - get_glue_env_var, - get_latest_partitions, -) - - -def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame: - for alias, frame in mapping.items(): - frame.toDF().createOrReplaceTempView(alias) - result = spark.sql(query) - return DynamicFrame.fromDF(result, glueContext, transformation_ctx) - - -args = getResolvedOptions(sys.argv, ["JOB_NAME"]) -sc = SparkContext() -glueContext = GlueContext(sc) -spark = glueContext.spark_session -job = Job(glueContext) -job.init(args["JOB_NAME"], args) - -environment = get_glue_env_var("environment") - -# Script generated for node Amazon S3 -AmazonS3_node1658997944648 = glueContext.create_dynamic_frame.from_catalog( - database="dataplatform-" + environment + "-liberator-raw-zone", - table_name="liberator_permit_activity", - transformation_ctx="AmazonS3_node1658997944648", - push_down_predicate=create_pushdown_predicate("import_date", 7), -) - -# Script generated for node Amazon S3 -AmazonS3_node1661350417347 = glueContext.create_dynamic_frame.from_catalog( - database="dataplatform-" + environment + "-liberator-refined-zone", - table_name="parking_suspension_denormalised_data", - transformation_ctx="AmazonS3_node1661350417347", - push_down_predicate=create_pushdown_predicate("import_date", 7), -) - -# Script generated for node SQL -SqlQuery177 = """ -/********************************************************************************* -Parking_Suspensions_Processed - -SQL TO create the suspension processed, to identify the No. od days that a -Suspension takes to be processed to accept/amend/reject/etc. - -19/10/2022 - Create Query -*********************************************************************************/ -/** Obtain the Suspension activity **/ -With Sus_Activity as ( - SELECT - permit_referece, activity_date, activity,activity_by, - ROW_NUMBER() OVER ( PARTITION BY permit_referece ORDER BY permit_referece, activity_date ASC) row_num - FROM liberator_permit_activity - WHERE import_date = (Select MAX(import_date) - from liberator_permit_activity) - AND permit_referece like 'HYS%' - AND (lower(activity) like '%approved%' OR lower(activity) like '%rejected%' - OR lower(activity) like '%amend%' OR lower(activity) like '%additional%')), - -/** Link the earliest activity to a suspension, obtain the days diff **/ -Suspensions as ( - select - suspensions_reference, - applicationdate, - activity_date, - datediff(activity_date,applicationdate) as DateDiff, - activity, activity_by - From parking_suspension_denormalised_data as A - INNER JOIN Sus_Activity as B ON A.suspensions_reference = B.permit_referece AND B.row_num = 1 - WHERE import_date = (Select MAX(import_date) from parking_suspension_denormalised_data) - ) - -/** Output the data **/ -SELECT - *, - - date_format(CAST(CURRENT_TIMESTAMP AS timestamp), 'yyyy-MM-dd HH:mm:ss') AS ImportDateTime, - date_format(current_date, 'yyyy') AS import_year, - date_format(current_date, 'MM') AS import_month, - date_format(current_date, 'dd') AS import_day, - date_format(current_date, 'yyyyMMdd') AS import_date -FROM Suspensions - -""" -SQL_node1658765472050 = sparkSqlQuery( - glueContext, - query=SqlQuery177, - mapping={ - "liberator_permit_activity": AmazonS3_node1658997944648, - "parking_suspension_denormalised_data": AmazonS3_node1661350417347, - }, - transformation_ctx="SQL_node1658765472050", -) - -# Script generated for node Amazon S3 -AmazonS3_node1658765590649 = glueContext.getSink( - path="s3://dataplatform-" - + environment - + "-refined-zone/parking/liberator/Parking_Suspensions_Processed/", - connection_type="s3", - updateBehavior="UPDATE_IN_DATABASE", - partitionKeys=PARTITION_KEYS, - enableUpdateCatalog=True, - transformation_ctx="AmazonS3_node1658765590649", -) -AmazonS3_node1658765590649.setCatalogInfo( - catalogDatabase="dataplatform-" + environment + "-liberator-refined-zone", - catalogTableName="Parking_Suspensions_Processed", -) -AmazonS3_node1658765590649.setFormat("glueparquet", compression="snappy") -AmazonS3_node1658765590649.writeFrame(SQL_node1658765472050) -job.commit() diff --git a/scripts/jobs/parking/parking_suspensions_processed_with_finyear.py b/scripts/jobs/parking/parking_suspensions_processed_with_finyear.py deleted file mode 100644 index f8aa67192..000000000 --- a/scripts/jobs/parking/parking_suspensions_processed_with_finyear.py +++ /dev/null @@ -1,160 +0,0 @@ -import sys - -from awsglue import DynamicFrame -from awsglue.context import GlueContext -from awsglue.job import Job -from awsglue.transforms import * -from awsglue.utils import getResolvedOptions -from pyspark.context import SparkContext - -from scripts.helpers.helpers import ( - PARTITION_KEYS, - create_pushdown_predicate, - create_pushdown_predicate_for_max_date_partition_value, - get_glue_env_var, - get_latest_partitions, -) - - -def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame: - for alias, frame in mapping.items(): - frame.toDF().createOrReplaceTempView(alias) - result = spark.sql(query) - return DynamicFrame.fromDF(result, glueContext, transformation_ctx) - - -args = getResolvedOptions(sys.argv, ["JOB_NAME"]) -sc = SparkContext() -glueContext = GlueContext(sc) -spark = glueContext.spark_session -job = Job(glueContext) -job.init(args["JOB_NAME"], args) - -environment = get_glue_env_var("environment") - -# Script generated for node Amazon S3 -AmazonS3_node1658997944648 = glueContext.create_dynamic_frame.from_catalog( - database="dataplatform-" + environment + "-liberator-raw-zone", - table_name="liberator_permit_activity", - transformation_ctx="AmazonS3_node1658997944648", - push_down_predicate=create_pushdown_predicate("import_date", 7), -) - -# Script generated for node Amazon S3 -AmazonS3_node1661350417347 = glueContext.create_dynamic_frame.from_catalog( - database="dataplatform-" + environment + "-liberator-refined-zone", - table_name="parking_suspension_denormalised_data", - transformation_ctx="AmazonS3_node1661350417347", - push_down_predicate=create_pushdown_predicate("import_date", 7), -) - -# Script generated for node Amazon S3 -AmazonS3_node1702397632233 = glueContext.create_dynamic_frame.from_catalog( - database="parking-raw-zone", - table_name="calendar", - transformation_ctx="AmazonS3_node1702397632233", -) - -# Script generated for node SQL -SqlQuery200 = """ -/********************************************************************************* -Parking_Suspensions_Processed - -SQL TO create the suspension processed, to identify the No. od days that a -Suspension takes to be processed to accept/amend/reject/etc. - -19/10/2022 - Create Query -*********************************************************************************/ -/** Obtain the Suspension activity **/ -With Sus_Activity as ( - SELECT - permit_referece, activity_date, activity,activity_by, - ROW_NUMBER() OVER ( PARTITION BY permit_referece ORDER BY permit_referece, activity_date ASC) row_num - FROM liberator_permit_activity - WHERE import_date = (Select MAX(import_date) - from liberator_permit_activity) - AND permit_referece like 'HYS%' - AND (lower(activity) like '%approved%' OR lower(activity) like '%rejected%' - OR lower(activity) like '%amend%' OR lower(activity) like '%additional%')), - -/*** Calendar import ***/ -Calendar as ( -Select - *, - CAST(CASE - When date like '%/%'Then substr(date, 7, 4)||'-'|| - substr(date, 4, 2)||'-'|| - substr(date, 1, 2) - ELSE substr(date, 1, 10) - END as date) as Format_date -From calendar -WHERE import_date = (Select MAX(import_date) from calendar)), - -CalendarMAX as ( -Select MAX(fin_year) as Max_Fin_Year From calendar -WHERE import_date = (Select MAX(import_date) from calendar)), - -/** Link the earliest activity to a suspension, obtain the days diff **/ -Suspensions as ( - select - suspensions_reference, - applicationdate, - activity_date, - datediff(activity_date,applicationdate) as DateDiff, - activity, activity_by - From parking_suspension_denormalised_data as A - INNER JOIN Sus_Activity as B ON A.suspensions_reference = B.permit_referece AND B.row_num = 1 - WHERE import_date = (Select MAX(import_date) from parking_suspension_denormalised_data) - ) - -/** Output the data **/ -SELECT - A.*, - - /** Obtain the Fin year flag and fin year ***/ - CASE - When H.Fin_Year = (Select Max_Fin_Year From CalendarMAX) Then 'Current' - When H.Fin_Year = (Select CAST(Cast(Max_Fin_Year as int)-1 as varchar(4)) From CalendarMAX) Then 'Previous' - Else '' - END as Fin_Year_Flag, - - H.Fin_Year, - - date_format(CAST(CURRENT_TIMESTAMP AS timestamp), 'yyyy-MM-dd HH:mm:ss') AS ImportDateTime, - date_format(current_date, 'yyyy') AS import_year, - date_format(current_date, 'MM') AS import_month, - date_format(current_date, 'dd') AS import_day, - date_format(current_date, 'yyyyMMdd') AS import_date -FROM Suspensions as A -LEFT JOIN Calendar as H ON CAST(substr(cast(applicationdate as string),1, 10) as date) - = cast(Format_date as date) -""" -SQL_node1658765472050 = sparkSqlQuery( - glueContext, - query=SqlQuery200, - mapping={ - "liberator_permit_activity": AmazonS3_node1658997944648, - "parking_suspension_denormalised_data": AmazonS3_node1661350417347, - "calendar": AmazonS3_node1702397632233, - }, - transformation_ctx="SQL_node1658765472050", -) - -# Script generated for node Amazon S3 -AmazonS3_node1658765590649 = glueContext.getSink( - path="s3://dataplatform-" - + environment - + "-refined-zone/parking/liberator/parking_suspensions_processed_with_finyear/", - connection_type="s3", - updateBehavior="UPDATE_IN_DATABASE", - partitionKeys=PARTITION_KEYS, - enableUpdateCatalog=True, - transformation_ctx="AmazonS3_node1658765590649", -) -AmazonS3_node1658765590649.setCatalogInfo( - catalogDatabase="dataplatform-" + environment + "-liberator-refined-zone", - catalogTableName="parking_suspensions_processed_with_finyear", -) -AmazonS3_node1658765590649.setFormat("glueparquet", compression="snappy") -AmazonS3_node1658765590649.writeFrame(SQL_node1658765472050) -job.commit() diff --git a/terraform/etl/38-aws-glue-job-parking.tf b/terraform/etl/38-aws-glue-job-parking.tf index f5fb10dce..89aafb2b9 100644 --- a/terraform/etl/38-aws-glue-job-parking.tf +++ b/terraform/etl/38-aws-glue-job-parking.tf @@ -169,27 +169,9 @@ module "parking_pcn_ltn_report_summary" { } } -module "parking_suspension_de-normalised_data" { - source = "../modules/aws-glue-job" - is_live_environment = local.is_live_environment - is_production_environment = local.is_production_environment - department = module.department_parking_data_source - job_name = "${local.short_identifier_prefix}parking_suspension_de-normalised_data" - helper_module_key = data.aws_s3_object.helpers.key - pydeequ_zip_key = data.aws_s3_object.pydeequ.key - spark_ui_output_storage_id = module.spark_ui_output_storage_data_source.bucket_id - script_name = "parking_suspension_de-normalised_data" - triggered_by_job = "${local.short_identifier_prefix}Copy parking Liberator landing zone to raw" - job_description = "This job creates the Suspension de-normalised data" - workflow_name = "${local.short_identifier_prefix}parking-liberator-data-workflow" - number_of_workers_for_glue_job = 10 - glue_job_worker_type = "G.1X" - glue_version = "4.0" - job_parameters = { - "--job-bookmark-option" = "job-bookmark-disable" - "--environment" = var.environment - } -} +# migrated Parking_Suspension_DeNormalised_Data to airflow on 19/05/2025 + + module "parking_permit_de_normalisation" { @@ -562,53 +544,10 @@ module "parking_open_pcns_vrms_linked_cancelled_ringer" { } } # MRB 15-02-2024 job created -module "parking_suspensions_processed" { - source = "../modules/aws-glue-job" - is_live_environment = local.is_live_environment - is_production_environment = local.is_production_environment - department = module.department_parking_data_source - job_name = "${local.short_identifier_prefix}parking_suspensions_processed" - helper_module_key = data.aws_s3_object.helpers.key - pydeequ_zip_key = data.aws_s3_object.pydeequ.key - spark_ui_output_storage_id = module.spark_ui_output_storage_data_source.bucket_id - script_name = "parking_suspensions_processed" - triggered_by_job = "${local.short_identifier_prefix}Copy parking Liberator landing zone to raw" - job_description = "format suspensions processed" - workflow_name = "${local.short_identifier_prefix}parking-liberator-data-workflow" - trigger_enabled = local.is_production_environment - number_of_workers_for_glue_job = 10 - glue_job_worker_type = "G.1X" - glue_version = "4.0" - job_parameters = { - "--job-bookmark-option" = "job-bookmark-disable" - "--environment" = var.environment - "--conf" = "spark.sql.legacy.timeParserPolicy=LEGACY --conf spark.sql.legacy.parquet.int96RebaseModeInRead=LEGACY --conf spark.sql.legacy.parquet.int96RebaseModeInWrite=LEGACY --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=LEGACY --conf spark.sql.legacy.parquet.datetimeRebaseModeInWrite=LEGACY" - } -} -# MRB 15-02-2024 job created -module "parking_suspensions_processed_with_finyear" { - source = "../modules/aws-glue-job" - is_live_environment = local.is_live_environment - is_production_environment = local.is_production_environment - department = module.department_parking_data_source - job_name = "${local.short_identifier_prefix}parking_suspensions_processed_with_finyear" - helper_module_key = data.aws_s3_object.helpers.key - pydeequ_zip_key = data.aws_s3_object.pydeequ.key - spark_ui_output_storage_id = module.spark_ui_output_storage_data_source.bucket_id - script_name = "parking_suspensions_processed_with_finyear" - triggered_by_job = "${local.short_identifier_prefix}Copy parking Liberator landing zone to raw" - job_description = "format suspensions processed within financial year" - workflow_name = "${local.short_identifier_prefix}parking-liberator-data-workflow" - trigger_enabled = local.is_production_environment - number_of_workers_for_glue_job = 10 - glue_job_worker_type = "G.1X" - glue_version = "4.0" - job_parameters = { - "--job-bookmark-option" = "job-bookmark-disable" - "--environment" = var.environment - "--conf" = "spark.sql.legacy.timeParserPolicy=LEGACY --conf spark.sql.legacy.parquet.int96RebaseModeInRead=LEGACY --conf spark.sql.legacy.parquet.int96RebaseModeInWrite=LEGACY --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=LEGACY --conf spark.sql.legacy.parquet.datetimeRebaseModeInWrite=LEGACY" - } -} + +# migrated job "Parking_Suspensions_Processed" to dap-airflow on 19/05/2025 +# parking_suspensions_processed_with_finyear migrated to dap-airflow on 19/05/2025 + module "parking_pcn_dvla_response_no_address" { source = "../modules/aws-glue-job" @@ -664,26 +603,4 @@ module "parking_permit_street_cpz_stress_mc" { # Migrated job "parking_permit_denormalisation_mc" to dap-airflow om 01/05/2025 # MRB 20-08-2024 job created -module "parking_all_suspensions_processed_review" { - source = "../modules/aws-glue-job" - is_live_environment = local.is_live_environment - is_production_environment = local.is_production_environment - department = module.department_parking_data_source - job_name = "${local.short_identifier_prefix}parking_all_suspensions_processed_review" - helper_module_key = data.aws_s3_object.helpers.key - pydeequ_zip_key = data.aws_s3_object.pydeequ.key - spark_ui_output_storage_id = module.spark_ui_output_storage_data_source.bucket_id - script_name = "parking_all_suspensions_processed_review" - triggered_by_job = module.parking_suspension_de-normalised_data.job_name - job_description = "Review of all Suspension records" - workflow_name = "${local.short_identifier_prefix}parking-liberator-data-workflow" - trigger_enabled = local.is_production_environment - number_of_workers_for_glue_job = 2 # 2 minimum which is enough for this job - glue_job_worker_type = "G.1X" - glue_version = "4.0" - job_parameters = { - "--job-bookmark-option" = "job-bookmark-disable" - "--environment" = var.environment - "--conf" = "spark.sql.legacy.timeParserPolicy=LEGACY --conf spark.sql.legacy.parquet.int96RebaseModeInRead=LEGACY --conf spark.sql.legacy.parquet.int96RebaseModeInWrite=LEGACY --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=LEGACY --conf spark.sql.legacy.parquet.datetimeRebaseModeInWrite=LEGACY" - } -} +# Migrated job "parking_all_suspensions_processed_review" to dap-airflow om 19/05/2025