diff --git a/scripts/jobs/data_and_insight/icaseworks_ingest_to_raw.py b/scripts/jobs/data_and_insight/icaseworks_ingest_to_raw.py index 59a7f1eb5..43f9a2e84 100644 --- a/scripts/jobs/data_and_insight/icaseworks_ingest_to_raw.py +++ b/scripts/jobs/data_and_insight/icaseworks_ingest_to_raw.py @@ -1,5 +1,3 @@ -# flake8: noqa: F821 - import base64 import hashlib import hmac @@ -14,10 +12,8 @@ import pandas as pd import requests from awsglue.utils import getResolvedOptions -from dateutil.relativedelta import * from pyathena import connect - logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -115,16 +111,22 @@ def get_report_fromtime(report_id, timestamp_to_call, auth_headers, auth_payload def dump_dataframe(response, location, filename): - df = pd.DataFrame.from_dict(response.json(), orient="columns") + df = pd.DataFrame.from_dict( + response.json(), + orient="columns", + ) - df["import_year"] = datetime.today().year - df["import_month"] = datetime.today().month - df["import_day"] = datetime.today().day + df["import_year"] = datetime.today().strftime("%Y") + df["import_month"] = datetime.today().strftime("%m") + df["import_day"] = datetime.today().strftime("%d") df["import_date"] = datetime.today().strftime("%Y%m%d") print(f"Database: {target_database}") print(f"Table: {target_table}") + dict_values = ["string" for _ in range(len(df.columns))] + dtype_dict = dict(zip(df.columns, dict_values)) + # write to s3 wr.s3.to_parquet( df=df, @@ -134,6 +136,7 @@ def dump_dataframe(response, location, filename): table=target_table, mode="overwrite_partitions", partition_cols=partition_keys, + dtype=dtype_dict, ) print(f"Dumped Dataframe {df.shape} to {s3_target_location}") logger.info(f"Dumped Dataframe {df.shape} to {s3_target_location}") @@ -142,7 +145,7 @@ def dump_dataframe(response, location, filename): def get_latest_timestamp(table_dict): # TODO: reintroduce try except # try: - print(f"Getting max timestamp") + print("Getting max timestamp") # 2025-01-05T15:06:16 # TODO: needs refactoring to allow for different tables @@ -216,7 +219,6 @@ def authenticate_icaseworks(api_key, secret): auth_payload = [] auth_headers = {"Authorization": authorization} - print(f"") return auth_payload, auth_headers @@ -260,9 +262,6 @@ def retrieve_credentials_from_secrets_manager(secrets_manager_client, secret_nam return response -### main function ## - - def main(): secrets_manager_client = boto3.client("secretsmanager") api_credentials_response = retrieve_credentials_from_secrets_manager( @@ -287,9 +286,7 @@ def main(): ] for data_dict in list_of_datadictionaries: - location = data_dict["location"] - - if data_dict["full_ingestion"] == False: + if data_dict["full_ingestion"] is False: date_to_track_from = get_latest_timestamp(data_dict) print(f"Starting calls from {date_to_track_from}")