diff --git a/scripts/jobs/academy_data/load_all_academy_data_into_redshift.py b/scripts/jobs/academy_data/load_all_academy_data_into_redshift.py deleted file mode 100644 index 1dbc577c5..000000000 --- a/scripts/jobs/academy_data/load_all_academy_data_into_redshift.py +++ /dev/null @@ -1,196 +0,0 @@ -import time -from datetime import datetime -from typing import List, Dict, Any, Optional -import boto3 -from scripts.helpers.helpers import get_secret_dict, get_glue_env_var -import redshift_connector - -environment = get_glue_env_var("environment") -role_arn = get_glue_env_var("role_arn") -base_s3_url = get_glue_env_var("base_s3_url") - -def rs_command(query: str, fetch_results: bool = False, allow_commit: bool = True, database_name: str = 'academy') -> Optional[List[Dict]]: - """Executes a SQL query against a Redshift database, optionally fetching results. - - Args: - query (str): The SQL query to execute. - fetch_results (bool): Whether to fetch and return the query results (default False). - allow_commit (bool): Whether to allow committing the transaction (default True). - database_name: Name of the database to connect to, defaults to 'academy'. - - Returns: - Optional[List[Dict]]: A list of dictionaries representing rows returned by the query if fetch_results is True; otherwise None. - """ - creds = get_secret_dict('/data-and-insight/redshift-serverless-connection', 'eu-west-2') - conn = None - cursor = None - try: - # Connects to Redshift cluster using AWS credentials - conn = redshift_connector.connect( - host=creds['host'], - database=database_name, - user=creds['username'], - password=creds['password'] - ) - - # autocommit is off by default. - if allow_commit: - # Add this line to handle commands like CREATE EXTERNAL TABLE - conn.autocommit = True - - cursor = conn.cursor() - - # Execute the query - cursor.execute(query) - - # Fetch the results if required - if fetch_results: - columns = [desc[0] for desc in cursor.description] # Get column names - result = cursor.fetchall() - return [dict(zip(columns, row)) for row in result] if result else [] - elif allow_commit: - # Commit the transaction only if allowed and needed - conn.commit() - - except redshift_connector.Error as e: - raise e - finally: - if cursor: - cursor.close() - if conn: - conn.close() - return None # Return None if fetch_results is False or if there's an error - -def get_all_tables(glue_client: Any, database_name: str, pattern: str = '') -> List[Dict]: - """Retrieve all table metadata from Glue catalog for a specific database.""" - tables = [] - paginator = glue_client.get_paginator('get_tables') # Without paginator, only get 100 tables - for page in paginator.paginate(DatabaseName=database_name): - for table in page['TableList']: - if pattern in table['Name']: - tables.append(table) - return tables - -def truncate_table(schema: str, table_name: str) -> str: - """Generate SQL to truncate a table.""" - return f"TRUNCATE TABLE {schema}.{table_name};" - -def copy_command(schema: str, table_name: str, s3_location: str, iam_role: str) -> str: - """Generate SQL for the COPY command.""" - return f""" - COPY {schema}.{table_name} - FROM '{s3_location}' - IAM_ROLE '{iam_role}' - FORMAT AS PARQUET; - """ - -def create_table_sql(schema: str, table_name: str, columns: List[Dict[str, str]]) -> str: - """Generate SQL to create a Redshift table matching the Parquet file structure.""" - type_mapping = { - 'int': 'INTEGER', - 'string': 'VARCHAR(65535)', - 'decimal(3,0)': 'DECIMAL(3,0)', - 'timestamp': 'TIMESTAMP', - 'double': 'DOUBLE PRECISION' - } - column_definitions = ', '.join( - f"{col['Name']} {type_mapping.get(col['Type'], col['Type'])}" - for col in columns - ) - return f"CREATE TABLE {schema}.{table_name} ({column_definitions});" - -def get_s3_location(glue_table_name: str, base_s3_url: str) -> str: - """Generate S3 location for today based on the table name.""" - today = datetime.today() - year, month, day = today.year, str(today.month).zfill(2), str(today.day).zfill(2) - return f"{base_s3_url}{glue_table_name}/import_year={year}/import_month={month}/import_day={day}/import_date={year}{month}{day}/" - -def process_load_tables(schema: str, catalog: str, table_mapping: Dict[str, str], iam_role: str, base_s3_url: str) -> None: - """Main function to process and load tables.""" - creds = get_secret_dict('/data-and-insight/database-migration-access-id-and-secret-key', 'eu-west-2') - glue_client = boto3.client('glue', region_name='eu-west-2', aws_access_key_id=creds['accessKeyId'], aws_secret_access_key=creds['secretAccessKey']) - - for original_pattern, new_prefix in table_mapping.items(): - for table in get_all_tables(glue_client, catalog, original_pattern): - new_table_name = table['Name'].replace(original_pattern, new_prefix) - create_sql = create_table_sql(schema, new_table_name, table['StorageDescriptor']['Columns']) - # print(create_sql) # print statements are for debugging - - try: - rs_command(create_sql, fetch_results=False, allow_commit=True) - print(f"Table {schema}.{new_table_name} created successfully.") - except Exception as e: - if 'already exists' in str(e): - print(f"Table {new_table_name} already exists.") - else: - print(f"Failed to create table {new_table_name}: {e}") - raise e - - truncate_sql = truncate_table(schema, new_table_name) - rs_command(truncate_sql, fetch_results=False, allow_commit=True) - print(f"Table {schema}.{new_table_name} truncated successfully.") - - s3_location = get_s3_location(table['Name'], base_s3_url) - copy_sql = copy_command(schema, new_table_name, s3_location, iam_role) - # print(copy_sql) # print statements are for debugging - try: - rs_command(copy_sql, fetch_results=False, allow_commit=True) - print(f"Data for {datetime.today().strftime('%Y%m%d')} copied successfully into table {new_table_name}.") - except Exception as e: - print(f"Failed to copy data into table {schema}.{new_table_name}: {e}") - - time.sleep(1) # Sleep to mitigate risk of overwhelming the cluster - -def main(): - # for all tables under ctax - process_load_tables( - schema='ctax', # Redshift schema - catalog='revenues-raw-zone', # Glue catalog database - # map the table names in Glue catalog to names in Redshift - table_mapping= { - 'lbhaliverbviews_core_ct': 'ct', - 'lbhaliverbviews_core_sy': 'sy' - }, - iam_role=role_arn, - base_s3_url = base_s3_url - ) - - # for all tables under nndr - process_load_tables( - schema='nndr', - catalog='revenues-raw-zone', - table_mapping= { - 'lbhaliverbviews_core_nr': 'nr', - 'lbhaliverbviews_core_sy': 'sy' - }, - iam_role=role_arn, - base_s3_url = base_s3_url - ) - - # for all tables under hben - process_load_tables( - schema='hben', - catalog='revenues-raw-zone', - table_mapping= { - 'lbhaliverbviews_core_sy': 'sy', - 'lbhaliverbviews_core_ctaccount': 'ctaccount', - 'lbhaliverbviews_core_ctnotice': 'ctnotice', - 'lbhaliverbviews_core_ctoccupation': 'ctoccupation', - 'lbhaliverbviews_core_ctproperty': 'ctproperty', - 'lbhaliverbviews_core_cttransaction': 'cttransaction', - }, - iam_role=role_arn, - base_s3_url = base_s3_url - ) - process_load_tables( - schema='hben', - catalog='bens-housing-needs-raw-zone', - table_mapping= { - 'lbhaliverbviews_core_hb': 'hb', - }, - iam_role=role_arn, - base_s3_url = f"s3://dataplatform-{environment}-raw-zone/benefits-housing-needs/" # note the path change - ) - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/terraform/core/12-aws-s3-scripts.tf b/terraform/core/12-aws-s3-scripts.tf index d3cf5a2ba..f60bdb634 100644 --- a/terraform/core/12-aws-s3-scripts.tf +++ b/terraform/core/12-aws-s3-scripts.tf @@ -93,12 +93,3 @@ resource "aws_s3_object" "parking_copy_ringgo_sftp_data_to_raw" { source = "../../scripts/jobs/parking/parking_copy_ringgo_sftp_data_to_raw.py" source_hash = filemd5("../../scripts/jobs/parking/parking_copy_ringgo_sftp_data_to_raw.py") } - -resource "aws_s3_object" "load_all_academy_data_into_redshift" { - bucket = module.glue_scripts.bucket_id # this is glue_scripts_data_source in etl folder - key = "scripts/load_all_academy_data_into_redshift.py" - acl = "private" - source = "../../scripts/jobs/academy_data/load_all_academy_data_into_redshift.py" - source_hash = filemd5("../../scripts/jobs/academy_data/load_all_academy_data_into_redshift.py") -} - diff --git a/terraform/core/45-database-migration-iam.tf b/terraform/core/45-database-migration-iam.tf deleted file mode 100644 index 734ff63bf..000000000 --- a/terraform/core/45-database-migration-iam.tf +++ /dev/null @@ -1,78 +0,0 @@ -resource "aws_iam_user" "database_migration" { - name = "${local.short_identifier_prefix}database_migration" - force_destroy = !local.is_live_environment - tags = module.tags.values -} - -resource "aws_iam_user_policy" "database_migration_user_policy" { - name = "${local.short_identifier_prefix}database_migration_user_policy" - user = aws_iam_user.database_migration.name - policy = data.aws_iam_policy_document.database_migration_custom_policy.json -} - -data "aws_iam_policy_document" "database_migration_custom_policy" { - statement { - effect = "Allow" - actions = [ - "s3:GetBucketLocation", - "s3:ListBucket", - "s3:ListAllMyBuckets" - ] - resources = [ - "*" - ] - } - - statement { - effect = "Allow" - actions = [ - "s3:GetObject", - "s3:GetObjectVersion", - ] - resources = [ - "${module.raw_zone.bucket_arn}/*", - "${module.refined_zone.bucket_arn}/*", - "${module.trusted_zone.bucket_arn}/*" - ] - } - - statement { - effect = "Allow" - actions = [ - "kms:Encrypt", - "kms:Decrypt", - # "kms:GenerateDataKey", - ] - resources = [ - module.raw_zone.kms_key_arn, - module.refined_zone.kms_key_arn, - module.trusted_zone.kms_key_arn - ] - } - - // Add new statement for full Redshift access. - statement { - effect = "Allow" - actions = [ - "redshift:*", - ] - resources = [ - "*" // will adjust this based on specific Redshift resource ARNs later - ] - } - - // only allow to get secret value with name starting from "/data-and-insight/*" - statement { - effect = "Allow" - actions = ["secretsmanager:GetSecretValue"] - resources = ["arn:aws:secretsmanager:${var.aws_deploy_region}:${var.aws_deploy_account_id}:secret:/${module.department_data_and_insight.identifier}/*"] - } - - // Need extract glue data cataglue info for database migration, - // but grant it full access so far (glue itself has very low risk). - statement { - effect = "Allow" - actions = ["glue:*"] - resources = ["*"] - } -} diff --git a/terraform/core/51-load-all-academy-data-into-redshift-serverless.tf b/terraform/core/51-load-all-academy-data-into-redshift-serverless.tf index e286561ef..14697aaf1 100644 --- a/terraform/core/51-load-all-academy-data-into-redshift-serverless.tf +++ b/terraform/core/51-load-all-academy-data-into-redshift-serverless.tf @@ -26,19 +26,19 @@ locals { # option 2: tailored for this module resource "aws_glue_connection" "database_ingestion_via_jdbc_connection" { - count = local.is_live_environment && !local.is_production_environment ? 1 : 0 - name = "${local.short_identifier_prefix}redshift-serverless-connection-${data.aws_subnet.network[local.instance_subnet_id].availability_zone}" + count = local.is_live_environment && !local.is_production_environment ? 1 : 0 + name = "${local.short_identifier_prefix}redshift-serverless-connection-${data.aws_subnet.network[local.instance_subnet_id].availability_zone}" description = "JDBC connection for Redshift Serverless" connection_properties = { - JDBC_CONNECTION_URL = "jdbc:redshift://${local.redshift_serverless_credentials["host"]}:${local.redshift_serverless_credentials["port"]}/${local.redshift_serverless_credentials["database_name"]}" - PASSWORD = local.redshift_serverless_credentials["password"] - USERNAME = local.redshift_serverless_credentials["username"] + JDBC_CONNECTION_URL = "jdbc:redshift://${local.redshift_serverless_credentials["host"]}:${local.redshift_serverless_credentials["port"]}/${local.redshift_serverless_credentials["database_name"]}" + PASSWORD = local.redshift_serverless_credentials["password"] + USERNAME = local.redshift_serverless_credentials["username"] } physical_connection_requirements { - availability_zone = data.aws_subnet.network[local.instance_subnet_id].availability_zone - security_group_id_list = [aws_security_group.ingestion_database_connection.id] - subnet_id = data.aws_subnet.network[local.instance_subnet_id].id + availability_zone = data.aws_subnet.network[local.instance_subnet_id].availability_zone + security_group_id_list = [aws_security_group.ingestion_database_connection.id] + subnet_id = data.aws_subnet.network[local.instance_subnet_id].id } } @@ -69,37 +69,3 @@ resource "aws_security_group_rule" "ingestion_database_connection_allow_tcp_egre ipv6_cidr_blocks = ["::/0"] security_group_id = aws_security_group.ingestion_database_connection.id } - - -module "load_all_academy_data_into_redshift" { - count = local.is_live_environment && !local.is_production_environment ? 1 : 0 - tags = module.tags.values - source = "../modules/aws-glue-job" - is_live_environment = local.is_live_environment - is_production_environment = local.is_production_environment - job_name = "${local.short_identifier_prefix}load_all_academy_data_into_redshift" - script_s3_object_key = aws_s3_object.load_all_academy_data_into_redshift.key - pydeequ_zip_key = aws_s3_object.pydeequ.key - helper_module_key = aws_s3_object.helpers.key - glue_role_arn = aws_iam_role.glue_role.arn - glue_temp_bucket_id = module.glue_temp_storage.bucket_id - glue_scripts_bucket_id = module.glue_scripts.bucket_id - spark_ui_output_storage_id = module.spark_ui_output_storage.bucket_id - glue_version = "4.0" - glue_job_worker_type = "G.1X" - number_of_workers_for_glue_job = 2 - glue_job_timeout = 220 - schedule = "cron(15 7 ? * MON-FRI *)" - # jdbc_connections = [module.database_ingestion_via_jdbc_connection[0].name] - jdbc_connections = [aws_glue_connection.database_ingestion_via_jdbc_connection[0].name] - job_parameters = { - "--additional-python-modules" = "botocore==1.27.59, redshift_connector==2.1.0" - "--environment" = var.environment - # This is the ARN of the IAM role used by Redshift Serverless. We have count in redshift-serverless module so index 0 is to get the ARN. - "--role_arn" = try(module.redshift_serverless[0].redshift_serverless_role_arn, "") - "--enable-auto-scaling" = "false" - "--job-bookmark-option" = "job-bookmark-disable" - "--base_s3_url" = "${module.raw_zone.bucket_url}/revenues/" - "--conf" = "spark.sql.legacy.timeParserPolicy=LEGACY --conf spark.sql.legacy.parquet.int96RebaseModeInRead=LEGACY --conf spark.sql.legacy.parquet.int96RebaseModeInWrite=LEGACY --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=LEGACY --conf spark.sql.legacy.parquet.datetimeRebaseModeInWrite=LEGACY" - } -} \ No newline at end of file