diff --git a/lambdas/icaseworks_api_ingestion/main.py b/lambdas/icaseworks_api_ingestion/main.py index 0abef851a..ab6f978f6 100644 --- a/lambdas/icaseworks_api_ingestion/main.py +++ b/lambdas/icaseworks_api_ingestion/main.py @@ -1,19 +1,20 @@ -import sys - -sys.path.append('./lib/') - -import pybase64 -import json +import datetime import hashlib import hmac +import json +import logging import re -import requests +import sys import time +from os import getenv + import boto3 +import pybase64 +import requests from dotenv import load_dotenv -from os import getenv -import datetime -import logging + + +sys.path.append("./lib/") logger = logging.getLogger() logger.setLevel(logging.INFO) @@ -21,11 +22,9 @@ def remove_illegal_characters(string): """Removes illegal characters from string""" - regex_list = [['=', ""], ['\/', "_"], ['+', "-"]] + regex_list = [["=", ""], ["\/", "_"], ["+", "-"]] # noqa: W605 for r in regex_list: - string = re.sub(string=string, - pattern="[{}]".format(r[0]), - repl=r[1]) + string = re.sub(string=string, pattern="[{}]".format(r[0]), repl=r[1]) return string @@ -44,12 +43,14 @@ def dictionary_to_string(dictionary): def create_signature(header, payload, secret): """Encode JSON string""" # hashed header, hashed payload, string secret - unsigned_token = header + '.' + payload - key_bytes = bytes(secret, 'utf-8') - string_to_sign_bytes = bytes(unsigned_token, 'utf-8') - signature_hash = hmac.new(key_bytes, string_to_sign_bytes, digestmod=hashlib.sha256).digest() + unsigned_token = header + "." + payload + key_bytes = bytes(secret, "utf-8") + string_to_sign_bytes = bytes(unsigned_token, "utf-8") + signature_hash = hmac.new( + key_bytes, string_to_sign_bytes, digestmod=hashlib.sha256 + ).digest() encoded_signature = pybase64.b64encode(signature_hash) - encoded_signature = encoded_signature.decode('utf-8') + encoded_signature = encoded_signature.decode("utf-8") encoded_signature = remove_illegal_characters(encoded_signature) return encoded_signature @@ -57,24 +58,24 @@ def create_signature(header, payload, secret): def get_token(url, encoded_header, encoded_payload, signature, headers): """Get token""" assertion = encoded_header + "." + encoded_payload + "." + signature - data = f'assertion={assertion}&grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Ajwt-bearer' + data = f"assertion={assertion}&grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Ajwt-bearer" response = requests.post(url, headers=headers, data=data) response_json = response.json() - auth_token = response_json.get('access_token') + auth_token = response_json.get("access_token") return auth_token def get_icaseworks_report_from(report_id, from_date, auth_headers, auth_payload): report_url = "https://hackneyreports.icasework.com/getreport?" - request_url = f'{report_url}ReportId={report_id}&Format=json&From={from_date}' - logger.info(f'Request url: {request_url}') + request_url = f"{report_url}ReportId={report_id}&Format=json&From={from_date}" + logger.info(f"Request url: {request_url}") response = requests.get(request_url, headers=auth_headers, data=auth_payload) - logger.info(f'Status Code: {response.status_code}') + logger.info(f"Status Code: {response.status_code}") return response.content def write_dataframe_to_s3(s3_client, data, s3_bucket, output_folder, filename): - filename = re.sub('[^a-zA-Z0-9]+', '-', filename).lower() + filename = re.sub("[^a-zA-Z0-9]+", "-", filename).lower() current_date = datetime.datetime.now() day = single_digit_to_zero_prefixed_string(current_date.day) month = single_digit_to_zero_prefixed_string(current_date.month) @@ -83,7 +84,7 @@ def write_dataframe_to_s3(s3_client, data, s3_bucket, output_folder, filename): return s3_client.put_object( Bucket=s3_bucket, Body=data, - Key=f"{output_folder}/import_year={year}/import_month={month}/import_day={day}/import_date={date}/{filename}.json" + Key=f"{output_folder}/import_year={year}/import_month={month}/import_day={day}/import_date={date}/{filename}.json", ) @@ -102,14 +103,16 @@ def lambda_handler(event, lambda_context): url = "https://hackney.icasework.com/token" headers = { - 'Content-Type': 'application/x-www-form-urlencoded', + "Content-Type": "application/x-www-form-urlencoded", } # Get api api credentials from secrets manager secret_name = getenv("SECRET_NAME") - secrets_manager_client = boto3.client('secretsmanager') - api_credentials_response = retrieve_credentials_from_secrets_manager(secrets_manager_client, secret_name) - api_credentials = json.loads(api_credentials_response['SecretString']) + secrets_manager_client = boto3.client("secretsmanager") + api_credentials_response = retrieve_credentials_from_secrets_manager( + secrets_manager_client, secret_name + ) + api_credentials = json.loads(api_credentials_response["SecretString"]) api_key = api_credentials.get("api_key") secret = api_credentials.get("secret") @@ -122,11 +125,7 @@ def lambda_handler(event, lambda_context): # Create payload current_unix_time = int(time.time()) str_time = str(current_unix_time) - payload_object = { - "iss": api_key, - "aud": url, - "iat": str_time - } + payload_object = {"iss": api_key, "aud": url, "iat": str_time} payload_object = dictionary_to_string(payload_object) @@ -136,16 +135,21 @@ def lambda_handler(event, lambda_context): signature = create_signature(header, payload, secret) # Get token from response - auth_token = get_token(url=url, encoded_header=header, encoded_payload=payload, signature=signature, - headers=headers) + auth_token = get_token( + url=url, + encoded_header=header, + encoded_payload=payload, + signature=signature, + headers=headers, + ) # Create auth header for API Calls and auth payload - authorization = f'Bearer {auth_token}' + authorization = f"Bearer {auth_token}" auth_payload = {} auth_headers = { - 'Authorization': authorization, + "Authorization": authorization, } report_tables = [ @@ -164,22 +168,32 @@ def lambda_handler(event, lambda_context): date_to_track_from = today - datetime.timedelta(days=1) logger.info(f"Date to track from: {date_to_track_from}") - s3_client = boto3.client('s3') + s3_client = boto3.client("s3") for report_details in report_tables: - logger.info(f'Pulling report for {report_details["name"]}') + logger.info(f"Pulling report for {report_details['name']}") case_id_report_id = report_details["id"] - case_id_list = get_icaseworks_report_from(case_id_report_id, date_to_track_from, auth_headers, auth_payload) + case_id_list = get_icaseworks_report_from( + case_id_report_id, date_to_track_from, auth_headers, auth_payload + ) report_details["data"] = case_id_list - write_dataframe_to_s3(s3_client, report_details["data"], s3_bucket, output_folder_name, report_details["name"]) - logger.info(f'Finished writing report for {report_details["name"]} to S3') + write_dataframe_to_s3( + s3_client, + report_details["data"], + s3_bucket, + output_folder_name, + report_details["name"], + ) + logger.info(f"Finished writing report for {report_details['name']} to S3") # Trigger glue job to copy from landing to raw and convert to parquet - glue_client = boto3.client('glue') + glue_client = boto3.client("glue") start_glue_trigger(glue_client, glue_trigger_name) + def single_digit_to_zero_prefixed_string(value): - return str(value) if value > 9 else '0' + str(value) + return str(value) if value > 9 else "0" + str(value) + def start_glue_trigger(glue_client, trigger_name): trigger_details = glue_client.start_trigger(Name=trigger_name) diff --git a/lambdas/sftp_to_s3/index.js b/lambdas/sftp_to_s3/index.js index 0c2b7d18a..759f2dd8a 100644 --- a/lambdas/sftp_to_s3/index.js +++ b/lambdas/sftp_to_s3/index.js @@ -16,7 +16,7 @@ const config = { host: process.env.SFTP_HOST, username: process.env.SFTP_USERNAME, password: process.env.SFTP_PASSWORD, - port: 22 + port: 22, }; let fileNamePattern = ""; @@ -34,20 +34,20 @@ async function getImportFilenamePattern(manualOverrideDateString) { Settings.throwOnInvalid = true; DateTime.fromISO(manualOverrideDateString); - let parts = manualOverrideDateString.split('-'); + let parts = manualOverrideDateString.split("-"); dateToImport = new Date(parts[0], parts[1] - 1, parts[2]); } year = dateToImport.getFullYear().toString(); - month = (dateToImport.getMonth() + 1).toString().padStart(2, '0'); - day = dateToImport.getDate().toString().padStart(2, '0'); - date = `${year}-${month}-${day}` + month = (dateToImport.getMonth() + 1).toString().padStart(2, "0"); + day = dateToImport.getDate().toString().padStart(2, "0"); + date = `${year}-${month}-${day}`; fileNamePattern = `${sftpSourceFilePrefix}${date}`; } async function findFiles(sftpConn) { - console.log(`filepath on server: ${sftpFilePath}`) + console.log(`filepath on server: ${sftpFilePath}`); const validPath = await sftpConn.exists(sftpFilePath); @@ -55,7 +55,7 @@ async function findFiles(sftpConn) { return { success: false, message: `Path ${sftpFilePath} doesn't exist on SFTP server`, - fileNames: [] + fileNames: [], }; } @@ -68,7 +68,7 @@ async function findFiles(sftpConn) { function filterByFileNamePattern(file) { let name = file.name.toLowerCase(); return name.includes(fileNamePattern.toLowerCase()); - } + }, ); if (fileList.length === 0) { @@ -79,12 +79,14 @@ async function findFiles(sftpConn) { }; } - const fileNames = fileList.filter(file => file.type != 'd').map(file => file.name); + const fileNames = fileList + .filter((file) => file.type != "d") + .map((file) => file.name); console.log(fileNames); return { success: true, fileNames, - message: "" + message: "", }; } @@ -92,13 +94,15 @@ async function checkS3ForFile() { const s3Client = new AWS.S3({ region: AWS_REGION }); const params = { Bucket: s3Bucket, - Key: `${s3TargetFolder}/import_year=${year}/import_month=${month}/import_day=${day}/import_date=${date}/${sftpSourceFilePrefix}${date}.${sftpSourceFileExtension}` + Key: `${s3TargetFolder}/import_year=${year}/import_month=${month}/import_day=${day}/import_date=${date}/${sftpSourceFilePrefix}${date}.${sftpSourceFileExtension}`, }; try { await s3Client.headObject(params).promise(); return true; } catch (error) { - console.log(`Today's ${s3TargetFolder} file not yet present in S3 bucket, retrieving file from SFTP`) + console.log( + `Today's ${s3TargetFolder} file not yet present in S3 bucket, retrieving file from SFTP`, + ); return false; } } @@ -109,7 +113,7 @@ function putFile() { const params = { Bucket: s3Bucket, Key: `${s3TargetFolder}/import_year=${year}/import_month=${month}/import_day=${day}/import_date=${date}/${sftpSourceFilePrefix}${date}.${sftpSourceFileExtension}`, - Body: stream + Body: stream, }; const upload = s3Client.upload(params); @@ -129,8 +133,7 @@ async function streamFileFromSftpToS3(sftp, fileName) { } exports.handler = async (event) => { - - let manualOverrideDateString = event['DateToImport']; + let manualOverrideDateString = event["DateToImport"]; console.log(`Manual override date: ${manualOverrideDateString}`); @@ -139,8 +142,13 @@ exports.handler = async (event) => { const sftp = new sftpClient(); if (await checkS3ForFile()) { - console.log(`Today's ${s3TargetFolder} file is already present in S3 bucket!`); - return { success: true, message: `File already found in s3, no further action taken` }; + console.log( + `Today's ${s3TargetFolder} file is already present in S3 bucket!`, + ); + return { + success: true, + message: `File already found in s3, no further action taken`, + }; } await sftp.connect(config); @@ -153,26 +161,33 @@ exports.handler = async (event) => { console.log(findFilesResponse); return { success: findFilesResponse.success, - message: findFilesResponse.message + message: findFilesResponse.message, }; } - await Promise.all(findFilesResponse.fileNames.map(file => streamFileFromSftpToS3(sftp, file))); + await Promise.all( + findFilesResponse.fileNames.map((file) => + streamFileFromSftpToS3(sftp, file), + ), + ); //start trigger - const glue = new AWS.Glue({ apiVersion: '2017-03-31' }); + const glue = new AWS.Glue({ apiVersion: "2017-03-31" }); const params = { - Name: trigger_to_run + Name: trigger_to_run, }; - console.log("Starting trigger with params", params) + console.log("Starting trigger with params", params); await glue.startTrigger(params).promise(); console.log("Success!"); - return { success: true, message: `Successfully uploaded ${findFilesResponse.fileNames.length} file(s) to s3` }; + return { + success: true, + message: `Successfully uploaded ${findFilesResponse.fileNames.length} file(s) to s3`, + }; } catch (error) { console.error(error.message); } finally { await sftp.end(); } -} +}; diff --git a/terraform/modules/api-ingestion-lambda/10-lambda.tf b/terraform/modules/api-ingestion-lambda/10-lambda.tf index 6a350550d..c28c53f3b 100644 --- a/terraform/modules/api-ingestion-lambda/10-lambda.tf +++ b/terraform/modules/api-ingestion-lambda/10-lambda.tf @@ -129,7 +129,7 @@ resource "aws_s3_object" "lambda" { source_hash = null_resource.run_install_requirements.triggers["dir_sha1"] depends_on = [data.archive_file.lambda] metadata = { - last_updated = data.archive_file.lambda.output_base64sha256 + last_updated = null_resource.run_install_requirements.triggers.dir_sha1 } }