From 64330b783a298f96482dcba3c78479751023d796 Mon Sep 17 00:00:00 2001 From: AGibson <4319494+annajgibson@users.noreply.github.com> Date: Fri, 14 Mar 2025 15:42:23 +0000 Subject: [PATCH] Add a new Lambda and terraform file for a task to call the GovNotify API for LBH Communal Repairs. --- .../main.py | 190 ++++++++++++++++++ ...-ingestion-housing-lbh-communal-repairs.tf | 131 ++++++++++++ 2 files changed, 321 insertions(+) create mode 100644 lambdas/govnotify_api_ingestion_housing_lbh_communal_repairs/main.py create mode 100644 terraform/etl/49-lambda-gov-notify-ingestion-housing-lbh-communal-repairs.tf diff --git a/lambdas/govnotify_api_ingestion_housing_lbh_communal_repairs/main.py b/lambdas/govnotify_api_ingestion_housing_lbh_communal_repairs/main.py new file mode 100644 index 000000000..a19ae7b3b --- /dev/null +++ b/lambdas/govnotify_api_ingestion_housing_lbh_communal_repairs/main.py @@ -0,0 +1,190 @@ +""" +Script to call the GovNotify API to retrieve data from the +Housing LBH Communal Repairs account and write to S3. +Retrieved data is written to S3 Landing as a json string and parquet file. +Data is then normalised and written to s3 Raw for use by analysts. +Both zones are crawled so that data is exposed in the Glue data catalog. +""" + +from datetime import datetime +from io import BytesIO +import json +import logging +from os import getenv + +from botocore.exceptions import ClientError +import boto3 +from notifications_python_client.notifications import NotificationsAPIClient +from notifications_python_client.errors import HTTPError +import pandas as pd + +# Set up logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def initialize_s3_client(): + """ + Initialise and return an AWS S3 client using default credentials. + + Returns: + boto3.client: S3 client instance. + """ + return boto3.client('s3') + + +def get_api_secret(api_secret_name, region_name): + session = boto3.session.Session() + client = session.client(service_name="secretsmanager", region_name=region_name) + try: + get_secret_value_response = client.get_secret_value(SecretId=api_secret_name) + except ClientError as e: + raise e + return get_secret_value_response["SecretString"] + + +def initialise_notification_client(api_key): + """ + Initialise and return a GovNotify Python API client using api key (secret). + Args: + api_key (str) + + Returns: + GovNotify Python API client instance + """ + return NotificationsAPIClient(api_key) + + +def get_response(query): + try: + response = query + except HTTPError as e: + logger.error( + f"Error requesting response from {query}: {e}" + ) + raise + return response + + +def upload_to_s3(s3_bucket_name, s3_client, file_content, file_name): + """ + Upload file content to AWS S3. + + Args: + s3_bucket_name (): Name of S3 bucket. + s3_client (boto3.client): S3 client instance. + file_content (bytes): File content as bytes. + file_name (str): Name of the file in S3. + + Returns: + None + """ + try: + s3_client.put_object(Bucket=s3_bucket_name, Key=file_name, Body=file_content) + logger.info(f"Uploaded {file_name} to S3") + except Exception as e: + logger.error(f"Error uploading {file_name} to S3: {str(e)}") + + +def json_to_parquet(response, label): + """ + + Args: + response (dict): dict containing response from API + label (str): Name of the api endpoint 'table' retrieved. + + Returns: + parquet buffer object + + """ + df = pd.DataFrame.from_dict(response[label]) + parquet_buffer = BytesIO() + df.to_parquet(parquet_buffer, index=False, engine='pyarrow') + return parquet_buffer + + +def json_to_parquet_normalised(response, label): + """ + Args: + response (json str): json string containing json response from API + label (str): Name of the api endpoint 'table' retrieved. + return: + parquet buffer object + """ + data = json.loads(response) + df = pd.json_normalize(data[label], max_level=1) + parquet_buffer = BytesIO() + df.to_parquet(parquet_buffer, index=False, engine='pyarrow') + return parquet_buffer + + +def prepare_json(response): + return json.dumps(response).encode('utf-8') + + +def add_date_partition_key_to_s3_prefix(s3_prefix): + t = datetime.today() + partition_key = f"import_year={t.strftime('%Y')}/import_month={t.strftime('%m')}/import_day={t.strftime('%d')}/import_date={t.strftime('%Y%m%d')}/" # noqa + return f"{s3_prefix}{partition_key}" + + +def lambda_handler(event, context): + logger.info("Set up S3 client...") + s3_client = boto3.client('s3') + glue_client = boto3.client('glue') + + api_secret_name = getenv("API_SECRET_NAME") + region_name = getenv("AWS_REGION") + + output_s3_bucket_landing = getenv("TARGET_S3_BUCKET_LANDING") + output_s3_bucket_raw = getenv("TARGET_S3_BUCKET_RAW") + output_folder = getenv("TARGET_S3_FOLDER") + crawler_landing = getenv("CRAWLER_NAME_LANDING") + crawler_raw = getenv("CRAWLER_NAME_RAW") + + logger.info("Get API secret...") + api_secret_string = get_api_secret(api_secret_name, region_name) + api_secret_json = json.loads(api_secret_string) + api_key = api_secret_json.get("api_key_live") + client = initialise_notification_client(api_key) + + # GovNotify queries to retrieve + api_queries = ['notifications'] + api_queries_dict = { + 'notifications': {'query': client.get_all_notifications(include_jobs=True), + 'file_name': 'notifications'} + } + + logger.info("Get API responses...") + for api_query in api_queries: + query = api_queries_dict.get(api_query).get('query') + response = get_response(query) + file_name = api_queries_dict.get(api_query).get('file_name') + + output_folder_json = add_date_partition_key_to_s3_prefix(f'{output_folder}{file_name}/json/') + output_folder_parquet = add_date_partition_key_to_s3_prefix(f'{output_folder}{file_name}/parquet/') + + # convert response to json formatted string + json_str = prepare_json(response=response) + + # Upload the json string to landing only + upload_to_s3(output_s3_bucket_landing, s3_client, json_str, f'{output_folder_json}{file_name}.json') + + # Upload parquet buffer to both S3 landing and raw; run crawler + parquet_buffer_landing = json_to_parquet(response=response, label=file_name) + parquet_buffer_landing.seek(0) + s3_client.upload_fileobj(parquet_buffer_landing, output_s3_bucket_landing, + f'{output_folder_parquet}{file_name}.parquet') + glue_client.start_crawler(Name=f'{crawler_landing} {file_name}') + + parquet_buffer_raw = json_to_parquet_normalised(response=json_str, label=file_name) + parquet_buffer_raw.seek(0) + s3_client.upload_fileobj(parquet_buffer_raw, output_s3_bucket_raw, + f'{output_folder_parquet}{file_name}.parquet') + glue_client.start_crawler(Name=f'{crawler_raw} {file_name}') + + logger.info("Job finished") + + +if __name__ == "__main__": + lambda_handler("event", "lambda_context") diff --git a/terraform/etl/49-lambda-gov-notify-ingestion-housing-lbh-communal-repairs.tf b/terraform/etl/49-lambda-gov-notify-ingestion-housing-lbh-communal-repairs.tf new file mode 100644 index 000000000..badace428 --- /dev/null +++ b/terraform/etl/49-lambda-gov-notify-ingestion-housing-lbh-communal-repairs.tf @@ -0,0 +1,131 @@ +locals { + govnotify_tables_housing_communal_repairs = ["notifications"] +} + + +data "aws_iam_policy_document" "gov_notify_housing_communal_repairs_lambda_secret_access" { + statement { + actions = [ + "secretsmanager:GetSecretValue", + ] + effect = "Allow" + resources = [ + "arn:aws:secretsmanager:eu-west-2:${data.aws_caller_identity.data_platform.account_id}:secret:housing-lbh-communal-repairs/gov-notify*" + ] + } +} + +resource "aws_iam_policy" "gov_notify_housing_communal_repairs_lambda_secret_access" { + count = local.create_govnotify_resource_count + name = "gov_notify_housing_communal_repairs_lambda_secret_access" + policy = data.aws_iam_policy_document.gov_notify_housing_communal_repairs_lambda_secret_access.json +} + +resource "aws_iam_role_policy_attachment" "gov_notify_housing_communal_repairs_lambda_secret_access" { + count = local.create_govnotify_resource_count + role = aws_iam_role.housing_gov_notify_ingestion[0].name + policy_arn = aws_iam_policy.gov_notify_housing_communal_repairs_lambda_secret_access[0].arn +} + +module "gov-notify-ingestion-housing-communal-repairs" { + count = local.create_govnotify_resource_count + source = "../modules/aws-lambda" + tags = module.tags.values + lambda_name = "govnotify_api_ingestion_housing_lbh_communal_repairs" + lambda_role_arn = aws_iam_role.housing_gov_notify_ingestion[0].arn + identifier_prefix = local.short_identifier_prefix + handler = "main.lambda_handler" + lambda_artefact_storage_bucket = module.lambda_artefact_storage_data_source.bucket_id + s3_key = "govnotify_api_ingestion_housing_lbh_communal_repairs.zip" + lambda_source_dir = "../../lambdas/govnotify_api_ingestion_housing_lbh_communal_repairs" + lambda_output_path = "../../lambdas/govnotify_api_ingestion_housing_lbh_communal_repairs.zip" + runtime = "python3.9" + environment_variables = { + + API_SECRET_NAME = "housing-lbh-communal-repairs/gov-notify_live_api_key" + TARGET_S3_BUCKET_LANDING = module.landing_zone_data_source.bucket_id + TARGET_S3_FOLDER = "housing/govnotify/lbh_communal_repairs/" + CRAWLER_NAME_LANDING = "${local.short_identifier_prefix}GovNotify Housing LBH Communal Repairs Landing Zone" + TARGET_S3_BUCKET_RAW = module.raw_zone_data_source.bucket_id + CRAWLER_NAME_RAW = "${local.short_identifier_prefix}GovNotify Housing LBH Communal Repairs Raw Zone" + } + layers = [ + "arn:aws:lambda:eu-west-2:336392948345:layer:AWSSDKPandas-Python39:13", + "arn:aws:lambda:eu-west-2:${data.aws_caller_identity.data_platform.account_id}:layer:notifications-python-client-9-0-0-layer:1", + "arn:aws:lambda:eu-west-2:${data.aws_caller_identity.data_platform.account_id}:layer:urllib3-1-26-18-layer:1" + ] +} + +resource "aws_cloudwatch_event_rule" "govnotify_housing_lbh_communal_repairs_trigger_event" { + count = local.create_govnotify_resource_count + name = "${local.short_identifier_prefix}govnotify_housing_lbh_communal_repairs_trigger_event" + description = "Trigger event for Housing LBH Communal Repairs GovNotify API ingestion" + schedule_expression = "cron(0 0 * * ? *)" + is_enabled = local.is_production_environment ? true : false + tags = module.tags.values +} + +resource "aws_lambda_permission" "allow_cloudwatch_to_call_govnotify_housing_lbh_communal_repairs" { + statement_id = "AllowCloudWatchToInvokeGovNotifyFunction" + action = "lambda:InvokeFunction" + function_name = module.gov-notify-ingestion-housing-communal-repairs[0].lambda_function_arn + principal = "events.amazonaws.com" + source_arn = aws_cloudwatch_event_rule.govnotify_housing_lbh_communal_repairs_trigger_event[0].arn +} + +resource "aws_cloudwatch_event_target" "govnotify_housing_lbh_communal_repairs_trigger_event_target" { + count = local.create_govnotify_resource_count + rule = aws_cloudwatch_event_rule.govnotify_housing_lbh_communal_repairs_trigger_event[0].name + target_id = "govnotify-housing-communal-repairs-event-target" + arn = module.gov-notify-ingestion-housing-communal-repairs[0].lambda_function_arn + input = < source} + + database_name = "${local.identifier_prefix}-landing-zone-database" + name = "${local.short_identifier_prefix}GovNotify Housing LBH Communal Repairs Landing Zone ${each.value}" + role = data.aws_iam_role.glue_role.arn + tags = module.tags.values + table_prefix = "housing_lbh_communal_repairs_${each.value}_" + + s3_target { + path = "s3://${module.landing_zone_data_source.bucket_id}/housing/govnotify/lbh_communal_repairs/${each.value}/" + } + configuration = jsonencode({ + Version = 1.0 + Grouping = { + TableLevelConfiguration = 5 + } + }) +} + +resource "aws_glue_crawler" "govnotify_housing_lbh_communal_repairs_raw_zone" { + for_each = {for idx, source in local.govnotify_tables_housing_communal_repairs : idx => source} + + database_name = module.department_housing_data_source.raw_zone_catalog_database_name + name = "${local.short_identifier_prefix}GovNotify Housing LBH Communal Repairs Raw Zone ${each.value}" + role = data.aws_iam_role.glue_role.arn + tags = module.tags.values + table_prefix = null + + s3_target { + path = "s3://${module.raw_zone_data_source.bucket_id}/housing/govnotify/lbh_communal_repairs/${each.value}/" + } + configuration = jsonencode({ + Version = 1.0 + Grouping = { + TableLevelConfiguration = 4 + } + }) +} +