Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 46 additions & 61 deletions lambdas/govnotify_api_ingestion_housing_lbh_communal_repairs/main.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
"""
Script to call the GovNotify API to retrieve data from the
Housing LBH Communal Repairs account and write to S3.
Retrieved data is written to S3 Landing as a json string and parquet file.
Retrieved data is written to S3 Landing as a json string.
Data is then normalised and written to s3 Raw for use by analysts.
Both zones are crawled so that data is exposed in the Glue data catalog.
Raw zone is crawled so that data is exposed in the Glue data catalog.
"""

from datetime import datetime
from io import BytesIO
import json
import logging
from os import getenv
import re

from botocore.exceptions import ClientError
import boto3
Expand Down Expand Up @@ -52,7 +53,21 @@ def initialise_notification_client(api_key):
Returns:
GovNotify Python API client instance
"""
return NotificationsAPIClient(api_key)
return NotificationsAPIClientAllJobs(api_key)


class NotificationsAPIClientAllJobs(NotificationsAPIClient):

def get_all_notifications_iterator_all_jobs(self, status=None, template_type=None, reference=None, older_than=None,
include_jobs=None):
result = self.get_all_notifications(status, template_type, reference, older_than, include_jobs)
notifications = result.get("notifications")
while notifications:
yield from notifications
next_link = result["links"].get("next")
notification_id = re.search("[0-F]{8}-[0-F]{4}-[0-F]{4}-[0-F]{4}-[0-F]{12}", next_link, re.I).group(0)
result = self.get_all_notifications(status, template_type, reference, notification_id, include_jobs)
notifications = result.get("notifications")


def get_response(query):
Expand All @@ -66,6 +81,10 @@ def get_response(query):
return response


def prepare_json(response):
return json.dumps(response).encode('utf-8')


def upload_to_s3(s3_bucket_name, s3_client, file_content, file_name):
"""
Upload file content to AWS S3.
Expand All @@ -86,23 +105,6 @@ def upload_to_s3(s3_bucket_name, s3_client, file_content, file_name):
logger.error(f"Error uploading {file_name} to S3: {str(e)}")


def json_to_parquet(response, label):
"""

Args:
response (dict): dict containing response from API
label (str): Name of the api endpoint 'table' retrieved.

Returns:
parquet buffer object

"""
df = pd.DataFrame.from_dict(response[label])
parquet_buffer = BytesIO()
df.to_parquet(parquet_buffer, index=False, engine='pyarrow')
return parquet_buffer


def json_to_parquet_normalised(response, label):
"""
Args:
Expand All @@ -111,17 +113,12 @@ def json_to_parquet_normalised(response, label):
return:
parquet buffer object
"""
data = json.loads(response)
df = pd.json_normalize(data[label], max_level=1)
df = pd.json_normalize(response)
parquet_buffer = BytesIO()
df.to_parquet(parquet_buffer, index=False, engine='pyarrow')
return parquet_buffer


def prepare_json(response):
return json.dumps(response).encode('utf-8')


def add_date_partition_key_to_s3_prefix(s3_prefix):
t = datetime.today()
partition_key = f"import_year={t.strftime('%Y')}/import_month={t.strftime('%m')}/import_day={t.strftime('%d')}/import_date={t.strftime('%Y%m%d')}/" # noqa
Expand All @@ -135,11 +132,11 @@ def lambda_handler(event, context):

api_secret_name = getenv("API_SECRET_NAME")
region_name = getenv("AWS_REGION")
file_name = 'notifications'

output_s3_bucket_landing = getenv("TARGET_S3_BUCKET_LANDING")
output_s3_bucket_raw = getenv("TARGET_S3_BUCKET_RAW")
output_folder = getenv("TARGET_S3_FOLDER")
crawler_landing = getenv("CRAWLER_NAME_LANDING")
crawler_raw = getenv("CRAWLER_NAME_RAW")

logger.info("Get API secret...")
Expand All @@ -148,40 +145,28 @@ def lambda_handler(event, context):
api_key = api_secret_json.get("api_key_live")
client = initialise_notification_client(api_key)

# GovNotify queries to retrieve
api_queries = ['notifications']
api_queries_dict = {
'notifications': {'query': client.get_all_notifications(include_jobs=True),
'file_name': 'notifications'}
}

logger.info("Get API responses...")
for api_query in api_queries:
query = api_queries_dict.get(api_query).get('query')
response = get_response(query)
file_name = api_queries_dict.get(api_query).get('file_name')

output_folder_json = add_date_partition_key_to_s3_prefix(f'{output_folder}{file_name}/json/')
output_folder_parquet = add_date_partition_key_to_s3_prefix(f'{output_folder}{file_name}/parquet/')

# convert response to json formatted string
json_str = prepare_json(response=response)

# Upload the json string to landing only
upload_to_s3(output_s3_bucket_landing, s3_client, json_str, f'{output_folder_json}{file_name}.json')

# Upload parquet buffer to both S3 landing and raw; run crawler
parquet_buffer_landing = json_to_parquet(response=response, label=file_name)
parquet_buffer_landing.seek(0)
s3_client.upload_fileobj(parquet_buffer_landing, output_s3_bucket_landing,
f'{output_folder_parquet}{file_name}.parquet')
glue_client.start_crawler(Name=f'{crawler_landing} {file_name}')

parquet_buffer_raw = json_to_parquet_normalised(response=json_str, label=file_name)
parquet_buffer_raw.seek(0)
s3_client.upload_fileobj(parquet_buffer_raw, output_s3_bucket_raw,
f'{output_folder_parquet}{file_name}.parquet')
glue_client.start_crawler(Name=f'{crawler_raw} {file_name}')
logger.info("Get all notifications through iterator...")

response = client.get_all_notifications_iterator_all_jobs(template_type='sms', include_jobs=True)

# write iterator items to a list as items will be finite
response_list = list(response)

output_folder_json = add_date_partition_key_to_s3_prefix(f'{output_folder}{file_name}/json/')
output_folder_parquet = add_date_partition_key_to_s3_prefix(f'{output_folder}{file_name}/parquet/')

# convert response to json formatted string
json_str = prepare_json(response=response_list)

# Upload the json string to landing only
upload_to_s3(output_s3_bucket_landing, s3_client, json_str, f'{output_folder_json}{file_name}.json')

# Upload parquet buffer to S3 raw; run crawler
parquet_buffer_raw = json_to_parquet_normalised(response=response_list, label=file_name)
parquet_buffer_raw.seek(0)
s3_client.upload_fileobj(parquet_buffer_raw, output_s3_bucket_raw,
f'{output_folder_parquet}{file_name}.parquet')
glue_client.start_crawler(Name=f'{crawler_raw} {file_name}')

logger.info("Job finished")

Expand Down
Loading