diff --git a/lambdas/govnotify_api_ingestion_housing_lbh_communal_repairs/main.py b/lambdas/govnotify_api_ingestion_housing_lbh_communal_repairs/main.py index a19ae7b3b..1d489a1dd 100644 --- a/lambdas/govnotify_api_ingestion_housing_lbh_communal_repairs/main.py +++ b/lambdas/govnotify_api_ingestion_housing_lbh_communal_repairs/main.py @@ -1,9 +1,9 @@ """ Script to call the GovNotify API to retrieve data from the Housing LBH Communal Repairs account and write to S3. -Retrieved data is written to S3 Landing as a json string and parquet file. +Retrieved data is written to S3 Landing as a json string. Data is then normalised and written to s3 Raw for use by analysts. -Both zones are crawled so that data is exposed in the Glue data catalog. +Raw zone is crawled so that data is exposed in the Glue data catalog. """ from datetime import datetime @@ -11,6 +11,7 @@ import json import logging from os import getenv +import re from botocore.exceptions import ClientError import boto3 @@ -52,7 +53,21 @@ def initialise_notification_client(api_key): Returns: GovNotify Python API client instance """ - return NotificationsAPIClient(api_key) + return NotificationsAPIClientAllJobs(api_key) + + +class NotificationsAPIClientAllJobs(NotificationsAPIClient): + + def get_all_notifications_iterator_all_jobs(self, status=None, template_type=None, reference=None, older_than=None, + include_jobs=None): + result = self.get_all_notifications(status, template_type, reference, older_than, include_jobs) + notifications = result.get("notifications") + while notifications: + yield from notifications + next_link = result["links"].get("next") + notification_id = re.search("[0-F]{8}-[0-F]{4}-[0-F]{4}-[0-F]{4}-[0-F]{12}", next_link, re.I).group(0) + result = self.get_all_notifications(status, template_type, reference, notification_id, include_jobs) + notifications = result.get("notifications") def get_response(query): @@ -66,6 +81,10 @@ def get_response(query): return response +def prepare_json(response): + return json.dumps(response).encode('utf-8') + + def upload_to_s3(s3_bucket_name, s3_client, file_content, file_name): """ Upload file content to AWS S3. @@ -86,23 +105,6 @@ def upload_to_s3(s3_bucket_name, s3_client, file_content, file_name): logger.error(f"Error uploading {file_name} to S3: {str(e)}") -def json_to_parquet(response, label): - """ - - Args: - response (dict): dict containing response from API - label (str): Name of the api endpoint 'table' retrieved. - - Returns: - parquet buffer object - - """ - df = pd.DataFrame.from_dict(response[label]) - parquet_buffer = BytesIO() - df.to_parquet(parquet_buffer, index=False, engine='pyarrow') - return parquet_buffer - - def json_to_parquet_normalised(response, label): """ Args: @@ -111,17 +113,12 @@ def json_to_parquet_normalised(response, label): return: parquet buffer object """ - data = json.loads(response) - df = pd.json_normalize(data[label], max_level=1) + df = pd.json_normalize(response) parquet_buffer = BytesIO() df.to_parquet(parquet_buffer, index=False, engine='pyarrow') return parquet_buffer -def prepare_json(response): - return json.dumps(response).encode('utf-8') - - def add_date_partition_key_to_s3_prefix(s3_prefix): t = datetime.today() partition_key = f"import_year={t.strftime('%Y')}/import_month={t.strftime('%m')}/import_day={t.strftime('%d')}/import_date={t.strftime('%Y%m%d')}/" # noqa @@ -135,11 +132,11 @@ def lambda_handler(event, context): api_secret_name = getenv("API_SECRET_NAME") region_name = getenv("AWS_REGION") + file_name = 'notifications' output_s3_bucket_landing = getenv("TARGET_S3_BUCKET_LANDING") output_s3_bucket_raw = getenv("TARGET_S3_BUCKET_RAW") output_folder = getenv("TARGET_S3_FOLDER") - crawler_landing = getenv("CRAWLER_NAME_LANDING") crawler_raw = getenv("CRAWLER_NAME_RAW") logger.info("Get API secret...") @@ -148,40 +145,28 @@ def lambda_handler(event, context): api_key = api_secret_json.get("api_key_live") client = initialise_notification_client(api_key) - # GovNotify queries to retrieve - api_queries = ['notifications'] - api_queries_dict = { - 'notifications': {'query': client.get_all_notifications(include_jobs=True), - 'file_name': 'notifications'} - } - - logger.info("Get API responses...") - for api_query in api_queries: - query = api_queries_dict.get(api_query).get('query') - response = get_response(query) - file_name = api_queries_dict.get(api_query).get('file_name') - - output_folder_json = add_date_partition_key_to_s3_prefix(f'{output_folder}{file_name}/json/') - output_folder_parquet = add_date_partition_key_to_s3_prefix(f'{output_folder}{file_name}/parquet/') - - # convert response to json formatted string - json_str = prepare_json(response=response) - - # Upload the json string to landing only - upload_to_s3(output_s3_bucket_landing, s3_client, json_str, f'{output_folder_json}{file_name}.json') - - # Upload parquet buffer to both S3 landing and raw; run crawler - parquet_buffer_landing = json_to_parquet(response=response, label=file_name) - parquet_buffer_landing.seek(0) - s3_client.upload_fileobj(parquet_buffer_landing, output_s3_bucket_landing, - f'{output_folder_parquet}{file_name}.parquet') - glue_client.start_crawler(Name=f'{crawler_landing} {file_name}') - - parquet_buffer_raw = json_to_parquet_normalised(response=json_str, label=file_name) - parquet_buffer_raw.seek(0) - s3_client.upload_fileobj(parquet_buffer_raw, output_s3_bucket_raw, - f'{output_folder_parquet}{file_name}.parquet') - glue_client.start_crawler(Name=f'{crawler_raw} {file_name}') + logger.info("Get all notifications through iterator...") + + response = client.get_all_notifications_iterator_all_jobs(template_type='sms', include_jobs=True) + + # write iterator items to a list as items will be finite + response_list = list(response) + + output_folder_json = add_date_partition_key_to_s3_prefix(f'{output_folder}{file_name}/json/') + output_folder_parquet = add_date_partition_key_to_s3_prefix(f'{output_folder}{file_name}/parquet/') + + # convert response to json formatted string + json_str = prepare_json(response=response_list) + + # Upload the json string to landing only + upload_to_s3(output_s3_bucket_landing, s3_client, json_str, f'{output_folder_json}{file_name}.json') + + # Upload parquet buffer to S3 raw; run crawler + parquet_buffer_raw = json_to_parquet_normalised(response=response_list, label=file_name) + parquet_buffer_raw.seek(0) + s3_client.upload_fileobj(parquet_buffer_raw, output_s3_bucket_raw, + f'{output_folder_parquet}{file_name}.parquet') + glue_client.start_crawler(Name=f'{crawler_raw} {file_name}') logger.info("Job finished")