diff --git a/notebook/scripts/planning/load-table-from-tascomi-API-endpoint.ipynb b/notebook/scripts/planning/load-table-from-tascomi-API-endpoint.ipynb index bc9ce2711..1d1648ffd 100644 --- a/notebook/scripts/planning/load-table-from-tascomi-API-endpoint.ipynb +++ b/notebook/scripts/planning/load-table-from-tascomi-API-endpoint.ipynb @@ -58,7 +58,7 @@ "private_key = PRIVATE_KEY.encode('utf-8')\n", "\n", "table_to_read = \"\"\n", - "request_uri = f'https://hackney-planning.tascomi.com/rest/v1/{table_to_read}'\n", + "request_uri = f'https://hackney-planning.idoxcloud.com/rest/v1/{table_to_read}'\n", "request_method = 'GET'" ] }, diff --git a/scripts/jobs/planning/tascomi_api_ingestion.py b/scripts/jobs/planning/tascomi_api_ingestion.py index 2d77c2c8e..e5c1352a9 100644 --- a/scripts/jobs/planning/tascomi_api_ingestion.py +++ b/scripts/jobs/planning/tascomi_api_ingestion.py @@ -3,33 +3,37 @@ from pyspark.sql.functions import udf, col, explode, lit from pyspark.sql.types import StructType, StructField, StringType, ArrayType from pyspark.sql import Row -import hmac, hashlib; -import base64; +import hmac +import hashlib +import base64 from datetime import datetime, timedelta from dateutil import tz -from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.dynamicframe import DynamicFrame from awsglue.job import Job from math import ceil -from scripts.helpers.helpers import get_glue_env_var, get_secret, add_import_time_columns, PARTITION_KEYS, table_exists_in_catalog +from scripts.helpers.helpers import get_glue_env_var, get_secret, add_import_time_columns, PARTITION_KEYS, \ + table_exists_in_catalog import json from distutils.util import strtobool + def authenticate_tascomi(headers, public_key, private_key): auth_hash = calculate_auth_hash(public_key, private_key) headers['X-Public'] = public_key headers['X-Hash'] = auth_hash return headers + def not_today(date_str): if not date_str: return True date = datetime.strptime(date_str[:19], "%Y-%m-%d %H:%M:%S") return date.date() != datetime.now().date() + def get_tascomi_resource(page_number, url, body): global public_key global private_key @@ -46,20 +50,23 @@ def get_tascomi_resource(page_number, url, body): res = {} try: res = requests.get(url, data=body, headers=headers) - if not res.text or json.loads(res.text) == None: + if not res.text or json.loads(res.text) is None: print(f"Null data response, with status code {res.status_code} for page {page_number}") return ([""], url, res.status_code, "Null data response.") records = json.loads(res.text) - serialized_records = [json.dumps(remove_gis_image(record)) for record in records if not_today(record['last_updated']) ] + serialized_records = [json.dumps(remove_gis_image(record)) for record in records if + not_today(record['last_updated'])] return (serialized_records, url, res.status_code, "") except Exception as e: exception = str(e) - print(f"ERROR: {exception} when getting page {page_number}. Status code {res.status_code}, response text {res.text}") + print( + f"ERROR: {exception} when getting page {page_number}. Status code {res.status_code}, response text {res.text}") return ([""], url, res.status_code, exception) + def calculate_auth_hash(public_key, private_key): tz_ldn = tz.gettz('Europe/London') now = datetime.now(tz_ldn) @@ -68,7 +75,8 @@ def calculate_auth_hash(public_key, private_key): token = crypt.hexdigest().encode('utf-8') return base64.b64encode(hmac.new(private_key.encode('utf-8'), token, hashlib.sha256).hexdigest().encode('utf-8')) -def get_number_of_pages(resource, query = ""): + +def get_number_of_pages(resource, query=""): global public_key global private_key @@ -79,45 +87,53 @@ def get_number_of_pages(resource, query = ""): headers = authenticate_tascomi(headers, public_key, private_key) - url = f'https://hackney-planning.tascomi.com/rest/v1/{resource}{query}' + url = f'https://hackney-planning.idoxcloud.com/rest/v1/{resource}{query}' res = requests.get(url, data="", headers=headers) if res.status_code == 202: logger.info(f"received status code 202, whilst getting number of pages for {resource}, with query {query}") - return { 'success': True, 'number_of_pages': 0 } + return {'success': True, 'number_of_pages': 0} if res.status_code == 200: number_of_results = res.headers['X-Number-Of-Results'] results_per_page = res.headers['X-Results-Per-Page'] - return { 'success': True, 'number_of_pages': ceil(int(number_of_results) / int(results_per_page)) } + return {'success': True, 'number_of_pages': ceil(int(number_of_results) / int(results_per_page))} error_message = f"Recieved status code {res.status_code} whilst trying to get number of pages for {resource}, {query}" logger.info(error_message) - return { 'success': False, 'error_message': error_message } + return {'success': False, 'error_message': error_message} + def get_days_since_last_import(last_import_date): yesterday = datetime.now() last_import_datetime = datetime.strptime(last_import_date, "%Y%m%d") number_days_to_query = (yesterday - last_import_datetime).days - days = [ datetime.strftime(yesterday - timedelta(days=day), "%Y-%m-%d") for day in range(1, number_days_to_query + 1)] + days = [datetime.strftime(yesterday - timedelta(days=day), "%Y-%m-%d") for day in + range(1, number_days_to_query + 1)] days.sort() return days -def get_last_import_date(glue_context, database, resource): +def get_last_import_date(glue_context, database, resource): if not table_exists_in_catalog(glue_context, f"api_response_{resource}", database): logger.info(f"Couldn't find table api_response_{resource} in database {database}.") return None logger.info(f"found table for {resource} api response in {database}") - return glue_context.sql(f"SELECT max(import_date) as max_import_date FROM `{database}`.api_response_{resource} where import_api_status_code = '200'").take(1)[0].max_import_date + return glue_context.sql( + f"SELECT max(import_date) as max_import_date FROM `{database}`.api_response_{resource} where import_api_status_code = '200'").take( + 1)[0].max_import_date + def throw_if_unsuccessful(success_state, message): if not success_state: raise Exception(message) + def get_failures_from_last_import(database, resource, last_import_date): - requests_df = glue_context.sql(f"SELECT page_number, import_api_url_requested as url, '' as body from `{database}`.api_response_{resource} where import_api_status_code != '200' and import_date={last_import_date}") - return { "requests": requests_df, "count": requests_df.count() } - + requests_df = glue_context.sql( + f"SELECT page_number, import_api_url_requested as url, '' as body from `{database}`.api_response_{resource} where import_api_status_code != '200' and import_date={last_import_date}") + return {"requests": requests_df, "count": requests_df.count()} + + def get_requests_since_last_import(resource, last_import_date): requests_list = [] for day in get_days_since_last_import(last_import_date): @@ -127,13 +143,16 @@ def get_requests_since_last_import(resource, last_import_date): number_of_pages = number_of_pages_reponse["number_of_pages"] logger.info(f"Number of pages to retrieve for {day}: {number_of_pages}") - requests_list += [ RequestRow(page_number, f'https://hackney-planning.tascomi.com/rest/v1/{resource}?page={page_number}&last_updated={day}', "") for page_number in range(1, number_of_pages + 1)] + requests_list += [RequestRow(page_number, + f'https://hackney-planning.idoxcloud.com/rest/v1/{resource}?page={page_number}&last_updated={day}', + "") for page_number in range(1, number_of_pages + 1)] number_of_requests = len(requests_list) if number_of_requests == 0: - return { "requests": [], "count": 0 } + return {"requests": [], "count": 0} requests_list = sc.parallelize(requests_list) requests_list = glue_context.createDataFrame(requests_list) - return { "requests": requests_list, "count": number_of_requests } + return {"requests": requests_list, "count": number_of_requests} + def get_requests_for_full_load(resource): number_of_pages_reponse = get_number_of_pages(resource) @@ -142,11 +161,13 @@ def get_requests_for_full_load(resource): number_of_pages = number_of_pages_reponse["number_of_pages"] logger.info(f"Number of pages to retrieve: {number_of_pages}") - requests_list = [RequestRow(page_number, f'https://hackney-planning.tascomi.com/rest/v1/{resource}?page={page_number}', "") for page_number in range(1, number_of_pages + 1)] + requests_list = [ + RequestRow(page_number, f'https://hackney-planning.idoxcloud.com/rest/v1/{resource}?page={page_number}', "") for + page_number in range(1, number_of_pages + 1)] number_of_requests = len(requests_list) requests_list = sc.parallelize(requests_list) requests_list = glue_context.createDataFrame(requests_list) - return { "requests": requests_list, "count": number_of_requests } + return {"requests": requests_list, "count": number_of_requests} def get_requests(last_import_date, resource, database): @@ -154,10 +175,11 @@ def get_requests(last_import_date, resource, database): try: retry_failures = strtobool(retry_arg_value) except ValueError: - raise Exception(f"--retry_failure_from_previous_import value must be recognised as a bool, received: {retry_arg_value}.") + raise Exception( + f"--retry_failure_from_previous_import value must be recognised as a bool, received: {retry_arg_value}.") if not last_import_date: - logger.info(f"Retrieving full load of data") + logger.info("Retrieving full load of data") return get_requests_for_full_load(resource) if retry_failures: logger.info(f"Getting failed requests from import on date {last_import_date}") @@ -175,20 +197,22 @@ def calculate_number_of_partitions(number_of_requests, number_of_workers): else: return max_partitions + def remove_gis_image(records): records.pop("gis_map_image_base64", None) return records + def retrieve_and_write_tascomi_data(glue_context, s3_target_url, resource, requests_list, partitions): request_df = requests_list.repartition(partitions) - response_df = request_df.withColumn("response", get_tascomi_resource_udf(col("page_number"), col("url"), col("body"))) + response_df = request_df.withColumn("response", + get_tascomi_resource_udf(col("page_number"), col("url"), col("body"))) - tascomi_responses_df = response_df.select( \ - col("page_number"), - explode(col("response.response_data")).alias(f"{resource}"), \ - col("response.import_api_url_requested").alias("import_api_url_requested"), \ - col("response.import_api_status_code").alias("import_api_status_code"), \ - col("response.import_exception_thrown").alias("import_exception_thrown")) + tascomi_responses_df = response_df.select(col("page_number"), + explode(col("response.response_data")).alias(f"{resource}"), + col("response.import_api_url_requested").alias("import_api_url_requested"), + col("response.import_api_status_code").alias("import_api_status_code"), + col("response.import_exception_thrown").alias("import_exception_thrown")) tascomi_responses_df = add_import_time_columns(tascomi_responses_df) @@ -201,10 +225,13 @@ def retrieve_and_write_tascomi_data(glue_context, s3_target_url, resource, reque return tascomi_responses_df + def get_failed_requests(data_frame): - return data_frame\ - .where((data_frame.import_api_status_code != '200') & (data_frame.import_api_status_code != '202'))\ - .select(data_frame.page_number.alias("page_number"), data_frame.import_api_url_requested.alias("url"), lit("").alias("body")) + return data_frame \ + .where((data_frame.import_api_status_code != '200') & (data_frame.import_api_status_code != '202')) \ + .select(data_frame.page_number.alias("page_number"), data_frame.import_api_url_requested.alias("url"), + lit("").alias("body")) + if __name__ == "__main__": args = getResolvedOptions(sys.argv, ['JOB_NAME']) @@ -247,7 +274,8 @@ def get_failed_requests(data_frame): partitions = calculate_number_of_partitions(requests_list["count"], number_of_workers) logger.info(f"Using {partitions} partitions to repartition the RDD.") - tascomi_responses = retrieve_and_write_tascomi_data(glue_context, s3_target_url, resource, requests_list["requests"], partitions) + tascomi_responses = retrieve_and_write_tascomi_data(glue_context, s3_target_url, resource, + requests_list["requests"], partitions) else: logger.info("No requests, exiting job") diff --git a/scripts/tests/planning/test_tascomi_parse_tables.py b/scripts/tests/planning/test_tascomi_parse_tables.py index ea745d4a7..fcaf294b4 100644 --- a/scripts/tests/planning/test_tascomi_parse_tables.py +++ b/scripts/tests/planning/test_tascomi_parse_tables.py @@ -7,39 +7,87 @@ class TestTascomiParsingRefinement: - def test_column_expansion(self, spark): - response = self.parse_json_into_dataframe(spark, 'contacts', [{'contacts': '{"id": "34607",' - ' "creation_user_id": null,' - ' "title_id": "4"}'}]) - expected = ['id', 'creation_user_id', 'title_id', - 'page_number', 'import_api_url_requested', 'import_api_status_code', - 'import_exception_thrown', 'import_datetime', 'import_timestamp', - 'import_year', 'import_month', 'import_day', 'import_date'] + response = self.parse_json_into_dataframe( + spark, + "contacts", + [ + { + "contacts": '{"id": "34607",' + ' "creation_user_id": null,' + ' "title_id": "4"}' + } + ], + ) + expected = [ + "id", + "creation_user_id", + "title_id", + "page_number", + "import_api_url_requested", + "import_api_status_code", + "import_exception_thrown", + "import_datetime", + "import_timestamp", + "import_year", + "import_month", + "import_day", + "import_date", + ] TestCase().assertCountEqual(list(response[0]), expected) def test_parsed_row_data(self, spark): - response = self.parse_json_into_dataframe(spark, 'contacts', [{'contacts': '{"id": "34607",' - ' "creation_user_id": null,' - ' "title_id": "4"}'}]) - expected = {'id': '34607', 'creation_user_id': None, 'title_id': '4', 'page_number': 691, - 'import_api_url_requested': 'https://hackney-planning.tascomi.com/rest/v1/contacts?page=691', - 'import_api_status_code': 200, 'import_exception_thrown': '', - 'import_datetime': datetime(2021, 9, 16, 13, 10), 'import_timestamp': '1631797859.247579', - 'import_year': '2021', 'import_month': '09', 'import_day': '16', - 'import_date': '20210916'} + response = self.parse_json_into_dataframe( + spark, + "contacts", + [ + { + "contacts": '{"id": "34607",' + ' "creation_user_id": null,' + ' "title_id": "4"}' + } + ], + ) + expected = { + "id": "34607", + "creation_user_id": None, + "title_id": "4", + "page_number": 691, + "import_api_url_requested": "https://hackney-planning.idoxcloud.com/rest/v1/contacts?page=691", + "import_api_status_code": 200, + "import_exception_thrown": "", + "import_datetime": datetime(2021, 9, 16, 13, 10), + "import_timestamp": "1631797859.247579", + "import_year": "2021", + "import_month": "09", + "import_day": "16", + "import_date": "20210916", + } assertions.dictionaryContains(response[0], expected) def parse_json_into_dataframe(self, spark, column, data): - data_with_imports = [{'page_number': 691, - 'import_api_url_requested': 'https://hackney-planning.tascomi.com/rest/v1/contacts?page=691', - 'import_api_status_code': 200, 'import_exception_thrown': '', - 'import_datetime': datetime(2021, 9, 16, 13, 10), 'import_timestamp': '1631797859.247579', - 'import_year': '2021', 'import_month': '09', 'import_day': '16', - 'import_date': '20210916', **i} for i in data] + data_with_imports = [ + { + "page_number": 691, + "import_api_url_requested": "https://hackney-planning.idoxcloud.com/rest/v1/contacts?page=691", + "import_api_status_code": 200, + "import_exception_thrown": "", + "import_datetime": datetime(2021, 9, 16, 13, 10), + "import_timestamp": "1631797859.247579", + "import_year": "2021", + "import_month": "09", + "import_day": "16", + "import_date": "20210916", + **i, + } + for i in data + ] query_data = spark.createDataFrame( - spark.sparkContext.parallelize( - [Row(**i) for i in data_with_imports] - ) + spark.sparkContext.parallelize([Row(**i) for i in data_with_imports]) ) - return [row.asDict() for row in parse_json_into_dataframe(spark, column, query_data).rdd.collect()] + return [ + row.asDict() + for row in parse_json_into_dataframe( + spark, column, query_data + ).rdd.collect() + ]