Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions scripts/helpers/housing_nec_migration_gx_dq_inputs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
sql_config = {
"properties_1a": {
"sql": """ SELECT *
FROM "housing_nec_migration"."properties_1a" """,
"id_field": "LPRO_PROPREF",
},
}


table_list = ['properties_1a']

partition_keys = ['import_date']
142 changes: 142 additions & 0 deletions scripts/jobs/housing/housing_nec_migration_apply_gx_dq_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# flake8: noqa: F821

import awswrangler as wr
from datetime import datetime, date
import json
import logging
import sys

from awsglue.utils import getResolvedOptions
import great_expectations as gx
import pandas as pd
from pyathena import connect
from scripts.helpers.housing_nec_migration_gx_dq_inputs import sql_config, table_list, partition_keys
import scripts.jobs.housing.housing_nec_migration_properties_data_load_gx_suite

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

arg_keys = ['region_name', 's3_endpoint', 's3_target_location', 's3_staging_location', 'target_database',
'target_table']
args = getResolvedOptions(sys.argv, arg_keys)
locals().update(args)


def json_serial(obj):
"""JSON serializer for objects not serializable by default."""
if isinstance(obj, (datetime, date)):
return obj.isoformat()
raise TypeError(f"Type {type(obj)} not serializable")


def main():
# add GX context
context = gx.get_context(mode="file", project_root_dir=s3_target_location)

table_results_df_list = []

for table in table_list:
logger.info(f'{table} loading...')

sql_query = sql_config.get(table).get('sql')

conn = connect(s3_staging_dir=s3_staging_location,
region_name=region_name)

df = pd.read_sql_query(sql_query, conn)

# set up batch
data_source = context.data_sources.add_pandas("pandas")
data_asset = data_source.add_dataframe_asset(name=f'{table}_df_asset')
batch_definition = data_asset.add_batch_definition_whole_dataframe("Athena batch definition")
batch_parameters = {"dataframe": df}

# get expectation suite for dataset
suite = context.suites.get(name='properties_data_load_suite')

validation_definition = gx.ValidationDefinition(
data=batch_definition,
suite=suite,
name=f'validation_definition_{table}')
validation_definition = context.validation_definitions.add(validation_definition)

# create and start checking data with checkpoints
checkpoint = context.checkpoints.add(
gx.checkpoint.checkpoint.Checkpoint(
name=f'{table}_checkpoint',
validation_definitions=[validation_definition],
result_format={"result_format": "COMPLETE",
"return_unexpected_index_query": False,
"partial_unexpected_count": 0}
)
)

checkpoint_result = checkpoint.run(batch_parameters=batch_parameters)
results_dict = list(checkpoint_result.run_results.values())[0].to_json_dict()
table_results_df = pd.json_normalize(results_dict['results'])
cols_to_drop = [c for c in table_results_df.columns if c.startswith('exception_info')]
cols_to_drop.append('result.unexpected_list')
table_results_df = table_results_df.drop(columns=cols_to_drop)
table_results_df_list.append(table_results_df)

# generate id lists for each unexpected result set
query_df = table_results_df.loc[(~table_results_df['result.unexpected_index_list'].isna()) & (
table_results_df['result.unexpected_index_list'].values != '[]')]

table_results_df['unexpected_id_list'] = pd.Series(dtype='object')
for i, row in query_df.iterrows():
table_results_df.loc[i, 'unexpected_id_list'] = str(
list(df[sql_config.get(table).get('id_field')].iloc[row['result.unexpected_index_list']]))

results_df = pd.concat(table_results_df_list)

# map DQ dimension type
results_df['dq_dimension_type'] = results_df['expectation_config.type'].map(dq_dimensions_map)

# add clean dataset name
results_df['dataset_name'] = results_df['expectation_config.kwargs.batch_id'].map(
lambda x: x.removeprefix('pandas-').removesuffix('_df_asset'))

# add composite key for each specific test (so can be tracked over time)
results_df.insert(loc=0, column='expectation_key',
value=results_df.set_index(['expectation_config.type', 'dataset_name']).index.factorize()[0] + 1)
results_df['expectation_id'] = results_df['expectation_config.type'] + "_" + results_df['dataset_name']
results_df['import_date'] = datetime.today().strftime('%Y%m%d')

