Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 60 additions & 46 deletions lambdas/icaseworks_api_ingestion/main.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,30 @@
import sys

sys.path.append('./lib/')

import pybase64
import json
import datetime
import hashlib
import hmac
import json
import logging
import re
import requests
import sys
import time
from os import getenv

import boto3
import pybase64
import requests
from dotenv import load_dotenv
from os import getenv
import datetime
import logging


sys.path.append("./lib/")

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def remove_illegal_characters(string):
"""Removes illegal characters from string"""
regex_list = [['=', ""], ['\/', "_"], ['+', "-"]]
regex_list = [["=", ""], ["\/", "_"], ["+", "-"]] # noqa: W605
for r in regex_list:
string = re.sub(string=string,
pattern="[{}]".format(r[0]),
repl=r[1])
string = re.sub(string=string, pattern="[{}]".format(r[0]), repl=r[1])
return string


Expand All @@ -44,37 +43,39 @@ def dictionary_to_string(dictionary):
def create_signature(header, payload, secret):
"""Encode JSON string"""
# hashed header, hashed payload, string secret
unsigned_token = header + '.' + payload
key_bytes = bytes(secret, 'utf-8')
string_to_sign_bytes = bytes(unsigned_token, 'utf-8')
signature_hash = hmac.new(key_bytes, string_to_sign_bytes, digestmod=hashlib.sha256).digest()
unsigned_token = header + "." + payload
key_bytes = bytes(secret, "utf-8")
string_to_sign_bytes = bytes(unsigned_token, "utf-8")
signature_hash = hmac.new(
key_bytes, string_to_sign_bytes, digestmod=hashlib.sha256
).digest()
encoded_signature = pybase64.b64encode(signature_hash)
encoded_signature = encoded_signature.decode('utf-8')
encoded_signature = encoded_signature.decode("utf-8")
encoded_signature = remove_illegal_characters(encoded_signature)
return encoded_signature


def get_token(url, encoded_header, encoded_payload, signature, headers):
"""Get token"""
assertion = encoded_header + "." + encoded_payload + "." + signature
data = f'assertion={assertion}&grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Ajwt-bearer'
data = f"assertion={assertion}&grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Ajwt-bearer"
response = requests.post(url, headers=headers, data=data)
response_json = response.json()
auth_token = response_json.get('access_token')
auth_token = response_json.get("access_token")
return auth_token


def get_icaseworks_report_from(report_id, from_date, auth_headers, auth_payload):
report_url = "https://hackneyreports.icasework.com/getreport?"
request_url = f'{report_url}ReportId={report_id}&Format=json&From={from_date}'
logger.info(f'Request url: {request_url}')
request_url = f"{report_url}ReportId={report_id}&Format=json&From={from_date}"
logger.info(f"Request url: {request_url}")
response = requests.get(request_url, headers=auth_headers, data=auth_payload)
logger.info(f'Status Code: {response.status_code}')
logger.info(f"Status Code: {response.status_code}")
return response.content


def write_dataframe_to_s3(s3_client, data, s3_bucket, output_folder, filename):
filename = re.sub('[^a-zA-Z0-9]+', '-', filename).lower()
filename = re.sub("[^a-zA-Z0-9]+", "-", filename).lower()
current_date = datetime.datetime.now()
day = single_digit_to_zero_prefixed_string(current_date.day)
month = single_digit_to_zero_prefixed_string(current_date.month)
Expand All @@ -83,7 +84,7 @@ def write_dataframe_to_s3(s3_client, data, s3_bucket, output_folder, filename):
return s3_client.put_object(
Bucket=s3_bucket,
Body=data,
Key=f"{output_folder}/import_year={year}/import_month={month}/import_day={day}/import_date={date}/{filename}.json"
Key=f"{output_folder}/import_year={year}/import_month={month}/import_day={day}/import_date={date}/{filename}.json",
)


Expand All @@ -102,14 +103,16 @@ def lambda_handler(event, lambda_context):
url = "https://hackney.icasework.com/token"

headers = {
'Content-Type': 'application/x-www-form-urlencoded',
"Content-Type": "application/x-www-form-urlencoded",
}

# Get api api credentials from secrets manager
secret_name = getenv("SECRET_NAME")
secrets_manager_client = boto3.client('secretsmanager')
api_credentials_response = retrieve_credentials_from_secrets_manager(secrets_manager_client, secret_name)
api_credentials = json.loads(api_credentials_response['SecretString'])
secrets_manager_client = boto3.client("secretsmanager")
api_credentials_response = retrieve_credentials_from_secrets_manager(
secrets_manager_client, secret_name
)
api_credentials = json.loads(api_credentials_response["SecretString"])
api_key = api_credentials.get("api_key")
secret = api_credentials.get("secret")

