Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scripts/helpers/housing_nec_migration_gx_dq_inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"properties_1c",
"properties_1d",
"properties_1e",
"properties_2a",
"properties_4a",
"properties_4c",
]
Expand Down
61 changes: 52 additions & 9 deletions scripts/jobs/housing/housing_nec_migration_apply_gx_dq_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
"s3_staging_location",
"target_database",
"target_table",
"target_table_metadata",
"s3_target_location_metadata",
"s3_target_location_results",
]
args = getResolvedOptions(sys.argv, arg_keys)
locals().update(args)
Expand All @@ -50,6 +53,7 @@ def main():
context = gx.get_context(mode="file", project_root_dir=s3_target_location)

table_results_df_list = []
df_all_suite_list = []

for data_load in data_load_list:
logger.info(f"{data_load} loading...")
Expand All @@ -75,6 +79,7 @@ def main():

# get expectation suite for dataset
suite = context.suites.get(name=f"{data_load}_data_load_suite")
expectations = suite.expectations

validation_definition = gx.ValidationDefinition(
data=batch_definition,
Expand Down Expand Up @@ -125,14 +130,45 @@ def main():
list(df[id_field].iloc[row["result.unexpected_index_list"]])
)

# drop columns not needed in metatdata
cols_to_drop_meta = [
"notes",
"result_format",
"catch_exceptions",
"rendered_content",
"windows",
]

suite_df = pd.DataFrame()
for i in expectations:
temp_i = i
temp_df = pd.json_normalize(dict(temp_i))
temp_df["expectation_type"] = temp_i.expectation_type
temp_df["dataset_name"] = table
temp_df = temp_df.drop(columns=cols_to_drop_meta)
suite_df = pd.concat([suite_df, temp_df])

df_all_suite_list.append(suite_df)

results_df = pd.concat(table_results_df_list)
metadata_df = pd.concat(df_all_suite_list)

# add expectation_id
metadata_df["expectation_id"] = (
metadata_df["expectation_type"] + "_" + metadata_df["dataset_name"]
)
metadata_df["import_date"] = datetime.today().strftime("%Y%m%d")

# set dtypes for Athena with default of string
dict_values = ["string" for _ in range(len(metadata_df.columns))]
dtype_dict_metadata = dict(zip(metadata_df.columns, dict_values))

# add clean dataset name
results_df["dataset_name"] = results_df["expectation_config.kwargs.batch_id"].map(
lambda x: x.removeprefix("pandas-").removesuffix("_df_asset")
)

# add composite key for each specific test (so can be tracked over time)
# add composite key for each test (so can be tracked over time)
results_df.insert(
loc=0,
column="expectation_key",
Expand All @@ -146,8 +182,8 @@ def main():
)
results_df["import_date"] = datetime.today().strftime("%Y%m%d")

# set dtypes for Athena
dtype_dict = {
# # set dtypes for Athena
dtype_dict_results = {
"expectation_config.type": "string",
"expectation_config.kwargs.batch_id": "string",
"expectation_config.kwargs.column": "string",
Expand All @@ -165,20 +201,27 @@ def main():
"import_date": "string",
}

# write to s3
# TODO for df_vars in [[results_df, dtype_dict_results, target_table], [metadata_df, dtype_dict_metadata, target_metadata_table]]:
# will loop the writing of these tables

wr.s3.to_parquet(
df=results_df,
path=s3_target_location,
path=s3_target_location_results,
dataset=True,
database=target_database,
table=target_table,
mode="overwrite",
dtype=dtype_dict,
schema_evolution=True,
dtype=dtype_dict_results,
)

logger.info(
f"Data Quality test results for NEC data loads written to {s3_target_location}"
wr.s3.to_parquet(
df=metadata_df,
path=s3_target_location_metadata,
dataset=True,
database=target_database,
table=target_table_metadata,
mode="overwrite",
dtype=dtype_dict_metadata,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@ module "housing_nec_migration_apply_gx_dq_tests" {
trigger_enabled = local.is_production_environment
number_of_workers_for_glue_job = 2
schedule = "cron(0 10 ? * MON-FRI *)"
job_parameters = {
job_parameters = {
"--job-bookmark-option" = "job-bookmark-enable"
"--enable-glue-datacatalog" = "true"
"--enable-continuous-cloudwatch-log" = "true"
"--additional-python-modules" = "great_expectations==1.5.8,PyAthena,numpy==1.26.1,awswrangler==3.10.0"
"--region_name" = data.aws_region.current.name
"--s3_endpoint" = "https://s3.${data.aws_region.current.name}.amazonaws.com"
"--s3_target_location" = "s3://${module.raw_zone_data_source.bucket_id}/housing/nec-migration-data-quality-tests/"
"--s3_target_location_metadata" = "s3://${module.raw_zone_data_source.bucket_id}/housing/nec-migration-data-quality-tests/metadata/"
"--s3_target_location_results" = "s3://${module.raw_zone_data_source.bucket_id}/housing/nec-migration-data-quality-tests/results/"
"--s3_staging_location" = "s3://${module.athena_storage_data_source.bucket_id}/housing/nec-migration-data-quality-tests/"
"--target_database" = "housing_nec_migration"
"--target_table" = "housing_nec_data_loads_dq_tests"
"--target_table_metadata" = "housing_nec_data_loads_dq_metadata"

}

script_name = "housing_nec_migration_apply_gx_dq_tests"
Expand Down
Loading