From b45dcf48076923c6657bff4bb68610f9af03b900 Mon Sep 17 00:00:00 2001 From: Huu Do Date: Wed, 22 Jan 2025 17:16:32 +0000 Subject: [PATCH 1/6] Adding new iCaseworks ingest --- .../icaseworks_ingest_to_raw.py | 283 ++++++++++++++++++ .../etl/55-aws-glue-icaseworks_ingest_etl.tf | 35 +++ 2 files changed, 318 insertions(+) create mode 100644 scripts/jobs/data_and_insight/icaseworks_ingest_to_raw.py create mode 100644 terraform/etl/55-aws-glue-icaseworks_ingest_etl.tf diff --git a/scripts/jobs/data_and_insight/icaseworks_ingest_to_raw.py b/scripts/jobs/data_and_insight/icaseworks_ingest_to_raw.py new file mode 100644 index 000000000..01cca5b02 --- /dev/null +++ b/scripts/jobs/data_and_insight/icaseworks_ingest_to_raw.py @@ -0,0 +1,283 @@ +# flake8: noqa: F821 + +import awswrangler as wr +from datetime import datetime +import logging +import sys + +from awsglue.utils import getResolvedOptions +from scripts.helpers.helpers import get_max_date_partition_value_from_glue_catalogue +import pandas as pd +from pyathena import connect + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +arg_keys = ['region_name', 's3_endpoint', 's3_target_location', 's3_staging_location', 'target_database', + 'target_table', 'secret_name'] +partition_keys = ['import_year', 'import_month', 'import_day', 'import_date'] + +import boto3 + +args = getResolvedOptions(sys.argv, arg_keys) +locals().update(args) + +### iCaseworks Loads + +import base64 +from datetime import datetime +from datetime import date +import json +import hashlib +import hmac +import re +import requests +import string +import time +import os +import glob +from dateutil.relativedelta import * + + +### Functions ### + +def remove_illegal_characters(string): + """Removes illegal characters from string""" + regex_list = [['=', ""], ['\/', "_"], ['+', "-"]] + for r in regex_list: + clean_string = re.sub(string=string, + pattern="[{}]".format(r[0]), + repl=r[1]) + return clean_string + + +def encode_json(json_string): + """Encode JSON string""" + json_string = json_string.encode() + json_string = base64.b64encode(json_string) + json_string = json_string.decode("utf-8") + return json_string + + +def create_signature(header, payload, secret): + """Encode JSON string""" + # hashed header, hashed payload, string secret + unsigned_token = header + '.' + payload + # secret_access_key = base64.b64decode(unsigned_token) #TODO is this used anywhere?? + key_bytes = bytes(secret, 'utf-8') + string_to_sign_bytes = bytes(unsigned_token, 'utf-8') + signature_hash = hmac.new(key_bytes, string_to_sign_bytes, digestmod=hashlib.sha256).digest() + encoded_signature = base64.b64encode(signature_hash) + encoded_signature = encoded_signature.decode('utf-8') + encoded_signature = remove_illegal_characters(encoded_signature) + return encoded_signature + + +def get_token(url, encoded_header, encoded_payload, signature, headers): + """Get token""" + assertion = encoded_header + "." + encoded_payload + "." + signature + data = f'assertion={assertion}&grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Ajwt-bearer' + print(f'Data : {data}') + response = requests.request("POST", url, headers=headers, data=data) + return response + + +def get_icaseworks_report_from(report_id, fromdate, auth_headers, auth_payload): + report_url = "https://hackneyreports.icasework.com/getreport?" + request_url = f'{report_url}ReportId={report_id}&Format=json&From={fromdate}' + print(f'Request url: {request_url}') + r = requests.request("GET", request_url, headers=auth_headers, data=auth_payload) + print(f'Status Code: {r.status_code}') + return r + + +def get_report_fromtime(report_id, timestamp_to_call, auth_headers, auth_payload): + report_url = "https://hackneyreports.icasework.com/getreport?" + + now = str(datetime.now()) + timetouse = now[:19] + print(f'TimeNow: {timetouse}') + + request_url = f'{report_url}ReportId={report_id}&Format=json&FromTime={timestamp_to_call}&UntilTime={timetouse}' + print(f'Request url: {request_url}') + r = requests.request("GET", request_url, headers=auth_headers, data=auth_payload) + print(f'Status Code: {r.status_code}') + return r + + +def dump_dataframe(response, location, filename): + df = pd.DataFrame.from_dict(response.json(), orient='columns') + + df['import_year'] = datetime.today().year + df['import_month'] = datetime.today().month + df['import_day'] = datetime.today().day + df['import_date'] = datetime.today().strftime('%Y%m%d') + + print(f'Database: {target_database}') + print(f'Table: {target_table}') + + # write to s3 + wr.s3.to_parquet( + df=df, + path=s3_target_location, + dataset=True, + database=target_database, + table=target_table, + mode="overwrite_partitions", + partition_cols=partition_keys + ) + print(f'Dumped Dataframe {df.shape} to {s3_target_location}') + logger.info(f'Dumped Dataframe {df.shape} to {s3_target_location}') + + +def get_latest_timestamp(table_dict): + # try: + print(f'Getting max timestamp') + # 2025-01-05T15:06:16 + + sql_query = 'select max("casestatustouchtimes_lastupdatedtime") as latest from "data-and-insight-raw-zone"."icaseworks_foi"' + + conn = connect(s3_staging_dir=s3_staging_location, + region_name=region_name) + + df = pd.read_sql_query(sql_query, conn) + latest_date = df.iloc[0, 0] + latest_date = latest_date.replace("T", " ") + + print(f'dataframe outputting') + print(f'Time Found: {latest_date}') + + return latest_date + + +# except: +# date_to_return = "2025-01-16 00:00:00" +# print(f'No Data Found. Will use {date_to_return}') +# return date_to_return + +def authenticate_icaseworks(api_key, secret): + url = "https://hackney.icasework.com/token" + + headers = { + 'Content-Type': 'application/x-www-form-urlencoded' + } + + header_object = {"alg": "HS256", "typ": "JWT"} + + # Create Header + header_object = str(header_object).replace("'", '"').replace(" ", "") + header = encode_json(header_object) + print(f'Header: {header}') + + # Create payload + current_unix_time = int(time.time()) + str_time = str(current_unix_time) + payload_object = { + "iss": api_key, + "aud": url, + "iat": str_time + } + payload_object = str(payload_object).replace("'", '"').replace(" ", + "") # can we do a dict-to-string function for this and the header + + payload = encode_json(str(payload_object)) + print(f'Created Payload: {payload}') + + # Create Signature + signature = create_signature(header, payload, secret) + print(f'Created Signature: {signature}') + + # Get assertion + assertion = header + "." + payload + "." + signature + print(f'assertion: {assertion}') + + # Get response + response = get_token(url=url, encoded_header=header, encoded_payload=payload, signature=signature, headers=headers) + print(response) + + # Get token + auth_token = response.json().get('access_token') + print(f'auth token: {auth_token}') + + # Create auth header for API Calls and auth payload + + authorization = f'Bearer {auth_token}' + print(authorization) + + auth_payload = [] + + # Note: I don't know how to generate the below cookie. That is extracted using postman. Not sure how to recreate this at all + auth_headers = { + 'Authorization': authorization + } + print(f'') + return auth_payload, auth_headers + + +def get_data(table_dict, date_to_call, auth_headers, auth_payload): + dict_to_call = table_dict + + print(f'Pulling report for {dict_to_call["name"]}') + case_id_report_id = dict_to_call["reportid"] + case_id_list = get_report_fromtime(case_id_report_id, date_to_call, auth_headers, auth_payload) + # print(f'Type of case_id_list {type(case_id_list)}') + + dict_to_call["DF"] = case_id_list # This will append the response to the DF column in the dictionary + + dump_dataframe(dict_to_call["DF"], dict_to_call["location"], date_to_call) + + +def get_data_from(table_dict, date_to_call, auth_headers, auth_payload): + dict_to_call = table_dict + + print(f'Pulling report for {dict_to_call["name"]}') + case_id_report_id = dict_to_call["reportid"] + case_id_list = get_icaseworks_report_from(case_id_report_id, date_to_call, auth_headers, auth_payload) + # print(f'Type of case_id_list {type(case_id_list)}') + + dict_to_call["DF"] = case_id_list # This will append the response to the DF column in the dictionary + + dump_dataframe(dict_to_call["DF"], dict_to_call["location"], date.today()) + + +def retrieve_credentials_from_secrets_manager(secrets_manager_client, secret_name): + response = secrets_manager_client.get_secret_value( + SecretId=secret_name, + ) + return response + + +### main function ## + +def main(): + secrets_manager_client = boto3.client('secretsmanager') + api_credentials_response = retrieve_credentials_from_secrets_manager(secrets_manager_client, secret_name) + api_credentials = json.loads(api_credentials_response['SecretString']) + api_key = api_credentials.get("api_key") + print(f'Api_key: {api_key}') + secret = api_credentials.get("secret") + print(f'Secret: {secret}') + auth_payload, auth_headers = authenticate_icaseworks(api_key, secret) + + list_of_datadictionaries = [ + # {"name":"Corrective Actions", "reportid":188769, "full_ingestion":False, "location":"/content/drive/MyDrive/iCaseworks/Corrective_Actions/"}, + # {"name":"Classifications", "reportid":188041, "full_ingestion":True, "location":"/content/drive/MyDrive/iCaseworks/classifications/"}, + {"name": "FOI Requests", "reportid": 199549, "full_ingestion": False, + "location": "s3://dataplatform-stg-raw-zone/data-and-insight/icaseworks_foi/"} + ] + + for data_dict in list_of_datadictionaries: + location = data_dict["location"] + + if data_dict['full_ingestion'] == False: + + date_to_track_from = get_latest_timestamp(data_dict) + print(f'Starting calls from {date_to_track_from}') + + get_data(data_dict, date_to_track_from, auth_headers, auth_payload) + else: + get_data_from(data_dict, "2020-09-01", auth_headers, auth_payload) + + +if __name__ == '__main__': + main() diff --git a/terraform/etl/55-aws-glue-icaseworks_ingest_etl.tf b/terraform/etl/55-aws-glue-icaseworks_ingest_etl.tf new file mode 100644 index 000000000..6ab3ef0e7 --- /dev/null +++ b/terraform/etl/55-aws-glue-icaseworks_ingest_etl.tf @@ -0,0 +1,35 @@ +module "icaseworks_ingest_to_raw" { + source = "../modules/aws-glue-job" + is_production_environment = local.is_production_environment + is_live_environment = local.is_live_environment + + count = local.is_live_environment ? 0 : 0 + + department = module.department_housing_data_source + job_name = "${local.short_identifier_prefix}icaseworks_ingest_to_raw" + glue_scripts_bucket_id = module.glue_scripts_data_source.bucket_id + glue_temp_bucket_id = module.glue_temp_storage_data_source.bucket_id + glue_job_timeout = 360 + helper_module_key = data.aws_s3_object.helpers.key + pydeequ_zip_key = data.aws_s3_object.pydeequ.key + spark_ui_output_storage_id = module.spark_ui_output_storage_data_source.bucket_id + trigger_enabled = local.is_production_environment + number_of_workers_for_glue_job = 2 + schedule = "cron(0 10 ? * MON-FRI *)" + job_parameters = { + "--job-bookmark-option" = "job-bookmark-enable" + "--enable-glue-datacatalog" = "true" + "--enable-continuous-cloudwatch-log" = "true" + "--additional-python-modules" = "PyAthena,numpy==1.26.1,awswrangler==3.10.0" + "--region_name" = data.aws_region.current.name + "--s3_endpoint" = "https://s3.${data.aws_region.current.name}.amazonaws.com" + "--s3_target_location" = "s3://${module.raw_zone_data_source.bucket_id}/data-and-insight/icaseworks/FOI/" + "--s3_staging_location" = "s3://${module.athena_storage_data_source.bucket_id}/data-and-insight/icaseworks/FOI/" + "--target_database" = "data-and-insight-raw-zone" + "--target_table" = "icaseworks_foi" + "--secret_name" = "/data-and-insight/icaseworks_key" + + } + + script_name = "housing_apply_gx_dq_tests" +} \ No newline at end of file From 79799fcbbeab7ef7ea753c69233966fca5736126 Mon Sep 17 00:00:00 2001 From: timburke-hackit Date: Tue, 28 Jan 2025 15:54:36 +0000 Subject: [PATCH 2/6] rework replace_illegal_characters function --- .../icaseworks_ingest_to_raw.py | 184 ++++++++++-------- 1 file changed, 106 insertions(+), 78 deletions(-) diff --git a/scripts/jobs/data_and_insight/icaseworks_ingest_to_raw.py b/scripts/jobs/data_and_insight/icaseworks_ingest_to_raw.py index 01cca5b02..7330b6c3c 100644 --- a/scripts/jobs/data_and_insight/icaseworks_ingest_to_raw.py +++ b/scripts/jobs/data_and_insight/icaseworks_ingest_to_raw.py @@ -13,9 +13,16 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -arg_keys = ['region_name', 's3_endpoint', 's3_target_location', 's3_staging_location', 'target_database', - 'target_table', 'secret_name'] -partition_keys = ['import_year', 'import_month', 'import_day', 'import_date'] +arg_keys = [ + "region_name", + "s3_endpoint", + "s3_target_location", + "s3_staging_location", + "target_database", + "target_table", + "secret_name", +] +partition_keys = ["import_year", "import_month", "import_day", "import_date"] import boto3 @@ -41,14 +48,20 @@ ### Functions ### + def remove_illegal_characters(string): - """Removes illegal characters from string""" - regex_list = [['=', ""], ['\/', "_"], ['+', "-"]] - for r in regex_list: - clean_string = re.sub(string=string, - pattern="[{}]".format(r[0]), - repl=r[1]) - return clean_string + """Removes illegal characters from string by replacing: + = with empty string + / with _ + + with - + """ + replacements = {"=": "", "/": "_", "+": "-"} + + result = string + for old, new in replacements.items(): + result = result.replace(old, new) + + return result def encode_json(json_string): @@ -62,13 +75,15 @@ def encode_json(json_string): def create_signature(header, payload, secret): """Encode JSON string""" # hashed header, hashed payload, string secret - unsigned_token = header + '.' + payload + unsigned_token = header + "." + payload # secret_access_key = base64.b64decode(unsigned_token) #TODO is this used anywhere?? - key_bytes = bytes(secret, 'utf-8') - string_to_sign_bytes = bytes(unsigned_token, 'utf-8') - signature_hash = hmac.new(key_bytes, string_to_sign_bytes, digestmod=hashlib.sha256).digest() + key_bytes = bytes(secret, "utf-8") + string_to_sign_bytes = bytes(unsigned_token, "utf-8") + signature_hash = hmac.new( + key_bytes, string_to_sign_bytes, digestmod=hashlib.sha256 + ).digest() encoded_signature = base64.b64encode(signature_hash) - encoded_signature = encoded_signature.decode('utf-8') + encoded_signature = encoded_signature.decode("utf-8") encoded_signature = remove_illegal_characters(encoded_signature) return encoded_signature @@ -76,18 +91,18 @@ def create_signature(header, payload, secret): def get_token(url, encoded_header, encoded_payload, signature, headers): """Get token""" assertion = encoded_header + "." + encoded_payload + "." + signature - data = f'assertion={assertion}&grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Ajwt-bearer' - print(f'Data : {data}') + data = f"assertion={assertion}&grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Ajwt-bearer" + print(f"Data : {data}") response = requests.request("POST", url, headers=headers, data=data) return response def get_icaseworks_report_from(report_id, fromdate, auth_headers, auth_payload): report_url = "https://hackneyreports.icasework.com/getreport?" - request_url = f'{report_url}ReportId={report_id}&Format=json&From={fromdate}' - print(f'Request url: {request_url}') + request_url = f"{report_url}ReportId={report_id}&Format=json&From={fromdate}" + print(f"Request url: {request_url}") r = requests.request("GET", request_url, headers=auth_headers, data=auth_payload) - print(f'Status Code: {r.status_code}') + print(f"Status Code: {r.status_code}") return r @@ -96,25 +111,25 @@ def get_report_fromtime(report_id, timestamp_to_call, auth_headers, auth_payload now = str(datetime.now()) timetouse = now[:19] - print(f'TimeNow: {timetouse}') + print(f"TimeNow: {timetouse}") - request_url = f'{report_url}ReportId={report_id}&Format=json&FromTime={timestamp_to_call}&UntilTime={timetouse}' - print(f'Request url: {request_url}') + request_url = f"{report_url}ReportId={report_id}&Format=json&FromTime={timestamp_to_call}&UntilTime={timetouse}" + print(f"Request url: {request_url}") r = requests.request("GET", request_url, headers=auth_headers, data=auth_payload) - print(f'Status Code: {r.status_code}') + print(f"Status Code: {r.status_code}") return r def dump_dataframe(response, location, filename): - df = pd.DataFrame.from_dict(response.json(), orient='columns') + df = pd.DataFrame.from_dict(response.json(), orient="columns") - df['import_year'] = datetime.today().year - df['import_month'] = datetime.today().month - df['import_day'] = datetime.today().day - df['import_date'] = datetime.today().strftime('%Y%m%d') + df["import_year"] = datetime.today().year + df["import_month"] = datetime.today().month + df["import_day"] = datetime.today().day + df["import_date"] = datetime.today().strftime("%Y%m%d") - print(f'Database: {target_database}') - print(f'Table: {target_table}') + print(f"Database: {target_database}") + print(f"Table: {target_table}") # write to s3 wr.s3.to_parquet( @@ -124,28 +139,27 @@ def dump_dataframe(response, location, filename): database=target_database, table=target_table, mode="overwrite_partitions", - partition_cols=partition_keys + partition_cols=partition_keys, ) - print(f'Dumped Dataframe {df.shape} to {s3_target_location}') - logger.info(f'Dumped Dataframe {df.shape} to {s3_target_location}') + print(f"Dumped Dataframe {df.shape} to {s3_target_location}") + logger.info(f"Dumped Dataframe {df.shape} to {s3_target_location}") def get_latest_timestamp(table_dict): # try: - print(f'Getting max timestamp') + print(f"Getting max timestamp") # 2025-01-05T15:06:16 sql_query = 'select max("casestatustouchtimes_lastupdatedtime") as latest from "data-and-insight-raw-zone"."icaseworks_foi"' - conn = connect(s3_staging_dir=s3_staging_location, - region_name=region_name) + conn = connect(s3_staging_dir=s3_staging_location, region_name=region_name) df = pd.read_sql_query(sql_query, conn) latest_date = df.iloc[0, 0] latest_date = latest_date.replace("T", " ") - print(f'dataframe outputting') - print(f'Time Found: {latest_date}') + print(f"dataframe outputting") + print(f"Time Found: {latest_date}") return latest_date @@ -155,74 +169,78 @@ def get_latest_timestamp(table_dict): # print(f'No Data Found. Will use {date_to_return}') # return date_to_return + def authenticate_icaseworks(api_key, secret): url = "https://hackney.icasework.com/token" - headers = { - 'Content-Type': 'application/x-www-form-urlencoded' - } + headers = {"Content-Type": "application/x-www-form-urlencoded"} header_object = {"alg": "HS256", "typ": "JWT"} # Create Header header_object = str(header_object).replace("'", '"').replace(" ", "") header = encode_json(header_object) - print(f'Header: {header}') + print(f"Header: {header}") # Create payload current_unix_time = int(time.time()) str_time = str(current_unix_time) - payload_object = { - "iss": api_key, - "aud": url, - "iat": str_time - } - payload_object = str(payload_object).replace("'", '"').replace(" ", - "") # can we do a dict-to-string function for this and the header + payload_object = {"iss": api_key, "aud": url, "iat": str_time} + payload_object = ( + str(payload_object).replace("'", '"').replace(" ", "") + ) # can we do a dict-to-string function for this and the header payload = encode_json(str(payload_object)) - print(f'Created Payload: {payload}') + print(f"Created Payload: {payload}") # Create Signature signature = create_signature(header, payload, secret) - print(f'Created Signature: {signature}') + print(f"Created Signature: {signature}") # Get assertion assertion = header + "." + payload + "." + signature - print(f'assertion: {assertion}') + print(f"assertion: {assertion}") # Get response - response = get_token(url=url, encoded_header=header, encoded_payload=payload, signature=signature, headers=headers) + response = get_token( + url=url, + encoded_header=header, + encoded_payload=payload, + signature=signature, + headers=headers, + ) print(response) # Get token - auth_token = response.json().get('access_token') - print(f'auth token: {auth_token}') + auth_token = response.json().get("access_token") + print(f"auth token: {auth_token}") # Create auth header for API Calls and auth payload - authorization = f'Bearer {auth_token}' + authorization = f"Bearer {auth_token}" print(authorization) auth_payload = [] # Note: I don't know how to generate the below cookie. That is extracted using postman. Not sure how to recreate this at all - auth_headers = { - 'Authorization': authorization - } - print(f'') + auth_headers = {"Authorization": authorization} + print(f"") return auth_payload, auth_headers def get_data(table_dict, date_to_call, auth_headers, auth_payload): dict_to_call = table_dict - print(f'Pulling report for {dict_to_call["name"]}') + print(f"Pulling report for {dict_to_call['name']}") case_id_report_id = dict_to_call["reportid"] - case_id_list = get_report_fromtime(case_id_report_id, date_to_call, auth_headers, auth_payload) + case_id_list = get_report_fromtime( + case_id_report_id, date_to_call, auth_headers, auth_payload + ) # print(f'Type of case_id_list {type(case_id_list)}') - dict_to_call["DF"] = case_id_list # This will append the response to the DF column in the dictionary + dict_to_call["DF"] = ( + case_id_list # This will append the response to the DF column in the dictionary + ) dump_dataframe(dict_to_call["DF"], dict_to_call["location"], date_to_call) @@ -230,12 +248,16 @@ def get_data(table_dict, date_to_call, auth_headers, auth_payload): def get_data_from(table_dict, date_to_call, auth_headers, auth_payload): dict_to_call = table_dict - print(f'Pulling report for {dict_to_call["name"]}') + print(f"Pulling report for {dict_to_call['name']}") case_id_report_id = dict_to_call["reportid"] - case_id_list = get_icaseworks_report_from(case_id_report_id, date_to_call, auth_headers, auth_payload) + case_id_list = get_icaseworks_report_from( + case_id_report_id, date_to_call, auth_headers, auth_payload + ) # print(f'Type of case_id_list {type(case_id_list)}') - dict_to_call["DF"] = case_id_list # This will append the response to the DF column in the dictionary + dict_to_call["DF"] = ( + case_id_list # This will append the response to the DF column in the dictionary + ) dump_dataframe(dict_to_call["DF"], dict_to_call["location"], date.today()) @@ -249,35 +271,41 @@ def retrieve_credentials_from_secrets_manager(secrets_manager_client, secret_nam ### main function ## + def main(): - secrets_manager_client = boto3.client('secretsmanager') - api_credentials_response = retrieve_credentials_from_secrets_manager(secrets_manager_client, secret_name) - api_credentials = json.loads(api_credentials_response['SecretString']) + secrets_manager_client = boto3.client("secretsmanager") + api_credentials_response = retrieve_credentials_from_secrets_manager( + secrets_manager_client, secret_name + ) + api_credentials = json.loads(api_credentials_response["SecretString"]) api_key = api_credentials.get("api_key") - print(f'Api_key: {api_key}') + print(f"Api_key: {api_key}") secret = api_credentials.get("secret") - print(f'Secret: {secret}') + print(f"Secret: {secret}") auth_payload, auth_headers = authenticate_icaseworks(api_key, secret) list_of_datadictionaries = [ # {"name":"Corrective Actions", "reportid":188769, "full_ingestion":False, "location":"/content/drive/MyDrive/iCaseworks/Corrective_Actions/"}, # {"name":"Classifications", "reportid":188041, "full_ingestion":True, "location":"/content/drive/MyDrive/iCaseworks/classifications/"}, - {"name": "FOI Requests", "reportid": 199549, "full_ingestion": False, - "location": "s3://dataplatform-stg-raw-zone/data-and-insight/icaseworks_foi/"} + { + "name": "FOI Requests", + "reportid": 199549, + "full_ingestion": False, + "location": "s3://dataplatform-stg-raw-zone/data-and-insight/icaseworks_foi/", + } ] for data_dict in list_of_datadictionaries: location = data_dict["location"] - if data_dict['full_ingestion'] == False: - + if data_dict["full_ingestion"] == False: date_to_track_from = get_latest_timestamp(data_dict) - print(f'Starting calls from {date_to_track_from}') + print(f"Starting calls from {date_to_track_from}") get_data(data_dict, date_to_track_from, auth_headers, auth_payload) else: get_data_from(data_dict, "2020-09-01", auth_headers, auth_payload) -if __name__ == '__main__': +if __name__ == "__main__": main() From d3974059b328eb049dafa858b480260e6b3a8ffe Mon Sep 17 00:00:00 2001 From: timburke-hackit Date: Tue, 28 Jan 2025 15:58:16 +0000 Subject: [PATCH 3/6] organise imports --- .../icaseworks_ingest_to_raw.py | 31 +++++++------------ 1 file changed, 12 insertions(+), 19 deletions(-) diff --git a/scripts/jobs/data_and_insight/icaseworks_ingest_to_raw.py b/scripts/jobs/data_and_insight/icaseworks_ingest_to_raw.py index 7330b6c3c..d5757bd89 100644 --- a/scripts/jobs/data_and_insight/icaseworks_ingest_to_raw.py +++ b/scripts/jobs/data_and_insight/icaseworks_ingest_to_raw.py @@ -1,15 +1,23 @@ # flake8: noqa: F821 -import awswrangler as wr -from datetime import datetime +import base64 +import hashlib +import hmac +import json import logging import sys +import time +from datetime import date, datetime -from awsglue.utils import getResolvedOptions -from scripts.helpers.helpers import get_max_date_partition_value_from_glue_catalogue +import awswrangler as wr +import boto3 import pandas as pd +import requests +from awsglue.utils import getResolvedOptions +from dateutil.relativedelta import * from pyathena import connect + logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -24,27 +32,12 @@ ] partition_keys = ["import_year", "import_month", "import_day", "import_date"] -import boto3 args = getResolvedOptions(sys.argv, arg_keys) locals().update(args) ### iCaseworks Loads -import base64 -from datetime import datetime -from datetime import date -import json -import hashlib -import hmac -import re -import requests -import string -import time -import os -import glob -from dateutil.relativedelta import * - ### Functions ### From 13dfdd64f59d97e3ae4e0ca9bb9723cd6317aebf Mon Sep 17 00:00:00 2001 From: timburke-hackit Date: Tue, 28 Jan 2025 16:06:59 +0000 Subject: [PATCH 4/6] assign variables explicitly --- .../data_and_insight/icaseworks_ingest_to_raw.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/scripts/jobs/data_and_insight/icaseworks_ingest_to_raw.py b/scripts/jobs/data_and_insight/icaseworks_ingest_to_raw.py index d5757bd89..f3bde372e 100644 --- a/scripts/jobs/data_and_insight/icaseworks_ingest_to_raw.py +++ b/scripts/jobs/data_and_insight/icaseworks_ingest_to_raw.py @@ -32,14 +32,15 @@ ] partition_keys = ["import_year", "import_month", "import_day", "import_date"] - args = getResolvedOptions(sys.argv, arg_keys) -locals().update(args) - -### iCaseworks Loads - -### Functions ### +region_name = args["region_name"] +s3_endpoint = args["s3_endpoint"] +s3_target_location = args["s3_target_location"] +s3_staging_location = args["s3_staging_location"] +target_database = args["target_database"] +target_table = args["target_table"] +secret_name = args["secret_name"] def remove_illegal_characters(string): From 1a71ea2f8385cb11dd7f03b0e941f00a4d8af1db Mon Sep 17 00:00:00 2001 From: timburke-hackit Date: Tue, 28 Jan 2025 16:16:11 +0000 Subject: [PATCH 5/6] tidy up comments and prints --- .../jobs/data_and_insight/icaseworks_ingest_to_raw.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/scripts/jobs/data_and_insight/icaseworks_ingest_to_raw.py b/scripts/jobs/data_and_insight/icaseworks_ingest_to_raw.py index f3bde372e..4a2e06b98 100644 --- a/scripts/jobs/data_and_insight/icaseworks_ingest_to_raw.py +++ b/scripts/jobs/data_and_insight/icaseworks_ingest_to_raw.py @@ -140,10 +140,12 @@ def dump_dataframe(response, location, filename): def get_latest_timestamp(table_dict): + # TODO: reintroduce try except # try: print(f"Getting max timestamp") # 2025-01-05T15:06:16 + # TODO: needs refactoring to allow for different tables sql_query = 'select max("casestatustouchtimes_lastupdatedtime") as latest from "data-and-insight-raw-zone"."icaseworks_foi"' conn = connect(s3_staging_dir=s3_staging_location, region_name=region_name) @@ -152,7 +154,7 @@ def get_latest_timestamp(table_dict): latest_date = df.iloc[0, 0] latest_date = latest_date.replace("T", " ") - print(f"dataframe outputting") + print("dataframe outputting") print(f"Time Found: {latest_date}") return latest_date @@ -203,20 +205,16 @@ def authenticate_icaseworks(api_key, secret): signature=signature, headers=headers, ) - print(response) # Get token auth_token = response.json().get("access_token") - print(f"auth token: {auth_token}") # Create auth header for API Calls and auth payload authorization = f"Bearer {auth_token}" - print(authorization) auth_payload = [] - # Note: I don't know how to generate the below cookie. That is extracted using postman. Not sure how to recreate this at all auth_headers = {"Authorization": authorization} print(f"") return auth_payload, auth_headers @@ -230,7 +228,6 @@ def get_data(table_dict, date_to_call, auth_headers, auth_payload): case_id_list = get_report_fromtime( case_id_report_id, date_to_call, auth_headers, auth_payload ) - # print(f'Type of case_id_list {type(case_id_list)}') dict_to_call["DF"] = ( case_id_list # This will append the response to the DF column in the dictionary From 77f99e9992264702be767bfe5bc3e0e7003f68b0 Mon Sep 17 00:00:00 2001 From: Huu Do Date: Wed, 29 Jan 2025 14:47:34 +0000 Subject: [PATCH 6/6] Committing Suggestions by Tim --- .../jobs/data_and_insight/icaseworks_ingest_to_raw.py | 4 ++-- terraform/etl/55-aws-glue-icaseworks_ingest_etl.tf | 10 ++++++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/scripts/jobs/data_and_insight/icaseworks_ingest_to_raw.py b/scripts/jobs/data_and_insight/icaseworks_ingest_to_raw.py index 4a2e06b98..59a7f1eb5 100644 --- a/scripts/jobs/data_and_insight/icaseworks_ingest_to_raw.py +++ b/scripts/jobs/data_and_insight/icaseworks_ingest_to_raw.py @@ -270,9 +270,9 @@ def main(): ) api_credentials = json.loads(api_credentials_response["SecretString"]) api_key = api_credentials.get("api_key") - print(f"Api_key: {api_key}") + secret = api_credentials.get("secret") - print(f"Secret: {secret}") + auth_payload, auth_headers = authenticate_icaseworks(api_key, secret) list_of_datadictionaries = [ diff --git a/terraform/etl/55-aws-glue-icaseworks_ingest_etl.tf b/terraform/etl/55-aws-glue-icaseworks_ingest_etl.tf index 6ab3ef0e7..6e4e89a77 100644 --- a/terraform/etl/55-aws-glue-icaseworks_ingest_etl.tf +++ b/terraform/etl/55-aws-glue-icaseworks_ingest_etl.tf @@ -3,9 +3,11 @@ module "icaseworks_ingest_to_raw" { is_production_environment = local.is_production_environment is_live_environment = local.is_live_environment - count = local.is_live_environment ? 0 : 0 + count = !local.is_production_environment && local.is_live_environment ? 1 : 0 + # count = local.is_live_environment ? 1 : 0 + # Bottom one is for Prod - department = module.department_housing_data_source + department = module.department_data_and_insight_data_source job_name = "${local.short_identifier_prefix}icaseworks_ingest_to_raw" glue_scripts_bucket_id = module.glue_scripts_data_source.bucket_id glue_temp_bucket_id = module.glue_temp_storage_data_source.bucket_id @@ -15,7 +17,7 @@ module "icaseworks_ingest_to_raw" { spark_ui_output_storage_id = module.spark_ui_output_storage_data_source.bucket_id trigger_enabled = local.is_production_environment number_of_workers_for_glue_job = 2 - schedule = "cron(0 10 ? * MON-FRI *)" + schedule = "cron(0 3 ? * * *)" job_parameters = { "--job-bookmark-option" = "job-bookmark-enable" "--enable-glue-datacatalog" = "true" @@ -31,5 +33,5 @@ module "icaseworks_ingest_to_raw" { } - script_name = "housing_apply_gx_dq_tests" + script_name = "icaseworks_ingest_to_raw" } \ No newline at end of file