diff --git a/scripts/jobs/data_and_insight/person_matching_module.py b/scripts/jobs/data_and_insight/person_matching_module.py index 93bbce78e..5dcb5e6ae 100644 --- a/scripts/jobs/data_and_insight/person_matching_module.py +++ b/scripts/jobs/data_and_insight/person_matching_module.py @@ -11,33 +11,66 @@ from graphframes import GraphFrame from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel -from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator +from pyspark.ml.evaluation import ( + BinaryClassificationEvaluator, + MulticlassClassificationEvaluator, +) from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler from pyspark.ml.functions import vector_to_array from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel from pyspark.sql import DataFrame, SparkSession, Column -from pyspark.sql.functions import to_date, col, lit, length, broadcast, udf, when, substring, lower, concat_ws, soundex, \ - regexp_replace, trim, split, struct, arrays_zip, array, array_sort, current_date +from pyspark.sql.functions import ( + to_date, + col, + lit, + length, + broadcast, + udf, + when, + substring, + lower, + concat_ws, + soundex, + regexp_replace, + trim, + split, + struct, + arrays_zip, + array, + array_sort, + current_date, +) from pyspark.sql.pandas.functions import pandas_udf -from pyspark.sql.types import StructType, StructField, StringType, DateType, BooleanType, DoubleType - -extracted_name_schema = StructType([ - StructField("entity_type", StringType(), True), - StructField("title", StringType(), True), - StructField("first_name", StringType(), True), - StructField("middle_name", StringType(), True), - StructField("last_name", StringType(), True) -]) - -features_schema = StructType([ - StructField("first_name_similar", BooleanType(), True), - StructField("middle_name_similar", BooleanType(), True), - StructField("last_name_similar", BooleanType(), True), - StructField("name_similarity", DoubleType(), True), - StructField("address_line_1_similarity", DoubleType(), True), - StructField("address_line_2_similarity", DoubleType(), True), - StructField("full_address_similarity", DoubleType(), True), -]) +from pyspark.sql.types import ( + StructType, + StructField, + StringType, + DateType, + BooleanType, + DoubleType, +) + +extracted_name_schema = StructType( + [ + StructField("entity_type", StringType(), True), + StructField("title", StringType(), True), + StructField("first_name", StringType(), True), + StructField("middle_name", StringType(), True), + StructField("last_name", StringType(), True), + ] +) + +features_schema = StructType( + [ + StructField("first_name_similar", BooleanType(), True), + StructField("middle_name_similar", BooleanType(), True), + StructField("last_name_similar", BooleanType(), True), + StructField("name_similarity", DoubleType(), True), + StructField("address_line_1_similarity", DoubleType(), True), + StructField("address_line_2_similarity", DoubleType(), True), + StructField("full_address_similarity", DoubleType(), True), + ] +) @udf(returnType=extracted_name_schema) @@ -56,29 +89,159 @@ def extract_person_name(name: str) -> (str, str, str, str, str): name: combined name including first name, last name, title etc. Returns: A quadruple where each element represents title, first_name, middle_name, last_name """ - common_titles = ["mr", "mr.", "mrs", "mrs.", "ms", "ms.", "miss", "master", "exor", "exors", "executors", "of", - "rep", "per", "pers", "reps", "prep", "&prep", "pe", "personal", "repmr", "repmrs", "repsmr", - "repsmrs", "the", "reps.of", - "dr", "dr.", "prof", "profeessor", "rev", "lady", "dame", "sir", "lord"] - common_titles_subset_with_space = ["mr ", "mr. ", "mrs ", "mrs. ", "ms ", "ms. ", "miss ", "exor ", "exors "] - - common_business_types_small = ["ltd", "llp", "plc", "pvt", "&", "lbh", " inc,", "llc", "bv"] - common_business_types = ["limited", - "association", "housing", "trust", "home", "society", "estates", "properties", "property", - "group", "fund", "invest", "investment", "estate", "development", "board", - "letting", "agent", "accommodat", "occupier", "residential", "relocation", - "accomodation", "traveller", "living", "education", "residence", "resident", - "organisation", "management", "international", "national", "clinic", "solutions", - "service", "system", "security", "move", "store", "academy", "ventures", "rent", - "account", "building", "company", "congregation", "project", "residencial", "centre", - "sport", "assets", "developer", "asylum", "committee", "school", "apartment", - "chartered", "consultant", "enterprise", "corporate", "associates", "studios", - "consultancy", "borough", - "holdings", "agency", "propperties", "hotel", "lodge", "university", "proeprties", - "hackney", "empty", "void", "london", "council"] - - deceased_flags = ["decd", "dec'd", "d'cead", "desd", "d'ced", "de'd", "def'd", "dea's", "dece'd", "dec", "dec`d", - "deceased"] + common_titles = [ + "mr", + "mr.", + "mrs", + "mrs.", + "ms", + "ms.", + "miss", + "master", + "exor", + "exors", + "executors", + "of", + "rep", + "per", + "pers", + "reps", + "prep", + "&prep", + "pe", + "personal", + "repmr", + "repmrs", + "repsmr", + "repsmrs", + "the", + "reps.of", + "dr", + "dr.", + "prof", + "profeessor", + "rev", + "lady", + "dame", + "sir", + "lord", + ] + common_titles_subset_with_space = [ + "mr ", + "mr. ", + "mrs ", + "mrs. ", + "ms ", + "ms. ", + "miss ", + "exor ", + "exors ", + ] + + common_business_types_small = [ + "ltd", + "llp", + "plc", + "pvt", + "&", + "lbh", + " inc,", + "llc", + "bv", + ] + common_business_types = [ + "limited", + "association", + "housing", + "trust", + "home", + "society", + "estates", + "properties", + "property", + "group", + "fund", + "invest", + "investment", + "estate", + "development", + "board", + "letting", + "agent", + "accommodat", + "occupier", + "residential", + "relocation", + "accomodation", + "traveller", + "living", + "education", + "residence", + "resident", + "organisation", + "management", + "international", + "national", + "clinic", + "solutions", + "service", + "system", + "security", + "move", + "store", + "academy", + "ventures", + "rent", + "account", + "building", + "company", + "congregation", + "project", + "residencial", + "centre", + "sport", + "assets", + "developer", + "asylum", + "committee", + "school", + "apartment", + "chartered", + "consultant", + "enterprise", + "corporate", + "associates", + "studios", + "consultancy", + "borough", + "holdings", + "agency", + "propperties", + "hotel", + "lodge", + "university", + "proeprties", + "hackney", + "empty", + "void", + "london", + "council", + ] + + deceased_flags = [ + "decd", + "dec'd", + "d'cead", + "desd", + "d'ced", + "de'd", + "def'd", + "dea's", + "dece'd", + "dec", + "dec`d", + "deceased", + ] junk_data = ["test", "owner"] @@ -87,20 +250,27 @@ def extract_person_name(name: str) -> (str, str, str, str, str): return "Unknown", None, None, None, None if any(business in name.casefold() for business in common_business_types) or ( - any(business in name.casefold() for business in common_business_types_small) and - not any(t in name.casefold() for t in common_titles_subset_with_space)): + any(business in name.casefold() for business in common_business_types_small) + and not any(t in name.casefold() for t in common_titles_subset_with_space) + ): return "Business", None, None, None, None person_title, first_name, middle_name, last_name = None, None, None, None - deceased_title = "(Deceased)" if any(dec in name.casefold() for dec in deceased_flags) else None + deceased_title = ( + "(Deceased)" if any(dec in name.casefold() for dec in deceased_flags) else None + ) name_cleaned = re.sub(r"\([^()]*\)", " ", name) # removes parentheses name_list = [n.strip() for n in name_cleaned.split(",") if n.strip()] if len(name_list) == 1: - parts_of_name = [n for n in name_list[0].split() if n.casefold() not in deceased_flags] + parts_of_name = [ + n for n in name_list[0].split() if n.casefold() not in deceased_flags + ] title_finder = [t for t in parts_of_name if t.casefold() in common_titles] person_title = " ".join(title_finder) if len(title_finder) else None - name_without_title = [n for n in parts_of_name if n.casefold() not in common_titles] + name_without_title = [ + n for n in parts_of_name if n.casefold() not in common_titles + ] if len(name_without_title) == 1: last_name = name_without_title[0] elif len(name_without_title) == 2: @@ -112,17 +282,28 @@ def extract_person_name(name: str) -> (str, str, str, str, str): last_name = name_without_title[-1] middle_name = " ".join(name_without_title[1:-1]) elif len(name_list) == 2: - title_finder = [t for t in name_list[0].split() if t.casefold() in common_titles] + title_finder = [ + t for t in name_list[0].split() if t.casefold() in common_titles + ] person_title = " ".join(title_finder) if len(title_finder) else None last_name = " ".join( - [n for n in name_list[0].split() if n and n.casefold() not in deceased_flags + common_titles]) + [ + n + for n in name_list[0].split() + if n and n.casefold() not in deceased_flags + common_titles + ] + ) title_with_name = name_list[1].split() if not person_title: title_finder = [t for t in title_with_name if t.casefold() in common_titles] person_title = " ".join(title_finder) if len(title_finder) else None - remaining_name = [n for n in title_with_name if - n.casefold() != (person_title or "").casefold() and n.casefold() not in common_titles and - n.casefold() not in [".", "&"]] + remaining_name = [ + n + for n in title_with_name + if n.casefold() != (person_title or "").casefold() + and n.casefold() not in common_titles + and n.casefold() not in [".", "&"] + ] if len(remaining_name) == 1: first_name = remaining_name[0] @@ -130,9 +311,15 @@ def extract_person_name(name: str) -> (str, str, str, str, str): first_name, middle_name = remaining_name elif len(remaining_name) > 2: first_name = remaining_name[0] - middle_name = ' '.join(remaining_name[1:]) # middle name includes anything not in first name or last name - - title = " ".join(filter(None, (person_title, deceased_title))).strip() if person_title or deceased_title else None + middle_name = " ".join( + remaining_name[1:] + ) # middle name includes anything not in first name or last name + + title = ( + " ".join(filter(None, (person_title, deceased_title))).strip() + if person_title or deceased_title + else None + ) last_name = last_name if last_name else None return "Person", title, first_name, middle_name, last_name @@ -177,20 +364,22 @@ def categorise_title(title: Column) -> Column: category_sir = title.contains("sir") # Priority 12 category_rabbi = title.contains("rabbi") # Priority 13 - return when(category_master, lit("master")) \ - .when(category_ms, lit("ms")) \ - .when(category_mrs, lit("mrs")) \ - .when(category_miss, lit("miss")) \ - .when(category_mr, lit("mr")) \ - .when(category_dame, lit("dame")) \ - .when(category_lady, lit("lady")) \ - .when(category_lord, lit("lord")) \ - .when(category_prof, lit("prof")) \ - .when(category_dr, lit("dr")) \ - .when(category_rev, lit("rev")) \ - .when(category_sir, lit("sir")) \ - .when(category_rabbi, lit("rabbi")) \ + return ( + when(category_master, lit("master")) + .when(category_ms, lit("ms")) + .when(category_mrs, lit("mrs")) + .when(category_miss, lit("miss")) + .when(category_mr, lit("mr")) + .when(category_dame, lit("dame")) + .when(category_lady, lit("lady")) + .when(category_lord, lit("lord")) + .when(category_prof, lit("prof")) + .when(category_dr, lit("dr")) + .when(category_rev, lit("rev")) + .when(category_sir, lit("sir")) + .when(category_rabbi, lit("rabbi")) .otherwise("unknown") + ) def standardize_name(name: Column) -> Column: @@ -210,12 +399,22 @@ def standardize_name(name: Column) -> Column: Returns: Column after applying the rules """ - return when(name.isNull(), lit("")) \ - .otherwise( - lower(trim(regexp_replace(regexp_replace(regexp_replace(name, "0", "O"), "1", "L"), "^[\\&*./\\\]+", "")))) + return when(name.isNull(), lit("")).otherwise( + lower( + trim( + regexp_replace( + regexp_replace(regexp_replace(name, "0", "O"), "1", "L"), + r"^[\&*./\]+", + "", + ) + ) + ) + ) -def standardize_full_name(first_name: Column, middle_name: Column, last_name: Column) -> Column: +def standardize_full_name( + first_name: Column, middle_name: Column, last_name: Column +) -> Column: """A Dataframe helper function to sort person's name. This will help to create a full name composed of first_name, middle_name and last_name with any surplus whitespace removed. @@ -227,7 +426,9 @@ def standardize_full_name(first_name: Column, middle_name: Column, last_name: Co Returns: A single name with composed of first_name, middle_name and last_name. """ - return regexp_replace(trim(concat_ws(" ", first_name, middle_name, last_name)), r"\s+", " ") + return regexp_replace( + trim(concat_ws(" ", first_name, middle_name, last_name)), r"\s+", " " + ) def standardize_address_line(address_line: Column) -> Column: @@ -241,8 +442,12 @@ def standardize_address_line(address_line: Column) -> Column: return when(address_line.isNull(), lit("")).otherwise(trim(lower(address_line))) -def full_address(address_line_1: Column, address_line_2: Column, address_line_3: Column, - address_line_4: Column) -> Column: +def full_address( + address_line_1: Column, + address_line_2: Column, + address_line_3: Column, + address_line_4: Column, +) -> Column: """A DataFrame helper function that joins all the parts of the address to form a single address. For example if the address is: @@ -262,11 +467,14 @@ def full_address(address_line_1: Column, address_line_2: Column, address_line_3: Returns: Full address after joining all the lines. """ - return trim(concat_ws(" ", address_line_1, address_line_2, address_line_3, address_line_4)) + return trim( + concat_ws(" ", address_line_1, address_line_2, address_line_3, address_line_4) + ) -def prepare_clean_housing_data(person_reshape: DataFrame, assets_reshape: DataFrame, - tenure_reshape: DataFrame) -> DataFrame: +def prepare_clean_housing_data( + person_reshape: DataFrame, assets_reshape: DataFrame, tenure_reshape: DataFrame +) -> DataFrame: """A function to prepare and clean housing data. Args: person_reshape: Dataframe containing person reshape data @@ -275,45 +483,92 @@ def prepare_clean_housing_data(person_reshape: DataFrame, assets_reshape: DataFr Returns: A prepared and cleaned dataframe containing housing tenancy data. """ - tenure_reshape = tenure_reshape.filter((tenure_reshape["endoftenuredate"].isNull()) | ( - tenure_reshape["endoftenuredate"].cast(DateType()) > current_date())) + tenure_reshape = tenure_reshape.filter( + (tenure_reshape["endoftenuredate"].isNull()) + | (tenure_reshape["endoftenuredate"].cast(DateType()) > current_date()) + ) - assets_reshape = assets_reshape.filter(assets_reshape['assettype'] == 'Dwelling') + assets_reshape = assets_reshape.filter(assets_reshape["assettype"] == "Dwelling") person_reshape = person_reshape.filter( - (person_reshape["type"].isin( - ["Secure", "Introductory", "Leasehold (RTB)", "Mense Profit Ac", "Mesne Profit Ac"])) + ( + person_reshape["type"].isin( + [ + "Secure", + "Introductory", + "Leasehold (RTB)", + "Mense Profit Ac", + "Mesne Profit Ac", + ] + ) + ) & (person_reshape["enddate"].isNull()) - & (person_reshape["person_type"].isin(["Tenant", "HouseholdMember"]))) - - housing = person_reshape \ - .join(assets_reshape, person_reshape["assetid"] == assets_reshape["asset_id"], how="left") \ - .join(tenure_reshape, person_reshape["person_id"] == tenure_reshape["person_id"], how="left") \ - .withColumn("source", lit("housing")) \ - .withColumn("extracted_name", extract_name_udf(col("member_fullname"))) \ - .withColumn("title", - when((col("extracted_name.title").isNull()) | - (lower(col("extracted_name.title")) == lower(col("preferredTitle"))), - col("preferredTitle")) - .otherwise(concat_ws(" ", col("preferredTitle"), col("extracted_name.title")))) \ - .withColumn("first_name", col("extracted_name.first_name")) \ - .withColumn("middle_name", col("extracted_name.middle_name")) \ - .withColumn("last_name", col("extracted_name.last_name")) \ - .withColumn("dob", to_date(substring(person_reshape["dateofbirth"], 1, 10), format="yyyy-MM-dd")) \ - .withColumn("date_of_birth", # null value represented by 1900-01-01, so converting 1900-01-01 to null - when(col("dob") == to_date(lit("1900-01-01"), "yyyy-MM-dd"), lit(None).cast(DateType())) - .otherwise(col("dob"))) \ - .withColumnRenamed("postcode", "post_code") \ - .withColumnRenamed("addressline1", "address_line_1") \ - .withColumnRenamed("addressline2", "address_line_2") \ - .withColumnRenamed("addressline3", "address_line_3") \ - .withColumnRenamed("addressline4", "address_line_4") \ - .withColumnRenamed("placeOfBirth", "place_of_birth") \ - .filter((length(col("first_name")) > 0) | (length(col("last_name")) > 0)) \ - .select(col("source"), person_reshape["person_id"], person_reshape["uprn"], col("title"), - col("first_name"), col("middle_name"), col("last_name"), col("date_of_birth"), - col("post_code"), col("address_line_1"), col("address_line_2"), col("address_line_3"), - col("address_line_4"), person_reshape["type"]) + & (person_reshape["person_type"].isin(["Tenant", "HouseholdMember"])) + ) + + housing = ( + person_reshape.join( + assets_reshape, + person_reshape["assetid"] == assets_reshape["asset_id"], + how="left", + ) + .join( + tenure_reshape, + person_reshape["person_id"] == tenure_reshape["person_id"], + how="left", + ) + .withColumn("source", lit("housing")) + .withColumn("extracted_name", extract_name_udf(col("member_fullname"))) + .withColumn( + "title", + when( + (col("extracted_name.title").isNull()) + | (lower(col("extracted_name.title")) == lower(col("preferredTitle"))), + col("preferredTitle"), + ).otherwise( + concat_ws(" ", col("preferredTitle"), col("extracted_name.title")) + ), + ) + .withColumn("first_name", col("extracted_name.first_name")) + .withColumn("middle_name", col("extracted_name.middle_name")) + .withColumn("last_name", col("extracted_name.last_name")) + .withColumn( + "dob", + to_date( + substring(person_reshape["dateofbirth"], 1, 10), format="yyyy-MM-dd" + ), + ) + .withColumn( + "date_of_birth", # null value represented by 1900-01-01, so converting 1900-01-01 to null + when( + col("dob") == to_date(lit("1900-01-01"), "yyyy-MM-dd"), + lit(None).cast(DateType()), + ).otherwise(col("dob")), + ) + .withColumnRenamed("postcode", "post_code") + .withColumnRenamed("addressline1", "address_line_1") + .withColumnRenamed("addressline2", "address_line_2") + .withColumnRenamed("addressline3", "address_line_3") + .withColumnRenamed("addressline4", "address_line_4") + .withColumnRenamed("placeOfBirth", "place_of_birth") + .filter((length(col("first_name")) > 0) | (length(col("last_name")) > 0)) + .select( + col("source"), + person_reshape["person_id"], + person_reshape["uprn"], + col("title"), + col("first_name"), + col("middle_name"), + col("last_name"), + col("date_of_birth"), + col("post_code"), + col("address_line_1"), + col("address_line_2"), + col("address_line_3"), + col("address_line_4"), + person_reshape["type"], + ) + ) return housing @@ -323,56 +578,90 @@ def standardize_housing_data(housing_cleaned: DataFrame) -> DataFrame: standard names that will be used by various other functions like feature engineering etc.) The DataFrame returned will have the following columns: * source: Source of the data like parking, tax etc. Should be of type string and cannot be blank. - * source_id: Unique ID for reach record. It's ok to have same person with different source_id. Should be of type string and cannot be blank. + * source_id: Unique ID for reach record. It's ok to have same person with different source_id. Should be of type + string and cannot be blank. * uprn: UPRN of the address. Should be of type string and can be blank. * title: Title of the person. Should be of type string and can be blank. * first_name: First name of the person. Should be of type string and can be blank. * middle_name: Middle name of the person. Should be of type string and can be blank. * last_name: Last name of the person. Should be of type string and can be blank. - * name: Concatenation of first, middle and last name after sorting alphabetically of the person. Should be of type string and can be blank. + * name: Concatenation of first, middle and last name after sorting alphabetically of the person. Should be of + type string and can be blank. * date_of_birth: Date of birth of the person. Should be of type Date and can be blank. * post_code: Postal code of the address. Should be of type string and can be blank. * address_line_1: First line of the address. Should be of type string and can be blank. * address_line_2: Second line of the address. Should be of type string and can be blank. * address_line_3: Third line of the address. Should be of type string and can be blank. * address_line_4: Fourth line of the address. Should be of type string and can be blank. - * full_address: Concatenation of address line 1, address line 2, address line 3, address line 4 in that order. Should be of type string and can be blank. - * source_filter: A field containing more information on the datasource such as tenancy type; this allows the user to filter the dataset to only + * full_address: Concatenation of address line 1, address line 2, address line 3, address line 4 in that order. + Should be of type string and can be blank. + * source_filter: A field containing more information on the datasource such as tenancy type; this allows the user + to filter the dataset to only include records for certain tenancy types. Args: housing_cleaned: housing DataFrame after preparing and cleaning it. Returns: A housing DataFrame with all the standard columns listed above. """ - housing = housing_cleaned \ - .withColumnRenamed("person_id", "source_id") \ - .withColumnRenamed("type", "source_filter") \ - .withColumn("title", categorise_title(lower(col("title")))) \ - .withColumn("first_name", standardize_name(col("first_name"))) \ - .withColumn("middle_name", standardize_name(col("middle_name"))) \ - .withColumn("last_name", standardize_name(col("last_name"))) \ - .withColumn("name", standardize_full_name(col("first_name"), col("middle_name"), col("last_name"))) \ - .withColumn("post_code", lower(col("post_code"))) \ - .withColumn("address_line_1", standardize_address_line(col("address_line_1"))) \ - .withColumn("address_line_2", standardize_address_line(col("address_line_2"))) \ - .withColumn("address_line_3", standardize_address_line(col("address_line_3"))) \ - .withColumn("address_line_4", standardize_address_line(col("address_line_4"))) \ - .withColumn("full_address", full_address(col("address_line_1"), col("address_line_2"), col("address_line_3"), - col("address_line_4"))) \ - .select(col("source"), col("source_id"), col("uprn"), col("title"), col("first_name"), col("middle_name"), - col("last_name"), col("name"), col("date_of_birth"), col("post_code"), col("address_line_1"), - col("address_line_2"), col("address_line_3"), col("address_line_4"), col("full_address"), - col("source_filter")) \ + housing = ( + housing_cleaned.withColumnRenamed("person_id", "source_id") + .withColumnRenamed("type", "source_filter") + .withColumn("title", categorise_title(lower(col("title")))) + .withColumn("first_name", standardize_name(col("first_name"))) + .withColumn("middle_name", standardize_name(col("middle_name"))) + .withColumn("last_name", standardize_name(col("last_name"))) + .withColumn( + "name", + standardize_full_name( + col("first_name"), col("middle_name"), col("last_name") + ), + ) + .withColumn("post_code", lower(col("post_code"))) + .withColumn("address_line_1", standardize_address_line(col("address_line_1"))) + .withColumn("address_line_2", standardize_address_line(col("address_line_2"))) + .withColumn("address_line_3", standardize_address_line(col("address_line_3"))) + .withColumn("address_line_4", standardize_address_line(col("address_line_4"))) + .withColumn( + "full_address", + full_address( + col("address_line_1"), + col("address_line_2"), + col("address_line_3"), + col("address_line_4"), + ), + ) + .select( + col("source"), + col("source_id"), + col("uprn"), + col("title"), + col("first_name"), + col("middle_name"), + col("last_name"), + col("name"), + col("date_of_birth"), + col("post_code"), + col("address_line_1"), + col("address_line_2"), + col("address_line_3"), + col("address_line_4"), + col("full_address"), + col("source_filter"), + ) .dropDuplicates(["source_id", "uprn", "date_of_birth"]) + ) return housing -def prepare_clean_council_tax_data(spark: SparkSession, council_tax_account: DataFrame, - council_tax_liability_person: DataFrame, - council_tax_non_liability_person: DataFrame, - council_tax_occupation: DataFrame, - council_tax_property: DataFrame) -> DataFrame: +def prepare_clean_council_tax_data( + spark: SparkSession, + council_tax_account: DataFrame, + council_tax_liability_person: DataFrame, + council_tax_non_liability_person: DataFrame, + council_tax_occupation: DataFrame, + council_tax_property: DataFrame, +) -> DataFrame: """A function to prepare and clean council tax data. Args: spark: SparkSession, @@ -385,69 +674,117 @@ def prepare_clean_council_tax_data(spark: SparkSession, council_tax_account: Dat A DataFrame after preparing and cleaning data from multiple council tax tables. """ council_tax_occupation = council_tax_occupation.filter( - (col("live_ind") == 1) & (col("vacation_date") > col("import_datetime"))) - - council_tax_property_occupancy = council_tax_occupation \ - .join(council_tax_property, "property_ref") \ - .withColumnRenamed("postcode", "post_code") \ - .withColumnRenamed("addr1", "address_line_1") \ - .withColumnRenamed("addr2", "address_line_2") \ - .withColumnRenamed("addr3", "address_line_3") \ - .withColumnRenamed("addr4", "address_line_4") \ - .select(col("uprn"), col("account_ref"), col("occupation_date"), col("vacation_date"), col("post_code"), - col("address_line_1"), col("address_line_2"), col("address_line_3"), col("address_line_4")) + (col("live_ind") == 1) & (col("vacation_date") > col("import_date")) + ) + + council_tax_property_occupancy = ( + council_tax_occupation.join(council_tax_property, "property_ref") + .withColumnRenamed("postcode", "post_code") + .withColumnRenamed("addr1", "address_line_1") + .withColumnRenamed("addr2", "address_line_2") + .withColumnRenamed("addr3", "address_line_3") + .withColumnRenamed("addr4", "address_line_4") + .select( + col("uprn"), + col("account_ref"), + col("occupation_date"), + col("vacation_date"), + col("post_code"), + col("address_line_1"), + col("address_line_2"), + col("address_line_3"), + col("address_line_4"), + ) + ) liable_types = broadcast( - spark.createDataFrame([ - (0, 'Non-liable'), - (1, 'Joint & Several'), - (2, 'Freeholder'), - (3, 'Leaseholder'), - (4, 'Tenant'), - (5, 'Licencee'), - (6, 'Resident'), - (7, 'Owner'), - (8, 'Assumed'), - (9, 'VOID'), - (10, 'Other'), - (11, 'Suspense'), - (12, 'CTax Payer'), - (-1, '(DATA ERROR)')]).toDF("liability_id", "liability_type")) - - council_tax_lead_person = (council_tax_account - .join(liable_types, col("lead_liab_pos") == col("liability_id")) - .withColumn("source", lit("council_tax")) - .withColumn("sub_source", lit("lead")) - .withColumn("position", lit(0)) - .withColumnRenamed("lead_liab_name", "name") - .withColumn("extracted_name", extract_name_udf(col("name"))) - .select(col("source"), col("account_ref"), col("party_ref"), col("liability_type"), - col("sub_source"), - col("position"), col("extracted_name.*"), col("name"))) - - council_tax_liable_person = council_tax_liability_person \ - .join(liable_types, col("liab_pos") == col("liability_id")) \ - .withColumn("source", lit("council_tax")) \ - .withColumn("sub_source", lit("liable")) \ - .withColumn("position", col("liab_pers_occ")) \ - .withColumnRenamed("liab_name", "name") \ - .withColumn("extracted_name", extract_name_udf(col("name"))) \ - .select(col("source"), col("account_ref"), col("party_ref"), col("liability_type"), col("sub_source"), - col("position"), col("extracted_name.*"), col("name")) - - council_tax_non_liable_person = council_tax_non_liability_person \ - .withColumn("source", lit("council_tax")) \ - .withColumn("sub_source", lit("non liable")) \ - .withColumn("liability_type", lit(None).cast(StringType())) \ - .withColumn("position", col("nonliab_occ")) \ - .withColumnRenamed("nonliab_name", "name") \ - .withColumn("extracted_name", extract_name_udf(col("name"))) \ - .select(col("source"), col("account_ref"), col("party_ref"), col("liability_type"), col("sub_source"), - col("position"), col("extracted_name.*"), col("name")) - - council_tax_person = council_tax_lead_person.union(council_tax_liable_person).union(council_tax_non_liable_person) \ - .join(council_tax_property_occupancy, "account_ref") \ + spark.createDataFrame( + [ + (0, "Non-liable"), + (1, "Joint & Several"), + (2, "Freeholder"), + (3, "Leaseholder"), + (4, "Tenant"), + (5, "Licencee"), + (6, "Resident"), + (7, "Owner"), + (8, "Assumed"), + (9, "VOID"), + (10, "Other"), + (11, "Suspense"), + (12, "CTax Payer"), + (-1, "(DATA ERROR)"), + ] + ).toDF("liability_id", "liability_type") + ) + + council_tax_lead_person = ( + council_tax_account.join( + liable_types, col("lead_liab_pos") == col("liability_id") + ) + .withColumn("source", lit("council_tax")) + .withColumn("sub_source", lit("lead")) + .withColumn("position", lit(0)) + .withColumnRenamed("lead_liab_name", "name") + .withColumn("extracted_name", extract_name_udf(col("name"))) + .select( + col("source"), + col("account_ref"), + col("party_ref"), + col("liability_type"), + col("sub_source"), + col("position"), + col("extracted_name.*"), + col("name"), + ) + ) + + council_tax_liable_person = ( + council_tax_liability_person.join( + liable_types, col("liab_pos") == col("liability_id") + ) + .withColumn("source", lit("council_tax")) + .withColumn("sub_source", lit("liable")) + .withColumn("position", col("liab_pers_occ")) + .withColumnRenamed("liab_name", "name") + .withColumn("extracted_name", extract_name_udf(col("name"))) + .select( + col("source"), + col("account_ref"), + col("party_ref"), + col("liability_type"), + col("sub_source"), + col("position"), + col("extracted_name.*"), + col("name"), + ) + ) + + council_tax_non_liable_person = ( + council_tax_non_liability_person.withColumn("source", lit("council_tax")) + .withColumn("sub_source", lit("non liable")) + .withColumn("liability_type", lit(None).cast(StringType())) + .withColumn("position", col("nonliab_occ")) + .withColumnRenamed("nonliab_name", "name") + .withColumn("extracted_name", extract_name_udf(col("name"))) + .select( + col("source"), + col("account_ref"), + col("party_ref"), + col("liability_type"), + col("sub_source"), + col("position"), + col("extracted_name.*"), + col("name"), + ) + ) + + council_tax_person = ( + council_tax_lead_person.union(council_tax_liable_person) + .union(council_tax_non_liable_person) + .join(council_tax_property_occupancy, "account_ref") .withColumn("source_filter", lit("council_tax")) + ) return council_tax_person @@ -482,36 +819,68 @@ def standardize_council_tax_data(council_tax_cleaned: DataFrame) -> DataFrame: Returns: A council tax DataFrame with all the standard columns listed above. """ - council_tax = council_tax_cleaned \ - .filter(col("entity_type") == "Person") \ - .drop(col("entity_type")) \ - .withColumn("source_id", concat_ws("-", col("account_ref"), col("party_ref"), col("position"))) \ - .withColumn("date_of_birth", lit(None).cast(DateType())) \ - .withColumn("title", categorise_title(lower(col("title")))) \ - .withColumn("first_name", standardize_name(col("first_name"))) \ - .withColumn("middle_name", standardize_name(col("middle_name"))) \ - .withColumn("last_name", standardize_name(col("last_name"))) \ - .withColumn("name", standardize_full_name(col("first_name"), col("middle_name"), col("last_name"))) \ - .withColumn("post_code", lower(col("post_code"))) \ - .withColumn("address_line_1", standardize_address_line(col("address_line_1"))) \ - .withColumn("address_line_2", standardize_address_line(col("address_line_2"))) \ - .withColumn("address_line_3", standardize_address_line(col("address_line_3"))) \ - .withColumn("address_line_4", standardize_address_line(col("address_line_4"))) \ - .withColumn("full_address", full_address(col("address_line_1"), col("address_line_2"), col("address_line_3"), - col("address_line_4"))) \ - .select(col("source"), col("source_id"), col("uprn"), col("title"), col("first_name"), col("middle_name"), - col("last_name"), col("name"), col("date_of_birth"), col("post_code"), col("address_line_1"), - col("address_line_2"), col("address_line_3"), col("address_line_4"), col("full_address"), - col("source_filter")) \ + council_tax = ( + council_tax_cleaned.filter(col("entity_type") == "Person") + .drop(col("entity_type")) + .withColumn( + "source_id", + concat_ws("-", col("account_ref"), col("party_ref"), col("position")), + ) + .withColumn("date_of_birth", lit(None).cast(DateType())) + .withColumn("title", categorise_title(lower(col("title")))) + .withColumn("first_name", standardize_name(col("first_name"))) + .withColumn("middle_name", standardize_name(col("middle_name"))) + .withColumn("last_name", standardize_name(col("last_name"))) + .withColumn( + "name", + standardize_full_name( + col("first_name"), col("middle_name"), col("last_name") + ), + ) + .withColumn("post_code", lower(col("post_code"))) + .withColumn("address_line_1", standardize_address_line(col("address_line_1"))) + .withColumn("address_line_2", standardize_address_line(col("address_line_2"))) + .withColumn("address_line_3", standardize_address_line(col("address_line_3"))) + .withColumn("address_line_4", standardize_address_line(col("address_line_4"))) + .withColumn( + "full_address", + full_address( + col("address_line_1"), + col("address_line_2"), + col("address_line_3"), + col("address_line_4"), + ), + ) + .select( + col("source"), + col("source_id"), + col("uprn"), + col("title"), + col("first_name"), + col("middle_name"), + col("last_name"), + col("name"), + col("date_of_birth"), + col("post_code"), + col("address_line_1"), + col("address_line_2"), + col("address_line_3"), + col("address_line_4"), + col("full_address"), + col("source_filter"), + ) .dropDuplicates(["source_id", "uprn"]) + ) return council_tax -def prepare_clean_housing_benefit_data(hb_member_df: DataFrame, - hb_household_df: DataFrame, - hb_rent_assessment_df: DataFrame, - hb_ctax_assessment_df: DataFrame) -> DataFrame: +def prepare_clean_housing_benefit_data( + hb_member_df: DataFrame, + hb_household_df: DataFrame, + hb_rent_assessment_df: DataFrame, + hb_ctax_assessment_df: DataFrame, +) -> DataFrame: """A function to prepare and clean housing benefit data. Data comes from multiple sources. This function is specific to this particular data source. For a new data source please add a new function. Args: @@ -522,50 +891,112 @@ def prepare_clean_housing_benefit_data(hb_member_df: DataFrame, Returns: A DataFrame after preparing and cleaning housing benefit data from multiple tables. """ - housing_benefit_member = hb_member_df \ - .withColumn("claim_house_id", concat_ws("-", col("claim_id"), col("house_id"))) \ - .withColumn("claim_person_ref", concat_ws("-", col("claim_id"), col("house_id"), col("member_id"))) \ - .withColumn("gender", when(col("gender") == 2, "F").when(col("gender") == 1, "M").otherwise("O")) \ - .withColumn("extracted_name", extract_name_udf(col("name"))) \ - .withColumn("source", lit("housing_benefit")) \ - .withColumn("date_of_birth", to_date(col("birth_date"))) \ - .select(col("source"), col("claim_person_ref"), col("claim_house_id"), col("extracted_name.*"), - col("date_of_birth"), col("gender")) - - housing_benefit_household = hb_household_df \ - .withColumn("claim_house_id", concat_ws("-", col("claim_id"), col("house_id"))) \ - .withColumnRenamed("addr1", "address_line_1") \ - .withColumnRenamed("addr2", "address_line_2") \ - .withColumnRenamed("addr3", "address_line_3") \ - .withColumnRenamed("addr4", "address_line_4") \ - .filter((col("from_date") < col("import_datetime")) & (col("to_date") > col("import_datetime"))) \ - .select(col("claim_id"), col("claim_house_id"), col("address_line_1"), col("address_line_2"), - col("address_line_3"), col("address_line_4"), col("post_code"), col("uprn")) - - housing_benefit_rent_assessment = hb_rent_assessment_df \ - .withColumn("source_filter", when((col("dhp_ind") == 1) & (col("type_ind") > 1), "DHP").otherwise("HB")) \ - .filter((col("from_date") < col("import_datetime")) & (col("to_date") > col("import_datetime")) - & ((col("type_ind") == 1) | (col("dhp_ind") == 1)) & (col("model_amt") > 0)) \ + housing_benefit_member = ( + hb_member_df.withColumn( + "claim_house_id", concat_ws("-", col("claim_id"), col("house_id")) + ) + .withColumn( + "claim_person_ref", + concat_ws("-", col("claim_id"), col("house_id"), col("member_id")), + ) + .withColumn( + "gender", + when(col("gender") == 2, "F").when(col("gender") == 1, "M").otherwise("O"), + ) + .withColumn("extracted_name", extract_name_udf(col("name"))) + .withColumn("source", lit("housing_benefit")) + .withColumn("date_of_birth", to_date(col("birth_date"))) + .select( + col("source"), + col("claim_person_ref"), + col("claim_house_id"), + col("extracted_name.*"), + col("date_of_birth"), + col("gender"), + ) + ) + + housing_benefit_household = ( + hb_household_df.withColumn( + "claim_house_id", concat_ws("-", col("claim_id"), col("house_id")) + ) + .withColumnRenamed("addr1", "address_line_1") + .withColumnRenamed("addr2", "address_line_2") + .withColumnRenamed("addr3", "address_line_3") + .withColumnRenamed("addr4", "address_line_4") + .filter( + (col("from_date") < col("import_date")) + & (col("to_date") > col("import_date")) + ) + .select( + col("claim_id"), + col("claim_house_id"), + col("address_line_1"), + col("address_line_2"), + col("address_line_3"), + col("address_line_4"), + col("post_code"), + col("uprn"), + ) + ) + + housing_benefit_rent_assessment = ( + hb_rent_assessment_df.withColumn( + "source_filter", + when((col("dhp_ind") == 1) & (col("type_ind") > 1), "DHP").otherwise("HB"), + ) + .filter( + (col("from_date") < col("import_date")) + & (col("to_date") > col("import_date")) + & ((col("type_ind") == 1) | (col("dhp_ind") == 1)) + & (col("model_amt") > 0) + ) .select(col("claim_id"), col("source_filter")) - - housing_benefit_ctax_assessment = hb_ctax_assessment_df \ - .withColumn("source_filter", lit("CTS")) \ - .filter((col("from_date") < col("import_datetime")) & (col("to_date") > col("import_datetime")) - & (col("model_amt") > 0) & ((col("type_ind") == 1) | (col("dhp_ind") == 1))) \ + ) + + housing_benefit_ctax_assessment = ( + hb_ctax_assessment_df.withColumn("source_filter", lit("CTS")) + .filter( + (col("from_date") < col("import_date")) + & (col("to_date") > col("import_date")) + & (col("model_amt") > 0) + & ((col("type_ind") == 1) | (col("dhp_ind") == 1)) + ) .select(col("claim_id"), col("source_filter")) - - housing_benefit_rent_ctax = housing_benefit_rent_assessment.union(housing_benefit_ctax_assessment) - - housing_benefit_household_claims = housing_benefit_household \ - .join(housing_benefit_rent_ctax, ["claim_id"]) - - housing_benefit_cleaned = housing_benefit_household_claims.join(housing_benefit_member, ["claim_house_id"]) \ - .withColumn("source", lit("housing_benefit")) \ - .withColumn("source_id", col("claim_id")) \ - .select(col("source"), col("claim_person_ref"), col("uprn"), col("title"), - col("first_name"), col("middle_name"), col("last_name"), col("date_of_birth"), col("gender"), - col("post_code"), col("address_line_1"), col("address_line_2"), col("address_line_3"), - col("address_line_4"), col("source_filter")) + ) + + housing_benefit_rent_ctax = housing_benefit_rent_assessment.union( + housing_benefit_ctax_assessment + ) + + housing_benefit_household_claims = housing_benefit_household.join( + housing_benefit_rent_ctax, ["claim_id"] + ) + + housing_benefit_cleaned = ( + housing_benefit_household_claims.join( + housing_benefit_member, ["claim_house_id"] + ) + .withColumn("source", lit("housing_benefit")) + .withColumn("source_id", col("claim_id")) + .select( + col("source"), + col("claim_person_ref"), + col("uprn"), + col("title"), + col("first_name"), + col("middle_name"), + col("last_name"), + col("date_of_birth"), + col("gender"), + col("post_code"), + col("address_line_1"), + col("address_line_2"), + col("address_line_3"), + col("address_line_4"), + col("source_filter"), + ) + ) return housing_benefit_cleaned @@ -599,25 +1030,54 @@ def standardize_housing_benefit_data(housing_benefit_cleaned: DataFrame) -> Data Returns: A housing benefit DataFrame with all the standard columns listed above. """ - housing_benefit = housing_benefit_cleaned \ - .withColumn("source_id", col("claim_person_ref")) \ - .withColumn("title", categorise_title(lower(col("title")))) \ - .withColumn("first_name", standardize_name(col("first_name"))) \ - .withColumn("middle_name", standardize_name(col("middle_name"))) \ - .withColumn("last_name", standardize_name(col("last_name"))) \ - .withColumn("name", standardize_full_name(col("first_name"), col("middle_name"), col("last_name"))) \ - .withColumn("post_code", lower(col("post_code"))) \ - .withColumn("address_line_1", standardize_address_line(col("address_line_1"))) \ - .withColumn("address_line_2", standardize_address_line(col("address_line_2"))) \ - .withColumn("address_line_3", standardize_address_line(col("address_line_3"))) \ - .withColumn("address_line_4", standardize_address_line(col("address_line_4"))) \ - .withColumn("full_address", full_address(col("address_line_1"), col("address_line_2"), col("address_line_3"), - col("address_line_4"))) \ - .select(col("source"), col("source_id"), col("uprn"), col("title"), col("first_name"), col("middle_name"), - col("last_name"), col("name"), col("date_of_birth"), col("post_code"), col("address_line_1"), - col("address_line_2"), col("address_line_3"), col("address_line_4"), col("full_address"), - col("source_filter")) \ - .dropDuplicates(["source_id", "first_name", "last_name", "date_of_birth", "post_code"]) + housing_benefit = ( + housing_benefit_cleaned.withColumn("source_id", col("claim_person_ref")) + .withColumn("title", categorise_title(lower(col("title")))) + .withColumn("first_name", standardize_name(col("first_name"))) + .withColumn("middle_name", standardize_name(col("middle_name"))) + .withColumn("last_name", standardize_name(col("last_name"))) + .withColumn( + "name", + standardize_full_name( + col("first_name"), col("middle_name"), col("last_name") + ), + ) + .withColumn("post_code", lower(col("post_code"))) + .withColumn("address_line_1", standardize_address_line(col("address_line_1"))) + .withColumn("address_line_2", standardize_address_line(col("address_line_2"))) + .withColumn("address_line_3", standardize_address_line(col("address_line_3"))) + .withColumn("address_line_4", standardize_address_line(col("address_line_4"))) + .withColumn( + "full_address", + full_address( + col("address_line_1"), + col("address_line_2"), + col("address_line_3"), + col("address_line_4"), + ), + ) + .select( + col("source"), + col("source_id"), + col("uprn"), + col("title"), + col("first_name"), + col("middle_name"), + col("last_name"), + col("name"), + col("date_of_birth"), + col("post_code"), + col("address_line_1"), + col("address_line_2"), + col("address_line_3"), + col("address_line_4"), + col("full_address"), + col("source_filter"), + ) + .dropDuplicates( + ["source_id", "first_name", "last_name", "date_of_birth", "post_code"] + ) + ) return housing_benefit @@ -631,19 +1091,41 @@ def prepare_clean_parking_permit_data(parking_permit_df: DataFrame) -> DataFrame A DataFrame after preparing and cleaning parking permit data. """ - parking_permit_cleaned = parking_permit_df \ - .withColumn("source", lit("parking_permit")) \ - .withColumn("source_filter", lit("live parking permit")) \ - .withColumn("extracted_name", - extract_name_udf(concat_ws(" ", col("forename_of_applicant"), col("surname_of_applicant")))) \ - .withColumn("date_of_birth", to_date(col("date_of_birth_of_applicant"), format="yyyy-MM-dd")) \ - .withColumnRenamed("postcode", "post_code") \ - .withColumnRenamed("email_address_of_applicant", "email") \ - .filter((col("permit_type").isin(["Residents", "Estate Resident"])) & (col("live_permit_flag") == 1)) \ - .select(col("source"), col("permit_reference"), - col("extracted_name.*"), - col("date_of_birth"), col("email"), col("post_code"), col("uprn"), - col("address_line_1"), col("address_line_2"), col("address_line_3"), col("source_filter")) + parking_permit_cleaned = ( + parking_permit_df.withColumn("source", lit("parking_permit")) + .withColumn("source_filter", lit("live " "parking " "permit")) + .withColumn( + "extracted_name", + extract_name_udf( + concat_ws( + " ", col("forename_of_applicant"), col("surname_of_applicant") + ) + ), + ) + .withColumn( + "date_of_birth", + to_date(col("date_of_birth_of_applicant"), format="yyyy-MM-dd"), + ) + .withColumnRenamed("postcode", "post_code") + .withColumnRenamed("email_address_of_applicant", "email") + .filter( + (col("permit_type").isin(["Residents", "Estate Resident"])) + & (col("live_permit_flag") == 1) + ) + .select( + col("source"), + col("permit_reference"), + col("extracted_name.*"), + col("date_of_birth"), + col("email"), + col("post_code"), + col("uprn"), + col("address_line_1"), + col("address_line_2"), + col("address_line_3"), + col("source_filter"), + ) + ) return parking_permit_cleaned @@ -677,31 +1159,60 @@ def standardize_parking_permit_data(parking_permit_cleaned: DataFrame) -> DataFr Returns: A parking permit DataFrame with all the standard columns listed above. """ - parking_permit = parking_permit_cleaned \ - .filter(col("entity_type") == "Person") \ - .drop(col("entity_type")) \ - .withColumn("source_id", col("permit_reference")) \ - .withColumn("title", categorise_title(lower(col("title")))) \ - .withColumn("first_name", standardize_name(col("first_name"))) \ - .withColumn("middle_name", standardize_name(col("middle_name"))) \ - .withColumn("last_name", standardize_name(col("last_name"))) \ - .withColumn("name", standardize_full_name(col("first_name"), col("middle_name"), col("last_name"))) \ - .withColumn("post_code", lower(col("post_code"))) \ - .withColumn("address_line_1", standardize_address_line(col("address_line_1"))) \ - .withColumn("address_line_2", standardize_address_line(col("address_line_2"))) \ - .withColumn("address_line_3", standardize_address_line(col("address_line_3"))) \ - .withColumn("address_line_4", lit("").cast(StringType())) \ - .withColumn("full_address", full_address(col("address_line_1"), col("address_line_2"), col("address_line_3"), - col("address_line_4"))) \ - .select(col("source"), col("source_id"), col("uprn"), col("title"), col("first_name"), col("middle_name"), - col("last_name"), col("name"), col("date_of_birth"), col("post_code"), col("address_line_1"), - col("address_line_2"), col("address_line_3"), col("address_line_4"), col("full_address"), - col("source_filter")) + parking_permit = ( + parking_permit_cleaned.filter(col("entity_type") == "Person") + .drop(col("entity_type")) + .withColumn("source_id", col("permit_reference")) + .withColumn("title", categorise_title(lower(col("title")))) + .withColumn("first_name", standardize_name(col("first_name"))) + .withColumn("middle_name", standardize_name(col("middle_name"))) + .withColumn("last_name", standardize_name(col("last_name"))) + .withColumn( + "name", + standardize_full_name( + col("first_name"), col("middle_name"), col("last_name") + ), + ) + .withColumn("post_code", lower(col("post_code"))) + .withColumn("address_line_1", standardize_address_line(col("address_line_1"))) + .withColumn("address_line_2", standardize_address_line(col("address_line_2"))) + .withColumn("address_line_3", standardize_address_line(col("address_line_3"))) + .withColumn("address_line_4", lit("").cast(StringType())) + .withColumn( + "full_address", + full_address( + col("address_line_1"), + col("address_line_2"), + col("address_line_3"), + col("address_line_4"), + ), + ) + .select( + col("source"), + col("source_id"), + col("uprn"), + col("title"), + col("first_name"), + col("middle_name"), + col("last_name"), + col("name"), + col("date_of_birth"), + col("post_code"), + col("address_line_1"), + col("address_line_2"), + col("address_line_3"), + col("address_line_4"), + col("full_address"), + col("source_filter"), + ) + ) return parking_permit -def prepare_clean_schools_admissions_data(schools_admissions_df: DataFrame) -> DataFrame: +def prepare_clean_schools_admissions_data( + schools_admissions_df: DataFrame, +) -> DataFrame: """A function to prepare and clean schools admissions data. Splits ou middle name from first name. Sorts address columns so that they are consistent with other datasets. @@ -712,75 +1223,170 @@ def prepare_clean_schools_admissions_data(schools_admissions_df: DataFrame) -> D A DataFrame after preparing data from multiple sources and cleaning it. """ - address_cols = ["address_line_1", "address_line_2", "address_line_3", "address_line_4"] - - schools_admissions_cleaned = schools_admissions_df \ - .withColumn("source", lit("schools_admission")) \ - .withColumn("source_id", col("child_id")) \ - .withColumn("first_name", split(schools_admissions_df["contact_forename"], ' ').getItem(0)) \ - .withColumn("middle_name", split(schools_admissions_df["contact_forename"], ' ').getItem(1)) \ - .withColumn("last_name", col("contact_surname")) \ - .withColumn("name", regexp_replace(concat_ws(" ", col("first_name"), col("middle_name"), - col("last_name")), r"\s+", " ")) \ - .withColumn("date_of_birth", lit("")) \ - .withColumnRenamed("first_lLine", "address_line_1") \ - .withColumnRenamed("second_line", "address_line_2") \ - .withColumnRenamed("third_line", "address_line_3") \ - .withColumnRenamed("town", "address_line_4") \ - .withColumn("source_filter", lit("school admissions")) \ - .select(col("source"), col("source_id"), col("title"), col("first_name"), col("middle_name"), - col("last_name"), col("name"), col("date_of_birth"), col("email"), col("post_code"), col("uprn"), - col("address_line_1"), col("address_line_2"), col("address_line_3"), - col("address_line_4"), col("source_filter")) + address_cols = [ + "address_line_1", + "address_line_2", + "address_line_3", + "address_line_4", + ] + + schools_admissions_cleaned = ( + schools_admissions_df.withColumn("source", lit("schools_admission")) + .withColumn("source_id", col("child_id")) + .withColumn( + "first_name", + split(schools_admissions_df["contact_forename"], " ").getItem(0), + ) + .withColumn( + "middle_name", + split(schools_admissions_df["contact_forename"], " ").getItem(1), + ) + .withColumn("last_name", col("contact_surname")) + .withColumn( + "name", + regexp_replace( + concat_ws(" ", col("first_name"), col("middle_name"), col("last_name")), + r"\s+", + " ", + ), + ) + .withColumn("date_of_birth", lit("")) + .withColumnRenamed("first_lLine", "address_line_1") + .withColumnRenamed("second_line", "address_line_2") + .withColumnRenamed("third_line", "address_line_3") + .withColumnRenamed("town", "address_line_4") + .withColumn("source_filter", lit("school admissions")) + .select( + col("source"), + col("source_id"), + col("title"), + col("first_name"), + col("middle_name"), + col("last_name"), + col("name"), + col("date_of_birth"), + col("email"), + col("post_code"), + col("uprn"), + col("address_line_1"), + col("address_line_2"), + col("address_line_3"), + col("address_line_4"), + col("source_filter"), + ) + ) # create a zip of address line arrays, sorted in the order of not null (False), column order schools_admissions_cleaned = schools_admissions_cleaned.select( - col("source"), col("source_id"), col("title"), col("first_name"), col("middle_name"), - col("last_name"), col("name"), col("date_of_birth"), col("email"), col("post_code"), col("uprn"), - col("address_line_1"), col("address_line_2"), col("address_line_3"), - col("address_line_4"), col("source_filter"), + col("source"), + col("source_id"), + col("title"), + col("first_name"), + col("middle_name"), + col("last_name"), + col("name"), + col("date_of_birth"), + col("email"), + col("post_code"), + col("uprn"), + col("address_line_1"), + col("address_line_2"), + col("address_line_3"), + col("address_line_4"), + col("source_filter"), array_sort( arrays_zip( array([col(c).isNull() for c in address_cols]), array([lit(i) for i in range(4)]), - array([col(c) for c in address_cols]) + array([col(c) for c in address_cols]), ) - ).alias('address_sorted')) + ).alias("address_sorted"), + ) # disaggregate address_sorted arrays into columns schools_admissions_cleaned = schools_admissions_cleaned.select( - col("source"), col("source_id"), col("title"), col("first_name"), col("middle_name"), - col("last_name"), col("name"), col("date_of_birth"), col("email"), col("post_code"), col("uprn"), + col("source"), + col("source_id"), + col("title"), + col("first_name"), + col("middle_name"), + col("last_name"), + col("name"), + col("date_of_birth"), + col("email"), + col("post_code"), + col("uprn"), col("source_filter"), - *[col("address_sorted")[i]['2'].alias(address_cols[i]) for i in range(4)]) + *[col("address_sorted")[i]["2"].alias(address_cols[i]) for i in range(4)], + ) # rejig address lines - schools_admissions_cleaned = schools_admissions_cleaned \ - .withColumn("address_line_1", when(col("address_line_1").rlike(r"\d+$") - & col("address_line_2").rlike(r"^[A-Za-z]"), - concat_ws(" ", col("address_line_1"), col("address_line_2"))) - .otherwise(col("address_line_1"))) \ - .withColumn("address_line_2", when(col("address_line_1").contains(col("address_line_2")), - col("address_line_3")) - .otherwise(concat_ws(" ", col("address_line_2"), col("address_line_3")))) \ - .withColumn("address_line_2", when(col("address_line_2").rlike(r"\d+$"), - concat_ws(" ", col("address_line_2"), col("address_line_4"))) - .otherwise(col("address_line_2"))) \ - .withColumn("address_line_3", when(col("address_line_2").contains(col("address_line_3")), lit("london"))) \ - .withColumn("address_line_2", when(col("address_line_2").isNull(), lit("hackney")) - .otherwise(col("address_line_2"))) \ - .withColumn("address_line_3", when(col("address_line_3").isNull(), lit("london")) - .otherwise(col("address_line_3"))) \ - .withColumn("address_line_4", lit("")) \ - .select(col("source"), col("source_id"), col("title"), col("first_name"), col("middle_name"), - col("last_name"), col("name"), col("date_of_birth"), col("email"), col("post_code"), col("uprn"), - col("address_line_1"), col("address_line_2"), col("address_line_3"), - col("address_line_4"), col("source_filter")) + schools_admissions_cleaned = ( + schools_admissions_cleaned.withColumn( + "address_line_1", + when( + col("address_line_1").rlike(r"\d+$") + & col("address_line_2").rlike(r"^[A-Za-z]"), + concat_ws(" ", col("address_line_1"), col("address_line_2")), + ).otherwise(col("address_line_1")), + ) + .withColumn( + "address_line_2", + when( + col("address_line_1").contains(col("address_line_2")), + col("address_line_3"), + ).otherwise(concat_ws(" ", col("address_line_2"), col("address_line_3"))), + ) + .withColumn( + "address_line_2", + when( + col("address_line_2").rlike(r"\d+$"), + concat_ws(" ", col("address_line_2"), col("address_line_4")), + ).otherwise(col("address_line_2")), + ) + .withColumn( + "address_line_3", + when(col("address_line_2").contains(col("address_line_3")), lit("london")), + ) + .withColumn( + "address_line_2", + when(col("address_line_2").isNull(), lit("hackney")).otherwise( + col("address_line_2") + ), + ) + .withColumn( + "address_line_3", + when(col("address_line_3").isNull(), lit("london")).otherwise( + col("address_line_3") + ), + ) + .withColumn("address_line_4", lit("")) + .select( + col("source"), + col("source_id"), + col("title"), + col("first_name"), + col("middle_name"), + col("last_name"), + col("name"), + col("date_of_birth"), + col("email"), + col("post_code"), + col("uprn"), + col("address_line_1"), + col("address_line_2"), + col("address_line_3"), + col("address_line_4"), + col("source_filter"), + ) + ) return schools_admissions_cleaned -def standardize_schools_admissions_data(schools_admissions_cleaned: DataFrame) -> DataFrame: +def standardize_schools_admissions_data( + schools_admissions_cleaned: DataFrame, +) -> DataFrame: """Standardize schools admissions data. This function convert all the custom names (coming from their respective sources to standard names that will be used by various other functions like feature engineering etc.) The DataFrame returned will have the following columns: @@ -805,7 +1411,8 @@ def standardize_schools_admissions_data(schools_admissions_cleaned: DataFrame) - * address_line_4: Fourth line of the address. Should be of type string and can be blank. * full_address: Concatenation of address line 1, address line 2, address line 3, address line 4 in that order. Should be of type string and can be blank. - * source_filter: Field to contain additional information on schools admissions (only contains holding string for now). + * source_filter: Field to contain additional information on schools admissions (only contains holding string for + now). Should be of type string and can be blank. Args: @@ -815,26 +1422,55 @@ def standardize_schools_admissions_data(schools_admissions_cleaned: DataFrame) - A schools admissions DataFrame with all the standard column listed above. """ - schools_admissions = schools_admissions_cleaned \ - .withColumn("source_id", col("source_id")) \ - .withColumn("title", categorise_title(lower(trim(col("title"))))) \ - .withColumn("first_name", standardize_name(trim(col("first_name")))) \ - .withColumn("middle_name", standardize_name(trim(col("middle_name")))) \ - .withColumn("last_name", standardize_name(trim(col("last_name")))) \ - .withColumn("name", standardize_name(trim(col("name")))) \ - .withColumn("post_code", lower(trim(col("post_code")))) \ - .withColumn("address_line_1", standardize_address_line(trim(col("address_line_1")))) \ - .withColumn("address_line_2", standardize_address_line(trim(col("address_line_2")))) \ - .withColumn("address_line_3", standardize_address_line(trim(col("address_line_3")))) \ - .withColumn("address_line_4", standardize_address_line(trim(col("address_line_4")))) \ - .withColumn("full_address1", full_address(trim(col("address_line_1")), trim(col("address_line_2")), - trim(col("address_line_3")), - trim(col("address_line_4")))) \ - .withColumn("full_address", regexp_replace(col("full_address1"), r"\s+", " ")) \ - .select(col("source"), col("source_id"), col("uprn"), col("title"), col("first_name"), col("middle_name"), - col("last_name"), col("name"), col("date_of_birth"), col("post_code"), col("address_line_1"), - col("address_line_2"), col("address_line_3"), col("address_line_4"), - col("full_address"), col("source_filter")) + schools_admissions = ( + schools_admissions_cleaned.withColumn("source_id", col("source_id")) + .withColumn("title", categorise_title(lower(trim(col("title"))))) + .withColumn("first_name", standardize_name(trim(col("first_name")))) + .withColumn("middle_name", standardize_name(trim(col("middle_name")))) + .withColumn("last_name", standardize_name(trim(col("last_name")))) + .withColumn("name", standardize_name(trim(col("name")))) + .withColumn("post_code", lower(trim(col("post_code")))) + .withColumn( + "address_line_1", standardize_address_line(trim(col("address_line_1"))) + ) + .withColumn( + "address_line_2", standardize_address_line(trim(col("address_line_2"))) + ) + .withColumn( + "address_line_3", standardize_address_line(trim(col("address_line_3"))) + ) + .withColumn( + "address_line_4", standardize_address_line(trim(col("address_line_4"))) + ) + .withColumn( + "full_address1", + full_address( + trim(col("address_line_1")), + trim(col("address_line_2")), + trim(col("address_line_3")), + trim(col("address_line_4")), + ), + ) + .withColumn("full_address", regexp_replace(col("full_address1"), r"\s+", " ")) + .select( + col("source"), + col("source_id"), + col("uprn"), + col("title"), + col("first_name"), + col("middle_name"), + col("last_name"), + col("name"), + col("date_of_birth"), + col("post_code"), + col("address_line_1"), + col("address_line_2"), + col("address_line_3"), + col("address_line_4"), + col("full_address"), + col("source_filter"), + ) + ) return schools_admissions @@ -850,72 +1486,158 @@ def prepare_clean_freedom_pass_admissions_data(freedom_df: DataFrame) -> DataFra freedom_cleaned (Dataframe): A DataFrame after preparing data from multiple sources and cleaning it. """ - address_cols = ["address_line_1", "address_line_2", "address_line_3", "address_line_4"] - - freedom_cleaned = freedom_df \ - .withColumn("source", lit("freedom_passes")) \ - .withColumn("source_id", col("applicantid")) \ - .withColumn("first_name", col("forename")) \ - .withColumn("middle_name", lit("")) \ - .withColumn("last_name", col("surname")) \ - .withColumn("name", regexp_replace(concat_ws(" ", col("first_name"), col("last_name")), r"\s+", " ")) \ - .withColumnRenamed("house_name_number", "address_line_1") \ - .withColumnRenamed("building_name", "address_line_2") \ - .withColumnRenamed("street", "address_line_3") \ - .withColumnRenamed("district", "address_line_4") \ - .withColumnRenamed("postcode", "post_code") \ - .withColumnRenamed("email_address", "email") \ - .withColumn("date_of_birth", to_date(col("date_of_birth"), format="dd/MM/yyyy"))\ - .withColumn("uprn", lit("")) \ - .withColumn("source_filter", lit("freedom_passes_2024")) \ - .select(col("source"), col("source_id"), col("title"), col("first_name"), col("middle_name"), - col("last_name"), col("name"), col("date_of_birth"), col("email"), col("post_code"), col("uprn"), - col("address_line_1"), col("address_line_2"), col("address_line_3"), - col("address_line_4"), col("source_filter")) + address_cols = [ + "address_line_1", + "address_line_2", + "address_line_3", + "address_line_4", + ] + + freedom_cleaned = ( + freedom_df.withColumn("source", lit("freedom_passes")) + .withColumn("source_id", col("applicantid")) + .withColumn("first_name", col("forename")) + .withColumn("middle_name", lit("")) + .withColumn("last_name", col("surname")) + .withColumn( + "name", + regexp_replace( + concat_ws(" ", col("first_name"), col("last_name")), r"\s+", " " + ), + ) + .withColumnRenamed("house_name_number", "address_line_1") + .withColumnRenamed("building_name", "address_line_2") + .withColumnRenamed("street", "address_line_3") + .withColumnRenamed("district", "address_line_4") + .withColumnRenamed("postcode", "post_code") + .withColumnRenamed("email_address", "email") + .withColumn("date_of_birth", to_date(col("date_of_birth"), format="dd/MM/yyyy")) + .withColumn("uprn", lit("")) + .withColumn("source_filter", lit("freedom_passes_2024")) + .select( + col("source"), + col("source_id"), + col("title"), + col("first_name"), + col("middle_name"), + col("last_name"), + col("name"), + col("date_of_birth"), + col("email"), + col("post_code"), + col("uprn"), + col("address_line_1"), + col("address_line_2"), + col("address_line_3"), + col("address_line_4"), + col("source_filter"), + ) + ) # create a zip of address line arrays, sorted in the order of not null (False), column order freedom_cleaned = freedom_cleaned.select( - col("source"), col("source_id"), col("title"), col("first_name"), col("middle_name"), - col("last_name"), col("name"), col("date_of_birth"), col("email"), col("post_code"), col("uprn"), - col("address_line_1"), col("address_line_2"), col("address_line_3"), - col("address_line_4"), col("source_filter"), + col("source"), + col("source_id"), + col("title"), + col("first_name"), + col("middle_name"), + col("last_name"), + col("name"), + col("date_of_birth"), + col("email"), + col("post_code"), + col("uprn"), + col("address_line_1"), + col("address_line_2"), + col("address_line_3"), + col("address_line_4"), + col("source_filter"), array_sort( arrays_zip( array([col(c).isNull() for c in address_cols]), array([lit(i) for i in range(4)]), - array([col(c) for c in address_cols]) + array([col(c) for c in address_cols]), ) - ).alias('address_sorted')) + ).alias("address_sorted"), + ) # disaggregate address_sorted arrays into columns freedom_cleaned = freedom_cleaned.select( - col("source"), col("source_id"), col("title"), col("first_name"), col("middle_name"), - col("last_name"), col("name"), col("date_of_birth"), col("email"), col("post_code"), col("uprn"), + col("source"), + col("source_id"), + col("title"), + col("first_name"), + col("middle_name"), + col("last_name"), + col("name"), + col("date_of_birth"), + col("email"), + col("post_code"), + col("uprn"), col("source_filter"), - *[col("address_sorted")[i]['2'].alias(address_cols[i]) for i in range(4)]) + *[col("address_sorted")[i]["2"].alias(address_cols[i]) for i in range(4)], + ) # rejig address lines - freedom_cleaned = freedom_cleaned \ - .withColumn("address_line_1", when(col("address_line_1").rlike(r"\d+[a-z]$") - & col("address_line_2").rlike(r"^[A-Za-z]"), - concat_ws(" ", col("address_line_1"), col("address_line_2"))) - .otherwise(col("address_line_1"))) \ - .withColumn("address_line_2", when(col("address_line_1").contains(col("address_line_2")), - col("address_line_3")) - .otherwise(concat_ws(" ", col("address_line_2"), col("address_line_3")))) \ - .withColumn("address_line_2", when(col("address_line_2").rlike(r"\d+$"), - concat_ws(" ", col("address_line_2"), col("address_line_4"))) - .otherwise(col("address_line_2"))) \ - .withColumn("address_line_3", when(col("address_line_2").contains(col("address_line_3")), lit("london"))) \ - .withColumn("address_line_2", when(col("address_line_2").isNull(), lit("hackney")) - .otherwise(col("address_line_2"))) \ - .withColumn("address_line_3", when(col("address_line_3").isNull(), lit("london")) - .otherwise(col("address_line_3"))) \ - .withColumn("address_line_4", lit("")) \ - .select(col("source"), col("source_id"), col("title"), col("first_name"), col("middle_name"), - col("last_name"), col("name"), col("date_of_birth"), col("email"), col("post_code"), col("uprn"), - col("address_line_1"), col("address_line_2"), col("address_line_3"), - col("address_line_4"), col("source_filter")) + freedom_cleaned = ( + freedom_cleaned.withColumn( + "address_line_1", + when( + col("address_line_1").rlike(r"\d+[a-z]$") + & col("address_line_2").rlike(r"^[A-Za-z]"), + concat_ws(" ", col("address_line_1"), col("address_line_2")), + ).otherwise(col("address_line_1")), + ) + .withColumn( + "address_line_2", + when( + col("address_line_1").contains(col("address_line_2")), + col("address_line_3"), + ).otherwise(concat_ws(" ", col("address_line_2"), col("address_line_3"))), + ) + .withColumn( + "address_line_2", + when( + col("address_line_2").rlike(r"\d+$"), + concat_ws(" ", col("address_line_2"), col("address_line_4")), + ).otherwise(col("address_line_2")), + ) + .withColumn( + "address_line_3", + when(col("address_line_2").contains(col("address_line_3")), lit("london")), + ) + .withColumn( + "address_line_2", + when(col("address_line_2").isNull(), lit("hackney")).otherwise( + col("address_line_2") + ), + ) + .withColumn( + "address_line_3", + when(col("address_line_3").isNull(), lit("london")).otherwise( + col("address_line_3") + ), + ) + .withColumn("address_line_4", lit("")) + .select( + col("source"), + col("source_id"), + col("title"), + col("first_name"), + col("middle_name"), + col("last_name"), + col("name"), + col("date_of_birth"), + col("email"), + col("post_code"), + col("uprn"), + col("address_line_1"), + col("address_line_2"), + col("address_line_3"), + col("address_line_4"), + col("source_filter"), + ) + ) return freedom_cleaned @@ -945,7 +1667,8 @@ def standardize_freedom_pass_data(freedom_cleaned: DataFrame) -> DataFrame: * address_line_4: Fourth line of the address. Should be of type string and can be blank. * full_address: Concatenation of address line 1, address line 2, address line 3, address line 4 in that order. Should be of type string and can be blank. - * source_filter: Field to contain additional information on freedom pass dataset e.g year (only contains holding string for now). + * source_filter: Field to contain additional information on freedom pass dataset e.g year (only contains holding + string for now). Should be of type string and can be blank. Args: @@ -955,31 +1678,62 @@ def standardize_freedom_pass_data(freedom_cleaned: DataFrame) -> DataFrame: freedom_passes (Dataframe): Freedom pass dataframe with all the standardised columns listed above. """ - freedom_passes = freedom_cleaned \ - .withColumn("source_id", col("source_id")) \ - .withColumn("title", categorise_title(lower(trim(col("title"))))) \ - .withColumn("first_name", standardize_name(trim(col("first_name")))) \ - .withColumn("middle_name", standardize_name(trim(col("middle_name")))) \ - .withColumn("last_name", standardize_name(trim(col("last_name")))) \ - .withColumn("name", standardize_name(trim(col("name")))) \ - .withColumn("post_code", lower(trim(col("post_code")))) \ - .withColumn("address_line_1", standardize_address_line(trim(col("address_line_1")))) \ - .withColumn("address_line_2", standardize_address_line(trim(col("address_line_2")))) \ - .withColumn("address_line_3", standardize_address_line(trim(col("address_line_3")))) \ - .withColumn("address_line_4", standardize_address_line(trim(col("address_line_4")))) \ - .withColumn("full_address1", full_address(trim(col("address_line_1")), trim(col("address_line_2")), - trim(col("address_line_3")), - trim(col("address_line_4")))) \ - .withColumn("full_address", regexp_replace(col("full_address1"), r"\s+", " ")) \ - .select(col("source"), col("source_id"), col("uprn"), col("title"), col("first_name"), col("middle_name"), - col("last_name"), col("name"), col("date_of_birth"), col("post_code"), col("address_line_1"), - col("address_line_2"), col("address_line_3"), col("address_line_4"), - col("full_address"), col("source_filter")) + freedom_passes = ( + freedom_cleaned.withColumn("source_id", col("source_id")) + .withColumn("title", categorise_title(lower(trim(col("title"))))) + .withColumn("first_name", standardize_name(trim(col("first_name")))) + .withColumn("middle_name", standardize_name(trim(col("middle_name")))) + .withColumn("last_name", standardize_name(trim(col("last_name")))) + .withColumn("name", standardize_name(trim(col("name")))) + .withColumn("post_code", lower(trim(col("post_code")))) + .withColumn( + "address_line_1", standardize_address_line(trim(col("address_line_1"))) + ) + .withColumn( + "address_line_2", standardize_address_line(trim(col("address_line_2"))) + ) + .withColumn( + "address_line_3", standardize_address_line(trim(col("address_line_3"))) + ) + .withColumn( + "address_line_4", standardize_address_line(trim(col("address_line_4"))) + ) + .withColumn( + "full_address1", + full_address( + trim(col("address_line_1")), + trim(col("address_line_2")), + trim(col("address_line_3")), + trim(col("address_line_4")), + ), + ) + .withColumn("full_address", regexp_replace(col("full_address1"), r"\s+", " ")) + .select( + col("source"), + col("source_id"), + col("uprn"), + col("title"), + col("first_name"), + col("middle_name"), + col("last_name"), + col("name"), + col("date_of_birth"), + col("post_code"), + col("address_line_1"), + col("address_line_2"), + col("address_line_3"), + col("address_line_4"), + col("full_address"), + col("source_filter"), + ) + ) return freedom_passes -def prepare_clean_electoral_register_data(electoral_register_df: DataFrame) -> DataFrame: +def prepare_clean_electoral_register_data( + electoral_register_df: DataFrame, +) -> DataFrame: """ This function cleans raw electoral register data from Xpress read for standardising. Args: @@ -989,56 +1743,110 @@ def prepare_clean_electoral_register_data(electoral_register_df: DataFrame) -> D electoral_register_cleaned (Dataframe): Cleaned dataframe containing electoral register data. """ - address_cols = ["address_line_1", "address_line_2", "address_line_3", "address_line_4"] - - electoral_register_cleaned = electoral_register_df \ - .withColumn("source", lit("electoral_register")) \ - .withColumn("source_id", col("elector_id")) \ - .withColumn("first_name", split(electoral_register_df["elector_forename"], ' ').getItem(0)) \ - .withColumn("middle_name", col("elector_middle_name")) \ - .withColumn("last_name", col("elector_surname")) \ - .withColumn("name", regexp_replace(concat_ws(" ", col("first_name"), col("middle_name"), - col("last_name")), r"\s+", " ")) \ - .withColumn("date_of_birth", to_date(col("elector_dob"), format="yyyy-MM-dd")) \ - .withColumnRenamed("property_address_1", "address_line_1") \ - .withColumnRenamed("property_address_2", "address_line_2") \ - .withColumnRenamed("property_address_3", "address_line_3") \ - .withColumnRenamed("property_address_4", "address_line_4") \ - .withColumnRenamed("property_post_code", "post_code") \ - .withColumnRenamed("property_urn", "uprn") \ - .withColumn("email", lit("")) \ - .withColumn("title", lit("")) \ - .withColumn("source_filter", lit("electoral register jun23")) \ - .select(col("source"), col("source_id"), col("title"), col("first_name"), col("middle_name"), - col("last_name"), col("name"), col("date_of_birth"), col("email"), col("post_code"), col("uprn"), - col("address_line_1"), col("address_line_2"), col("address_line_3"), - col("address_line_4"), col("source_filter")) + address_cols = [ + "address_line_1", + "address_line_2", + "address_line_3", + "address_line_4", + ] + + electoral_register_cleaned = ( + electoral_register_df.withColumn("source", lit("electoral_register")) + .withColumn("source_id", col("elector_id")) + .withColumn( + "first_name", + split(electoral_register_df["elector_forename"], " ").getItem(0), + ) + .withColumn("middle_name", col("elector_middle_name")) + .withColumn("last_name", col("elector_surname")) + .withColumn( + "name", + regexp_replace( + concat_ws(" ", col("first_name"), col("middle_name"), col("last_name")), + r"\s+", + " ", + ), + ) + .withColumn("date_of_birth", to_date(col("elector_dob"), format="yyyy-MM-dd")) + .withColumnRenamed("property_address_1", "address_line_1") + .withColumnRenamed("property_address_2", "address_line_2") + .withColumnRenamed("property_address_3", "address_line_3") + .withColumnRenamed("property_address_4", "address_line_4") + .withColumnRenamed("property_post_code", "post_code") + .withColumnRenamed("property_urn", "uprn") + .withColumn("email", lit("")) + .withColumn("title", lit("")) + .withColumn("source_filter", lit("electoral " "register " "jun23")) + .select( + col("source"), + col("source_id"), + col("title"), + col("first_name"), + col("middle_name"), + col("last_name"), + col("name"), + col("date_of_birth"), + col("email"), + col("post_code"), + col("uprn"), + col("address_line_1"), + col("address_line_2"), + col("address_line_3"), + col("address_line_4"), + col("source_filter"), + ) + ) # create a zip of address line arrays, sorted in the order of not null (False), column order electoral_register_cleaned = electoral_register_cleaned.select( - col("source"), col("source_id"), col("title"), col("first_name"), col("middle_name"), - col("last_name"), col("name"), col("date_of_birth"), col("email"), col("post_code"), col("uprn"), - col("address_line_1"), col("address_line_2"), col("address_line_3"), - col("address_line_4"), col("source_filter"), + col("source"), + col("source_id"), + col("title"), + col("first_name"), + col("middle_name"), + col("last_name"), + col("name"), + col("date_of_birth"), + col("email"), + col("post_code"), + col("uprn"), + col("address_line_1"), + col("address_line_2"), + col("address_line_3"), + col("address_line_4"), + col("source_filter"), array_sort( arrays_zip( array([col(c).isNull() for c in address_cols]), array([lit(i) for i in range(4)]), - array([col(c) for c in address_cols]) + array([col(c) for c in address_cols]), ) - ).alias('address_sorted')) + ).alias("address_sorted"), + ) # disaggregate address_sorted arrays into columns electoral_register_cleaned = electoral_register_cleaned.select( - col("source"), col("source_id"), col("title"), col("first_name"), col("middle_name"), - col("last_name"), col("name"), col("date_of_birth"), col("email"), col("post_code"), col("uprn"), + col("source"), + col("source_id"), + col("title"), + col("first_name"), + col("middle_name"), + col("last_name"), + col("name"), + col("date_of_birth"), + col("email"), + col("post_code"), + col("uprn"), col("source_filter"), - *[col("address_sorted")[i]['2'].alias(address_cols[i]) for i in range(4)]) + *[col("address_sorted")[i]["2"].alias(address_cols[i]) for i in range(4)], + ) return electoral_register_cleaned -def standardize_electoral_register_data(electoral_register_cleaned: DataFrame) -> DataFrame: +def standardize_electoral_register_data( + electoral_register_cleaned: DataFrame, +) -> DataFrame: """Standardize electoral register data. This function convert all the custom names (coming from their respective sources to standard names that will be used by various other functions like feature engineering etc.) The DataFrame returned will have the following columns: @@ -1063,7 +1871,8 @@ def standardize_electoral_register_data(electoral_register_cleaned: DataFrame) - * address_line_4: Fourth line of the address. Should be of type string and can be blank. * full_address: Concatenation of address line 1, address line 2, address line 3, address line 4 in that order. Should be of type string and can be blank. - * source_filter: Field to contain additional information on electoral register (only contains holding string for now). + * source_filter: Field to contain additional information on electoral register (only contains holding string for + now). Should be of type string and can be blank. Args: @@ -1073,26 +1882,55 @@ def standardize_electoral_register_data(electoral_register_cleaned: DataFrame) - A electoral_register DataFrame with all the standard column listed above. """ - electoral_register = electoral_register_cleaned \ - .withColumn("source_id", col("source_id")) \ - .withColumn("title", categorise_title(lower(trim(col("title"))))) \ - .withColumn("first_name", standardize_name(trim(col("first_name")))) \ - .withColumn("middle_name", standardize_name(trim(col("middle_name")))) \ - .withColumn("last_name", standardize_name(trim(col("last_name")))) \ - .withColumn("name", standardize_name(trim(col("name")))) \ - .withColumn("post_code", lower(trim(col("post_code")))) \ - .withColumn("address_line_1", standardize_address_line(trim(col("address_line_1")))) \ - .withColumn("address_line_2", standardize_address_line(trim(col("address_line_2")))) \ - .withColumn("address_line_3", standardize_address_line(trim(col("address_line_3")))) \ - .withColumn("address_line_4", standardize_address_line(trim(col("address_line_4")))) \ - .withColumn("full_address1", full_address(trim(col("address_line_1")), trim(col("address_line_2")), - trim(col("address_line_3")), - trim(col("address_line_4")))) \ - .withColumn("full_address", regexp_replace(col("full_address1"), r"\s+", " ")) \ - .select(col("source"), col("source_id"), col("uprn"), col("title"), col("first_name"), col("middle_name"), - col("last_name"), col("name"), col("date_of_birth"), col("post_code"), col("address_line_1"), - col("address_line_2"), col("address_line_3"), col("address_line_4"), - col("full_address"), col("source_filter")) + electoral_register = ( + electoral_register_cleaned.withColumn("source_id", col("source_id")) + .withColumn("title", categorise_title(lower(trim(col("title"))))) + .withColumn("first_name", standardize_name(trim(col("first_name")))) + .withColumn("middle_name", standardize_name(trim(col("middle_name")))) + .withColumn("last_name", standardize_name(trim(col("last_name")))) + .withColumn("name", standardize_name(trim(col("name")))) + .withColumn("post_code", lower(trim(col("post_code")))) + .withColumn( + "address_line_1", standardize_address_line(trim(col("address_line_1"))) + ) + .withColumn( + "address_line_2", standardize_address_line(trim(col("address_line_2"))) + ) + .withColumn( + "address_line_3", standardize_address_line(trim(col("address_line_3"))) + ) + .withColumn( + "address_line_4", standardize_address_line(trim(col("address_line_4"))) + ) + .withColumn( + "full_address1", + full_address( + trim(col("address_line_1")), + trim(col("address_line_2")), + trim(col("address_line_3")), + trim(col("address_line_4")), + ), + ) + .withColumn("full_address", regexp_replace(col("full_address1"), r"\s+", " ")) + .select( + col("source"), + col("source_id"), + col("uprn"), + col("title"), + col("first_name"), + col("middle_name"), + col("last_name"), + col("name"), + col("date_of_birth"), + col("post_code"), + col("address_line_1"), + col("address_line_2"), + col("address_line_3"), + col("address_line_4"), + col("full_address"), + col("source_filter"), + ) + ) return electoral_register @@ -1110,15 +1948,17 @@ def remove_deceased(df: DataFrame) -> DataFrame: Returns: A DataFrame after removing all the deceased persons. """ - deceased_filter_cond = (lower(col("title")).contains("(deceased)") | - lower(col("title")).contains("executor") | - lower(col("title")).contains("exor") | - lower(col("title")).contains("rep") | - lower(col("title")).contains(" of") | - lower(col("title")).contains("of ") | - lower(col("title")).contains("the") | - lower(col("title")).contains("pe") | - lower(col("title")).contains("other")) + deceased_filter_cond = ( + lower(col("title")).contains("(deceased)") + | lower(col("title")).contains("executor") + | lower(col("title")).contains("exor") + | lower(col("title")).contains("rep") + | lower(col("title")).contains(" of") + | lower(col("title")).contains("of ") + | lower(col("title")).contains("the") + | lower(col("title")).contains("pe") + | lower(col("title")).contains("other") + ) return df.filter(~deceased_filter_cond) @@ -1138,21 +1978,26 @@ def generate_possible_matches(df: DataFrame) -> DataFrame: """ partitions = 5 - df_a = df.select(*[col(c).alias(f"a_{c}") for c in df.columns]) \ - .withColumn("first_name_soundex", soundex(col("a_first_name"))) \ - .withColumn("last_name_soundex", soundex(col("a_last_name"))) \ + df_a = ( + df.select(*[col(c).alias(f"a_{c}") for c in df.columns]) + .withColumn("first_name_soundex", soundex(col("a_first_name"))) + .withColumn("last_name_soundex", soundex(col("a_last_name"))) .repartition(partitions, col("first_name_soundex"), col("last_name_soundex")) + ) - df_b = df.select(*[col(c).alias(f"b_{c}") for c in df.columns]) \ - .withColumn("first_name_soundex", soundex(col("b_first_name"))) \ - .withColumn("last_name_soundex", soundex(col("b_last_name"))) \ + df_b = ( + df.select(*[col(c).alias(f"b_{c}") for c in df.columns]) + .withColumn("first_name_soundex", soundex(col("b_first_name"))) + .withColumn("last_name_soundex", soundex(col("b_last_name"))) .repartition(partitions, col("first_name_soundex"), col("last_name_soundex")) + ) - return df_a.join(df_b, - (df_a["a_source_id"] != df_b["b_source_id"]) & - (df_a["first_name_soundex"] == df_b["first_name_soundex"]) & - (df_a["last_name_soundex"] == df_b["last_name_soundex"])) \ - .drop(*["first_name_soundex", "last_name_soundex"]) + return df_a.join( + df_b, + (df_a["a_source_id"] != df_b["b_source_id"]) + & (df_a["first_name_soundex"] == df_b["first_name_soundex"]) + & (df_a["last_name_soundex"] == df_b["last_name_soundex"]), + ).drop(*["first_name_soundex", "last_name_soundex"]) def automatically_label_data(df: DataFrame) -> DataFrame: @@ -1165,14 +2010,20 @@ def automatically_label_data(df: DataFrame) -> DataFrame: Returns: A DataFrame with column auto_labels. """ - return df.withColumn("auto_labels", - when((col("a_source_id") == col("b_source_id")) | ( - (col("a_first_name") == col("b_first_name")) & - (col("a_last_name") == col("b_last_name")) & - (col("a_date_of_birth") == col("b_date_of_birth")) & - (col("a_uprn") == col("b_uprn")) & - (col("a_post_code") == col("b_post_code"))), lit(True)) - .otherwise(lit(None).cast(BooleanType()))) + return df.withColumn( + "auto_labels", + when( + (col("a_source_id") == col("b_source_id")) + | ( + (col("a_first_name") == col("b_first_name")) + & (col("a_last_name") == col("b_last_name")) + & (col("a_date_of_birth") == col("b_date_of_birth")) + & (col("a_uprn") == col("b_uprn")) + & (col("a_post_code") == col("b_post_code")) + ), + lit(True), + ).otherwise(lit(None).cast(BooleanType())), + ) @pandas_udf(features_schema) @@ -1213,24 +2064,52 @@ def generate_features(input_df: pd.DataFrame) -> pd.DataFrame: similarity_algo = Cosine() input_df["first_name_similar"] = input_df.apply( - lambda x: phonetic_algo.encode(x["a_first_name"]) == phonetic_algo.encode(x["b_first_name"]), axis=1) + lambda x: phonetic_algo.encode(x["a_first_name"]) + == phonetic_algo.encode(x["b_first_name"]), + axis=1, + ) input_df["middle_name_similar"] = input_df.apply( - lambda x: phonetic_algo.encode(x["a_middle_name"]) == phonetic_algo.encode(x["b_middle_name"]), axis=1) + lambda x: phonetic_algo.encode(x["a_middle_name"]) + == phonetic_algo.encode(x["b_middle_name"]), + axis=1, + ) input_df["last_name_similar"] = input_df.apply( - lambda x: phonetic_algo.encode(x["a_last_name"]) == phonetic_algo.encode(x["b_last_name"]), axis=1) - input_df["name_similarity"] = input_df.apply(lambda x: similarity_algo.sim(x["a_name"], x["b_name"]), axis=1) + lambda x: phonetic_algo.encode(x["a_last_name"]) + == phonetic_algo.encode(x["b_last_name"]), + axis=1, + ) + input_df["name_similarity"] = input_df.apply( + lambda x: similarity_algo.sim(x["a_name"], x["b_name"]), axis=1 + ) input_df["address_line_1_similarity"] = input_df.apply( - lambda x: similarity_algo.sim(x["a_address_line_1"], x["b_address_line_1"]), axis=1) + lambda x: similarity_algo.sim(x["a_address_line_1"], x["b_address_line_1"]), + axis=1, + ) input_df["address_line_2_similarity"] = input_df.apply( - lambda x: similarity_algo.sim(x["a_address_line_2"], x["b_address_line_2"]), axis=1) + lambda x: similarity_algo.sim(x["a_address_line_2"], x["b_address_line_2"]), + axis=1, + ) input_df["full_address_similarity"] = input_df.apply( - lambda x: similarity_algo.sim(x["a_full_address"], x["b_full_address"]), axis=1) - - return input_df.drop([ - "a_first_name", "b_first_name", "a_last_name", "b_last_name", - "a_name", "b_name", - "a_address_line_1", "b_address_line_1", "a_address_line_2", "b_address_line_2", - "a_full_address", "b_full_address"], axis=1) + lambda x: similarity_algo.sim(x["a_full_address"], x["b_full_address"]), axis=1 + ) + + return input_df.drop( + [ + "a_first_name", + "b_first_name", + "a_last_name", + "b_last_name", + "a_name", + "b_name", + "a_address_line_1", + "b_address_line_1", + "a_address_line_2", + "b_address_line_2", + "a_full_address", + "b_full_address", + ], + axis=1, + ) def feature_engineering(df: DataFrame) -> DataFrame: @@ -1262,48 +2141,85 @@ def feature_engineering(df: DataFrame) -> DataFrame: match = lit("match") non_match = lit("non-match") unknown = lit("unknown") - features_df = df \ - .withColumn("uprn_same", - when(col("a_uprn") == col("b_uprn"), match) - .when(col("a_uprn") != col("b_uprn"), non_match) - .otherwise(unknown)) \ - .withColumn("title_same", - when(col("a_title") == col("b_title"), match) - .when(col("a_title") != col("b_title"), non_match) - .otherwise(unknown)) \ - .withColumn("date_of_birth_same", - when(col("a_date_of_birth") == col("b_date_of_birth"), match) - .when(col("a_date_of_birth") != col("b_date_of_birth"), non_match) - .otherwise(unknown)) \ - .withColumn("similarity_features", - generate_features(struct( - col("a_first_name"), col("b_first_name"), - col("a_middle_name"), col("b_middle_name"), - col("a_last_name"), col("b_last_name"), - col("a_name"), col("b_name"), - col("a_address_line_1"), col("b_address_line_1"), - col("a_address_line_2"), col("b_address_line_2"), - col("a_full_address"), col("b_full_address")))) \ - .select(col("*"), col("similarity_features.*")).drop("similarity_features") + features_df = ( + df.withColumn( + "uprn_same", + when(col("a_uprn") == col("b_uprn"), match) + .when(col("a_uprn") != col("b_uprn"), non_match) + .otherwise(unknown), + ) + .withColumn( + "title_same", + when(col("a_title") == col("b_title"), match) + .when(col("a_title") != col("b_title"), non_match) + .otherwise(unknown), + ) + .withColumn( + "date_of_birth_same", + when(col("a_date_of_birth") == col("b_date_of_birth"), match) + .when(col("a_date_of_birth") != col("b_date_of_birth"), non_match) + .otherwise(unknown), + ) + .withColumn( + "similarity_features", + generate_features( + struct( + col("a_first_name"), + col("b_first_name"), + col("a_middle_name"), + col("b_middle_name"), + col("a_last_name"), + col("b_last_name"), + col("a_name"), + col("b_name"), + col("a_address_line_1"), + col("b_address_line_1"), + col("a_address_line_2"), + col("b_address_line_2"), + col("a_full_address"), + col("b_full_address"), + ) + ), + ) + .select(col("*"), col("similarity_features.*")) + .drop("similarity_features") + ) return features_df def evaluation_for_various_metrics(predictions: DataFrame): - metrics = MulticlassClassificationEvaluator(predictionCol="prediction", - labelCol="label", - weightCol="label_confidence_score", - probabilityCol="probability") + metrics = MulticlassClassificationEvaluator( + predictionCol="prediction", + labelCol="label", + weightCol="label_confidence_score", + probabilityCol="probability", + ) accuracy = metrics.evaluate(predictions, {metrics.metricName: "accuracy"}) - precision_non_match = metrics.evaluate(predictions, - {metrics.metricName: "precisionByLabel", metrics.metricLabel: 0.0}) - precision_match = metrics.evaluate(predictions, {metrics.metricName: "precisionByLabel", metrics.metricLabel: 1.0}) - recall_non_match = metrics.evaluate(predictions, {metrics.metricName: "recallByLabel", metrics.metricLabel: 0.0}) - recall_match = metrics.evaluate(predictions, {metrics.metricName: "recallByLabel", metrics.metricLabel: 1.0}) - return accuracy, precision_non_match, precision_match, recall_non_match, recall_match - - -def train_model(df: DataFrame, model_path: str, test_model: bool, save_model: bool) -> None: + precision_non_match = metrics.evaluate( + predictions, {metrics.metricName: "precisionByLabel", metrics.metricLabel: 0.0} + ) + precision_match = metrics.evaluate( + predictions, {metrics.metricName: "precisionByLabel", metrics.metricLabel: 1.0} + ) + recall_non_match = metrics.evaluate( + predictions, {metrics.metricName: "recallByLabel", metrics.metricLabel: 0.0} + ) + recall_match = metrics.evaluate( + predictions, {metrics.metricName: "recallByLabel", metrics.metricLabel: 1.0} + ) + return ( + accuracy, + precision_non_match, + precision_match, + recall_non_match, + recall_match, + ) + + +def train_model( + df: DataFrame, model_path: str, test_model: bool, save_model: bool +) -> None: """Trains the model Args: @@ -1323,43 +2239,83 @@ def train_model(df: DataFrame, model_path: str, test_model: bool, save_model: bo print(f"Training data size: {train.count()}") print(f"Test data size....: {test.count()}") - string_indexer = StringIndexer(inputCols=["uprn_same", "title_same", "date_of_birth_same"], - outputCols=["uprn_indexed", "title_indexed", "date_of_birth_indexed"], - stringOrderType="alphabetAsc") - one_hot_encoder = OneHotEncoder(inputCols=["uprn_indexed", "title_indexed", "date_of_birth_indexed"], - outputCols=["uprn_vec", "title_vec", "date_of_birth_vec"]) + string_indexer = StringIndexer( + inputCols=["uprn_same", "title_same", "date_of_birth_same"], + outputCols=["uprn_indexed", "title_indexed", "date_of_birth_indexed"], + stringOrderType="alphabetAsc", + ) + one_hot_encoder = OneHotEncoder( + inputCols=["uprn_indexed", "title_indexed", "date_of_birth_indexed"], + outputCols=["uprn_vec", "title_vec", "date_of_birth_vec"], + ) vector_assembler = VectorAssembler( - inputCols=["uprn_vec", "title_vec", "date_of_birth_vec", "first_name_similar", "middle_name_similar", - "last_name_similar", "name_similarity", "address_line_1_similarity", "address_line_2_similarity", - "full_address_similarity"], outputCol="features") - classifier = LogisticRegression(featuresCol="features", labelCol="label", weightCol="label_confidence_score", - standardization=False) - - pipeline = Pipeline(stages=[string_indexer, one_hot_encoder, vector_assembler, classifier]) + inputCols=[ + "uprn_vec", + "title_vec", + "date_of_birth_vec", + "first_name_similar", + "middle_name_similar", + "last_name_similar", + "name_similarity", + "address_line_1_similarity", + "address_line_2_similarity", + "full_address_similarity", + ], + outputCol="features", + ) + classifier = LogisticRegression( + featuresCol="features", + labelCol="label", + weightCol="label_confidence_score", + standardization=False, + ) + + pipeline = Pipeline( + stages=[string_indexer, one_hot_encoder, vector_assembler, classifier] + ) # Due to limited time I haven't searched on a larger space # param_grid = ParamGridBuilder() \ # .addGrid(classifier.regParam, [0.0001, 0.00005, 8e-05, 7e-05, 5e-05]) \ # .addGrid(classifier.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0]) \ # .build() - param_grid = ParamGridBuilder() \ - .addGrid(classifier.regParam, [7e-05]) \ - .addGrid(classifier.elasticNetParam, [1.0]) \ + param_grid = ( + ParamGridBuilder() + .addGrid(classifier.regParam, [7e-05]) + .addGrid(classifier.elasticNetParam, [1.0]) .build() - evaluator = BinaryClassificationEvaluator(labelCol="label", - rawPredictionCol="rawPrediction", - weightCol="label_confidence_score") - - cv = CrossValidator(estimator=pipeline, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5, seed=42, - parallelism=5) + ) + evaluator = BinaryClassificationEvaluator( + labelCol="label", + rawPredictionCol="rawPrediction", + weightCol="label_confidence_score", + ) + + cv = CrossValidator( + estimator=pipeline, + estimatorParamMaps=param_grid, + evaluator=evaluator, + numFolds=5, + seed=42, + parallelism=5, + ) cv_model = cv.fit(train) train_prediction = cv_model.transform(train) - print(f"Training ROC AUC train score before fine-tuning..: {evaluator.evaluate(train_prediction):.5f}") - accuracy, precision_non_match, precision_match, recall_non_match, recall_match = evaluation_for_various_metrics( - train_prediction) + print( + f"Training ROC AUC train score before fine-tuning..: {evaluator.evaluate(train_prediction):.5f}" + ) + ( + accuracy, + precision_non_match, + precision_match, + recall_non_match, + recall_match, + ) = evaluation_for_various_metrics(train_prediction) print(f"Training Accuracy before fine-tuning............: {accuracy:.5f}") - print(f"Training Precision before fine-tuning (non-match): {precision_non_match:.5f}") + print( + f"Training Precision before fine-tuning (non-match): {precision_non_match:.5f}" + ) print(f"Training Precision before fine-tuning.....(match): {precision_match:.5f}") print(f"Training Recall before fine-tuning.(non-match): {recall_non_match:.5f}") print(f"Training Recall before fine-tuning.....(match): {recall_match:.5f}") @@ -1370,10 +2326,11 @@ def train_model(df: DataFrame, model_path: str, test_model: bool, save_model: bo # Fine-tuning the model to maximize performance f_measure = training_summary.fMeasureByThreshold max_f_measure = f_measure.groupBy().max("F-Measure").select("max(F-Measure)").head() - best_threshold = f_measure \ - .filter(f_measure["F-Measure"] == max_f_measure["max(F-Measure)"]) \ - .select("threshold") \ + best_threshold = ( + f_measure.filter(f_measure["F-Measure"] == max_f_measure["max(F-Measure)"]) + .select("threshold") .head()["threshold"] + ) print(f"Best threshold: {best_threshold}") cv_model.bestModel.stages[-1].setThreshold(best_threshold) @@ -1381,29 +2338,60 @@ def train_model(df: DataFrame, model_path: str, test_model: bool, save_model: bo cv_model.write().overwrite().save(model_path) train_prediction = cv_model.transform(train) - print(f"Training ROC AUC train score after fine-tuning...: {evaluator.evaluate(train_prediction):.5f}") - accuracy, precision_non_match, precision_match, recall_non_match, recall_match = evaluation_for_various_metrics( - train_prediction) + print( + f"Training ROC AUC train score after fine-tuning...: {evaluator.evaluate(train_prediction):.5f}" + ) + ( + accuracy, + precision_non_match, + precision_match, + recall_non_match, + recall_match, + ) = evaluation_for_various_metrics(train_prediction) print(f"Training Accuracy after fine-tuning.............: {accuracy:.5f}") - print(f"Training Precision after fine-tuning .(non-match): {precision_non_match:.5f}") + print( + f"Training Precision after fine-tuning .(non-match): {precision_non_match:.5f}" + ) print(f"Training Precision after fine-tuning......(match): {precision_match:.5f}") print(f"Training Recall after fine-tuning..(non-match): {recall_non_match:.5f}") print(f"Training Recall after fine-tuning......(match): {recall_match:.5f}") if test_model: - print("Only evaluate once in the end, so keep it commented for most of the time.") + print( + "Only evaluate once in the end, so keep it commented for most of the time." + ) test_prediction = cv_model.transform(test) test_prediction.show() - print(f'Write predictions to csv...') + print("Write predictions to csv...") test_prediction.printSchema() - test_prediction_for_export = test_prediction.withColumn('probability', vector_to_array(col('probability'))) \ - .withColumn('probability_str', concat_ws('probability')) \ - .drop('uprn_vec', 'title_vec', 'date_of_birth_vec', 'features', 'rawPrediction', 'uprn_indexed', - 'title_indexed', 'date_of_birth_indexed', 'probability') - test_prediction_for_export.write.csv(header=True, path=f"{model_path}/test_predictions") - - accuracy, precision_non_match, precision_match, recall_non_match, recall_match = evaluation_for_various_metrics( - test_prediction) + test_prediction_for_export = ( + test_prediction.withColumn( + "probability", vector_to_array(col("probability")) + ) + .withColumn("probability_str", concat_ws("probability")) + .drop( + "uprn_vec", + "title_vec", + "date_of_birth_vec", + "features", + "rawPrediction", + "uprn_indexed", + "title_indexed", + "date_of_birth_indexed", + "probability", + ) + ) + test_prediction_for_export.write.csv( + header=True, path=f"{model_path}/test_predictions" + ) + + ( + accuracy, + precision_non_match, + precision_match, + recall_non_match, + recall_match, + ) = evaluation_for_various_metrics(test_prediction) print(f"Test ROC AUC..............: {evaluator.evaluate(test_prediction):.5f}") print(f"Test Accuracy.............: {accuracy:.5f}") print(f"Test Precision (non-match): {precision_non_match:.5f}") @@ -1425,17 +2413,35 @@ def predict(features_df: DataFrame, model_path: str) -> DataFrame: Returns DataFrame with prediction. """ cv_model: CrossValidatorModel = CrossValidatorModel.load(model_path) - predictions = cv_model.transform(features_df).withColumn("predicted_label", - when(col("prediction") == 1.0, "match") - .when(col("prediction") == 0.0, "non-match") - .otherwise("unknown")) \ - .drop(*["uprn_indexed", "title_indexed", "date_of_birth_indexed", "uprn_vec", "title_vec", - "date_of_birth_vec", "features", "rawPrediction", "probability"]) + predictions = ( + cv_model.transform(features_df) + .withColumn( + "predicted_label", + when(col("prediction") == 1.0, "match") + .when(col("prediction") == 0.0, "non-match") + .otherwise("unknown"), + ) + .drop( + *[ + "uprn_indexed", + "title_indexed", + "date_of_birth_indexed", + "uprn_vec", + "title_vec", + "date_of_birth_vec", + "features", + "rawPrediction", + "probability", + ] + ) + ) return predictions -def link_all_matched_persons(standard_df: DataFrame, predicted_df: DataFrame) -> DataFrame: +def link_all_matched_persons( + standard_df: DataFrame, predicted_df: DataFrame +) -> DataFrame: """Finds all the matching person in the standard DataFrame. All the records having same matching_id are considered as same person. @@ -1448,37 +2454,32 @@ def link_all_matched_persons(standard_df: DataFrame, predicted_df: DataFrame) -> """ vertices = standard_df.withColumn("id", col("source_id")) - edges = predicted_df \ - .filter(col("prediction") == 1.0) \ - .withColumn("src", col("a_source_id")) \ + edges = ( + predicted_df.filter(col("prediction") == 1.0) + .withColumn("src", col("a_source_id")) .withColumn("dst", col("b_source_id")) + ) person_graph = GraphFrame(vertices, edges).dropIsolatedVertices() connected = person_graph.connectedComponents() - unique_connections = connected \ - .select(col("source"), col("source_id"), col("component").alias("matching_id")) \ - .distinct() - return standard_df \ - .join(unique_connections, ["source", "source_id"]) \ - .orderBy(col("matching_id")) + unique_connections = connected.select( + col("source"), col("source_id"), col("component").alias("matching_id") + ).distinct() + return standard_df.join(unique_connections, ["source", "source_id"]).orderBy( + col("matching_id") + ) # Extra analysis (for analyst only): if you need to do. - # To find how many connection are there - # person_graph.inDegrees.filter(col("inDegree") > 1).orderBy(col("inDegree").desc()).show(truncate=False) + # To find how many connection are there # person_graph.inDegrees.filter(col("inDegree") > 1).orderBy(col( # # # "inDegree").desc()).show(truncate=False) - # Graph query using motif to find where person 'a' is connected to person 'b', and person 'b' is also connected to - # person 'a' - # motif = person_graph.find("(a)-[]->(b); (b)-[]->(a)") - # motif.show(truncate=False) + # Graph query using motif to find where person 'a' is connected to person 'b', and person 'b' is also connected # to # person 'a' # motif = person_graph.find("(a)-[]->(b); (b)-[]->(a)") # motif.show(truncate=False) - # To count number of triangles i.e. a connected to b, b connected to c and c is connected back to a - # triangle_count = person_graph.triangleCount() - # triangle_count.orderBy(col("count").desc()).show(n=10, truncate=False) + # To count number of triangles i.e. a connected to b, b connected to c and c is connected back to a # # # # # triangle_count = person_graph.triangleCount() # triangle_count.orderBy(col("count").desc()).show(n=10, # truncate=False) def match_persons(model_path: str, standard_df: DataFrame) -> DataFrame: - """ A convenient method that facilitate the user of the module to perform person match. This method accepts a + """A convenient method that facilitate the user of the module to perform person match. This method accepts a standard DataFrame that represents the dataset containing the record records referring to the same person. Standard DataFrame should have the following columns though data it can be missing. @@ -1510,13 +2511,29 @@ def match_persons(model_path: str, standard_df: DataFrame) -> DataFrame: Raises: AssertionError is mandatory columns are missing. """ - mandatory_columns = ["source", "source_id", "uprn", "title", "first_name", "middle_name", "last_name", "name", - "date_of_birth", "post_code", "address_line_1", "address_line_2", "address_line_3", - "address_line_4", "full_address"] + mandatory_columns = [ + "source", + "source_id", + "uprn", + "title", + "first_name", + "middle_name", + "last_name", + "name", + "date_of_birth", + "post_code", + "address_line_1", + "address_line_2", + "address_line_3", + "address_line_4", + "full_address", + ] try: assert set(mandatory_columns).issubset(standard_df.columns) except AssertionError as e: - raise AssertionError(f"Standard DataFrame doesn't contain all the mandatory columns and error is {e}") + raise AssertionError( + f"Standard DataFrame doesn't contain all the mandatory columns and error is {e}" + ) possible_matches = generate_possible_matches(standard_df) features_df = feature_engineering(possible_matches)