Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions scripts/helpers/housing_nec_migration_gx_dq_inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,22 @@
"tenancies": {"id_field": "LTCY_ALT_REF"},
"people": {"id_field": "LPAR_PER_ALT_REF"},
"contacts": {"id_field": "LCDE_LEGACY_REF"},
"arrears_actions": {"id_field": "LACA_PAY_REF"},
"revenue_accounts": {"id_field": "LRAC_PAY_REF"},
"transactions": {"id_field": "LTRN_ALT_REF"},
"addresses": {"id_field": "LAUS_LEGACY_REF"}
}

data_load_list = ["properties", "tenancies", "people", "contacts", "arrears_actions"]
data_load_list = [
"properties",
"tenancies",
"people",
"contacts",
"arrears_actions",
"revenue_accounts",
"transactions",
"addresses",
]

table_list = {
"properties": [
Expand All @@ -31,7 +44,26 @@
],
"people": ["people_1a", "people_1b", "people_1c", "people_2a", "people_all"],
"contacts": ["contacts_1a", "contacts_1b", "contacts_2a", "contacts_all"],
"arrears_actions": ["arrears_actions_1a", "arrears_actions_1c", "arrears_actions_2a"],
"arrears_actions": [
"arrears_actions_1a",
"arrears_actions_1c",
"arrears_actions_2a",
],
"revenue_accounts": [
"revenue_accounts_1a",
"revenue_accounts_1b_sc",
"revenue_accounts_1c",
"revenue_accounts_2a",
"revenue_accounts_other",
],
"transactions": [
"transactions_1a",
"transactions_1c",
"transactions_2a",
"transactions_other",
"transactions_all",
],
"addresses": ["addresses_1a", "addresses_2a"],
}

