diff --git a/scripts/jobs/rentsense_former_tenants_to_refined.py b/scripts/jobs/rentsense_former_tenants_to_refined.py index ac258ba8e..1964e5a19 100644 --- a/scripts/jobs/rentsense_former_tenants_to_refined.py +++ b/scripts/jobs/rentsense_former_tenants_to_refined.py @@ -23,6 +23,7 @@ datediff, when, ) +from pyspark.sql.window import Window from scripts.helpers.helpers import ( PARTITION_KEYS, add_import_time_columns, @@ -980,23 +981,20 @@ def export_dynamic_frame_as_xml_gzip( case_priorities, accounts4.tenancy_ref == case_priorities.tenancy_ref2, "left" ) - accounts6 = accounts5.selectExpr( - "AccountReference as AccountReference", - # "TenureType", - # "TenureTypeCode", - # "max_date as TenancyStartDate", - "TenancyEndDate", - "LocalAuthority", - # "HousingOfficerName", - "Patch", - "'Hackney' as Region", - # "import_date as import_date", - # "tenancy_ref as TenReference", - # "is_paused_until as BreathingSpaceEndDate", - "Case when Deceased=1 then 'Y' else 'N' end as Deceased" - # "previousweekbalance" - ) + w = Window.partitionBy("AccountReference").orderBy(F.col("TenancyEndDate").desc()) + accounts6 = ( + accounts5.withColumn("rn", F.row_number().over(w)) + .filter(F.col("rn") == 1) + .selectExpr( + "AccountReference", + "TenancyEndDate", + "LocalAuthority", + "Patch", + "'Hackney' as Region", + "case when Deceased=1 then 'Y' else 'N' end as Deceased", + ) + ) accounts7 = accounts6.filter("AccountReference is not null") accounts8 = accounts7.distinct()