# set dtypes for Athena
dtype_dict = {'expectation_config.type': 'string',
'expectation_config.kwargs.batch_id': 'string',
'expectation_config.kwargs.column': 'string',
'expectation_config.kwargs.min_value': 'string',
'expectation_config.kwargs.max_value': 'string',
'result.element_count': 'bigint',
'result.unexpected_count': 'bigint',
'result.missing_count': 'bigint',
'result.partial_unexpected_list': 'array<string>',
'result.unexpected_index_list': 'array<bigint>',
'result.unexpected_index_query': 'string',
'expectation_config.kwargs.regex': 'string',
'expectation_config.kwargs.value_set': 'string',
'expectation_config.kwargs.column_list': 'string',
'import_year': 'string',
'import_month': 'string',
'import_day': 'string',
'import_date': 'string'}

# write to s3
wr.s3.to_parquet(
df=results_df,
path=s3_target_location,
dataset=True,
database=target_database,
table=target_table,
mode="overwrite_partitions",
partition_cols=partition_keys,
dtype=dtype_dict
)

logger.info(f'Data Quality test results for NEC data loads written to {s3_target_location}')


if __name__ == '__main__':
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# flake8: noqa: F821

import sys

from awsglue.utils import getResolvedOptions
import great_expectations as gx
import great_expectations.expectations as gxe


class ExpectPropRefColumnValuesToBeUnique(gxe.ExpectColumnValuesToBeUnique):
column: str = "LPRO_PROPREF"
description: str = "Expect UPRN (LPRO_PROPREF) values to be unique"


class ExpectPropTypeCodeToBeInSet(gxe.ExpectColumnValuesToBeInSet):
column: str = "LPRO_HOU_PTV_CODE"
value_set: list = [
"BUN",
"CMC",
"CMF",
"COM",
"CYC",
"DUP",
"FLT",
"GAR",
"HOU",
"MAI",
"PRA",
"PSP",
"ROM",
"STD",
"TRV",
]
description: str = "Expect property type codes to contain one of the set"


arg_key = ["s3_target_location"]
args = getResolvedOptions(sys.argv, arg_key)
locals().update(args)

# add to GX context
context = gx.get_context(mode="file", project_root_dir=s3_target_location)

suite = gx.ExpectationSuite(name="properties_data_load_suite")

suite.add_expectation(ExpectPropRefColumnValuesToBeUnique())
suite.add_expectation(ExpectPropTypeCodeToBeInSet())
suite = context.suites.add(suite)
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
module "housing_nec_migration_apply_gx_dq_tests" {
source = "../modules/aws-glue-job"
is_production_environment = local.is_production_environment
is_live_environment = local.is_live_environment

count = local.is_live_environment ? 1 : 0

department = module.department_housing_data_source
job_name = "${local.short_identifier_prefix}Housing NEC Migration GX Data Quality Testing"
glue_scripts_bucket_id = module.glue_scripts_data_source.bucket_id
glue_temp_bucket_id = module.glue_temp_storage_data_source.bucket_id
glue_job_timeout = 360
helper_module_key = data.aws_s3_object.helpers.key
pydeequ_zip_key = data.aws_s3_object.pydeequ.key
spark_ui_output_storage_id = module.spark_ui_output_storage_data_source.bucket_id
trigger_enabled = local.is_production_environment
number_of_workers_for_glue_job = 2
schedule = "cron(0 10 ? * MON-FRI *)"
job_parameters = {
"--job-bookmark-option" = "job-bookmark-enable"
"--enable-glue-datacatalog" = "true"
"--enable-continuous-cloudwatch-log" = "true"
"--additional-python-modules" = "great_expectations==1.5.8,PyAthena,numpy==1.26.1,awswrangler==3.10.0"
"--region_name" = data.aws_region.current.name
"--s3_endpoint" = "https://s3.${data.aws_region.current.name}.amazonaws.com"
"--s3_target_location" = "s3://${module.raw_zone_data_source.bucket_id}/housing/nec-migration-data-quality-tests/"
"--s3_staging_location" = "s3://${module.athena_storage_data_source.bucket_id}/housing/nec-migration-data-quality-tests/"
"--target_database" = "housing_nec_migration"
"--target_table" = "housing_nec_data_loads_dq_tests"
}

script_name = "housing_nec_migration_apply_gx_dq_tests"
}