partition_keys = ["import_date"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# flake8: noqa: F821

import sys

from awsglue.utils import getResolvedOptions
import great_expectations as gx
import great_expectations.expectations as gxe


class ExpectPropRefColumnValuesToBeUnique(gxe.ExpectColumnValuesToBeUnique):
column: str = "LAUS_LEGACY_REF"
description: str = "Expect LAUS_LEGACY_REF values to be unique"

class ExpectPropRefColumnValuesToNotBeNull(gxe.ExpectColumnValuesToNotBeNull):
column: str = "LAUS_LEGACY_REF"
description: str = "Expect LAUS_LEGACY_REF values to not be Null"

class ExpectUPRNColumnValuesToBeUnique(gxe.ExpectColumnValuesToBeUnique):
column: str = "LADR_UPRN"
description: str = "Expect UPRN values to be unique"

class ExpectUPRNColumnValuesToNotBeNull(gxe.ExpectColumnValuesToNotBeNull):
column: str = "LADR_UPRN"
description: str = "Expect UPRN values to not be Null"

class ExpectAddressColumnsToMatchOrderedList(gxe.ExpectTableColumnsToMatchOrderedList):
column_list = [
"LAUS_LEGACY_REF",
"LAUS_AUT_FAO_CODE",
"LAUS_AUT_FAR_CODE",
"LAUS_START_DATE",
"LAUS_END_DATE",
"LADR_FLAT",
"LADR_BUILDING",
"LADR_STREET_NUMBER",
"LAEL_STREET",
"LAEL_SUB_STREET1",
"LAEL_SUB_STREET2",
"LAEL_SUB_STREET3",
"LAEL_AREA",
"LAEL_TOWN",
"LAEL_COUNTY",
"LAEL_COUNTRY",
"LAEL_POSTCODE",
"LAEL_LOCAL_IND",
"LAEL_ABROAD_IND",
"LADD_ADDL1",
"LADD_ADDL2",
"LADD_ADDL3",
"LAEL_STREET_INDEX_CODE",
"LAUS_CONTACT_NAME",
"LADR_EASTINGS",
"LADR_NORTHINGS",
"LADR_UPRN"
]
description: str = "Expect columns to match ordered list exactly"


arg_key = ["s3_target_location"]
args = getResolvedOptions(sys.argv, arg_key)
locals().update(args)

# add to GX context
context = gx.get_context(mode="file", project_root_dir=s3_target_location)

suite = gx.ExpectationSuite(name="addresses_data_load_suite")

suite.add_expectation(ExpectPropRefColumnValuesToBeUnique())
suite.add_expectation(ExpectUPRNColumnValuesToBeUnique())
suite.add_expectation(ExpectAddressColumnsToMatchOrderedList())
suite.add_expectation(ExpectPropRefColumnValuesToNotBeNull())
suite.add_expectation(ExpectUPRNColumnValuesToNotBeNull())
suite = context.suites.add(suite)
12 changes: 10 additions & 2 deletions scripts/jobs/housing/housing_nec_migration_apply_gx_dq_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import scripts.jobs.housing.housing_nec_migration_people_data_load_gx_suite
import scripts.jobs.housing.housing_nec_migration_contacts_data_load_gx_suite
import scripts.jobs.housing.housing_nec_migration_arrears_actions_data_load_gx_suite
import scripts.jobs.housing.housing_nec_migration_revenue_accounts_data_load_gx_suite
import scripts.jobs.housing.housing_nec_migration_transactions_data_load_gx_suite
import scripts.jobs.housing.housing_nec_migration_addresses_data_load_gx_suite

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -86,8 +89,13 @@ def main():
batch_parameters = {"dataframe": df}

# get expectation suite for dataset
suite = context.suites.get(name=f"{data_load}_data_load_suite")
expectations = suite.expectations
try:
suite = context.suites.get(name=f"{data_load}_data_load_suite")
except Exception as e:
logger.info(f"Problem found with {data_load}: GX suite {e}, skipping suite.")
continue
else:
expectations = suite.expectations

validation_definition = gx.ValidationDefinition(
data=batch_definition,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# flake8: noqa: F821

import sys

from awsglue.utils import getResolvedOptions
import great_expectations as gx
import great_expectations.expectations as gxe


class ExpectPayRefColumnValuesToNotBeNull(gxe.ExpectColumnValuesToNotBeNull):
column: str = "LRAC_PAY_REF"
description: str = (
"Expect LLRAC_PAY_REF (pay ref) values to not be Null in contacts load"
)


class ExpectTenancyRefToNotBeNull(gxe.ExpectColumnValuesToNotBeNull):
column: str = "LRAC_TCY_ALT_REF"
description: str = "Expect LRAC_TCY_ALT_REF to not be Null"


class ExpectArrearCodeToBeInSet(gxe.ExpectColumnValuesToBeInSet):
column: str = "LACA_ARA_CODE"
value_set: list = [
"BAGF",
"BAG1",
"RTRN",
"FRA2",
"ADVR",
"RDCN",
"DWPR",
"DWPN",
"EDCL",
"NOTE",
"RHCN",
"CORT",
"IUPO",
"IRA1",
"NTQ",
"CON3",
"RREQ",
"RRFN",
"STAT",
"SHB",
"SNW",
"ARRN",
"POP",
"FRA1",
"BCOL",
"DWPC",
"DWPT",
"COUT",
"NFA",
"NRA2",
"FRET",
"SCH",
"SNP",
"VISN",
"WOA",
"WOC",
"WOH",
"WON",
"CDAT",
"CNOK",
"ADVC",
"EVIC",
"FINC",
"HBN",
"SRA1",
"TRA1",
"NRA1",
"IRA1",
"MRA1",
"TELO",
"RPAN",
"RRHB",
"RRF",
"SAR",
"SBA",
"SCM",
"SSA",
"VISI",
"WOF",
"RCHN",
"RDDN",
"CDL",
"FINI",
"GRA1",
"AGRL",
"SRA2",
"TRA2",
"NRA2",
"IRA2",
"MRA2",
"CWAL",
"TELI",
"RELI",
"LREF",
"NOSP",
"INTV",
"SUP",
"UCC"
]
description: str = "Expect arrear code to be one of the set"


class ExpectArrearsActionsColumnsToMatchOrderedList(gxe.ExpectTableColumnsToMatchOrderedList):
column_list = [
"LACA_BALANCE",
"LACA_PAY_REF",
"LACA_TYPE",
"LACA_CREATED_BY",
"LACA_CREATED_DATE",
"LACA_ARREARS_DISPUTE_IND",
"LACA_ARA_CODE",
"LACA_STATUS",
"LACA_HRV_ADL_CODE",
"LACA_EAC_EPO_CODE",
"LACA_EFFECTIVE_DATE",
"LACA_EXPIRY_DATE",
"LACA_NEXT_ACTION_DATE",
"LACA_AUTH_DATE",
"LACA_AUTH_USERNAME",
"LACA_PRINT_DATE",
"LACA_DEL_"
]
description: str = "Expect columns to match ordered list exactly"


arg_key = ["s3_target_location"]
args = getResolvedOptions(sys.argv, arg_key)
locals().update(args)

# add to GX context
context = gx.get_context(mode="file", project_root_dir=s3_target_location)

suite = gx.ExpectationSuite(name="revenue_accounts_data_load_suite")

suite.add_expectation(ExpectArrearsActionsColumnsToMatchOrderedList())
suite.add_expectation(ExpectArrearCodeToBeInSet())
suite.add_expectation(ExpectPayRefColumnValuesToNotBeNull())
suite.add_expectation(ExpectTenancyRefToNotBeNull())
suite = context.suites.add(suite)
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# flake8: noqa: F821

import sys

from awsglue.utils import getResolvedOptions
import great_expectations as gx
import great_expectations.expectations as gxe


class ExpectPayRefColumnValuesToNotBeNull(gxe.ExpectColumnValuesToNotBeNull):
column: str = "LTRN_ALT_REF"
description: str = "Expect LTRN_ALT_REF values to not be Null in contacts load"


arg_key = ["s3_target_location"]
args = getResolvedOptions(sys.argv, arg_key)
locals().update(args)

# add to GX context
context = gx.get_context(mode="file", project_root_dir=s3_target_location)

suite = gx.ExpectationSuite(name="transactions_data_load_suite")

suite.add_expectation(ExpectPayRefColumnValuesToNotBeNull())
suite = context.suites.add(suite)