Expand All @@ -122,11 +125,7 @@ def lambda_handler(event, lambda_context):
# Create payload
current_unix_time = int(time.time())
str_time = str(current_unix_time)
payload_object = {
"iss": api_key,
"aud": url,
"iat": str_time
}
payload_object = {"iss": api_key, "aud": url, "iat": str_time}

payload_object = dictionary_to_string(payload_object)

Expand All @@ -136,16 +135,21 @@ def lambda_handler(event, lambda_context):
signature = create_signature(header, payload, secret)

# Get token from response
auth_token = get_token(url=url, encoded_header=header, encoded_payload=payload, signature=signature,
headers=headers)
auth_token = get_token(
url=url,
encoded_header=header,
encoded_payload=payload,
signature=signature,
headers=headers,
)

# Create auth header for API Calls and auth payload
authorization = f'Bearer {auth_token}'
authorization = f"Bearer {auth_token}"

auth_payload = {}

auth_headers = {
'Authorization': authorization,
"Authorization": authorization,
}

report_tables = [
Expand All @@ -164,22 +168,32 @@ def lambda_handler(event, lambda_context):
date_to_track_from = today - datetime.timedelta(days=1)
logger.info(f"Date to track from: {date_to_track_from}")

s3_client = boto3.client('s3')
s3_client = boto3.client("s3")

for report_details in report_tables:
logger.info(f'Pulling report for {report_details["name"]}')
logger.info(f"Pulling report for {report_details['name']}")
case_id_report_id = report_details["id"]
case_id_list = get_icaseworks_report_from(case_id_report_id, date_to_track_from, auth_headers, auth_payload)
case_id_list = get_icaseworks_report_from(
case_id_report_id, date_to_track_from, auth_headers, auth_payload
)
report_details["data"] = case_id_list
write_dataframe_to_s3(s3_client, report_details["data"], s3_bucket, output_folder_name, report_details["name"])
logger.info(f'Finished writing report for {report_details["name"]} to S3')
write_dataframe_to_s3(
s3_client,
report_details["data"],
s3_bucket,
output_folder_name,
report_details["name"],
)
logger.info(f"Finished writing report for {report_details['name']} to S3")

# Trigger glue job to copy from landing to raw and convert to parquet
glue_client = boto3.client('glue')
glue_client = boto3.client("glue")
start_glue_trigger(glue_client, glue_trigger_name)


def single_digit_to_zero_prefixed_string(value):
return str(value) if value > 9 else '0' + str(value)
return str(value) if value > 9 else "0" + str(value)


def start_glue_trigger(glue_client, trigger_name):
trigger_details = glue_client.start_trigger(Name=trigger_name)
Expand Down
63 changes: 39 additions & 24 deletions lambdas/sftp_to_s3/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const config = {
host: process.env.SFTP_HOST,
username: process.env.SFTP_USERNAME,
password: process.env.SFTP_PASSWORD,
port: 22
port: 22,
};

let fileNamePattern = "";
Expand All @@ -34,28 +34,28 @@ async function getImportFilenamePattern(manualOverrideDateString) {
Settings.throwOnInvalid = true;
DateTime.fromISO(manualOverrideDateString);

let parts = manualOverrideDateString.split('-');
let parts = manualOverrideDateString.split("-");
dateToImport = new Date(parts[0], parts[1] - 1, parts[2]);
}

year = dateToImport.getFullYear().toString();
month = (dateToImport.getMonth() + 1).toString().padStart(2, '0');
day = dateToImport.getDate().toString().padStart(2, '0');
date = `${year}-${month}-${day}`
month = (dateToImport.getMonth() + 1).toString().padStart(2, "0");
day = dateToImport.getDate().toString().padStart(2, "0");
date = `${year}-${month}-${day}`;

fileNamePattern = `${sftpSourceFilePrefix}${date}`;
}

