From 542c4476fbb8fa61dbd63aa27a33cfd97847b55f Mon Sep 17 00:00:00 2001 From: Tian Chen <38001883+Tian-2017@users.noreply.github.com> Date: Tue, 13 May 2025 15:16:14 +0100 Subject: [PATCH] remove the 7 tables from group 6 --- .../parking/parking_ceo_average_on_street.py | 185 ---------- ...ing_ceo_average_on_street_hrs_mins_secs.py | 215 ------------ scripts/jobs/parking/parking_ceo_on_street.py | 200 ----------- scripts/jobs/parking/parking_ceo_summary.py | 139 -------- .../parking_deployment_target_details.py | 327 ------------------ .../parking_percent_street_coverage.py | 103 ------ .../parking_percent_street_coverage_cpz.py | 107 ------ terraform/etl/38-aws-glue-job-parking.tf | 160 +-------- 8 files changed, 8 insertions(+), 1428 deletions(-) delete mode 100644 scripts/jobs/parking/parking_ceo_average_on_street.py delete mode 100644 scripts/jobs/parking/parking_ceo_average_on_street_hrs_mins_secs.py delete mode 100644 scripts/jobs/parking/parking_ceo_on_street.py delete mode 100644 scripts/jobs/parking/parking_ceo_summary.py delete mode 100644 scripts/jobs/parking/parking_deployment_target_details.py delete mode 100644 scripts/jobs/parking/parking_percent_street_coverage.py delete mode 100644 scripts/jobs/parking/parking_percent_street_coverage_cpz.py diff --git a/scripts/jobs/parking/parking_ceo_average_on_street.py b/scripts/jobs/parking/parking_ceo_average_on_street.py deleted file mode 100644 index 560b59dd1..000000000 --- a/scripts/jobs/parking/parking_ceo_average_on_street.py +++ /dev/null @@ -1,185 +0,0 @@ -import sys - -from awsglue import DynamicFrame -from awsglue.context import GlueContext -from awsglue.job import Job -from awsglue.transforms import * -from awsglue.utils import getResolvedOptions -from pyspark.context import SparkContext - -from scripts.helpers.helpers import get_glue_env_var - -environment = get_glue_env_var("environment") - - -def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame: - for alias, frame in mapping.items(): - frame.toDF().createOrReplaceTempView(alias) - result = spark.sql(query) - return DynamicFrame.fromDF(result, glueContext, transformation_ctx) - - -args = getResolvedOptions(sys.argv, ["JOB_NAME"]) -sc = SparkContext() -glueContext = GlueContext(sc) -spark = glueContext.spark_session -job = Job(glueContext) -job.init(args["JOB_NAME"], args) - -# Script generated for node Amazon S3 -AmazonS3_node1628173244776 = glueContext.create_dynamic_frame.from_catalog( - database="dataplatform-" + environment + "-liberator-refined-zone", - table_name="parking_ceo_on_street", - transformation_ctx="AmazonS3_node1628173244776", -) - -# Script generated for node Amazon S3 -AmazonS3_node1632912445458 = glueContext.create_dynamic_frame.from_catalog( - database="dataplatform-" + environment + "-liberator-refined-zone", - table_name="parking_ceo_summary", - transformation_ctx="AmazonS3_node1632912445458", -) - -# Script generated for node ApplyMapping -SqlQuery0 = """ -/****************************************************************************************************************************** -Parking_CEO_Average_On_Street - -This SQL creates the average CEO figures for time on street, breaks, etc - -15/11/2021 - create SQL - -******************************************************************************************************************************/ - -/****************************************************************************************************************************** -Obtain the Time On Street Average -******************************************************************************************************************************/ -with CEO_TimeOnStreet_Summary as ( - SELECT - CEO, patrol_date, - CAST(substr(cast(patrol_date as string), 1, 8)||'01' as date) as MonthYear, - SUM(timeonstreet_secs) as CEODailyBeatTime_secs - - FROM parking_ceo_on_street - WHERE import_Date = (Select MAX(import_date) from parking_ceo_on_street) and timeonstreet_secs > 0 - GROUP BY CEO, patrol_date - order by CEO, patrol_date), - -Monthly_AVG as ( - SELECT - MonthYear, avg(CEODailyBeatTime_secs) as AVG_Month, - CAST(from_unixtime(avg(CEODailyBeatTime_secs),'hh:mm:ss a') as string) as Format_Avg - - FROM CEO_TimeOnStreet_Summary - GROUP BY MonthYear - ORDER BY MonthYear), - -/****************************************************************************************************************************** -Get the time to the first Beat Street -******************************************************************************************************************************/ Start_Time as ( - SELECT - CEO, patrol_date, officer_patrolling_date StartTime - - FROM parking_ceo_on_street - WHERE import_Date = (Select MAX(import_date) from parking_ceo_on_street) AND beatname != '' - AND Status_flag IN ('SSS')), - -First_Street_Time as ( - SELECT - CEO, patrol_date, MIN(officer_patrolling_date) FirstBeatStreet - FROM parking_ceo_on_street - WHERE import_Date = (Select MAX(import_date) from parking_ceo_on_street) AND beatname != '' - AND Status_flag IN ('SS') - GROUP BY CEO, patrol_date), - -Time_To_Beat_Street as ( - SELECT - A.CEO, A.patrol_date, CAST(substr(cast(A.patrol_date as string), 1, 8)||'01' as date) as MonthYear, - StartTime, FirstBeatStreet, - SUBSTRING(cast(unix_timestamp(FirstBeatStreet)-unix_timestamp(StartTime) as timestamp), 11) as Secs_to_First_Beat_Street - - /*date_diff('second',StartTime, FirstBeatStreet) as Secs_to_First_Beat_Street*/ - - FROM Start_Time as A - LEFT JOIN First_Street_Time as B ON A.CEO = B.CEO AND A.patrol_date = B.patrol_date), - -AVG_Time_to_Beat as ( - SELECT - MonthYear, - CAST(from_unixtime(avg(Secs_to_First_Beat_Street),'hh:mm:ss a') as string) as Format_Total_Avg - - FROM Time_To_Beat_Street - GROUP BY MonthYear), - -/****************************************************************************************************************************** -Get the Average amount of time CEO on Break and the average amoun of time CEO onStreet -******************************************************************************************************************************/ -CEO_Break_Full as ( - SELECT - officer_shoulder_no, ceo_protrol_date, - CAST(substr(cast(ceo_protrol_date as string), 1, 8)||'01' as date) as MonthYear, - total_break, timeonstreet_secs - - FROM parking_ceo_summary - WHERE import_Date = (Select MAX(import_date) from parking_ceo_summary)), - -Monthly_Break_AVG as ( - SELECT - MonthYear, - avg(total_break) as AVG_Break_Month, - CAST(from_unixtime(avg(total_break),'hh:mm:ss a') as string) as Format_Break_Avg, - avg(timeonstreet_secs) as AVG_Total_Month, - CAST(from_unixtime(avg(timeonstreet_secs),'hh:mm:ss a') as string) as Format_Total_Avg - - FROM CEO_Break_Full - GROUP BY MonthYear - ORDER BY MonthYear) -/****************************************************************************************************************************** -Output the data -******************************************************************************************************************************/ -SELECT - A.MonthYear, - CAST(A.Format_Total_Avg as string) as Avg_Time_out, - CAST(Format_Avg as string) as Avg_Time_on_Beat, - CAST(Format_Break_Avg as string) as Avg_Break, - CAST(C.Format_Total_Avg as string) as Avg_Time_to_Beat, - - current_timestamp() as ImportDateTime, - date_format(current_date, 'yyyy') AS import_year, - date_format(current_date, 'MM') AS import_month, - date_format(current_date, 'dd') AS import_day, - date_format(current_date, 'yyyyMMdd') AS import_date - -FROM Monthly_Break_AVG as A -LEFT JOIN Monthly_AVG as B ON A.MonthYear = B.MonthYear -LEFT JOIN AVG_Time_to_Beat as C ON A.MonthYear = C.MonthYear - -""" -ApplyMapping_node2 = sparkSqlQuery( - glueContext, - query=SqlQuery0, - mapping={ - "parking_ceo_on_street": AmazonS3_node1628173244776, - "parking_ceo_summary": AmazonS3_node1632912445458, - }, - transformation_ctx="ApplyMapping_node2", -) - -# Script generated for node S3 bucket -S3bucket_node3 = glueContext.getSink( - path="s3://dataplatform-" - + environment - + "-refined-zone/parking/liberator/Parking_CEO_Average_On_Street/", - connection_type="s3", - updateBehavior="UPDATE_IN_DATABASE", - partitionKeys=["import_year", "import_month", "import_day", "import_date"], - enableUpdateCatalog=True, - transformation_ctx="S3bucket_node3", -) -S3bucket_node3.setCatalogInfo( - catalogDatabase="dataplatform-" + environment + "-liberator-refined-zone", - catalogTableName="Parking_CEO_Average_On_Street", -) -S3bucket_node3.setFormat("glueparquet") -S3bucket_node3.writeFrame(ApplyMapping_node2) -job.commit() diff --git a/scripts/jobs/parking/parking_ceo_average_on_street_hrs_mins_secs.py b/scripts/jobs/parking/parking_ceo_average_on_street_hrs_mins_secs.py deleted file mode 100644 index ae7d06efc..000000000 --- a/scripts/jobs/parking/parking_ceo_average_on_street_hrs_mins_secs.py +++ /dev/null @@ -1,215 +0,0 @@ -import sys - -from awsglue import DynamicFrame -from awsglue.context import GlueContext -from awsglue.job import Job -from awsglue.transforms import * -from awsglue.utils import getResolvedOptions -from pyspark.context import SparkContext - -from scripts.helpers.helpers import create_pushdown_predicate, get_glue_env_var - - -environment = get_glue_env_var("environment") - - -def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame: - for alias, frame in mapping.items(): - frame.toDF().createOrReplaceTempView(alias) - result = spark.sql(query) - return DynamicFrame.fromDF(result, glueContext, transformation_ctx) - - -args = getResolvedOptions(sys.argv, ["JOB_NAME"]) -sc = SparkContext() -glueContext = GlueContext(sc) -spark = glueContext.spark_session -job = Job(glueContext) -job.init(args["JOB_NAME"], args) - -# Script generated for node Amazon S3 -AmazonS3_node1628173244776 = glueContext.create_dynamic_frame.from_catalog( - database="dataplatform-" + environment + "-liberator-refined-zone", - table_name="parking_ceo_on_street", - transformation_ctx="AmazonS3_node1628173244776", - push_down_predicate=create_pushdown_predicate("import_date", 7), -) - -# Script generated for node Amazon S3 -AmazonS3_node1638273151502 = glueContext.create_dynamic_frame.from_catalog( - database="dataplatform-" + environment + "-liberator-refined-zone", - table_name="parking_ceo_summary", - transformation_ctx="AmazonS3_node1638273151502", - push_down_predicate=create_pushdown_predicate("import_date", 7), -) - -# Script generated for node ApplyMapping -SqlQuery0 = """ -/****************************************************************************************************************************** -Parking_CEO_Average_On_Street_hrs_mins_secs - -This SQL creates the average CEO figures for time on street, breaks, etc - -15/11/2021 - create SQL - -******************************************************************************************************************************/ -/*** Collect the parking_ceo_on_street data ***/ -With On_Street as ( - SELECT - ceo, patrol_date, officer_patrolling_date, street, cpzname, - status_flag, beat, beatname, timeonstreet_secs - FROM parking_ceo_on_street - WHERE import_Date = (Select MAX(import_date) from parking_ceo_on_street)), - -/*** Obtain the Time On Street Average ***/ -CEO_TimeOnStreet_Summary as ( - SELECT - CEO, patrol_date, - CAST(substr(cast(patrol_date as string), 1, 8)||'01' as date) as MonthYear, - SUM(timeonstreet_secs) as CEODailyBeatTime_secs - - FROM On_Street - WHERE timeonstreet_secs > 0 - GROUP BY CEO, patrol_date - order by CEO, patrol_date), - -/*** Calculate the monthly BEAT average ***/ -Monthly_AVG as ( - SELECT - MonthYear, avg(CEODailyBeatTime_secs) as AVG_Month, - /*CAST(time '00:00:00' + avg(CEODailyBeatTime_secs) * interval '1' second as varchar) as Format_Avg*/ - CAST(from_unixtime(avg(CEODailyBeatTime_secs),'hh:mm:ss') as string) as Format_Avg - - FROM CEO_TimeOnStreet_Summary - GROUP BY MonthYear - ORDER BY MonthYear), - -/****************************************************************************************************************************** -Get the time to the first Beat Street -******************************************************************************************************************************/ -/*** Get the first CEO record ***/ -Start_Time as ( - SELECT - CEO, patrol_date, officer_patrolling_date StartTime - - FROM On_Street - WHERE beatname != '' - AND Status_flag IN ('SSS')), - -/*** Get the first beat street start record ***/ -First_Street_Time as ( - SELECT - CEO, patrol_date, MIN(officer_patrolling_date) FirstBeatStreet - FROM On_Street - WHERE beatname != '' - AND Status_flag IN ('SS') - GROUP BY CEO, patrol_date), - -/*** Calculate the time difference ***/ -Time_To_Beat_Street as ( - SELECT - A.CEO, A.patrol_date, CAST(substr(cast(A.patrol_date as string), 1, 8)||'01' as date) as MonthYear, - StartTime, FirstBeatStreet, - unix_timestamp(FirstBeatStreet)-unix_timestamp(StartTime) as Secs_to_First_Beat_Street - - /*date_diff('second',StartTime, FirstBeatStreet) as Secs_to_First_Beat_Street*/ - FROM Start_Time as A - LEFT JOIN First_Street_Time as B ON A.CEO = B.CEO AND A.patrol_date = B.patrol_date), - -AVG_Time_to_Beat as ( - SELECT - MonthYear, - CASE - When avg(Secs_to_First_Beat_Street) >= 43200 Then - CAST(from_unixtime(avg(Secs_to_First_Beat_Street),'hh:mm:ss') as string) - Else CAST(from_unixtime(avg(Secs_to_First_Beat_Street),'mm:ss') as string) - END as Format_Total_Avg - - /*CAST(time '00:00:00' + avg(Secs_to_First_Beat_Street) * interval '1' second as varchar) as Format_Total_Avg*/ - - FROM Time_To_Beat_Street - GROUP BY MonthYear), - -/****************************************************************************************************************************** -Get the Average amount of time CEO on Break and the average amoun of time CEO onStreet -******************************************************************************************************************************/ -CEO_Break_Full as ( - SELECT - officer_shoulder_no, ceo_protrol_date, - CAST(substr(cast(ceo_protrol_date as string), 1, 8)||'01' as date) as MonthYear, - total_break, timeonstreet_secs - - FROM parking_ceo_summary - WHERE import_Date = (Select MAX(import_date) from parking_ceo_summary)), - -Monthly_Break_AVG as ( - SELECT - MonthYear, - avg(total_break) as AVG_Break_Month, - - CASE - When avg(total_break) >= 43200 Then - CAST(from_unixtime(avg(total_break),'hh:mm:ss') as string) - ELSE CAST(from_unixtime(avg(total_break),'mm:ss') as string) - END as Format_Break_Avg, - - avg(timeonstreet_secs) as AVG_Total_Month, - CAST(from_unixtime(avg(timeonstreet_secs),'hh:mm:ss') as string) as Format_Total_Avg - - /*MonthYear, - avg(total_break) as AVG_Break_Month, - CAST(time '00:00:00' + avg(total_break) * interval '1' second as varchar) as Format_Break_Avg, - avg(timeonstreet_secs) as AVG_Total_Month, - CAST(time '00:00:00' + avg(timeonstreet_secs) * interval '1' second as varchar) as Format_Total_Avg*/ - - FROM CEO_Break_Full - GROUP BY MonthYear - ORDER BY MonthYear) -/****************************************************************************************************************************** -Output the data -******************************************************************************************************************************/ -SELECT - A.MonthYear, - CAST(A.Format_Total_Avg as string) as Avg_Time_out, - CAST(Format_Avg as string) as Avg_Time_on_Beat, - CAST(Format_Break_Avg as string) as Avg_Break, - CAST(C.Format_Total_Avg as string) as Avg_Time_to_Beat, - - current_timestamp() as ImportDateTime, - date_format(current_date, 'yyyy') AS import_year, - date_format(current_date, 'MM') AS import_month, - date_format(current_date, 'dd') AS import_day, - date_format(current_date, 'yyyyMMdd') AS import_date - -FROM Monthly_Break_AVG as A -LEFT JOIN Monthly_AVG as B ON A.MonthYear = B.MonthYear -LEFT JOIN AVG_Time_to_Beat as C ON A.MonthYear = C.MonthYear -""" -ApplyMapping_node2 = sparkSqlQuery( - glueContext, - query=SqlQuery0, - mapping={ - "parking_ceo_summary": AmazonS3_node1638273151502, - "parking_ceo_on_street": AmazonS3_node1628173244776, - }, - transformation_ctx="ApplyMapping_node2", -) - -# Script generated for node S3 bucket -S3bucket_node3 = glueContext.getSink( - path="s3://dataplatform-" - + environment - + "-refined-zone/parking/liberator/Parking_CEO_Average_On_Street_hrs_mins_secs/", - connection_type="s3", - updateBehavior="UPDATE_IN_DATABASE", - partitionKeys=["import_year", "import_month", "import_day", "import_date"], - enableUpdateCatalog=True, - transformation_ctx="S3bucket_node3", -) -S3bucket_node3.setCatalogInfo( - catalogDatabase="dataplatform-" + environment + "-liberator-refined-zone", - catalogTableName="Parking_CEO_Average_On_Street_hrs_mins_secs", -) -S3bucket_node3.setFormat("glueparquet") -S3bucket_node3.writeFrame(ApplyMapping_node2) -job.commit() diff --git a/scripts/jobs/parking/parking_ceo_on_street.py b/scripts/jobs/parking/parking_ceo_on_street.py deleted file mode 100644 index fff54924d..000000000 --- a/scripts/jobs/parking/parking_ceo_on_street.py +++ /dev/null @@ -1,200 +0,0 @@ -import sys - -from awsglue import DynamicFrame -from awsglue.context import GlueContext -from awsglue.job import Job -from awsglue.transforms import * -from awsglue.utils import getResolvedOptions -from pyspark.context import SparkContext - -from scripts.helpers.helpers import get_glue_env_var - -environment = get_glue_env_var("environment") - - -def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame: - for alias, frame in mapping.items(): - frame.toDF().createOrReplaceTempView(alias) - result = spark.sql(query) - return DynamicFrame.fromDF(result, glueContext, transformation_ctx) - - -args = getResolvedOptions(sys.argv, ["JOB_NAME"]) -sc = SparkContext() -glueContext = GlueContext(sc) -spark = glueContext.spark_session -job = Job(glueContext) -job.init(args["JOB_NAME"], args) - -# Script generated for node Amazon S3 -AmazonS3_node1628173244776 = glueContext.create_dynamic_frame.from_catalog( - database="dataplatform-" + environment + "-liberator-raw-zone", - table_name="liberator_pcn_ceo", - transformation_ctx="AmazonS3_node1628173244776", -) - -# Script generated for node Amazon S3 -AmazonS3_node1632912445458 = glueContext.create_dynamic_frame.from_catalog( - database="dataplatform-" + environment + "-liberator-raw-zone", - table_name="liberator_pcn_cb", - transformation_ctx="AmazonS3_node1632912445458", -) - -# Script generated for node Amazon S3 -AmazonS3_node1636704737623 = glueContext.create_dynamic_frame.from_catalog( - database="parking-raw-zone", - table_name="ceo_beat_streets", - transformation_ctx="AmazonS3_node1636704737623", -) - -# Script generated for node ApplyMapping -SqlQuery0 = """ -/*************************************************************************************************************************** -Parking_CEO_On_Street - -This SQL creates the On Street log, ONLY the start street/end street & PCN - -30/09/2021 - create SQL -09/11/2021 - Amend because PCN was against the ES street - remove filter for PCN -11/11/2021 - rewrite to add beat details and time on street for ES records -18/11/2021 - amend because Observation was against the ES street - remove filter for Observed -25/11/2021 - force the street names to lowercase -***************************************************************************************************************************/ -/*** Get the Beat streets ***/ -with BeatStreet as ( - SELECT - beatid, beatname, lower(streetname) as streetname - FROM ceo_beat_streets - WHERE import_Date = (Select MAX(import_date) from ceo_beat_streets)), - -/*** Collect the On Street data ***/ -CEO_OnStreet as ( - SELECT - row_number() over (partition by officer_shoulder_no, cast(substr(officer_patrolling_date, 1, 10) as date) - order by officer_shoulder_no,officer_patrolling_date) as Row_Num, - cast(substr(officer_patrolling_date, 1, 10) as date) as Patrol_Date, - officer_shoulder_no, - /** force the street name to lower ***/ - lower(officer_street_name) as officer_street_name, - Cast(officer_patrolling_date as timestamp) as officer_patrolling_date, - break_duration,ticket_number, observation_date,cpzname,vrm, - length(ltrim(Ticket_number)) as T_Len - FROM liberator_pcn_ceo - WHERE /*import_Date = (Select MAX(import_date) from liberator_pcn_ceo) AND*/ - officer_street_name != 'null' AND officer_street_name != '' - /*AND VRM != 'OBSERVED'*/ AND officer_street_name != 'CEO on break' - order by officer_shoulder_no, officer_patrolling_date), - -/*** Get the summerised list of CEO/Dates */ -CEO_Date as ( - Select officer_shoulder_no as CEO, cast(officer_patrolling_date as date) as Patrol_Date - From CEO_OnStreet - Group By officer_shoulder_no, cast(officer_patrolling_date as date)), - -/*** Identify the Street start/end and flag the PCN records ***/ -On_Street_Ident as ( - Select - A.*, CAST(B.officer_patrolling_date as timestamp) as officer_patrolling_date, B.officer_street_name as Street, B.ticket_number, B.vrm, - B.observation_date, B.cpzname, - /*** Flags SS - Start Street/ES - End Street/ PCN - PCN issued***/ - CASE - When B.Row_Num = 1 Then 'SSS' - When B.officer_street_name != C.officer_street_name AND B.officer_street_name != D.officer_street_name Then 'SS/ES' - When B.officer_street_name = C.officer_street_name AND B.officer_street_name != D.officer_street_name Then 'SS' - When B.officer_street_name = C.officer_street_name AND B.officer_street_name = D.officer_street_name - AND B.T_Len < 8 Then 'null' - When B.officer_street_name != C.officer_street_name AND B.officer_street_name != D.officer_street_name - AND B.T_Len < 8 Then 'null' - When B.officer_street_name != C.officer_street_name Then 'ES' - When B.T_Len >= 10 Then 'PCN' - When C.Row_Num is NULL Then 'ESS' - When C.Row_Num = 1 Then 'ESS' - Else 'null' - END as Status_Flag - From CEO_Date as A - inner join CEO_OnStreet as B ON A.CEO = B.officer_shoulder_no AND A.Patrol_Date = B.Patrol_Date - /** Rec AFTER ***/ - left join CEO_OnStreet as C ON A.CEO = C.officer_shoulder_no AND A.Patrol_Date = C.Patrol_Date AND C.Row_num = B.Row_Num+1 - /*** Rec BEFORE ***/ - left join CEO_OnStreet as D ON A.CEO = D.officer_shoulder_no AND A.Patrol_Date = D.Patrol_Date AND D.Row_num = B.Row_Num-1 - order by B.officer_shoulder_no, B.officer_patrolling_date), - -/*** Beat Info from APCOA ***/ -CEO_Beat as ( - SELECT activity_date, ceo_shoulder_no, beat, shift, beat_start_point, method_of_travel, travel_time, shift_start_time, shift_end_time - FROM liberator_pcn_cb - WHERE import_Date = (Select MAX(import_date) from liberator_pcn_cb)), - -/*** Get the start street and end street and create a record number ***/ -Order_Streets as ( - SELECT - CEO, Patrol_Date, officer_patrolling_date, street, status_flag, - row_number() over (partition by CEO, Patrol_Date - order by CEO, officer_patrolling_date) as Row_Num - FROM On_Street_Ident - WHERE Status_Flag IN ('SSS','SS','ES','ESS')), - -/*** using the data above get the amount of time that the CEO was on the street ***/ -Time_On_Street as ( - SELECT - A.*, - unix_timestamp(A.officer_patrolling_date)- unix_timestamp(D.officer_patrolling_date) as TimeOnStreet_Secs - - FROM Order_Streets as A - left join Order_Streets as D ON A.CEO = D.CEO AND A.Patrol_Date = D.Patrol_Date AND D.Row_num = A.Row_Num-1 - WHERE A.Status_flag IN ('ES','ESS')), - -/*** Merge Data ***/ -Display_Data as ( - SELECT - A.*, - B.beat, B.shift, B.beat_start_point, B.method_of_travel, B.travel_time, B.shift_start_time, B.shift_end_time, - C.beatname, C.streetname - - FROM On_Street_Ident as A - LEFT JOIN CEO_Beat as B ON A.CEO = B.ceo_shoulder_no AND A.Patrol_Date = B.activity_date - LEFT JOIN BeatStreet as C ON B.Beat = C.Beatname AND A.Street = C.streetname - WHERE Status_Flag != 'null' - Order by A.CEO, A.Officer_patrolling_date) - -SELECT A.*, B.TimeOnStreet_Secs, - - current_timestamp() as ImportDateTime, - date_format(current_date, 'yyyy') AS import_year, - date_format(current_date, 'MM') AS import_month, - date_format(current_date, 'dd') AS import_day, - date_format(current_date, 'yyyyMMdd') AS import_date - -FROM Display_Data as A -LEFT JOIN Time_On_Street as B ON A.CEO = B.CEO and A.officer_patrolling_date = B.officer_patrolling_date - and A.Status_Flag = B.status_flag -""" -ApplyMapping_node2 = sparkSqlQuery( - glueContext, - query=SqlQuery0, - mapping={ - "ceo_beat_streets": AmazonS3_node1636704737623, - "liberator_pcn_ceo": AmazonS3_node1628173244776, - "liberator_pcn_cb": AmazonS3_node1632912445458, - }, - transformation_ctx="ApplyMapping_node2", -) - -# Script generated for node S3 bucket -S3bucket_node3 = glueContext.getSink( - path="s3://dataplatform-" - + environment - + "-refined-zone/parking/liberator/Parking_CEO_On_Street/", - connection_type="s3", - updateBehavior="UPDATE_IN_DATABASE", - partitionKeys=["import_year", "import_month", "import_day", "import_date"], - enableUpdateCatalog=True, - transformation_ctx="S3bucket_node3", -) -S3bucket_node3.setCatalogInfo( - catalogDatabase="dataplatform-" + environment + "-liberator-refined-zone", - catalogTableName="Parking_CEO_On_Street", -) -S3bucket_node3.setFormat("glueparquet") -S3bucket_node3.writeFrame(ApplyMapping_node2) -job.commit() diff --git a/scripts/jobs/parking/parking_ceo_summary.py b/scripts/jobs/parking/parking_ceo_summary.py deleted file mode 100644 index 66f74907c..000000000 --- a/scripts/jobs/parking/parking_ceo_summary.py +++ /dev/null @@ -1,139 +0,0 @@ -import sys - -from awsglue import DynamicFrame -from awsglue.context import GlueContext -from awsglue.job import Job -from awsglue.transforms import * -from awsglue.utils import getResolvedOptions -from pyspark.context import SparkContext - -from scripts.helpers.helpers import get_glue_env_var - -environment = get_glue_env_var("environment") - - -def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame: - for alias, frame in mapping.items(): - frame.toDF().createOrReplaceTempView(alias) - result = spark.sql(query) - return DynamicFrame.fromDF(result, glueContext, transformation_ctx) - - -args = getResolvedOptions(sys.argv, ["JOB_NAME"]) -sc = SparkContext() -glueContext = GlueContext(sc) -spark = glueContext.spark_session -job = Job(glueContext) -job.init(args["JOB_NAME"], args) - -# Script generated for node Amazon S3 -AmazonS3_node1628173244776 = glueContext.create_dynamic_frame.from_catalog( - database="dataplatform-" + environment + "-liberator-raw-zone", - table_name="liberator_pcn_ceo", - transformation_ctx="AmazonS3_node1628173244776", -) - -# Script generated for node Amazon S3 -AmazonS3_node1632912445458 = glueContext.create_dynamic_frame.from_catalog( - database="dataplatform-" + environment + "-liberator-raw-zone", - table_name="liberator_pcn_cb", - transformation_ctx="AmazonS3_node1632912445458", -) - -# Script generated for node ApplyMapping -SqlQuery0 = """ -/*************************************************************************************************************************** -Parking_CEO_Summary - -This SQL summerises the CEO On-Street logs - -28/09/2021 - create SQL -***************************************************************************************************************************/ -WITH CEO_Summary as ( -select - officer_shoulder_no, cast(substr(officer_patrolling_date, 1, 10) as date) CEO_Protrol_Date, - Min(cast(officer_patrolling_date as timestamp)) CEO_Start_DateTime, - Max(cast(officer_patrolling_date as timestamp)) CEO_End_DateTime, - SUM(CASE When ticket_number like 'QZ%' Then 1 else 0 end) No_PCNs_Issued, - SUM(CASE When vrm = 'OBSERVED' Then 1 else 0 END) No_of_Observations, - SUM(CASE When officer_street_name = 'CEO on break' Then 1 Else 0 END) CEO_No_of_Breaks, - SUM(CASE When officer_street_name = 'CEO on break' Then - unix_timestamp(break_end)-unix_timestamp(break_start) else 0 end) as Total_Break - -from liberator_pcn_ceo as A -WHERE officer_street_name != 'null' -Group By officer_shoulder_no, cast(substr(officer_patrolling_date, 1, 10) as date)), - -PCN_CB as ( -Select ceo_shoulder_no,activity_date, - beat, shift, beat_start_point, shift_lunch, method_of_travel, - travel_time, - shift_start_time, - shift_end_time -FROM liberator_pcn_cb -WHERE import_Date = (Select MAX(import_date) from liberator_pcn_cb)), - -StreetList as ( -select officer_shoulder_no, cast(substr(officer_patrolling_date, 1, 10) as date) CEO_Protrol_Date, officer_street_name as Street -from liberator_pcn_ceo as A -WHERE officer_street_name != 'null' -Group By officer_shoulder_no, cast(substr(officer_patrolling_date, 1, 10) as date), officer_street_name), - -StreetSum as ( -Select officer_shoulder_no, CEO_Protrol_Date, count(*) as No_of_Streets_Visited -From StreetList -Group By officer_shoulder_no, CEO_Protrol_Date) - -SELECT - A.*, - unix_timestamp(CEO_End_DateTime)-unix_timestamp(CEO_Start_DateTime) as TimeOnStreet_Secs, - - SUBSTRING(cast(Total_Break as timestamp), 11) as Total_Break_HrMin, - SUBSTRING(cast(unix_timestamp(CEO_End_DateTime)-unix_timestamp(CEO_Start_DateTime) as timestamp), 11) as TimeOnStreet_HourMin, - - No_of_Streets_Visited, - - /*** Beat information ***/ - C.beat, C.shift, C.beat_start_point, C.shift_lunch, C.method_of_travel, - LTRIM(REPLACE(REPLACE(C.travel_time, 'Minutes',''),'mins','')) as travel_time, - C.shift_start_time, - C.shift_end_time, - - current_timestamp() as ImportDateTime, - date_format(current_date, 'yyyy') AS import_year, - date_format(current_date, 'MM') AS import_month, - date_format(current_date, 'dd') AS import_day, - date_format(current_date, 'yyyyMMdd') AS import_date - -FROM CEO_Summary as A -LEFT JOIN StreetSum as B ON A.officer_shoulder_no = B.officer_shoulder_no AND A.CEO_Protrol_Date = B.CEO_Protrol_Date -LEFT JOIN PCN_CB as C ON A.officer_shoulder_no = C.ceo_shoulder_no AND A.CEO_Protrol_Date = C.activity_date -""" -ApplyMapping_node2 = sparkSqlQuery( - glueContext, - query=SqlQuery0, - mapping={ - "liberator_pcn_ceo": AmazonS3_node1628173244776, - "liberator_pcn_cb": AmazonS3_node1632912445458, - }, - transformation_ctx="ApplyMapping_node2", -) - -# Script generated for node S3 bucket -S3bucket_node3 = glueContext.getSink( - path="s3://dataplatform-" - + environment - + "-refined-zone/parking/liberator/Parking_CEO_Summary/", - connection_type="s3", - updateBehavior="UPDATE_IN_DATABASE", - partitionKeys=["import_year", "import_month", "import_day", "import_date"], - enableUpdateCatalog=True, - transformation_ctx="S3bucket_node3", -) -S3bucket_node3.setCatalogInfo( - catalogDatabase="dataplatform-" + environment + "-liberator-refined-zone", - catalogTableName="Parking_CEO_Summary", -) -S3bucket_node3.setFormat("glueparquet") -S3bucket_node3.writeFrame(ApplyMapping_node2) -job.commit() diff --git a/scripts/jobs/parking/parking_deployment_target_details.py b/scripts/jobs/parking/parking_deployment_target_details.py deleted file mode 100644 index 2a78def29..000000000 --- a/scripts/jobs/parking/parking_deployment_target_details.py +++ /dev/null @@ -1,327 +0,0 @@ -import sys - -from awsglue import DynamicFrame -from awsglue.context import GlueContext -from awsglue.job import Job -from awsglue.transforms import * -from awsglue.utils import getResolvedOptions -from pyspark.context import SparkContext - -from scripts.helpers.helpers import create_pushdown_predicate, get_glue_env_var - - -environment = get_glue_env_var("environment") - - -def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame: - for alias, frame in mapping.items(): - frame.toDF().createOrReplaceTempView(alias) - result = spark.sql(query) - return DynamicFrame.fromDF(result, glueContext, transformation_ctx) - - -args = getResolvedOptions(sys.argv, ["JOB_NAME"]) -sc = SparkContext() -glueContext = GlueContext(sc) -spark = glueContext.spark_session -job = Job(glueContext) -job.init(args["JOB_NAME"], args) - -# Script generated for node Amazon S3 -AmazonS3_node1632912445458 = glueContext.create_dynamic_frame.from_catalog( - database="parking-raw-zone", - table_name="calendar", - transformation_ctx="AmazonS3_node1632912445458", -) - -# Script generated for node Amazon S3 -AmazonS3_node1633593610551 = glueContext.create_dynamic_frame.from_catalog( - database="parking-raw-zone", - table_name="ceo_visit_req_timings", - transformation_ctx="AmazonS3_node1633593610551", -) - -# Script generated for node Amazon S3 -AmazonS3_node1633593851886 = glueContext.create_dynamic_frame.from_catalog( - database="dataplatform-" + environment + "-liberator-refined-zone", - table_name="parking_ceo_on_street", - transformation_ctx="AmazonS3_node1633593851886", - push_down_predicate=create_pushdown_predicate("import_date", 7), -) - -# Script generated for node Amazon S3 -AmazonS3_node1633594330463 = glueContext.create_dynamic_frame.from_catalog( - database="parking-raw-zone", - table_name="ceo_beat_visit_requirements", - transformation_ctx="AmazonS3_node1633594330463", -) - -# Script generated for node ApplyMapping -SqlQuery0 = """ -/*************************************************************************************************************************** -Parking_Deployment_Target_Details - -This SQL calculates the deployment details against agreed targets - -07/10/2021 - create SQL -***************************************************************************************************************************/ -/*** Format the date because Calendar data imported with different date formats!!! ***/ -With CalendarFormat as ( - SELECT - CAST(CASE - When date like '%/%'Then substr(date, 7, 4)||'-'||substr(date, 4,2)||'-'||substr(date, 1,2) - ELSE substr(date, 1, 10) - end as date) as date, - workingday, - holiday, - dow, - fin_year, - fin_year_startdate, - fin_year_enddate, - ROW_NUMBER() OVER ( PARTITION BY date - ORDER BY date, import_date DESC) row_num - FROM calendar), - -/*** Continue formatting the Calendar data, get first and last date of the month, etc ***/ -cteCalendarBefore as ( -select distinct - date_format(cast(date as timestamp), "M") as Month, - MIN(date) as MonthStart, - MAX(date) as MonthEnd, - SUM(CASE When workingday = '1' AND holiday = '0' Then 1 ELSE 0 END) as TotWorkingDays, - SUM(CASE When workingday = '0' AND holiday = '0' Then 1 ELSE 0 END) as NonWorkingDays -FROM CalendarFormat -WHERE date >= (Select cast(substr(fin_year_startdate, 1, 10) as date) FROM CalendarFormat - Where date = current_date AND row_num = 1) -group by month(cast(date as timestamp)),year(cast(date as timestamp)),date_format(cast(date as timestamp), "M")), - -CalendarFULL as ( - Select - date, - cast(substr(cast(date as string), 1, 8)||'01' as date) As FinMonthStartDate, - workingday, - dow, - holiday, - fin_year, - fin_year_startdate, - fin_year_enddate -FROM CalendarFormat), - -/*************************************************************************************************************************************** -Populate the Deployment timings table -***************************************************************************************************************************************/ -DepTimings as ( -select full_cpz, - CASE When mfam != '' Then substr(mfam, 1, 5)||':00' else cast(null as string) end as mfam_B, - CASE When mfam != '' Then substr(mfam, 7, 5)||':00' else cast(null as string) end as mfam_E, - /***********************************************************************************************/ - CASE When mfpm != '' Then substr(mfpm, 1, 5)||':00' else cast(null as string) end as mfpm_B, - CASE When mfpm != '' Then substr(mfpm, 7, 5)||':00' else cast(null as string) end as mfpm_E, - /***********************************************************************************************/ - CASE When mfev != '' Then substr(mfev, 1, 5)||':00' else cast(null as string) end as mfev_B, - CASE When mfev != '' Then substr(mfev, 7, 5)||':00' else cast(null as string) end as mfev_E, - /***********************************************************************************************/ - CASE When mfat != '' Then substr(mfat, 1, 5)||':00' else cast(null as string) end as mfat_B, - CASE When mfat != '' Then substr(mfat, 7, 5)||':00' else cast(null as string) end as mfat_E, - /***********************************************************************************************/ - CASE When satam != '' Then substr(satam, 1, 5)||':00' else cast(null as string) end as satam_B, - CASE When satam != '' Then substr(satam, 7, 5)||':00' else cast(null as string) end as satam_E, - /***********************************************************************************************/ - CASE When satpm != '' Then substr(satpm, 1, 5)||':00' else cast(null as string) end as satpm_B, - CASE When satpm != '' Then substr(satpm, 7, 5)||':00' else cast(null as string) end as satpm_E, - /***********************************************************************************************/ - CASE When satev != '' Then substr(satev, 1, 5)||':00' else cast(null as string) end as satev_B, - CASE When satev != '' Then substr(satev, 7, 5)||':00' else cast(null as string) end as satev_E -FROM ceo_visit_req_timings -WHERE import_Date = (Select MAX(import_date) from ceo_visit_req_timings) AND full_cpz != 'X Parking Zone'), - -Visit_Req as ( - SELECT * - from ceo_beat_visit_requirements - WHERE import_Date = (Select MAX(import_date) from ceo_visit_req_timings)), - -/*************************************************************************************************************************************** -Populate the Deployment Targets table -***************************************************************************************************************************************/ -cteCalendarSummary as ( -SELECT - MonthStart, - - lower(CASE - When street_name = 'Kingsland Road ©' Then 'Kingsland Road (C)' - ELSE Street_Name - END) as Street_Name, - - CASE - When CPZ = 'EST' Then 'Estates' - When CPZ = 'CP' Then 'Car Parks' - ELSE CPZ - END as CPZ, - - cast((MFAM) as int) * TotWorkingDays MFAM, - cast((MFPM) as int) * TotWorkingDays MFPM, - cast((MFEV) as int) * TotWorkingDays MFEV, - cast((MFAT) as int) * TotWorkingDays MFAT, - cast((SATAM) as int) * NonWorkingDays/2 SATAM, - cast((SATPM) as int) * NonWorkingDays/2 SATPM, - cast((SATEV) as int) * NonWorkingDays/2 SATEV - -FROM cteCalendarBefore cross join Visit_Req -WHERE MonthEnd <= current_date AND cpz != 'X'), - -/*************************************************************************************************************************************** -Create the on-street list, and add time zone -***************************************************************************************************************************************/ -On_Street_Breakdown as ( - SELECT - CEO, - patrol_date, - FinMonthStartDate, - officer_patrolling_date as patrol_time, - lower(Street) as Street, CPZname, - CASE - When dow IN ('Monday','Tuesday','Wednesday','Thursday','Friday') Then - CASE - When officer_patrolling_date between - CAST(substr(cast(officer_patrolling_date as string), 1, 10)||' '||mfam_B as timestamp) AND - CAST(substr(cast(officer_patrolling_date as string), 1, 10)||' '||mfam_E as timestamp) Then 'MFAM' - When officer_patrolling_date between - CAST(substr(cast(officer_patrolling_date as string), 1, 10)||' '||mfpm_B as timestamp) AND - CAST(substr(cast(officer_patrolling_date as string), 1, 10)||' '||mfpm_E as timestamp) Then 'MFPM' - When officer_patrolling_date between - CAST(substr(cast(officer_patrolling_date as string), 1, 10)||' '||mfev_B as timestamp) AND - CAST(substr(cast(officer_patrolling_date as string), 1, 10)||' '||mfev_E as timestamp) Then 'MFEV' - ELSE 'MFAT' - END - When dow IN ('Saturday') Then - CASE - When officer_patrolling_date between - CAST(substr(cast(officer_patrolling_date as string), 1, 10)||' '||satam_B as timestamp) AND - CAST(substr(cast(officer_patrolling_date as string), 1, 10)||' '||satam_E as timestamp) Then 'SATAM' - When officer_patrolling_date between - CAST(substr(cast(officer_patrolling_date as string), 1, 10)||' '||satpm_B as timestamp) AND - CAST(substr(cast(officer_patrolling_date as string), 1, 10)||' '||satpm_E as timestamp) Then 'SATPM' - When officer_patrolling_date between - CAST(substr(cast(officer_patrolling_date as string), 1, 10)||' '||satev_B as timestamp) AND - CAST(substr(cast(officer_patrolling_date as string), 1, 10)||' '||satev_E as timestamp) Then 'SATEV' - ELSE 'SATAT' - END - ELSE dow - END as TimeZone - FROM parking_ceo_on_street as A - - LEFT JOIN CalendarFULL as C ON A.patrol_date = C.date - LEFT JOIN DepTimings as D ON A.cpzname = D.full_cpz - - WHERE import_Date = (Select MAX(import_date) from parking_ceo_on_street) AND status_flag IN ('SS/ES', 'SS', 'SSS') - AND A.street != 'CEO In transit'), - -/*************************************************************************************************************************************** -Summerise the on-street & time zone to date/street/zone & timezone -***************************************************************************************************************************************/ -On_Street_Summary as ( - SELECT - cpzname, street, - FinMonthStartDate, TimeZone, - count(*) as TotalNoOfVisits - FROM On_Street_Breakdown - WHERE TimeZone != 'Sunday' - GROUP BY cpzname, street,FinMonthStartDate, TimeZone), - -/*************************************************************************************************************************************** -Bring the Summary and Target records togather -***************************************************************************************************************************************/ -Display_Data as ( - SELECT - MonthStart as Month, Street_Name, CPZ, B.cpzname, - /** Collect the Target values ***/ - MFAM, MFPM, MFEV, MFAT, SATAM, SATPM, SATEV, - - /** Collect the actual number of times the road was visited ***/ - coalesce(B.TotalNoOfVisits,0) as Act_MFAM, - coalesce(C.TotalNoOfVisits,0) as Act_MFPM, - coalesce(D.TotalNoOfVisits,0) as Act_MFEV, - coalesce(E.TotalNoOfVisits,0) as Act_MFAT, - - coalesce(F.TotalNoOfVisits,0) as Act_SATAM, - coalesce(G.TotalNoOfVisits,0) as Act_SATPM, - coalesce(H.TotalNoOfVisits,0) as Act_SATEV, - - - /** try and trap duplicates **/ - ROW_NUMBER() OVER ( PARTITION BY MonthStart, Street_Name - ORDER BY MonthStart, Street_Name DESC) row_num - - FROM cteCalendarSummary as A - LEFT JOIN On_Street_Summary as B ON A.MonthStart = B.FinMonthStartDate - AND A.Street_Name = B.street AND B.TimeZone = 'MFAM' - LEFT JOIN On_Street_Summary as C ON A.MonthStart = C.FinMonthStartDate - AND A.Street_Name = C.street AND C.TimeZone = 'MFPM' - LEFT JOIN On_Street_Summary as D ON A.MonthStart = D.FinMonthStartDate - AND A.Street_Name = D.street AND D.TimeZone = 'MFEV' - LEFT JOIN On_Street_Summary as E ON A.MonthStart = E.FinMonthStartDate - AND A.Street_Name = E.street AND E.TimeZone = 'MFAT' - /** Saturday **/ - - LEFT JOIN On_Street_Summary as F ON A.MonthStart = F.FinMonthStartDate - AND A.Street_Name = F.street AND F.TimeZone = 'SATAM' - LEFT JOIN On_Street_Summary as G ON A.MonthStart = G.FinMonthStartDate - AND A.Street_Name = G.street AND G.TimeZone = 'SATPM' - LEFT JOIN On_Street_Summary as H ON A.MonthStart = H.FinMonthStartDate - AND A.Street_Name = H.street AND H.TimeZone = 'SATEV' - ORDER BY A.MonthStart) - -/****************************************************************************** -Output the data -*******************************************************************************/ -SELECT - Month, Street_Name, CPZ, cpzname, - MFAM, - MFPM, - MFEV, - MFAT, - SATAM, - SATPM, - SATEV, - Act_MFAM, Act_MFPM, Act_MFEV, Act_MFAT, Act_SATAM, Act_SATPM, Act_SATEV, - - current_timestamp() as ImportDateTime, - date_format(current_date, 'yyyy') AS import_year, - date_format(current_date, 'MM') AS import_month, - date_format(current_date, 'dd') AS import_day, - date_format(current_date, 'yyyyMMdd') AS import_date - -FROM Display_Data -WHERE row_num = 1 - -""" -ApplyMapping_node2 = sparkSqlQuery( - glueContext, - query=SqlQuery0, - mapping={ - "parking_ceo_on_street": AmazonS3_node1633593851886, - "ceo_beat_visit_requirements": AmazonS3_node1633594330463, - "ceo_visit_req_timings": AmazonS3_node1633593610551, - "calendar": AmazonS3_node1632912445458, - }, - transformation_ctx="ApplyMapping_node2", -) - -# Script generated for node S3 bucket -S3bucket_node3 = glueContext.getSink( - path="s3://dataplatform-" - + environment - + "-refined-zone/parking/liberator/Parking_Deployment_Target_Details/", - connection_type="s3", - updateBehavior="UPDATE_IN_DATABASE", - partitionKeys=["import_year", "import_month", "import_day", "import_date"], - enableUpdateCatalog=True, - transformation_ctx="S3bucket_node3", -) -S3bucket_node3.setCatalogInfo( - catalogDatabase="dataplatform-" + environment + "-liberator-refined-zone", - catalogTableName="Parking_Deployment_Target_Details", -) -S3bucket_node3.setFormat("glueparquet") -S3bucket_node3.writeFrame(ApplyMapping_node2) -job.commit() diff --git a/scripts/jobs/parking/parking_percent_street_coverage.py b/scripts/jobs/parking/parking_percent_street_coverage.py deleted file mode 100644 index b026a982b..000000000 --- a/scripts/jobs/parking/parking_percent_street_coverage.py +++ /dev/null @@ -1,103 +0,0 @@ -import sys - -from awsglue import DynamicFrame -from awsglue.context import GlueContext -from awsglue.job import Job -from awsglue.transforms import * -from awsglue.utils import getResolvedOptions -from pyspark.context import SparkContext - -from scripts.helpers.helpers import get_glue_env_var - -environment = get_glue_env_var("environment") - - -def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame: - for alias, frame in mapping.items(): - frame.toDF().createOrReplaceTempView(alias) - result = spark.sql(query) - return DynamicFrame.fromDF(result, glueContext, transformation_ctx) - - -args = getResolvedOptions(sys.argv, ["JOB_NAME"]) -sc = SparkContext() -glueContext = GlueContext(sc) -spark = glueContext.spark_session -job = Job(glueContext) -job.init(args["JOB_NAME"], args) - -# Script generated for node Amazon S3 -AmazonS3_node1628173244776 = glueContext.create_dynamic_frame.from_catalog( - database="dataplatform-" + environment + "-liberator-refined-zone", - table_name="parking_deployment_target_details", - transformation_ctx="AmazonS3_node1628173244776", -) - -# Script generated for node ApplyMapping -SqlQuery0 = """ -/****************************************************************************************************************************** -Parking_Percent_Street_Coverage - -This SQL creates the street % coverage by CEO - -26/11/2021 - create SQL -******************************************************************************************************************************/ -WITH Target_Actual as ( - SELECT - month, cpz, - cast(SUM(mfam+mfpm+mfev+mfat) as double) as Mon_Fri, - cast(cast(SUM(satam+satpm+satev) as int) as double) as Sat, - cast(SUM(act_mfam+act_mfpm+act_mfev+act_mfat) as double) as Act_Mon_Fri, - cast(SUM(act_satam+act_satpm+act_satev) as double) as Act_Sat - FROM parking_deployment_target_details - WHERE import_date = (Select MAX(import_date) from parking_deployment_target_details) - AND cpz != 'Estates' - GROUP BY month, cpz), -/*** Summerise the Mon-Fri & Sat totals into a Single Monthly Target ***/ -Monthly_Totals as ( - SELECT - month, - SUM(Mon_Fri+Sat) as zone_Target, - SUM(Act_Mon_Fri +Act_Sat) as zone_actual - FROM Target_Actual - GROUP BY month -order by month) -/*** Calculate the coverage percentage ***/ -SELECT - month, - Zone_Target as Monthly_NoStreets_Target, - zone_actual as Monthly_NoStreets_Actual, - round(((zone_actual - zone_target) / zone_target)*100, 2)+100 as Percentage_Coverage, - - current_timestamp() as ImportDateTime, - date_format(current_date, 'yyyy') AS import_year, - date_format(current_date, 'MM') AS import_month, - date_format(current_date, 'dd') AS import_day, - date_format(current_date, 'yyyyMMdd') AS import_date -FROM Monthly_Totals -""" -ApplyMapping_node2 = sparkSqlQuery( - glueContext, - query=SqlQuery0, - mapping={"parking_deployment_target_details": AmazonS3_node1628173244776}, - transformation_ctx="ApplyMapping_node2", -) - -# Script generated for node S3 bucket -S3bucket_node3 = glueContext.getSink( - path="s3://dataplatform-" - + environment - + "-refined-zone/parking/liberator/Parking_Percent_Street_Coverage/", - connection_type="s3", - updateBehavior="UPDATE_IN_DATABASE", - partitionKeys=["import_year", "import_month", "import_day", "import_date"], - enableUpdateCatalog=True, - transformation_ctx="S3bucket_node3", -) -S3bucket_node3.setCatalogInfo( - catalogDatabase="dataplatform-" + environment + "-liberator-refined-zone", - catalogTableName="Parking_Percent_Street_Coverage", -) -S3bucket_node3.setFormat("glueparquet") -S3bucket_node3.writeFrame(ApplyMapping_node2) -job.commit() diff --git a/scripts/jobs/parking/parking_percent_street_coverage_cpz.py b/scripts/jobs/parking/parking_percent_street_coverage_cpz.py deleted file mode 100644 index 6a40509be..000000000 --- a/scripts/jobs/parking/parking_percent_street_coverage_cpz.py +++ /dev/null @@ -1,107 +0,0 @@ -import sys - -from awsglue import DynamicFrame -from awsglue.context import GlueContext -from awsglue.job import Job -from awsglue.transforms import * -from awsglue.utils import getResolvedOptions -from pyspark.context import SparkContext - -from scripts.helpers.helpers import get_glue_env_var - -environment = get_glue_env_var("environment") - - -def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame: - for alias, frame in mapping.items(): - frame.toDF().createOrReplaceTempView(alias) - result = spark.sql(query) - return DynamicFrame.fromDF(result, glueContext, transformation_ctx) - - -args = getResolvedOptions(sys.argv, ["JOB_NAME"]) -sc = SparkContext() -glueContext = GlueContext(sc) -spark = glueContext.spark_session -job = Job(glueContext) -job.init(args["JOB_NAME"], args) - -# Script generated for node Amazon S3 -AmazonS3_node1628173244776 = glueContext.create_dynamic_frame.from_catalog( - database="dataplatform-" + environment + "-liberator-refined-zone", - table_name="parking_deployment_target_details", - transformation_ctx="AmazonS3_node1628173244776", -) - -# Script generated for node ApplyMapping -SqlQuery0 = """ -/****************************************************************************************************************************** -Parking_Percent_Street_Coverage_CPZ - -This SQL creates the street % coverage by Month & CPZ - -08/12/2021 - create SQL -******************************************************************************************************************************/ -WITH Target_Actual as ( - SELECT - month, cpz, - cast(SUM(mfam+mfpm+mfev+mfat) as double) as Mon_Fri, - cast(cast(SUM(satam+satpm+satev) as int) as double) as Sat, - cast(SUM(act_mfam+act_mfpm+act_mfev+act_mfat) as double) as Act_Mon_Fri, - cast(SUM(act_satam+act_satpm+act_satev) as double) as Act_Sat - FROM parking_deployment_target_details - WHERE import_date = (Select MAX(import_date) from parking_deployment_target_details) AND cpz != 'Estates' - GROUP BY month, cpz), -/*** Summerise the Mon-Fri & Sat totals into a Single Monthly Target ***/ -Monthly_Totals as ( - SELECT - month, cpz, - SUM(Mon_Fri+Sat) as zone_Target, - SUM(Act_Mon_Fri +Act_Sat) as zone_actual - FROM Target_Actual - GROUP BY month, cpz - order by month, cpz) -/*** Calculate the coverage percentage ***/ -SELECT - month, - cpz, - Zone_Target as Monthly_NoStreets_Target, - zone_actual as Monthly_NoStreets_Actual, - round(CASE - When round(((zone_actual - zone_target) / zone_target)*100, 2)+100 > 100 Then 100.00 - ELSE round(((zone_actual - zone_target) / zone_target)*100, 2)+100 - END, 2) as Percentage_Coverage, - - current_timestamp() as ImportDateTime, - date_format(current_date, 'yyyy') AS import_year, - date_format(current_date, 'MM') AS import_month, - date_format(current_date, 'dd') AS import_day, - date_format(current_date, 'yyyyMMdd') AS import_date -FROM Monthly_Totals -order by month, cpz -""" -ApplyMapping_node2 = sparkSqlQuery( - glueContext, - query=SqlQuery0, - mapping={"parking_deployment_target_details": AmazonS3_node1628173244776}, - transformation_ctx="ApplyMapping_node2", -) - -# Script generated for node S3 bucket -S3bucket_node3 = glueContext.getSink( - path="s3://dataplatform-" - + environment - + "-refined-zone/parking/liberator/Parking_Percent_Street_Coverage_CPZ/", - connection_type="s3", - updateBehavior="UPDATE_IN_DATABASE", - partitionKeys=["import_year", "import_month", "import_day", "import_date"], - enableUpdateCatalog=True, - transformation_ctx="S3bucket_node3", -) -S3bucket_node3.setCatalogInfo( - catalogDatabase="dataplatform-" + environment + "-liberator-refined-zone", - catalogTableName="Parking_Percent_Street_Coverage_CPZ", -) -S3bucket_node3.setFormat("glueparquet") -S3bucket_node3.writeFrame(ApplyMapping_node2) -job.commit() diff --git a/terraform/etl/38-aws-glue-job-parking.tf b/terraform/etl/38-aws-glue-job-parking.tf index a675a2b5c..f5fb10dce 100644 --- a/terraform/etl/38-aws-glue-job-parking.tf +++ b/terraform/etl/38-aws-glue-job-parking.tf @@ -215,159 +215,15 @@ module "parking_permit_de_normalisation" { } -module "parking_ceo_on_street" { - source = "../modules/aws-glue-job" - is_live_environment = local.is_live_environment - is_production_environment = local.is_production_environment - department = module.department_parking_data_source - job_name = "${local.short_identifier_prefix}parking_ceo_on_street" - helper_module_key = data.aws_s3_object.helpers.key - pydeequ_zip_key = data.aws_s3_object.pydeequ.key - spark_ui_output_storage_id = module.spark_ui_output_storage_data_source.bucket_id - script_name = "parking_ceo_on_street" - triggered_by_job = "${local.short_identifier_prefix}Copy parking Liberator landing zone to raw" - job_description = "This job creates the Permit de-normalised data" - workflow_name = "${local.short_identifier_prefix}parking-liberator-data-workflow" - number_of_workers_for_glue_job = 10 - glue_job_worker_type = "G.1X" - glue_version = "4.0" - job_parameters = { - "--job-bookmark-option" = "job-bookmark-disable" - "--environment" = var.environment - } -} - -module "parking_ceo_summary" { - source = "../modules/aws-glue-job" - is_live_environment = local.is_live_environment - is_production_environment = local.is_production_environment - department = module.department_parking_data_source - job_name = "${local.short_identifier_prefix}parking_ceo_summary" - helper_module_key = data.aws_s3_object.helpers.key - pydeequ_zip_key = data.aws_s3_object.pydeequ.key - spark_ui_output_storage_id = module.spark_ui_output_storage_data_source.bucket_id - script_name = "parking_ceo_summary" - triggered_by_job = "${local.short_identifier_prefix}Copy parking Liberator landing zone to raw" - job_description = "This job creates the Permit de-normalised data" - workflow_name = "${local.short_identifier_prefix}parking-liberator-data-workflow" - number_of_workers_for_glue_job = 10 - glue_job_worker_type = "G.1X" - glue_version = "4.0" - job_parameters = { - "--job-bookmark-option" = "job-bookmark-disable" - "--environment" = var.environment - } -} - -module "parking_deployment_target_details" { - source = "../modules/aws-glue-job" - is_live_environment = local.is_live_environment - is_production_environment = local.is_production_environment - department = module.department_parking_data_source - job_name = "${local.short_identifier_prefix}parking_deployment_target_details" - helper_module_key = data.aws_s3_object.helpers.key - pydeequ_zip_key = data.aws_s3_object.pydeequ.key - spark_ui_output_storage_id = module.spark_ui_output_storage_data_source.bucket_id - script_name = "parking_deployment_target_details" - triggered_by_job = module.parking_ceo_on_street.job_name - job_description = "This job creates the Permit de-normalised data" - workflow_name = "${local.short_identifier_prefix}parking-liberator-data-workflow" - number_of_workers_for_glue_job = 10 - glue_job_worker_type = "G.1X" - glue_version = "4.0" - job_parameters = { - "--job-bookmark-option" = "job-bookmark-disable" - "--environment" = var.environment - } -} - -module "parking_ceo_average_on_street" { - source = "../modules/aws-glue-job" - is_live_environment = local.is_live_environment - is_production_environment = local.is_production_environment - department = module.department_parking_data_source - job_name = "${local.short_identifier_prefix}parking_ceo_average_on_street" - helper_module_key = data.aws_s3_object.helpers.key - pydeequ_zip_key = data.aws_s3_object.pydeequ.key - spark_ui_output_storage_id = module.spark_ui_output_storage_data_source.bucket_id - script_name = "parking_ceo_average_on_street" - triggered_by_job = module.parking_ceo_on_street.job_name - job_description = "This job creates the Permit de-normalised data" - workflow_name = "${local.short_identifier_prefix}parking-liberator-data-workflow" - number_of_workers_for_glue_job = 10 - glue_job_worker_type = "G.1X" - glue_version = "4.0" - job_parameters = { - "--job-bookmark-option" = "job-bookmark-enable" - "--environment" = var.environment - } -} - -module "parking_percent_street_coverage" { - source = "../modules/aws-glue-job" - is_live_environment = local.is_live_environment - is_production_environment = local.is_production_environment - department = module.department_parking_data_source - job_name = "${local.short_identifier_prefix}parking_percent_street_coverage" - helper_module_key = data.aws_s3_object.helpers.key - pydeequ_zip_key = data.aws_s3_object.pydeequ.key - spark_ui_output_storage_id = module.spark_ui_output_storage_data_source.bucket_id - script_name = "parking_percent_street_coverage" - triggered_by_job = module.parking_deployment_target_details.job_name - job_description = "This job creates the Permit de-normalised data" - workflow_name = "${local.short_identifier_prefix}parking-liberator-data-workflow" - number_of_workers_for_glue_job = 10 - glue_job_worker_type = "G.1X" - glue_version = "4.0" - job_parameters = { - "--job-bookmark-option" = "job-bookmark-disable" - "--environment" = var.environment - } -} +# The airflow has the latest version of these 7 tables +# removed Parking_Deployment_Target_Details +# removed parking_ceo_average_on_street +# removed parking_ceo_on_street +# removed parking_ceo_summary +# removed parking_ceo_average_on_street_hrs_mins_secs +# removed parking_percent_street_coverage +# removed parking_percent_street_coverage_cpz -module "parking_ceo_average_on_street_hrs_mins_secs" { - source = "../modules/aws-glue-job" - is_live_environment = local.is_live_environment - is_production_environment = local.is_production_environment - department = module.department_parking_data_source - job_name = "${local.short_identifier_prefix}parking_ceo_average_on_street_hrs_mins_secs" - helper_module_key = data.aws_s3_object.helpers.key - pydeequ_zip_key = data.aws_s3_object.pydeequ.key - spark_ui_output_storage_id = module.spark_ui_output_storage_data_source.bucket_id - script_name = "parking_ceo_average_on_street_hrs_mins_secs" - triggered_by_job = module.parking_ceo_on_street.job_name - job_description = "This job creates the Permit de-normalised data" - workflow_name = "${local.short_identifier_prefix}parking-liberator-data-workflow" - number_of_workers_for_glue_job = 10 - glue_job_worker_type = "G.1X" - glue_version = "4.0" - job_parameters = { - "--job-bookmark-option" = "job-bookmark-disable" - "--environment" = var.environment - } -} - -module "parking_percent_street_coverage_cpz" { - source = "../modules/aws-glue-job" - is_live_environment = local.is_live_environment - is_production_environment = local.is_production_environment - department = module.department_parking_data_source - job_name = "${local.short_identifier_prefix}parking_percent_street_coverage_cpz" - helper_module_key = data.aws_s3_object.helpers.key - pydeequ_zip_key = data.aws_s3_object.pydeequ.key - spark_ui_output_storage_id = module.spark_ui_output_storage_data_source.bucket_id - script_name = "parking_percent_street_coverage_cpz" - triggered_by_job = module.parking_deployment_target_details.job_name - job_description = "This job creates the Permit de-normalised data" - workflow_name = "${local.short_identifier_prefix}parking-liberator-data-workflow" - number_of_workers_for_glue_job = 10 - glue_job_worker_type = "G.1X" - glue_version = "4.0" - job_parameters = { - "--job-bookmark-option" = "job-bookmark-disable" - "--environment" = var.environment - } -} module "parking_foreign_vrm_pcns" { source = "../modules/aws-glue-job"