async function findFiles(sftpConn) {
console.log(`filepath on server: ${sftpFilePath}`)
console.log(`filepath on server: ${sftpFilePath}`);

const validPath = await sftpConn.exists(sftpFilePath);

if (!validPath) {
return {
success: false,
message: `Path ${sftpFilePath} doesn't exist on SFTP server`,
fileNames: []
fileNames: [],
};
}

Expand All @@ -68,7 +68,7 @@ async function findFiles(sftpConn) {
function filterByFileNamePattern(file) {
let name = file.name.toLowerCase();
return name.includes(fileNamePattern.toLowerCase());
}
},
);

if (fileList.length === 0) {
Expand All @@ -79,26 +79,30 @@ async function findFiles(sftpConn) {
};
}

const fileNames = fileList.filter(file => file.type != 'd').map(file => file.name);
const fileNames = fileList
.filter((file) => file.type != "d")
.map((file) => file.name);
console.log(fileNames);
return {
success: true,
fileNames,
message: ""
message: "",
};
}

async function checkS3ForFile() {
const s3Client = new AWS.S3({ region: AWS_REGION });
const params = {
Bucket: s3Bucket,
Key: `${s3TargetFolder}/import_year=${year}/import_month=${month}/import_day=${day}/import_date=${date}/${sftpSourceFilePrefix}${date}.${sftpSourceFileExtension}`
Key: `${s3TargetFolder}/import_year=${year}/import_month=${month}/import_day=${day}/import_date=${date}/${sftpSourceFilePrefix}${date}.${sftpSourceFileExtension}`,
};
try {
await s3Client.headObject(params).promise();
return true;
} catch (error) {
console.log(`Today's ${s3TargetFolder} file not yet present in S3 bucket, retrieving file from SFTP`)
console.log(
`Today's ${s3TargetFolder} file not yet present in S3 bucket, retrieving file from SFTP`,
);
return false;
}
}
Expand All @@ -109,7 +113,7 @@ function putFile() {
const params = {
Bucket: s3Bucket,
Key: `${s3TargetFolder}/import_year=${year}/import_month=${month}/import_day=${day}/import_date=${date}/${sftpSourceFilePrefix}${date}.${sftpSourceFileExtension}`,
Body: stream
Body: stream,
};

const upload = s3Client.upload(params);
Expand All @@ -129,8 +133,7 @@ async function streamFileFromSftpToS3(sftp, fileName) {
}

exports.handler = async (event) => {

let manualOverrideDateString = event['DateToImport'];
let manualOverrideDateString = event["DateToImport"];

console.log(`Manual override date: ${manualOverrideDateString}`);

Expand All @@ -139,8 +142,13 @@ exports.handler = async (event) => {
const sftp = new sftpClient();

if (await checkS3ForFile()) {
console.log(`Today's ${s3TargetFolder} file is already present in S3 bucket!`);
return { success: true, message: `File already found in s3, no further action taken` };
console.log(
`Today's ${s3TargetFolder} file is already present in S3 bucket!`,
);
return {
success: true,
message: `File already found in s3, no further action taken`,
};
}

await sftp.connect(config);
Expand All @@ -153,26 +161,33 @@ exports.handler = async (event) => {
console.log(findFilesResponse);
return {
success: findFilesResponse.success,
message: findFilesResponse.message
message: findFilesResponse.message,
};
}

await Promise.all(findFilesResponse.fileNames.map(file => streamFileFromSftpToS3(sftp, file)));
await Promise.all(
findFilesResponse.fileNames.map((file) =>
streamFileFromSftpToS3(sftp, file),
),
);

//start trigger
const glue = new AWS.Glue({ apiVersion: '2017-03-31' });
const glue = new AWS.Glue({ apiVersion: "2017-03-31" });
const params = {
Name: trigger_to_run
Name: trigger_to_run,
};
console.log("Starting trigger with params", params)
console.log("Starting trigger with params", params);

await glue.startTrigger(params).promise();

console.log("Success!");
return { success: true, message: `Successfully uploaded ${findFilesResponse.fileNames.length} file(s) to s3` };
return {
success: true,
message: `Successfully uploaded ${findFilesResponse.fileNames.length} file(s) to s3`,
};
} catch (error) {
console.error(error.message);
} finally {
await sftp.end();
}
}
};
2 changes: 1 addition & 1 deletion terraform/modules/api-ingestion-lambda/10-lambda.tf
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ resource "aws_s3_object" "lambda" {
source_hash = null_resource.run_install_requirements.triggers["dir_sha1"]
depends_on = [data.archive_file.lambda]
metadata = {
last_updated = data.archive_file.lambda.output_base64sha256
last_updated = null_resource.run_install_requirements.triggers.dir_sha1
}
}

Expand Down