From 775737137d7a9b98212e3a73acff82b78e64fdea Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Thu, 1 May 2025 16:55:16 +0100 Subject: [PATCH 01/26] Added time and memory check --- digital_land/package/dataset_parquet.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/digital_land/package/dataset_parquet.py b/digital_land/package/dataset_parquet.py index 0e236165d..424f25363 100644 --- a/digital_land/package/dataset_parquet.py +++ b/digital_land/package/dataset_parquet.py @@ -4,6 +4,8 @@ import shutil from pathlib import Path from .package import Package +import time +import psutil logger = logging.getLogger(__name__) @@ -136,6 +138,11 @@ def load_facts(self, transformed_parquet_dir): fact_fields = self.specification.schema["fact"]["fields"] fields_str = ", ".join([field.replace("-", "_") for field in fact_fields]) + start_time = time.time() + process = psutil.Process(os.getpid()) + mem = process.memory_info().rss / 1024 ** 2 # Memory in MB + logger.info(f"[Memory usage] At start: {mem:.2f} MB") + # query to extract data from the temp table (containing raw data), group by a fact, and get the highest # priority or latest record @@ -153,6 +160,12 @@ def load_facts(self, transformed_parquet_dir): ) TO '{str(output_path)}' (FORMAT PARQUET); """ ) + end_time = time.time() + elapsed_time = end_time - start_time + logger.info(f"Time for fact query {elapsed_time:.2f}") + process = psutil.Process(os.getpid()) + mem = process.memory_info().rss / 1024 ** 2 # Memory in MB + logger.info(f"[Memory usage] After fact query: {mem:.2f} MB") def load_fact_resource(self, transformed_parquet_dir): logger.info(f"loading fact resources from {str(transformed_parquet_dir)}") @@ -163,6 +176,11 @@ def load_fact_resource(self, transformed_parquet_dir): [field.replace("-", "_") for field in fact_resource_fields] ) + start_time = time.time() + process = psutil.Process(os.getpid()) + mem = process.memory_info().rss / 1024 ** 2 # Memory in MB + logger.info(f"[Memory usage] At start: {mem:.2f} MB") + # All CSV files have been loaded into a temporary table. Extract several columns and export query = f""" SELECT {fields_str} @@ -176,6 +194,12 @@ def load_fact_resource(self, transformed_parquet_dir): ) TO '{str(output_path)}' (FORMAT PARQUET); """ ) + end_time = time.time() + elapsed_time = end_time - start_time + logger.info(f"Time for fact-resource query {elapsed_time:.2f}") + process = psutil.Process(os.getpid()) + mem = process.memory_info().rss / 1024 ** 2 # Memory in MB + logger.info(f"[Memory usage] After fact-resource query: {mem:.2f} MB") def load_entities_range( self, @@ -466,6 +490,7 @@ def load_to_sqlite(self, sqlite_path): ) logger.info("loading fact data") + logger.info(self.fact_path) # insert fact data fact_fields = self.specification.schema["fact"]["fields"] fields_str = ", ".join([field.replace("-", "_") for field in fact_fields]) From 48e2b11e1bbe22780e94189f0c79cffd9538ac05 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Thu, 1 May 2025 17:17:23 +0100 Subject: [PATCH 02/26] Added libraries required for testing --- setup.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/setup.py b/setup.py index e2d16f876..4d02b9581 100644 --- a/setup.py +++ b/setup.py @@ -60,6 +60,8 @@ def get_long_description(): "pygit2", "boto3", "moto", + "psutil", + "time" ], entry_points={"console_scripts": ["digital-land=digital_land.cli:cli"]}, setup_requires=["pytest-runner"], From fc94974c8008a1231321d23a502e0689d51f394d Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Thu, 1 May 2025 17:21:41 +0100 Subject: [PATCH 03/26] Added libraries required for testing --- setup.py | 1 - 1 file changed, 1 deletion(-) diff --git a/setup.py b/setup.py index 4d02b9581..a051b7356 100644 --- a/setup.py +++ b/setup.py @@ -61,7 +61,6 @@ def get_long_description(): "boto3", "moto", "psutil", - "time" ], entry_points={"console_scripts": ["digital-land=digital_land.cli:cli"]}, setup_requires=["pytest-runner"], From 7656cd2358708db74318d8c509b1de7544c3500f Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Thu, 1 May 2025 22:03:19 +0100 Subject: [PATCH 04/26] Ran black --- digital_land/package/dataset_parquet.py | 223 +++++++++++++++++++++++- 1 file changed, 219 insertions(+), 4 deletions(-) diff --git a/digital_land/package/dataset_parquet.py b/digital_land/package/dataset_parquet.py index 424f25363..d41111826 100644 --- a/digital_land/package/dataset_parquet.py +++ b/digital_land/package/dataset_parquet.py @@ -140,7 +140,7 @@ def load_facts(self, transformed_parquet_dir): start_time = time.time() process = psutil.Process(os.getpid()) - mem = process.memory_info().rss / 1024 ** 2 # Memory in MB + mem = process.memory_info().rss / 1024**2 # Memory in MB logger.info(f"[Memory usage] At start: {mem:.2f} MB") # query to extract data from the temp table (containing raw data), group by a fact, and get the highest @@ -164,7 +164,7 @@ def load_facts(self, transformed_parquet_dir): elapsed_time = end_time - start_time logger.info(f"Time for fact query {elapsed_time:.2f}") process = psutil.Process(os.getpid()) - mem = process.memory_info().rss / 1024 ** 2 # Memory in MB + mem = process.memory_info().rss / 1024**2 # Memory in MB logger.info(f"[Memory usage] After fact query: {mem:.2f} MB") def load_fact_resource(self, transformed_parquet_dir): @@ -178,7 +178,7 @@ def load_fact_resource(self, transformed_parquet_dir): start_time = time.time() process = psutil.Process(os.getpid()) - mem = process.memory_info().rss / 1024 ** 2 # Memory in MB + mem = process.memory_info().rss / 1024**2 # Memory in MB logger.info(f"[Memory usage] At start: {mem:.2f} MB") # All CSV files have been loaded into a temporary table. Extract several columns and export @@ -198,9 +198,224 @@ def load_fact_resource(self, transformed_parquet_dir): elapsed_time = end_time - start_time logger.info(f"Time for fact-resource query {elapsed_time:.2f}") process = psutil.Process(os.getpid()) - mem = process.memory_info().rss / 1024 ** 2 # Memory in MB + mem = process.memory_info().rss / 1024**2 # Memory in MB logger.info(f"[Memory usage] After fact-resource query: {mem:.2f} MB") + def load_details_into_temp_parquet(self, transformed_parquet_dir): + """ + Save all details into a temporary parquet file, so use later. + """ + logger.info(f"loading all details from {str(transformed_parquet_dir)}") + output_path = self.fact_path + temp_dir = output_path.parent.parent.parent + logger.info(f"output_path {str(output_path)}") + logger.info(f"temp_dir {str(temp_dir)}") + temp_dir.mkdir(parents=True, exist_ok=True) + output_path = temp_dir / "temp_table.parquet" + + start_time = time.time() + process = psutil.Process(os.getpid()) + mem = process.memory_info().rss / 1024**2 # Memory in MB + logger.info(f"[Memory usage] At start: {mem:.2f} MB") + + fact_fields = self.specification.schema["fact"]["fields"] + fact_resource_fields = self.specification.schema["fact-resource"]["fields"] + fields = list(set([*fact_fields, *fact_resource_fields])) + fields_str = ", ".join([field.replace("-", "_") for field in fields]) + + # All CSV files have been loaded into a temporary table. Extract needed columns and export + # Need to add entry_number as it is needed for fact table + query = f""" + SELECT {fields_str}, entry_number + FROM '{str(transformed_parquet_dir)}/*.parquet' + """ + + self.conn.execute( + f""" + COPY ( + {query} + ) TO '{str(output_path)}' (FORMAT PARQUET); + """ + ) + end_time = time.time() + elapsed_time = end_time - start_time + logger.info(f"Time for new query {elapsed_time:.2f}") + + process = psutil.Process(os.getpid()) + mem = process.memory_info().rss / 1024**2 # Memory in MB + logger.info(f"[Memory usage] At end of query: {mem:.2f} MB") + + return output_path + + # def load_facts_from_partitions(self, transformed_parquet_dir, temp_parquet): + # """ + # This method loads facts into a fact table from a directory containing all transformed files as parquet files + # """ + # logger.info(f"self.fact_path: {self.fact_path}") + # logger.info(f"temp_parquet exists: {os.path.exists(temp_parquet)}") + # + # output_path = self.fact_path + # output_path.parent.mkdir(parents=True, exist_ok=True) + # logger.info(f"loading facts from {str(transformed_parquet_dir)}") + # logger.info(f"loading facts into {str(output_path)}") + # + # fact_fields = self.specification.schema["fact"]["fields"] + # fields_str = ", ".join([field.replace("-", "_") for field in fact_fields]) + # + # # query to extract data from the temp table (containing raw data), group by a fact, and get the highest + # # priority or latest record + # start_time = time.time() + # process = psutil.Process(os.getpid()) + # mem = process.memory_info().rss / 1024 ** 2 # Memory in MB + # logger.info(f"[Memory usage] At start: {mem:.2f} MB") + # + # import tempfile + # temp_dir = Path(tempfile.mkdtemp(prefix="duckdb_buckets_")) + # n_buckets = 50 + # bucket_files = [] + # # Break down the work into n_buckets and loop over that + # for bucket in range(n_buckets): + # bucket_path = temp_dir / f"bucket_{bucket}.parquet" + # where_clause = f"abs(hash(fact)) % {n_buckets} = {bucket}" + # query = f""" + # SELECT {fields_str} + # FROM '{temp_parquet}' + # WHERE {where_clause} + # QUALIFY ROW_NUMBER() OVER ( + # PARTITION BY fact ORDER BY priority, entry_date DESC, entry_number DESC + # ) = 1 + # """ + # self.conn.execute( + # f""" + # COPY ( + # {query} + # ) TO '{bucket_path}' (FORMAT PARQUET, PARTITION_BY (fact)); + # """ + # ) + # bucket_files.append(str(output_path)) + # + # chunk_files = [] + # for i, parquet_file in enumerate(bucket_files): + # chunk_output = temp_dir / f"chunk_{i}.parquet" + # self.conn.execute(f""" + # COPY ( + # SELECT * FROM read_parquet('{parquet_file}') + # ) TO '{chunk_output}' (FORMAT PARQUET, PARTITION_BY (fact)); + # """) + # chunk_files.append(chunk_output) + # + # # Final merge + # file_list_str = ', '.join([f"'{f}'" for f in chunk_files]) + # merged_query = f""" + # COPY ( + # SELECT * FROM read_parquet([{file_list_str}]) + # ) TO '{str(output_path)}' (FORMAT PARQUET); + # """ + # self.conn.execute(merged_query) + # end_time = time.time() + # elapsed_time = end_time - start_time + # logger.info(f"Time for new query {elapsed_time:.2f}") + # + # process = psutil.Process(os.getpid()) + # mem = process.memory_info().rss / 1024 ** 2 # Memory in MB + # logger.info(f"[Memory usage] At end of query: {mem:.2f} MB") + # + # logger.info(f"fact parquet file: {output_path}") + # logger.info(f"fact parquet file exists: {os.path.exists(output_path)}") + + def load_facts_from_temp_parquet(self, transformed_parquet_dir, temp_parquet): + """ + This method loads facts into a fact table from a directory containing all transformed files as parquet files + """ + logger.info(f"self.fact_path: {self.fact_path}") + logger.info(f"temp_parquet exists: {os.path.exists(temp_parquet)}") + + output_path = self.fact_path + output_path.parent.mkdir(parents=True, exist_ok=True) + logger.info(f"loading facts from {str(transformed_parquet_dir)}") + logger.info(f"loading facts into {str(output_path)}") + + fact_fields = self.specification.schema["fact"]["fields"] + fields_str = ", ".join([field.replace("-", "_") for field in fact_fields]) + + # query to extract data from the temp table (containing raw data), group by a fact, and get the highest + # priority or latest record + start_time = time.time() + process = psutil.Process(os.getpid()) + mem = process.memory_info().rss / 1024**2 # Memory in MB + logger.info(f"[Memory usage] At start: {mem:.2f} MB") + + query = f""" + SELECT {fields_str} + FROM '{temp_parquet}' + QUALIFY ROW_NUMBER() OVER ( + PARTITION BY fact ORDER BY priority, entry_date DESC, entry_number DESC + ) = 1 + """ + self.conn.execute( + f""" + COPY ( + {query} + ) TO '{str(output_path)}' (FORMAT PARQUET); + """ + ) + end_time = time.time() + elapsed_time = end_time - start_time + logger.info(f"Time for new query {elapsed_time:.2f}") + + process = psutil.Process(os.getpid()) + mem = process.memory_info().rss / 1024**2 # Memory in MB + logger.info(f"[Memory usage] At end of query: {mem:.2f} MB") + + logger.info(f"fact parquet file: {output_path}") + logger.info(f"fact parquet file exists: {os.path.exists(output_path)}") + + def load_fact_resource_from_temp_parquet( + self, transformed_parquet_dir, temp_parquet + ): + """ + This method loads facts into a fact table from a directory containing all transformed files as parquet files + """ + output_path = self.fact_resource_path + output_path.parent.mkdir(parents=True, exist_ok=True) + logger.info(f"loading fact resources from {str(transformed_parquet_dir)}") + logger.info(f"loading fact resources into {str(output_path)}") + + fact_resource_fields = self.specification.schema["fact-resource"]["fields"] + fields_str = ", ".join( + [field.replace("-", "_") for field in fact_resource_fields] + ) + + # query to extract data from the temp table (containing raw data), group by a fact, and get the highest + # priority or latest record + start_time = time.time() + process = psutil.Process(os.getpid()) + mem = process.memory_info().rss / 1024**2 # Memory in MB + logger.info(f"[Memory usage] At start: {mem:.2f} MB") + logger.info(f"fields_str: {fields_str}") + + query = f""" + SELECT {fields_str} + FROM '{temp_parquet}' + """ + self.conn.execute( + f""" + COPY ( + {query} + ) TO '{str(output_path)}' (FORMAT PARQUET); + """ + ) + end_time = time.time() + elapsed_time = end_time - start_time + logger.info(f"Time for new query {elapsed_time:.2f}") + + process = psutil.Process(os.getpid()) + mem = process.memory_info().rss / 1024**2 # Memory in MB + logger.info(f"[Memory usage] At end of query: {mem:.2f} MB") + + logger.info(f"temp_parquet: {temp_parquet}") + os.remove(temp_parquet) + def load_entities_range( self, transformed_parquet_dir, From 05a1bad4a25a765a2a9b14b5d1cdc20ba4df1009 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Fri, 2 May 2025 10:06:28 +0100 Subject: [PATCH 05/26] Calling new table commands --- digital_land/commands.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index 172c04ac9..d439c0c9d 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -478,8 +478,14 @@ def dataset_create( specification_dir=None, # TBD: package should use this specification object duckdb_path=cache_dir / "overflow.duckdb", ) - pqpackage.load_facts(transformed_parquet_dir) - pqpackage.load_fact_resource(transformed_parquet_dir) + # pqpackage.load_facts(transformed_parquet_dir) + # pqpackage.load_fact_resource(transformed_parquet_dir) + temp_parquet = pqpackage.load_details_into_temp_parquet(transformed_parquet_dir) + # pqpackage.load_facts_from_partitions(transformed_parquet_dir, temp_parquet) + pqpackage.load_facts_from_temp_parquet(transformed_parquet_dir, temp_parquet) + pqpackage.load_fact_resource_from_temp_parquet( + transformed_parquet_dir, temp_parquet + ) pqpackage.load_entities(transformed_parquet_dir, resource_path, organisation_path) logger.info("loading fact,fact_resource and entity into {output_path}") From 724389aac2828139038ea0c1700f8118710aec8e Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Fri, 2 May 2025 10:07:29 +0100 Subject: [PATCH 06/26] Edited logging message --- digital_land/package/dataset_parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/digital_land/package/dataset_parquet.py b/digital_land/package/dataset_parquet.py index d41111826..0738f743f 100644 --- a/digital_land/package/dataset_parquet.py +++ b/digital_land/package/dataset_parquet.py @@ -243,7 +243,7 @@ def load_details_into_temp_parquet(self, transformed_parquet_dir): process = psutil.Process(os.getpid()) mem = process.memory_info().rss / 1024**2 # Memory in MB - logger.info(f"[Memory usage] At end of query: {mem:.2f} MB") + logger.info(f"[Memory usage] At end of temp_parquet query: {mem:.2f} MB") return output_path From b6e9ad42835877c79d162896ed8cd61fa408e810 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Fri, 2 May 2025 11:25:16 +0100 Subject: [PATCH 07/26] Bucketed window function --- digital_land/package/dataset_parquet.py | 83 ++++++++++++++++++++++++- 1 file changed, 82 insertions(+), 1 deletion(-) diff --git a/digital_land/package/dataset_parquet.py b/digital_land/package/dataset_parquet.py index 0738f743f..7e1536ee6 100644 --- a/digital_land/package/dataset_parquet.py +++ b/digital_land/package/dataset_parquet.py @@ -6,6 +6,7 @@ from .package import Package import time import psutil +import tempfile logger = logging.getLogger(__name__) @@ -207,7 +208,8 @@ def load_details_into_temp_parquet(self, transformed_parquet_dir): """ logger.info(f"loading all details from {str(transformed_parquet_dir)}") output_path = self.fact_path - temp_dir = output_path.parent.parent.parent + temp_dir = Path(tempfile.mkdtemp()) + # temp_dir = output_path.parent.parent.parent logger.info(f"output_path {str(output_path)}") logger.info(f"temp_dir {str(temp_dir)}") temp_dir.mkdir(parents=True, exist_ok=True) @@ -245,6 +247,15 @@ def load_details_into_temp_parquet(self, transformed_parquet_dir): mem = process.memory_info().rss / 1024**2 # Memory in MB logger.info(f"[Memory usage] At end of temp_parquet query: {mem:.2f} MB") + # Check if a garbage collect helps + time.sleep(10) + import gc + gc.collect() + + process = psutil.Process(os.getpid()) + mem = process.memory_info().rss / 1024**2 # Memory in MB + logger.info(f"[Memory usage] After garbage collect: {mem:.2f} MB") + return output_path # def load_facts_from_partitions(self, transformed_parquet_dir, temp_parquet): @@ -345,6 +356,76 @@ def load_facts_from_temp_parquet(self, transformed_parquet_dir, temp_parquet): mem = process.memory_info().rss / 1024**2 # Memory in MB logger.info(f"[Memory usage] At start: {mem:.2f} MB") + n_buckets = 50 # Try 100 or 200 depending on memory + temp_dir = Path(tempfile.mkdtemp(prefix="duckdb_buckets_")) + bucket_outputs = [] + + for bucket in range(n_buckets): + output_file = temp_dir / f"bucket_{bucket}.parquet" + + # Bucketed window query + query = f""" + COPY ( + SELECT {fields_str} + FROM read_parquet('{temp_parquet}') + WHERE MOD(HASH(fact), {n_buckets}) = {bucket} + QUALIFY ROW_NUMBER() OVER ( + PARTITION BY fact ORDER BY priority, entry_date DESC, entry_number DESC + ) = 1 + ) TO '{output_file}' (FORMAT PARQUET); + """ + self.conn.execute(query) + bucket_outputs.append(output_file) + + end_time = time.time() + elapsed_time = end_time - start_time + logger.info(f"Time for new query {elapsed_time:.2f}") + + process = psutil.Process(os.getpid()) + mem = process.memory_info().rss / 1024**2 # Memory in MB + logger.info(f"[Memory usage] At end of query: {mem:.2f} MB") + + files_str = ', '.join([f"'{str(f)}'" for f in bucket_outputs]) + + self.conn.execute(f""" + COPY ( + SELECT * FROM read_parquet([{files_str}]) + ) TO '{output_path}' (FORMAT PARQUET); + """) + + end_time = time.time() + elapsed_time = end_time - start_time + logger.info(f"Time for concatenation {elapsed_time:.2f}") + + process = psutil.Process(os.getpid()) + mem = process.memory_info().rss / 1024**2 # Memory in MB + logger.info(f"[Memory usage] For concatenation: {mem:.2f} MB") + + logger.info(f"fact parquet file: {output_path}") + logger.info(f"fact parquet file exists: {os.path.exists(output_path)}") + + def load_facts_from_temp_parquet_orig(self, transformed_parquet_dir, temp_parquet): + """ + This method loads facts into a fact table from a directory containing all transformed files as parquet files + """ + logger.info(f"self.fact_path: {self.fact_path}") + logger.info(f"temp_parquet exists: {os.path.exists(temp_parquet)}") + + output_path = self.fact_path + output_path.parent.mkdir(parents=True, exist_ok=True) + logger.info(f"loading facts from {str(transformed_parquet_dir)}") + logger.info(f"loading facts into {str(output_path)}") + + fact_fields = self.specification.schema["fact"]["fields"] + fields_str = ", ".join([field.replace("-", "_") for field in fact_fields]) + + # query to extract data from the temp table (containing raw data), group by a fact, and get the highest + # priority or latest record + start_time = time.time() + process = psutil.Process(os.getpid()) + mem = process.memory_info().rss / 1024**2 # Memory in MB + logger.info(f"[Memory usage] At start: {mem:.2f} MB") + query = f""" SELECT {fields_str} FROM '{temp_parquet}' From 7c1d37a55545bcbd591ed13ded85fbc45d29038a Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Fri, 2 May 2025 11:29:23 +0100 Subject: [PATCH 08/26] Ran black --- digital_land/package/dataset_parquet.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/digital_land/package/dataset_parquet.py b/digital_land/package/dataset_parquet.py index 7e1536ee6..4aacf1162 100644 --- a/digital_land/package/dataset_parquet.py +++ b/digital_land/package/dataset_parquet.py @@ -250,6 +250,7 @@ def load_details_into_temp_parquet(self, transformed_parquet_dir): # Check if a garbage collect helps time.sleep(10) import gc + gc.collect() process = psutil.Process(os.getpid()) @@ -385,13 +386,15 @@ def load_facts_from_temp_parquet(self, transformed_parquet_dir, temp_parquet): mem = process.memory_info().rss / 1024**2 # Memory in MB logger.info(f"[Memory usage] At end of query: {mem:.2f} MB") - files_str = ', '.join([f"'{str(f)}'" for f in bucket_outputs]) + files_str = ", ".join([f"'{str(f)}'" for f in bucket_outputs]) - self.conn.execute(f""" + self.conn.execute( + f""" COPY ( SELECT * FROM read_parquet([{files_str}]) ) TO '{output_path}' (FORMAT PARQUET); - """) + """ + ) end_time = time.time() elapsed_time = end_time - start_time From 987ecd2f8106c3e13716e5982f9d030ef959ef8b Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Fri, 2 May 2025 16:18:33 +0100 Subject: [PATCH 09/26] Tidy up of code --- digital_land/commands.py | 4 +- digital_land/package/dataset_parquet.py | 209 ++---------------------- 2 files changed, 16 insertions(+), 197 deletions(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index d439c0c9d..41e07b9c7 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -478,10 +478,12 @@ def dataset_create( specification_dir=None, # TBD: package should use this specification object duckdb_path=cache_dir / "overflow.duckdb", ) + # Original methods of getting fact and fact_resource tables. Keeping in case we need to revert, but currently + # there is an issue with transport_access_node_collection and the amount of memory required # pqpackage.load_facts(transformed_parquet_dir) # pqpackage.load_fact_resource(transformed_parquet_dir) + # Newer methods which reduce the memory usage temp_parquet = pqpackage.load_details_into_temp_parquet(transformed_parquet_dir) - # pqpackage.load_facts_from_partitions(transformed_parquet_dir, temp_parquet) pqpackage.load_facts_from_temp_parquet(transformed_parquet_dir, temp_parquet) pqpackage.load_fact_resource_from_temp_parquet( transformed_parquet_dir, temp_parquet diff --git a/digital_land/package/dataset_parquet.py b/digital_land/package/dataset_parquet.py index 4aacf1162..2fff0a73b 100644 --- a/digital_land/package/dataset_parquet.py +++ b/digital_land/package/dataset_parquet.py @@ -4,7 +4,6 @@ import shutil from pathlib import Path from .package import Package -import time import psutil import tempfile @@ -139,11 +138,6 @@ def load_facts(self, transformed_parquet_dir): fact_fields = self.specification.schema["fact"]["fields"] fields_str = ", ".join([field.replace("-", "_") for field in fact_fields]) - start_time = time.time() - process = psutil.Process(os.getpid()) - mem = process.memory_info().rss / 1024**2 # Memory in MB - logger.info(f"[Memory usage] At start: {mem:.2f} MB") - # query to extract data from the temp table (containing raw data), group by a fact, and get the highest # priority or latest record @@ -161,12 +155,6 @@ def load_facts(self, transformed_parquet_dir): ) TO '{str(output_path)}' (FORMAT PARQUET); """ ) - end_time = time.time() - elapsed_time = end_time - start_time - logger.info(f"Time for fact query {elapsed_time:.2f}") - process = psutil.Process(os.getpid()) - mem = process.memory_info().rss / 1024**2 # Memory in MB - logger.info(f"[Memory usage] After fact query: {mem:.2f} MB") def load_fact_resource(self, transformed_parquet_dir): logger.info(f"loading fact resources from {str(transformed_parquet_dir)}") @@ -177,7 +165,6 @@ def load_fact_resource(self, transformed_parquet_dir): [field.replace("-", "_") for field in fact_resource_fields] ) - start_time = time.time() process = psutil.Process(os.getpid()) mem = process.memory_info().rss / 1024**2 # Memory in MB logger.info(f"[Memory usage] At start: {mem:.2f} MB") @@ -195,27 +182,16 @@ def load_fact_resource(self, transformed_parquet_dir): ) TO '{str(output_path)}' (FORMAT PARQUET); """ ) - end_time = time.time() - elapsed_time = end_time - start_time - logger.info(f"Time for fact-resource query {elapsed_time:.2f}") - process = psutil.Process(os.getpid()) - mem = process.memory_info().rss / 1024**2 # Memory in MB - logger.info(f"[Memory usage] After fact-resource query: {mem:.2f} MB") def load_details_into_temp_parquet(self, transformed_parquet_dir): """ - Save all details into a temporary parquet file, so use later. + Save all details into a temporary parquet file, to use later to """ logger.info(f"loading all details from {str(transformed_parquet_dir)}") - output_path = self.fact_path temp_dir = Path(tempfile.mkdtemp()) - # temp_dir = output_path.parent.parent.parent - logger.info(f"output_path {str(output_path)}") - logger.info(f"temp_dir {str(temp_dir)}") temp_dir.mkdir(parents=True, exist_ok=True) output_path = temp_dir / "temp_table.parquet" - start_time = time.time() process = psutil.Process(os.getpid()) mem = process.memory_info().rss / 1024**2 # Memory in MB logger.info(f"[Memory usage] At start: {mem:.2f} MB") @@ -225,7 +201,7 @@ def load_details_into_temp_parquet(self, transformed_parquet_dir): fields = list(set([*fact_fields, *fact_resource_fields])) fields_str = ", ".join([field.replace("-", "_") for field in fields]) - # All CSV files have been loaded into a temporary table. Extract needed columns and export + # All parquet files to be loaded into a temporary parquet file. Extract needed columns and export # Need to add entry_number as it is needed for fact table query = f""" SELECT {fields_str}, entry_number @@ -239,132 +215,38 @@ def load_details_into_temp_parquet(self, transformed_parquet_dir): ) TO '{str(output_path)}' (FORMAT PARQUET); """ ) - end_time = time.time() - elapsed_time = end_time - start_time - logger.info(f"Time for new query {elapsed_time:.2f}") process = psutil.Process(os.getpid()) mem = process.memory_info().rss / 1024**2 # Memory in MB logger.info(f"[Memory usage] At end of temp_parquet query: {mem:.2f} MB") - # Check if a garbage collect helps - time.sleep(10) - import gc - - gc.collect() - - process = psutil.Process(os.getpid()) - mem = process.memory_info().rss / 1024**2 # Memory in MB - logger.info(f"[Memory usage] After garbage collect: {mem:.2f} MB") - return output_path - # def load_facts_from_partitions(self, transformed_parquet_dir, temp_parquet): - # """ - # This method loads facts into a fact table from a directory containing all transformed files as parquet files - # """ - # logger.info(f"self.fact_path: {self.fact_path}") - # logger.info(f"temp_parquet exists: {os.path.exists(temp_parquet)}") - # - # output_path = self.fact_path - # output_path.parent.mkdir(parents=True, exist_ok=True) - # logger.info(f"loading facts from {str(transformed_parquet_dir)}") - # logger.info(f"loading facts into {str(output_path)}") - # - # fact_fields = self.specification.schema["fact"]["fields"] - # fields_str = ", ".join([field.replace("-", "_") for field in fact_fields]) - # - # # query to extract data from the temp table (containing raw data), group by a fact, and get the highest - # # priority or latest record - # start_time = time.time() - # process = psutil.Process(os.getpid()) - # mem = process.memory_info().rss / 1024 ** 2 # Memory in MB - # logger.info(f"[Memory usage] At start: {mem:.2f} MB") - # - # import tempfile - # temp_dir = Path(tempfile.mkdtemp(prefix="duckdb_buckets_")) - # n_buckets = 50 - # bucket_files = [] - # # Break down the work into n_buckets and loop over that - # for bucket in range(n_buckets): - # bucket_path = temp_dir / f"bucket_{bucket}.parquet" - # where_clause = f"abs(hash(fact)) % {n_buckets} = {bucket}" - # query = f""" - # SELECT {fields_str} - # FROM '{temp_parquet}' - # WHERE {where_clause} - # QUALIFY ROW_NUMBER() OVER ( - # PARTITION BY fact ORDER BY priority, entry_date DESC, entry_number DESC - # ) = 1 - # """ - # self.conn.execute( - # f""" - # COPY ( - # {query} - # ) TO '{bucket_path}' (FORMAT PARQUET, PARTITION_BY (fact)); - # """ - # ) - # bucket_files.append(str(output_path)) - # - # chunk_files = [] - # for i, parquet_file in enumerate(bucket_files): - # chunk_output = temp_dir / f"chunk_{i}.parquet" - # self.conn.execute(f""" - # COPY ( - # SELECT * FROM read_parquet('{parquet_file}') - # ) TO '{chunk_output}' (FORMAT PARQUET, PARTITION_BY (fact)); - # """) - # chunk_files.append(chunk_output) - # - # # Final merge - # file_list_str = ', '.join([f"'{f}'" for f in chunk_files]) - # merged_query = f""" - # COPY ( - # SELECT * FROM read_parquet([{file_list_str}]) - # ) TO '{str(output_path)}' (FORMAT PARQUET); - # """ - # self.conn.execute(merged_query) - # end_time = time.time() - # elapsed_time = end_time - start_time - # logger.info(f"Time for new query {elapsed_time:.2f}") - # - # process = psutil.Process(os.getpid()) - # mem = process.memory_info().rss / 1024 ** 2 # Memory in MB - # logger.info(f"[Memory usage] At end of query: {mem:.2f} MB") - # - # logger.info(f"fact parquet file: {output_path}") - # logger.info(f"fact parquet file exists: {os.path.exists(output_path)}") - def load_facts_from_temp_parquet(self, transformed_parquet_dir, temp_parquet): """ - This method loads facts into a fact table from a directory containing all transformed files as parquet files + This method loads facts into a parquet table using the temporary parquet table from earlier """ - logger.info(f"self.fact_path: {self.fact_path}") - logger.info(f"temp_parquet exists: {os.path.exists(temp_parquet)}") - output_path = self.fact_path output_path.parent.mkdir(parents=True, exist_ok=True) - logger.info(f"loading facts from {str(transformed_parquet_dir)}") - logger.info(f"loading facts into {str(output_path)}") + logger.info(f"loading facts") fact_fields = self.specification.schema["fact"]["fields"] fields_str = ", ".join([field.replace("-", "_") for field in fact_fields]) - # query to extract data from the temp table (containing raw data), group by a fact, and get the highest - # priority or latest record - start_time = time.time() process = psutil.Process(os.getpid()) mem = process.memory_info().rss / 1024**2 # Memory in MB logger.info(f"[Memory usage] At start: {mem:.2f} MB") - n_buckets = 50 # Try 100 or 200 depending on memory + # query to extract data from the temp table (containing raw data), group by a fact, and get the highest + # priority or latest record + n_buckets = 50 temp_dir = Path(tempfile.mkdtemp(prefix="duckdb_buckets_")) bucket_outputs = [] + # Split this query into buckets to avoid using too much memory at the same time. for bucket in range(n_buckets): output_file = temp_dir / f"bucket_{bucket}.parquet" - # Bucketed window query query = f""" COPY ( SELECT {fields_str} @@ -378,10 +260,6 @@ def load_facts_from_temp_parquet(self, transformed_parquet_dir, temp_parquet): self.conn.execute(query) bucket_outputs.append(output_file) - end_time = time.time() - elapsed_time = end_time - start_time - logger.info(f"Time for new query {elapsed_time:.2f}") - process = psutil.Process(os.getpid()) mem = process.memory_info().rss / 1024**2 # Memory in MB logger.info(f"[Memory usage] At end of query: {mem:.2f} MB") @@ -396,87 +274,29 @@ def load_facts_from_temp_parquet(self, transformed_parquet_dir, temp_parquet): """ ) - end_time = time.time() - elapsed_time = end_time - start_time - logger.info(f"Time for concatenation {elapsed_time:.2f}") - process = psutil.Process(os.getpid()) mem = process.memory_info().rss / 1024**2 # Memory in MB logger.info(f"[Memory usage] For concatenation: {mem:.2f} MB") - logger.info(f"fact parquet file: {output_path}") - logger.info(f"fact parquet file exists: {os.path.exists(output_path)}") - - def load_facts_from_temp_parquet_orig(self, transformed_parquet_dir, temp_parquet): - """ - This method loads facts into a fact table from a directory containing all transformed files as parquet files - """ - logger.info(f"self.fact_path: {self.fact_path}") - logger.info(f"temp_parquet exists: {os.path.exists(temp_parquet)}") - - output_path = self.fact_path - output_path.parent.mkdir(parents=True, exist_ok=True) - logger.info(f"loading facts from {str(transformed_parquet_dir)}") - logger.info(f"loading facts into {str(output_path)}") - - fact_fields = self.specification.schema["fact"]["fields"] - fields_str = ", ".join([field.replace("-", "_") for field in fact_fields]) - - # query to extract data from the temp table (containing raw data), group by a fact, and get the highest - # priority or latest record - start_time = time.time() - process = psutil.Process(os.getpid()) - mem = process.memory_info().rss / 1024**2 # Memory in MB - logger.info(f"[Memory usage] At start: {mem:.2f} MB") - - query = f""" - SELECT {fields_str} - FROM '{temp_parquet}' - QUALIFY ROW_NUMBER() OVER ( - PARTITION BY fact ORDER BY priority, entry_date DESC, entry_number DESC - ) = 1 - """ - self.conn.execute( - f""" - COPY ( - {query} - ) TO '{str(output_path)}' (FORMAT PARQUET); - """ - ) - end_time = time.time() - elapsed_time = end_time - start_time - logger.info(f"Time for new query {elapsed_time:.2f}") - - process = psutil.Process(os.getpid()) - mem = process.memory_info().rss / 1024**2 # Memory in MB - logger.info(f"[Memory usage] At end of query: {mem:.2f} MB") - - logger.info(f"fact parquet file: {output_path}") - logger.info(f"fact parquet file exists: {os.path.exists(output_path)}") - def load_fact_resource_from_temp_parquet( self, transformed_parquet_dir, temp_parquet ): """ - This method loads facts into a fact table from a directory containing all transformed files as parquet files + This method loads fact resources into a parquet table using the temporary parquet table from earlier """ output_path = self.fact_resource_path output_path.parent.mkdir(parents=True, exist_ok=True) - logger.info(f"loading fact resources from {str(transformed_parquet_dir)}") - logger.info(f"loading fact resources into {str(output_path)}") + logger.info(f"loading fact resources") fact_resource_fields = self.specification.schema["fact-resource"]["fields"] fields_str = ", ".join( [field.replace("-", "_") for field in fact_resource_fields] ) - # query to extract data from the temp table (containing raw data), group by a fact, and get the highest - # priority or latest record - start_time = time.time() + # All CSV files have been loaded into a temporary table. Extract required columns and export process = psutil.Process(os.getpid()) mem = process.memory_info().rss / 1024**2 # Memory in MB logger.info(f"[Memory usage] At start: {mem:.2f} MB") - logger.info(f"fields_str: {fields_str}") query = f""" SELECT {fields_str} @@ -489,15 +309,12 @@ def load_fact_resource_from_temp_parquet( ) TO '{str(output_path)}' (FORMAT PARQUET); """ ) - end_time = time.time() - elapsed_time = end_time - start_time - logger.info(f"Time for new query {elapsed_time:.2f}") process = psutil.Process(os.getpid()) - mem = process.memory_info().rss / 1024**2 # Memory in MB + mem = process.memory_info().rss / 1024**2 logger.info(f"[Memory usage] At end of query: {mem:.2f} MB") - logger.info(f"temp_parquet: {temp_parquet}") + # parquet file no longer needed, remove to clear up memory os.remove(temp_parquet) def load_entities_range( From 285593be6ecd997fea0eb0c48bad108d6596aa9c Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Fri, 2 May 2025 16:25:51 +0100 Subject: [PATCH 10/26] Tidy up of code --- digital_land/package/dataset_parquet.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/digital_land/package/dataset_parquet.py b/digital_land/package/dataset_parquet.py index 2fff0a73b..4890b443a 100644 --- a/digital_land/package/dataset_parquet.py +++ b/digital_land/package/dataset_parquet.py @@ -228,7 +228,7 @@ def load_facts_from_temp_parquet(self, transformed_parquet_dir, temp_parquet): """ output_path = self.fact_path output_path.parent.mkdir(parents=True, exist_ok=True) - logger.info(f"loading facts") + logger.info("loading facts") fact_fields = self.specification.schema["fact"]["fields"] fields_str = ", ".join([field.replace("-", "_") for field in fact_fields]) @@ -286,7 +286,7 @@ def load_fact_resource_from_temp_parquet( """ output_path = self.fact_resource_path output_path.parent.mkdir(parents=True, exist_ok=True) - logger.info(f"loading fact resources") + logger.info("loading fact resources") fact_resource_fields = self.specification.schema["fact-resource"]["fields"] fields_str = ", ".join( From fdd293be988e531dd0d712bc1850fb9858e3609f Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Tue, 6 May 2025 14:49:22 +0100 Subject: [PATCH 11/26] Using 'with' to ensure cleanup if any issues --- digital_land/package/dataset_parquet.py | 67 +++++++++++++------------ 1 file changed, 35 insertions(+), 32 deletions(-) diff --git a/digital_land/package/dataset_parquet.py b/digital_land/package/dataset_parquet.py index 4890b443a..9f6b6aab2 100644 --- a/digital_land/package/dataset_parquet.py +++ b/digital_land/package/dataset_parquet.py @@ -240,43 +240,46 @@ def load_facts_from_temp_parquet(self, transformed_parquet_dir, temp_parquet): # query to extract data from the temp table (containing raw data), group by a fact, and get the highest # priority or latest record n_buckets = 50 - temp_dir = Path(tempfile.mkdtemp(prefix="duckdb_buckets_")) bucket_outputs = [] - # Split this query into buckets to avoid using too much memory at the same time. - for bucket in range(n_buckets): - output_file = temp_dir / f"bucket_{bucket}.parquet" - - query = f""" + # Using 'with' to ensure temp_dir is cleaned up if there are any issues + with tempfile.TemporaryDirectory(prefix="duckdb_buckets_") as temp_dir_str: + temp_dir = Path(temp_dir_str) + + # Split this query into buckets to avoid using too much memory at the same time. + for bucket in range(n_buckets): + output_file = temp_dir / f"bucket_{bucket}.parquet" + + query = f""" + COPY ( + SELECT {fields_str} + FROM read_parquet('{temp_parquet}') + WHERE MOD(HASH(fact), {n_buckets}) = {bucket} + QUALIFY ROW_NUMBER() OVER ( + PARTITION BY fact ORDER BY priority, entry_date DESC, entry_number DESC + ) = 1 + ) TO '{output_file}' (FORMAT PARQUET); + """ + self.conn.execute(query) + bucket_outputs.append(output_file) + + process = psutil.Process(os.getpid()) + mem = process.memory_info().rss / 1024**2 # Memory in MB + logger.info(f"[Memory usage] At end of query: {mem:.2f} MB") + + files_str = ", ".join([f"'{str(f)}'" for f in bucket_outputs]) + + self.conn.execute( + f""" COPY ( - SELECT {fields_str} - FROM read_parquet('{temp_parquet}') - WHERE MOD(HASH(fact), {n_buckets}) = {bucket} - QUALIFY ROW_NUMBER() OVER ( - PARTITION BY fact ORDER BY priority, entry_date DESC, entry_number DESC - ) = 1 - ) TO '{output_file}' (FORMAT PARQUET); + SELECT * FROM read_parquet([{files_str}]) + ) TO '{output_path}' (FORMAT PARQUET); """ - self.conn.execute(query) - bucket_outputs.append(output_file) - - process = psutil.Process(os.getpid()) - mem = process.memory_info().rss / 1024**2 # Memory in MB - logger.info(f"[Memory usage] At end of query: {mem:.2f} MB") - - files_str = ", ".join([f"'{str(f)}'" for f in bucket_outputs]) - - self.conn.execute( - f""" - COPY ( - SELECT * FROM read_parquet([{files_str}]) - ) TO '{output_path}' (FORMAT PARQUET); - """ - ) + ) - process = psutil.Process(os.getpid()) - mem = process.memory_info().rss / 1024**2 # Memory in MB - logger.info(f"[Memory usage] For concatenation: {mem:.2f} MB") + process = psutil.Process(os.getpid()) + mem = process.memory_info().rss / 1024**2 # Memory in MB + logger.info(f"[Memory usage] For concatenation: {mem:.2f} MB") def load_fact_resource_from_temp_parquet( self, transformed_parquet_dir, temp_parquet From 7ced24ed2ec9c000e0a01545b186c8cbaecc5a97 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Tue, 6 May 2025 14:50:08 +0100 Subject: [PATCH 12/26] Added tests for new parquet functions --- .../package/test_dataset_parquet.py | 109 +++++++++++++++++- 1 file changed, 106 insertions(+), 3 deletions(-) diff --git a/tests/integration/package/test_dataset_parquet.py b/tests/integration/package/test_dataset_parquet.py index f94244dd1..9ada32fb7 100644 --- a/tests/integration/package/test_dataset_parquet.py +++ b/tests/integration/package/test_dataset_parquet.py @@ -374,9 +374,9 @@ def test_load_facts_single_file(data: dict, expected: int, tmp_path): specification_dir=None, ) - # this method is explicitely designed to load facts from the temp table - # however it shouldn't need this, it's dupllicating all of the same data in a emporary space - # we should try leveraging the power of duckdb and parquet. + # this method is explicitly designed to load facts from the temp table + # however it shouldn't need this, it's duplicating all of the same data in a temporary space + # we should try leveraging the power of duckdb and parquet. package.load_facts(transformed_parquet_dir=transformed_parquet_dir) output_file = ( @@ -497,6 +497,109 @@ def test_load_facts_one_file_with_empty_file(data, expected, tmp_path): assert df.shape[1] == 9, "Not all columns saved in fact.parquet file" +@pytest.mark.parametrize( + "data1,data2,expected", [(transformed_1_data, transformed_2_data, 35)] +) +def test_load_facts_from_temp_parquet(data1, data2, expected, tmp_path): + """ + tests loading from a directory when there are multiple files, loads them + into a single parquet file and gets facts from that file + """ + # convert data to df's and save to a single parquet file + df1 = pd.DataFrame.from_dict(data1) + df2 = pd.DataFrame.from_dict(data2) + transformed_parquet_dir = tmp_path / "transformed" + transformed_parquet_dir.mkdir(parents=True, exist_ok=True) + df1.to_parquet( + transformed_parquet_dir / "transformed_resource_1.parquet", index=False + ) + df2.to_parquet( + transformed_parquet_dir / "transformed_resource_2.parquet", index=False + ) + + package = DatasetParquetPackage( + dataset="conservation-area", + path=tmp_path / "conservation-area", + specification_dir=None, + ) + temp_parquet = package.load_details_into_temp_parquet( + transformed_parquet_dir=transformed_parquet_dir + ) + assert os.path.exists(temp_parquet), "temp parquet file not created" + package.load_facts_from_temp_parquet( + transformed_parquet_dir=transformed_parquet_dir, temp_parquet=temp_parquet + ) + + output_file = ( + tmp_path + / "conservation-area" + / "fact" + / "dataset=conservation-area" + / "fact.parquet" + ) + assert os.path.exists(output_file), "fact.parquet file does not exist" + + df = pd.read_parquet(output_file) + + assert len(df) > 0, "No data in fact.parquet file" + assert ( + len(df) == expected + ), "No. of facts does not match expected" # No of unique facts + assert df.shape[1] == 9, "Not all columns saved in fact.parquet file" + + +@pytest.mark.parametrize( + "data_1,data_2,expected", [(transformed_1_data, transformed_2_data, 35)] +) +def test_load_fact_resource_from_temp_parquet(data_1, data_2, expected, tmp_path): + """ + tests loading from a directory when there are multiple files, loads them + into a single parquet file and gets fact_resources from that file + """ + # convert data to df's and save to a single parquet file + df_1 = pd.DataFrame.from_dict(data_1) + df_2 = pd.DataFrame.from_dict(data_2) + transformed_parquet_dir = tmp_path / "transformed" + transformed_parquet_dir.mkdir(parents=True, exist_ok=True) + df_1.to_parquet( + transformed_parquet_dir / "transformed_resource_1.parquet", index=False + ) + df_2.to_parquet( + transformed_parquet_dir / "transformed_resource_2.parquet", index=False + ) + + package = DatasetParquetPackage( + dataset="conservation-area", + path=tmp_path / "conservation-area", + specification_dir=None, + ) + temp_parquet = package.load_details_into_temp_parquet( + transformed_parquet_dir=transformed_parquet_dir + ) + assert os.path.exists(temp_parquet), "temp parquet file not created" + package.load_fact_resource_from_temp_parquet( + transformed_parquet_dir=transformed_parquet_dir, temp_parquet=temp_parquet + ) + + # Check if the output parquet file exists and verify contents + output_file = ( + tmp_path + / "conservation-area" + / "fact-resource" + / "dataset=conservation-area" + / "fact-resource.parquet" + ) + assert os.path.exists(output_file), "fact-resource.parquet file does not exist" + + # Load Parquet into a DataFrame to verify data correctness + df = pd.read_parquet(output_file) + + assert len(df) > 0, "No data in fact-resource,parquet file" + assert len(df) == expected, "Not all data saved in fact-resource.parquet file" + + assert df.shape[1] == 7, "Not all columns saved in fact-resource.parquet file" + + @pytest.mark.parametrize("data,expected", [(transformed_1_data, 16)]) def test_load_fact_resource_single_file(data, expected, tmp_path): From 77b80f8ae489d20c91ba4b496942b0589cdd6676 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Thu, 8 May 2025 16:35:17 +0100 Subject: [PATCH 13/26] Incorporating batching and bucketing into the calculation of facts, fact_resources and entities --- digital_land/commands.py | 23 +- digital_land/package/dataset_parquet.py | 560 ++++++++++++++++++------ 2 files changed, 434 insertions(+), 149 deletions(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index 41e07b9c7..72bfda28a 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -477,17 +477,20 @@ def dataset_create( path=dataset_parquet_path, specification_dir=None, # TBD: package should use this specification object duckdb_path=cache_dir / "overflow.duckdb", + transformed_parquet_dir=transformed_parquet_dir, ) - # Original methods of getting fact and fact_resource tables. Keeping in case we need to revert, but currently - # there is an issue with transport_access_node_collection and the amount of memory required - # pqpackage.load_facts(transformed_parquet_dir) - # pqpackage.load_fact_resource(transformed_parquet_dir) - # Newer methods which reduce the memory usage - temp_parquet = pqpackage.load_details_into_temp_parquet(transformed_parquet_dir) - pqpackage.load_facts_from_temp_parquet(transformed_parquet_dir, temp_parquet) - pqpackage.load_fact_resource_from_temp_parquet( - transformed_parquet_dir, temp_parquet - ) + # To find facts we have a complex SQL window function that can cause memory issues. To aid the allocation of memory + # we decide on a parquet strategy, based on how many parquet files we have, the overall size of these + # files and the available memory. We will look at the following strategies: + # 1) if we have a small number of files or the total size of the files is small then we can run the SQL over all of + # these files. + # 2) Grouping the parquet files into 256MB batches. Then running SQL either on all of these batches at once, or + # bucketing the data so that we run the window SQL function on a subset of facts (then concatenate them) + + # Group parquet files into approx 256MB batches. If this is unnecessary we exit immediately + pqpackage.group_parquet_files(transformed_parquet_dir, target_mb=256) + pqpackage.load_facts(transformed_parquet_dir) + pqpackage.load_fact_resource(transformed_parquet_dir) pqpackage.load_entities(transformed_parquet_dir, resource_path, organisation_path) logger.info("loading fact,fact_resource and entity into {output_path}") diff --git a/digital_land/package/dataset_parquet.py b/digital_land/package/dataset_parquet.py index 9f6b6aab2..7de27ac03 100644 --- a/digital_land/package/dataset_parquet.py +++ b/digital_land/package/dataset_parquet.py @@ -30,7 +30,9 @@ class DatasetParquetPackage(Package): - def __init__(self, dataset, path, duckdb_path=None, **kwargs): + def __init__( + self, dataset, path, duckdb_path=None, transformed_parquet_dir=None, **kwargs + ): """ Initialisation method to set up information as needed @@ -66,6 +68,10 @@ def __init__(self, dataset, path, duckdb_path=None, **kwargs): self.entity_path = ( self.path / "entity" / f"dataset={self.dataset}" / "entity.parquet" ) + self.parquet_dir_details = self.analyze_parquet_dir( + transformed_parquet_dir=transformed_parquet_dir + ) + self.strategy = self.choose_strategy(self.parquet_dir_details) def get_schema(self): schema = {} @@ -127,184 +133,309 @@ def get_schema(self): # logging.info(f"Failed to read in when max_size = {max_size}") # raise - def load_facts(self, transformed_parquet_dir): + def analyze_parquet_dir(self, transformed_parquet_dir): """ - This method loads facts into a fact table from a directory containing all transformed files as parquet files + Get details about the transformed_parquet_dir to decide on which strategy to use for + creating the fact and fact_resource tables """ - output_path = self.fact_path - output_path.parent.mkdir(parents=True, exist_ok=True) - logger.info(f"loading facts from from {str(transformed_parquet_dir)}") - - fact_fields = self.specification.schema["fact"]["fields"] - fields_str = ", ".join([field.replace("-", "_") for field in fact_fields]) - - # query to extract data from the temp table (containing raw data), group by a fact, and get the highest - # priority or latest record - - query = f""" - SELECT {fields_str} - FROM '{str(transformed_parquet_dir)}/*.parquet' - QUALIFY ROW_NUMBER() OVER ( - PARTITION BY fact ORDER BY priority, entry_date DESC, entry_number DESC - ) = 1 + files = list(transformed_parquet_dir.glob("*.parquet")) + no_parquet_files = len(files) + total_size_bytes = sum(f.stat().st_size for f in files) + total_size_mb = total_size_bytes / (1024 * 1024) + avg_size_mb = total_size_mb / len(files) if files else 0 + max_size_mb = max(f.stat().st_size for f in files) / (1024 * 1024) + mem = psutil.virtual_memory() + memory_available = mem.total / 1024**2 + + return { + "no_parquet_files": no_parquet_files, + "total_size_mb": total_size_mb, + "avg_size_mb": avg_size_mb, + "max_size_mb": max_size_mb, + "memory_available": memory_available, + } + + def choose_strategy(self, parquet_dir_details): """ - self.conn.execute( - f""" - COPY ( - {query} - ) TO '{str(output_path)}' (FORMAT PARQUET); + What strategy should we use to create fact, fact_resource and entity tables: + Return one of: + - "direct" - analyse all parquet files at once + - "batch" - group the parquet files into batch files of approx. 256MB + - Did have the other following as potential strategies but it appears as if 'batch' and 'direct' will suffice + - since batching everything into one fie is the equivalent os the 'single_file' option. + - "single_file" - put all parquet files into a single parquet file + - "consolidate_then_bucket" - put all parquet files into several larger files """ - ) - - def load_fact_resource(self, transformed_parquet_dir): - logger.info(f"loading fact resources from {str(transformed_parquet_dir)}") - output_path = self.fact_resource_path - output_path.parent.mkdir(parents=True, exist_ok=True) - fact_resource_fields = self.specification.schema["fact-resource"]["fields"] - fields_str = ", ".join( - [field.replace("-", "_") for field in fact_resource_fields] - ) - - process = psutil.Process(os.getpid()) - mem = process.memory_info().rss / 1024**2 # Memory in MB - logger.info(f"[Memory usage] At start: {mem:.2f} MB") - - # All CSV files have been loaded into a temporary table. Extract several columns and export - query = f""" - SELECT {fields_str} - FROM '{str(transformed_parquet_dir)}/*.parquet' - """ - - self.conn.execute( - f""" - COPY ( - {query} - ) TO '{str(output_path)}' (FORMAT PARQUET); - """ - ) - - def load_details_into_temp_parquet(self, transformed_parquet_dir): + # If memory is less than 2GB (or 1/4 of available memory, whichever is smaller) then can potentially process + # them directly + memory_check = min(2048, parquet_dir_details["memory_available"] / 4) + if ( + parquet_dir_details["total_size_mb"] < memory_check + and parquet_dir_details["no_parquet_files"] < 100 + ) or (parquet_dir_details["no_parquet_files"] < 4): + return "direct" + + # if parquet_dir_details["no_parquet_files"] > 500 or parquet_dir_details[ + # "total_size_mb" + # ] > (parquet_dir_details["memory_available"] * 0.75): + # return "consolidate_then_bucket" + + return "batch" + + def group_parquet_files( + self, transformed_parquet_dir, target_mb=256, delete_originals=False + ): """ - Save all details into a temporary parquet file, to use later to + group parquet files into batches, each aiming for approximately 'target_mb' in size. """ - logger.info(f"loading all details from {str(transformed_parquet_dir)}") - temp_dir = Path(tempfile.mkdtemp()) - temp_dir.mkdir(parents=True, exist_ok=True) - output_path = temp_dir / "temp_table.parquet" - process = psutil.Process(os.getpid()) mem = process.memory_info().rss / 1024**2 # Memory in MB - logger.info(f"[Memory usage] At start: {mem:.2f} MB") + logger.info(f"[Memory usage] Before grouping: {mem:.2f} MB") - fact_fields = self.specification.schema["fact"]["fields"] - fact_resource_fields = self.specification.schema["fact-resource"]["fields"] - fields = list(set([*fact_fields, *fact_resource_fields])) - fields_str = ", ".join([field.replace("-", "_") for field in fields]) - - # All parquet files to be loaded into a temporary parquet file. Extract needed columns and export - # Need to add entry_number as it is needed for fact table - query = f""" - SELECT {fields_str}, entry_number - FROM '{str(transformed_parquet_dir)}/*.parquet' - """ + if self.strategy == "direct": + logger.info(f"No batching required for {str(transformed_parquet_dir)}") + else: + logger.info(f"Batching all files from {str(transformed_parquet_dir)}") + target_bytes = target_mb * 1024 * 1024 + parquet_files = list(transformed_parquet_dir.glob("*.parquet")) + + # List of (file_path, file_size_in_bytes) + file_sizes = [(f, f.stat().st_size) for f in parquet_files] + file_sizes.sort(key=lambda x: x[1], reverse=True) + + batches = [] + + # apply first-fit decreasing heuristic + for f, size in file_sizes: + placed = False + for batch in batches: + if batch["total_size"] + size <= target_bytes: + batch["files"].append(f) + batch["total_size"] += size + placed = True + break + if not placed: + # Start a new batch + batches.append({"files": [f], "total_size": size}) + + digits = max(2, len(str(len(batches) - 1))) + batch_dir = transformed_parquet_dir / "batch" + batch_dir.mkdir(parents=True, exist_ok=True) + for i, batch in enumerate(batches): + files = batch["files"] + output_file = batch_dir / f"batch_{i:0{digits}}.parquet" + files_str = ", ".join(f"'{str(f)}'" for f in files) + query = f""" + COPY ( + SELECT * FROM read_parquet([{files_str}]) + ) TO '{str(output_file)}' (FORMAT PARQUET) + """ + self.conn.execute(query) - self.conn.execute( - f""" - COPY ( - {query} - ) TO '{str(output_path)}' (FORMAT PARQUET); - """ - ) + # Should we delete the files now that they have been 'batched'? + if delete_originals: + for f in files: + f.unlink() process = psutil.Process(os.getpid()) mem = process.memory_info().rss / 1024**2 # Memory in MB - logger.info(f"[Memory usage] At end of temp_parquet query: {mem:.2f} MB") + logger.info(f"[Memory usage] After grouping: {mem:.2f} MB") - return output_path - - def load_facts_from_temp_parquet(self, transformed_parquet_dir, temp_parquet): + def load_facts(self, transformed_parquet_dir): """ - This method loads facts into a parquet table using the temporary parquet table from earlier + This method loads facts into a fact table from a directory containing all transformed files as parquet files """ output_path = self.fact_path output_path.parent.mkdir(parents=True, exist_ok=True) - logger.info("loading facts") + logger.info(f"loading facts from from {str(transformed_parquet_dir)}") fact_fields = self.specification.schema["fact"]["fields"] fields_str = ", ".join([field.replace("-", "_") for field in fact_fields]) process = psutil.Process(os.getpid()) mem = process.memory_info().rss / 1024**2 # Memory in MB - logger.info(f"[Memory usage] At start: {mem:.2f} MB") - - # query to extract data from the temp table (containing raw data), group by a fact, and get the highest - # priority or latest record - n_buckets = 50 - bucket_outputs = [] - - # Using 'with' to ensure temp_dir is cleaned up if there are any issues - with tempfile.TemporaryDirectory(prefix="duckdb_buckets_") as temp_dir_str: - temp_dir = Path(temp_dir_str) + logger.info(f"[Memory usage] At start of load_facts: {mem:.2f} MB") + + # query to extract data from either the transformed parquet files or batched ones. Data is grouped by fact, + # and we get the highest priority or latest record + if self.strategy == "direct": + logger.info("Using direct strategy for facts") + query = f""" + SELECT {fields_str} + FROM '{str(transformed_parquet_dir)}/*.parquet' + QUALIFY ROW_NUMBER() OVER ( + PARTITION BY fact ORDER BY priority, entry_date DESC, entry_number DESC + ) = 1 + """ + self.conn.execute( + f""" + COPY ( + {query} + ) TO '{str(output_path)}' (FORMAT PARQUET); + """ + ) + else: + no_of_batched_files = len( + list(transformed_parquet_dir.glob("batch/batch_*.parquet")) + ) + if no_of_batched_files == 1: + logger.info( + "Only have one batched file for facts - using 'simple' query" + ) + n_buckets = 1 + else: + # Max partition size should the smallest value of either be 4GB or 1/4 of the available memory + max_partition_memory_mb = min( + 2048, self.parquet_dir_details["memory_available"] / 4 + ) + n_buckets = ( + self.parquet_dir_details["total_size_mb"] // max_partition_memory_mb + + 1 + ) + n_buckets = int(min(n_buckets, no_of_batched_files)) + if n_buckets == 0: + exit( + "Have got a value of zero for n_buckets in `load_facts`. Cannot continue." + ) + if n_buckets == 1: + logger.info("Only need one bucket for facts - using 'simple' query") + if no_of_batched_files == 1 or n_buckets == 1: + query = f""" + SELECT {fields_str} + FROM '{str(transformed_parquet_dir)}/batch/batch_*.parquet' + QUALIFY ROW_NUMBER() OVER ( + PARTITION BY fact ORDER BY priority, entry_date DESC, entry_number DESC + ) = 1 + """ + self.conn.execute( + f""" + COPY ( + {query} + ) TO '{str(output_path)}' (FORMAT PARQUET); + """ + ) + else: + # Multiple buckets and batched files used + logger.info( + "Need to use multiple buckets in windowed function for facts" + ) + bucket_dir = transformed_parquet_dir / "bucket" + bucket_dir.mkdir(parents=True, exist_ok=True) + digits = max(2, len(str(n_buckets - 1))) + bucket_paths = [ + bucket_dir / f"bucket_{i:0{digits}}.parquet" + for i in range(n_buckets) + ] + logger.info( + f"Have {len(list(transformed_parquet_dir.glob('batch/batch_*.parquet')))} batch files" + ) - # Split this query into buckets to avoid using too much memory at the same time. - for bucket in range(n_buckets): - output_file = temp_dir / f"bucket_{bucket}.parquet" + # Loop over each batch file and assign to a bucket file + logger.info(f"Assigning to {n_buckets} buckets") + for f in transformed_parquet_dir.glob("batch/batch_*.parquet"): + for i in range(n_buckets): + self.conn.execute( + f""" + COPY ( + SELECT * + FROM read_parquet('{f}') + WHERE MOD(HASH(fact), {n_buckets}) = {i} + ) TO '{bucket_paths[i]}' (FORMAT PARQUET, APPEND TRUE); + """ + ) - query = f""" + logger.info( + f"Have {len(list(bucket_dir.glob('bucket_*.parquet')))} bucket files" + ) + process = psutil.Process(os.getpid()) + mem = process.memory_info().rss / 1024**2 # Memory in MB + logger.info(f"[Memory usage] After 'bucketing': {mem:.2f} MB") + + result_dir = transformed_parquet_dir / "result" + result_dir.mkdir(parents=True, exist_ok=True) + result_paths = [ + result_dir / f"result_{i:0{digits}}.parquet" + for i in range(n_buckets) + ] + for i in range(n_buckets): + bucket_path = bucket_dir / f"bucket_{i:0{digits}}.parquet" + self.conn.execute( + f""" COPY ( - SELECT {fields_str} - FROM read_parquet('{temp_parquet}') - WHERE MOD(HASH(fact), {n_buckets}) = {bucket} + SELECT * + FROM read_parquet('{bucket_path}') QUALIFY ROW_NUMBER() OVER ( PARTITION BY fact ORDER BY priority, entry_date DESC, entry_number DESC ) = 1 - ) TO '{output_file}' (FORMAT PARQUET); + ) TO '{result_paths[i]}' (FORMAT PARQUET); + """ + ) + logger.info( + f"Have {len(list(transformed_parquet_dir.glob('result_*.parquet')))} result files" + ) + + # for path in bucket_paths: + # path.unlink(missing_ok=True) + + process = psutil.Process(os.getpid()) + mem = process.memory_info().rss / 1024**2 # Memory in MB + logger.info(f"[Memory usage] After 'result': {mem:.2f} MB") + + self.conn.execute( + f""" + COPY( + SELECT * FROM + read_parquet('{str(result_dir)}/result_*.parquet') + ) TO + '{output_path}.parquet'(FORMAT + PARQUET); """ - self.conn.execute(query) - bucket_outputs.append(output_file) + ) + + # for path in result_paths: + # path.unlink(missing_ok=True) process = psutil.Process(os.getpid()) mem = process.memory_info().rss / 1024**2 # Memory in MB logger.info(f"[Memory usage] At end of query: {mem:.2f} MB") - files_str = ", ".join([f"'{str(f)}'" for f in bucket_outputs]) + # files_str = ", ".join([f"'{str(f)}'" for f in bucket_outputs]) + # + # self.conn.execute( + # f""" + # COPY ( + # SELECT * FROM read_parquet([{files_str}]) + # ) TO '{output_path}' (FORMAT PARQUET); + # """ + # ) + # + # process = psutil.Process(os.getpid()) + # mem = process.memory_info().rss / 1024**2 # Memory in MB + # logger.info(f"[Memory usage] For concatenation: {mem:.2f} MB") - self.conn.execute( - f""" - COPY ( - SELECT * FROM read_parquet([{files_str}]) - ) TO '{output_path}' (FORMAT PARQUET); - """ - ) - - process = psutil.Process(os.getpid()) - mem = process.memory_info().rss / 1024**2 # Memory in MB - logger.info(f"[Memory usage] For concatenation: {mem:.2f} MB") - - def load_fact_resource_from_temp_parquet( - self, transformed_parquet_dir, temp_parquet - ): - """ - This method loads fact resources into a parquet table using the temporary parquet table from earlier - """ + def load_fact_resource(self, transformed_parquet_dir): + logger.info(f"loading fact resources from {str(transformed_parquet_dir)}") output_path = self.fact_resource_path output_path.parent.mkdir(parents=True, exist_ok=True) - logger.info("loading fact resources") - fact_resource_fields = self.specification.schema["fact-resource"]["fields"] fields_str = ", ".join( [field.replace("-", "_") for field in fact_resource_fields] ) - # All CSV files have been loaded into a temporary table. Extract required columns and export process = psutil.Process(os.getpid()) mem = process.memory_info().rss / 1024**2 # Memory in MB logger.info(f"[Memory usage] At start: {mem:.2f} MB") + # Extract relevant columns from original parquet or batched parquet files + if self.strategy == "direct": + parquet_str = "*.parquet" + else: + parquet_str = "batch/batch_*.parquet" query = f""" SELECT {fields_str} - FROM '{temp_parquet}' + FROM '{str(transformed_parquet_dir)}/{parquet_str}' """ + self.conn.execute( f""" COPY ( @@ -313,12 +444,147 @@ def load_fact_resource_from_temp_parquet( """ ) - process = psutil.Process(os.getpid()) - mem = process.memory_info().rss / 1024**2 - logger.info(f"[Memory usage] At end of query: {mem:.2f} MB") - - # parquet file no longer needed, remove to clear up memory - os.remove(temp_parquet) + # def load_details_into_temp_parquet(self, transformed_parquet_dir): + # """ + # Save all details into a temporary parquet file, to use later to + # """ + # logger.info(f"loading all details from {str(transformed_parquet_dir)}") + # temp_dir = Path(tempfile.mkdtemp()) + # temp_dir.mkdir(parents=True, exist_ok=True) + # output_path = temp_dir / "temp_table.parquet" + # + # process = psutil.Process(os.getpid()) + # mem = process.memory_info().rss / 1024**2 # Memory in MB + # logger.info(f"[Memory usage] At start: {mem:.2f} MB") + # + # fact_fields = self.specification.schema["fact"]["fields"] + # fact_resource_fields = self.specification.schema["fact-resource"]["fields"] + # fields = list(set([*fact_fields, *fact_resource_fields])) + # fields_str = ", ".join([field.replace("-", "_") for field in fields]) + # + # # All parquet files to be loaded into a temporary parquet file. Extract needed columns and export + # # Need to add entry_number as it is needed for fact table + # query = f""" + # SELECT {fields_str}, entry_number + # FROM '{str(transformed_parquet_dir)}/*.parquet' + # """ + # + # self.conn.execute( + # f""" + # COPY ( + # {query} + # ) TO '{str(output_path)}' (FORMAT PARQUET); + # """ + # ) + # + # process = psutil.Process(os.getpid()) + # mem = process.memory_info().rss / 1024**2 # Memory in MB + # logger.info(f"[Memory usage] At end of temp_parquet query: {mem:.2f} MB") + # + # return output_path + # + # def load_facts_from_temp_parquet( + # self, transformed_parquet_dir, temp_parquet, parquet_dir_details + # ): + # """ + # This method loads facts into a parquet table using the temporary parquet table from earlier + # """ + # output_path = self.fact_path + # output_path.parent.mkdir(parents=True, exist_ok=True) + # logger.info("loading facts") + # + # fact_fields = self.specification.schema["fact"]["fields"] + # fields_str = ", ".join([field.replace("-", "_") for field in fact_fields]) + # + # process = psutil.Process(os.getpid()) + # mem = process.memory_info().rss / 1024**2 # Memory in MB + # logger.info(f"[Memory usage] At start: {mem:.2f} MB") + # + # # query to extract data from the temp table (containing raw data), group by a fact, and get the highest + # # priority or latest record + # # Max partition size should either be 1GB or twice the maximum + # max_partition_memory_mb = 1024 # Set it to be 1GB per partition + # n_buckets = parquet_dir_details["total_size_mb"] // max_partition_memory_mb + 1 + # n_buckets = min(n_buckets, parquet_dir_details["no_parquet_files"]) + # bucket_outputs = [] + # + # # Using 'with' to ensure temp_dir is cleaned up if there are any issues + # with tempfile.TemporaryDirectory(prefix="duckdb_buckets_") as temp_dir_str: + # temp_dir = Path(temp_dir_str) + # + # # Split this query into buckets to avoid using too much memory at the same time. + # for bucket in range(n_buckets): + # output_file = temp_dir / f"bucket_{bucket}.parquet" + # + # query = f""" + # COPY ( + # SELECT {fields_str} + # FROM read_parquet('{temp_parquet}') + # WHERE MOD(HASH(fact), {n_buckets}) = {bucket} + # QUALIFY ROW_NUMBER() OVER ( + # PARTITION BY fact ORDER BY priority, entry_date DESC, entry_number DESC + # ) = 1 + # ) TO '{output_file}' (FORMAT PARQUET); + # """ + # self.conn.execute(query) + # bucket_outputs.append(output_file) + # + # process = psutil.Process(os.getpid()) + # mem = process.memory_info().rss / 1024**2 # Memory in MB + # logger.info(f"[Memory usage] At end of query: {mem:.2f} MB") + # + # files_str = ", ".join([f"'{str(f)}'" for f in bucket_outputs]) + # + # self.conn.execute( + # f""" + # COPY ( + # SELECT * FROM read_parquet([{files_str}]) + # ) TO '{output_path}' (FORMAT PARQUET); + # """ + # ) + # + # process = psutil.Process(os.getpid()) + # mem = process.memory_info().rss / 1024**2 # Memory in MB + # logger.info(f"[Memory usage] For concatenation: {mem:.2f} MB") + # + # def load_fact_resource_from_temp_parquet( + # self, transformed_parquet_dir, temp_parquet + # ): + # """ + # This method loads fact resources into a parquet table using the temporary parquet table from earlier + # """ + # output_path = self.fact_resource_path + # output_path.parent.mkdir(parents=True, exist_ok=True) + # logger.info("loading fact resources") + # + # fact_resource_fields = self.specification.schema["fact-resource"]["fields"] + # fields_str = ", ".join( + # [field.replace("-", "_") for field in fact_resource_fields] + # ) + # + # # All CSV files have been loaded into a temporary table. Extract required columns and export + # process = psutil.Process(os.getpid()) + # mem = process.memory_info().rss / 1024**2 # Memory in MB + # logger.info(f"[Memory usage] At start: {mem:.2f} MB") + # + # query = f""" + # SELECT {fields_str} + # FROM '{temp_parquet}' + # """ + # self.conn.execute( + # f""" + # COPY ( + # {query} + # ) TO '{str(output_path)}' (FORMAT PARQUET); + # """ + # ) + # + # process = psutil.Process(os.getpid()) + # mem = process.memory_info().rss / 1024**2 + # logger.info(f"[Memory usage] At end of query: {mem:.2f} MB") + # + # # parquet file no longer needed, remove to clear up memory + # os.remove(temp_parquet) def load_entities_range( self, @@ -353,9 +619,13 @@ def load_entities_range( else: entity_where_clause = "" + if self.strategy == "direct": + parquet_str = "*.parquet" + else: + parquet_str = "batch/batch_*.parquet" query = f""" SELECT DISTINCT REPLACE(field,'-','_') - FROM parquet_scan('{transformed_parquet_dir}/*.parquet') + FROM parquet_scan('{transformed_parquet_dir}/{parquet_str}') {entity_where_clause} """ @@ -435,10 +705,15 @@ def load_entities_range( # craft a where clause to limit entities in quetion, this chunking helps solve memory issues + if self.strategy == "direct": + parquet_str = "*.parquet" + else: + parquet_str = "batch/batch_*.parquet" + query = f""" SELECT {fields_str}{optional_org_str} FROM ( SELECT {fields_str}, CASE WHEN resource_csv."end-date" IS NULL THEN '2999-12-31' ELSE resource_csv."end-date" END AS resource_end_date - FROM parquet_scan('{transformed_parquet_dir}/*.parquet') tf + FROM parquet_scan('{transformed_parquet_dir}/{parquet_str}') tf LEFT JOIN read_csv_auto('{resource_path}', max_line_size=40000000) resource_csv ON tf.resource = resource_csv.resource {entity_where_clause} @@ -531,9 +806,13 @@ def load_entities(self, transformed_parquet_dir, resource_path, organisation_pat output_path.parent.mkdir(parents=True, exist_ok=True) # retrieve entity counnts including and minimum - min_sql = f"select MIN(entity) FROM parquet_scan('{transformed_parquet_dir}/*.parquet');" + if self.strategy == "direct": + parquet_str = "*.parquet" + else: + parquet_str = "batch/batch_*.parquet" + min_sql = f"select MIN(entity) FROM parquet_scan('{transformed_parquet_dir}/{parquet_str}');" min_entity = self.conn.execute(min_sql).fetchone()[0] - max_sql = f"select MAX(entity) FROM parquet_scan('{transformed_parquet_dir}/*.parquet');" + max_sql = f"select MAX(entity) FROM parquet_scan('{transformed_parquet_dir}/{parquet_str}');" max_entity = self.conn.execute(max_sql).fetchone()[0] total_entities = max_entity - min_entity entity_limit = 100000 @@ -571,7 +850,10 @@ def load_entities(self, transformed_parquet_dir, resource_path, organisation_pat shutil.rmtree(temp_dir) else: self.load_entities_range( - transformed_parquet_dir, resource_path, organisation_path, output_path + transformed_parquet_dir, + resource_path, + organisation_path, + output_path, ) def load_to_sqlite(self, sqlite_path): From 739336178b9aaf4edebfbf36505b17c9ae4587f4 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Thu, 8 May 2025 16:36:05 +0100 Subject: [PATCH 14/26] Removing temp_parquet tests as no longer using those functions --- .../package/test_dataset_parquet.py | 202 +++++++++--------- 1 file changed, 101 insertions(+), 101 deletions(-) diff --git a/tests/integration/package/test_dataset_parquet.py b/tests/integration/package/test_dataset_parquet.py index 9ada32fb7..c20a92dca 100644 --- a/tests/integration/package/test_dataset_parquet.py +++ b/tests/integration/package/test_dataset_parquet.py @@ -497,107 +497,107 @@ def test_load_facts_one_file_with_empty_file(data, expected, tmp_path): assert df.shape[1] == 9, "Not all columns saved in fact.parquet file" -@pytest.mark.parametrize( - "data1,data2,expected", [(transformed_1_data, transformed_2_data, 35)] -) -def test_load_facts_from_temp_parquet(data1, data2, expected, tmp_path): - """ - tests loading from a directory when there are multiple files, loads them - into a single parquet file and gets facts from that file - """ - # convert data to df's and save to a single parquet file - df1 = pd.DataFrame.from_dict(data1) - df2 = pd.DataFrame.from_dict(data2) - transformed_parquet_dir = tmp_path / "transformed" - transformed_parquet_dir.mkdir(parents=True, exist_ok=True) - df1.to_parquet( - transformed_parquet_dir / "transformed_resource_1.parquet", index=False - ) - df2.to_parquet( - transformed_parquet_dir / "transformed_resource_2.parquet", index=False - ) - - package = DatasetParquetPackage( - dataset="conservation-area", - path=tmp_path / "conservation-area", - specification_dir=None, - ) - temp_parquet = package.load_details_into_temp_parquet( - transformed_parquet_dir=transformed_parquet_dir - ) - assert os.path.exists(temp_parquet), "temp parquet file not created" - package.load_facts_from_temp_parquet( - transformed_parquet_dir=transformed_parquet_dir, temp_parquet=temp_parquet - ) - - output_file = ( - tmp_path - / "conservation-area" - / "fact" - / "dataset=conservation-area" - / "fact.parquet" - ) - assert os.path.exists(output_file), "fact.parquet file does not exist" - - df = pd.read_parquet(output_file) - - assert len(df) > 0, "No data in fact.parquet file" - assert ( - len(df) == expected - ), "No. of facts does not match expected" # No of unique facts - assert df.shape[1] == 9, "Not all columns saved in fact.parquet file" - - -@pytest.mark.parametrize( - "data_1,data_2,expected", [(transformed_1_data, transformed_2_data, 35)] -) -def test_load_fact_resource_from_temp_parquet(data_1, data_2, expected, tmp_path): - """ - tests loading from a directory when there are multiple files, loads them - into a single parquet file and gets fact_resources from that file - """ - # convert data to df's and save to a single parquet file - df_1 = pd.DataFrame.from_dict(data_1) - df_2 = pd.DataFrame.from_dict(data_2) - transformed_parquet_dir = tmp_path / "transformed" - transformed_parquet_dir.mkdir(parents=True, exist_ok=True) - df_1.to_parquet( - transformed_parquet_dir / "transformed_resource_1.parquet", index=False - ) - df_2.to_parquet( - transformed_parquet_dir / "transformed_resource_2.parquet", index=False - ) - - package = DatasetParquetPackage( - dataset="conservation-area", - path=tmp_path / "conservation-area", - specification_dir=None, - ) - temp_parquet = package.load_details_into_temp_parquet( - transformed_parquet_dir=transformed_parquet_dir - ) - assert os.path.exists(temp_parquet), "temp parquet file not created" - package.load_fact_resource_from_temp_parquet( - transformed_parquet_dir=transformed_parquet_dir, temp_parquet=temp_parquet - ) - - # Check if the output parquet file exists and verify contents - output_file = ( - tmp_path - / "conservation-area" - / "fact-resource" - / "dataset=conservation-area" - / "fact-resource.parquet" - ) - assert os.path.exists(output_file), "fact-resource.parquet file does not exist" - - # Load Parquet into a DataFrame to verify data correctness - df = pd.read_parquet(output_file) - - assert len(df) > 0, "No data in fact-resource,parquet file" - assert len(df) == expected, "Not all data saved in fact-resource.parquet file" - - assert df.shape[1] == 7, "Not all columns saved in fact-resource.parquet file" +# @pytest.mark.parametrize( +# "data1,data2,expected", [(transformed_1_data, transformed_2_data, 35)] +# ) +# def test_load_facts_from_temp_parquet(data1, data2, expected, tmp_path): +# """ +# tests loading from a directory when there are multiple files, loads them +# into a single parquet file and gets facts from that file +# """ +# # convert data to df's and save to a single parquet file +# df1 = pd.DataFrame.from_dict(data1) +# df2 = pd.DataFrame.from_dict(data2) +# transformed_parquet_dir = tmp_path / "transformed" +# transformed_parquet_dir.mkdir(parents=True, exist_ok=True) +# df1.to_parquet( +# transformed_parquet_dir / "transformed_resource_1.parquet", index=False +# ) +# df2.to_parquet( +# transformed_parquet_dir / "transformed_resource_2.parquet", index=False +# ) +# +# package = DatasetParquetPackage( +# dataset="conservation-area", +# path=tmp_path / "conservation-area", +# specification_dir=None, +# ) +# temp_parquet = package.load_details_into_temp_parquet( +# transformed_parquet_dir=transformed_parquet_dir +# ) +# assert os.path.exists(temp_parquet), "temp parquet file not created" +# package.load_facts_from_temp_parquet( +# transformed_parquet_dir=transformed_parquet_dir, temp_parquet=temp_parquet +# ) +# +# output_file = ( +# tmp_path +# / "conservation-area" +# / "fact" +# / "dataset=conservation-area" +# / "fact.parquet" +# ) +# assert os.path.exists(output_file), "fact.parquet file does not exist" +# +# df = pd.read_parquet(output_file) +# +# assert len(df) > 0, "No data in fact.parquet file" +# assert ( +# len(df) == expected +# ), "No. of facts does not match expected" # No of unique facts +# assert df.shape[1] == 9, "Not all columns saved in fact.parquet file" +# +# +# @pytest.mark.parametrize( +# "data_1,data_2,expected", [(transformed_1_data, transformed_2_data, 35)] +# ) +# def test_load_fact_resource_from_temp_parquet(data_1, data_2, expected, tmp_path): +# """ +# tests loading from a directory when there are multiple files, loads them +# into a single parquet file and gets fact_resources from that file +# """ +# # convert data to df's and save to a single parquet file +# df_1 = pd.DataFrame.from_dict(data_1) +# df_2 = pd.DataFrame.from_dict(data_2) +# transformed_parquet_dir = tmp_path / "transformed" +# transformed_parquet_dir.mkdir(parents=True, exist_ok=True) +# df_1.to_parquet( +# transformed_parquet_dir / "transformed_resource_1.parquet", index=False +# ) +# df_2.to_parquet( +# transformed_parquet_dir / "transformed_resource_2.parquet", index=False +# ) +# +# package = DatasetParquetPackage( +# dataset="conservation-area", +# path=tmp_path / "conservation-area", +# specification_dir=None, +# ) +# temp_parquet = package.load_details_into_temp_parquet( +# transformed_parquet_dir=transformed_parquet_dir +# ) +# assert os.path.exists(temp_parquet), "temp parquet file not created" +# package.load_fact_resource_from_temp_parquet( +# transformed_parquet_dir=transformed_parquet_dir, temp_parquet=temp_parquet +# ) +# +# # Check if the output parquet file exists and verify contents +# output_file = ( +# tmp_path +# / "conservation-area" +# / "fact-resource" +# / "dataset=conservation-area" +# / "fact-resource.parquet" +# ) +# assert os.path.exists(output_file), "fact-resource.parquet file does not exist" +# +# # Load Parquet into a DataFrame to verify data correctness +# df = pd.read_parquet(output_file) +# +# assert len(df) > 0, "No data in fact-resource,parquet file" +# assert len(df) == expected, "Not all data saved in fact-resource.parquet file" +# +# assert df.shape[1] == 7, "Not all columns saved in fact-resource.parquet file" @pytest.mark.parametrize("data,expected", [(transformed_1_data, 16)]) From 422485e58e00440e905d44e6485141d1c1113e11 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Thu, 8 May 2025 16:40:37 +0100 Subject: [PATCH 15/26] Removing tempfile library --- digital_land/package/dataset_parquet.py | 1 - 1 file changed, 1 deletion(-) diff --git a/digital_land/package/dataset_parquet.py b/digital_land/package/dataset_parquet.py index 7de27ac03..c29374701 100644 --- a/digital_land/package/dataset_parquet.py +++ b/digital_land/package/dataset_parquet.py @@ -5,7 +5,6 @@ from pathlib import Path from .package import Package import psutil -import tempfile logger = logging.getLogger(__name__) From 572a2de0456e73271d805362f179676102095b05 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Thu, 8 May 2025 16:54:02 +0100 Subject: [PATCH 16/26] Corrected typo --- digital_land/package/dataset_parquet.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/digital_land/package/dataset_parquet.py b/digital_land/package/dataset_parquet.py index c29374701..25371ee59 100644 --- a/digital_land/package/dataset_parquet.py +++ b/digital_land/package/dataset_parquet.py @@ -67,10 +67,13 @@ def __init__( self.entity_path = ( self.path / "entity" / f"dataset={self.dataset}" / "entity.parquet" ) - self.parquet_dir_details = self.analyze_parquet_dir( - transformed_parquet_dir=transformed_parquet_dir - ) - self.strategy = self.choose_strategy(self.parquet_dir_details) + if transformed_parquet_dir is None: + self.strategy = "direct" + else: + self.parquet_dir_details = self.analyze_parquet_dir( + transformed_parquet_dir=transformed_parquet_dir + ) + self.strategy = self.choose_strategy(self.parquet_dir_details) def get_schema(self): schema = {} From b7842313db552dab8f7441e675bb452b4786d54a Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Thu, 8 May 2025 17:39:27 +0100 Subject: [PATCH 17/26] Edited comment --- digital_land/commands.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index 72bfda28a..bc68cd0dd 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -487,7 +487,7 @@ def dataset_create( # 2) Grouping the parquet files into 256MB batches. Then running SQL either on all of these batches at once, or # bucketing the data so that we run the window SQL function on a subset of facts (then concatenate them) - # Group parquet files into approx 256MB batches. If this is unnecessary we exit immediately + # Group parquet files into approx 256MB batches. If this is unnecessary we exit the function immediately pqpackage.group_parquet_files(transformed_parquet_dir, target_mb=256) pqpackage.load_facts(transformed_parquet_dir) pqpackage.load_fact_resource(transformed_parquet_dir) From 137c9a264f9ecbdce9cf6e74b25c43abea7839ff Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Thu, 8 May 2025 17:40:06 +0100 Subject: [PATCH 18/26] Edited exit message --- digital_land/package/dataset_parquet.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/digital_land/package/dataset_parquet.py b/digital_land/package/dataset_parquet.py index 25371ee59..b39aacea7 100644 --- a/digital_land/package/dataset_parquet.py +++ b/digital_land/package/dataset_parquet.py @@ -1,5 +1,7 @@ import os import logging +import sys + import duckdb import shutil from pathlib import Path @@ -296,7 +298,7 @@ def load_facts(self, transformed_parquet_dir): ) n_buckets = int(min(n_buckets, no_of_batched_files)) if n_buckets == 0: - exit( + sys.exit( "Have got a value of zero for n_buckets in `load_facts`. Cannot continue." ) if n_buckets == 1: From a07b535a090524499886557a5a1175c7a1cac22b Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Fri, 9 May 2025 09:41:08 +0100 Subject: [PATCH 19/26] Check strategy before calling function --- digital_land/commands.py | 5 +- digital_land/package/dataset_parquet.py | 83 ++++++++++++------------- 2 files changed, 43 insertions(+), 45 deletions(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index bc68cd0dd..204937176 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -487,8 +487,9 @@ def dataset_create( # 2) Grouping the parquet files into 256MB batches. Then running SQL either on all of these batches at once, or # bucketing the data so that we run the window SQL function on a subset of facts (then concatenate them) - # Group parquet files into approx 256MB batches. If this is unnecessary we exit the function immediately - pqpackage.group_parquet_files(transformed_parquet_dir, target_mb=256) + # Group parquet files into approx 256MB batches (if needed) + if pqpackage.strategy != "direct": + pqpackage.group_parquet_files(transformed_parquet_dir, target_mb=256) pqpackage.load_facts(transformed_parquet_dir) pqpackage.load_fact_resource(transformed_parquet_dir) pqpackage.load_entities(transformed_parquet_dir, resource_path, organisation_path) diff --git a/digital_land/package/dataset_parquet.py b/digital_land/package/dataset_parquet.py index b39aacea7..39fe9175e 100644 --- a/digital_land/package/dataset_parquet.py +++ b/digital_land/package/dataset_parquet.py @@ -196,50 +196,47 @@ def group_parquet_files( mem = process.memory_info().rss / 1024**2 # Memory in MB logger.info(f"[Memory usage] Before grouping: {mem:.2f} MB") - if self.strategy == "direct": - logger.info(f"No batching required for {str(transformed_parquet_dir)}") - else: - logger.info(f"Batching all files from {str(transformed_parquet_dir)}") - target_bytes = target_mb * 1024 * 1024 - parquet_files = list(transformed_parquet_dir.glob("*.parquet")) - - # List of (file_path, file_size_in_bytes) - file_sizes = [(f, f.stat().st_size) for f in parquet_files] - file_sizes.sort(key=lambda x: x[1], reverse=True) - - batches = [] - - # apply first-fit decreasing heuristic - for f, size in file_sizes: - placed = False - for batch in batches: - if batch["total_size"] + size <= target_bytes: - batch["files"].append(f) - batch["total_size"] += size - placed = True - break - if not placed: - # Start a new batch - batches.append({"files": [f], "total_size": size}) - - digits = max(2, len(str(len(batches) - 1))) - batch_dir = transformed_parquet_dir / "batch" - batch_dir.mkdir(parents=True, exist_ok=True) - for i, batch in enumerate(batches): - files = batch["files"] - output_file = batch_dir / f"batch_{i:0{digits}}.parquet" - files_str = ", ".join(f"'{str(f)}'" for f in files) - query = f""" - COPY ( - SELECT * FROM read_parquet([{files_str}]) - ) TO '{str(output_file)}' (FORMAT PARQUET) - """ - self.conn.execute(query) + logger.info(f"Batching all files from {str(transformed_parquet_dir)}") + target_bytes = target_mb * 1024 * 1024 + parquet_files = list(transformed_parquet_dir.glob("*.parquet")) + + # List of (file_path, file_size_in_bytes) + file_sizes = [(f, f.stat().st_size) for f in parquet_files] + file_sizes.sort(key=lambda x: x[1], reverse=True) + + batches = [] + + # apply first-fit decreasing heuristic + for f, size in file_sizes: + placed = False + for batch in batches: + if batch["total_size"] + size <= target_bytes: + batch["files"].append(f) + batch["total_size"] += size + placed = True + break + if not placed: + # Start a new batch + batches.append({"files": [f], "total_size": size}) + + digits = max(2, len(str(len(batches) - 1))) + batch_dir = transformed_parquet_dir / "batch" + batch_dir.mkdir(parents=True, exist_ok=True) + for i, batch in enumerate(batches): + files = batch["files"] + output_file = batch_dir / f"batch_{i:0{digits}}.parquet" + files_str = ", ".join(f"'{str(f)}'" for f in files) + query = f""" + COPY ( + SELECT * FROM read_parquet([{files_str}]) + ) TO '{str(output_file)}' (FORMAT PARQUET) + """ + self.conn.execute(query) - # Should we delete the files now that they have been 'batched'? - if delete_originals: - for f in files: - f.unlink() + # Should we delete the files now that they have been 'batched'? + if delete_originals: + for f in files: + f.unlink() process = psutil.Process(os.getpid()) mem = process.memory_info().rss / 1024**2 # Memory in MB From 58002c044a9a7a23583f34c233d585f4980322ef Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Fri, 9 May 2025 12:18:17 +0100 Subject: [PATCH 20/26] Added test for batch processing of parquet files --- .../package/test_dataset_parquet.py | 119 ++++++++++++++++++ 1 file changed, 119 insertions(+) diff --git a/tests/integration/package/test_dataset_parquet.py b/tests/integration/package/test_dataset_parquet.py index c20a92dca..64ceb1cb7 100644 --- a/tests/integration/package/test_dataset_parquet.py +++ b/tests/integration/package/test_dataset_parquet.py @@ -8,6 +8,8 @@ import pyarrow.parquet as pq import pyarrow as pa from digital_land.package.dataset_parquet import DatasetParquetPackage +import random +import string class MockOrganisation(object): @@ -497,6 +499,123 @@ def test_load_facts_one_file_with_empty_file(data, expected, tmp_path): assert df.shape[1] == 9, "Not all columns saved in fact.parquet file" +@pytest.mark.parametrize( + "data1,data2,expected_facts,expected_entities", + [(transformed_1_data, transformed_2_data, 35 * 100 / 2, 3 * 100)], +) +def test_load_functions_batch( + data1, data2, expected_facts, expected_entities, tmp_path, org_path, resource_path +): + """ + test loading multiple files into the fact table when they're from a single directory and batched into a single file + """ + # Use current data, but edit the fact and entities so that we have many more facts and entities than current test + # data gives us + df1 = pd.DataFrame.from_dict(data1) + df2 = pd.DataFrame.from_dict(data2) + df1_copy = df1.copy() + df2_copy = df2.copy() + ldf1 = len(df1_copy) + one_third1 = ldf1 // 3 + two_third1 = 2 * ldf1 // 3 + ldf2 = len(df2_copy) + one_third2 = ldf2 // 3 + two_third2 = 2 * ldf2 // 3 + transformed_parquet_dir = tmp_path / "transformed" + transformed_parquet_dir.mkdir(parents=True, exist_ok=True) + random.seed(42) # Set seed to ensure consistency + chars = string.ascii_lowercase + string.digits + # Create 100 files so that strategy is 'batch' + for i in range(100): + # Change the fact and entity values to ensure uniqueness + base = i * 10 # base entity value (assigned to first third of rows) + if i % 2 == 0: + df1_copy = df1_copy.assign( + fact=["".join(random.choices(chars, k=10)) for _ in range(ldf1)], + entity=[base] * one_third1 + + [base + 1] * (two_third1 - one_third1) + + [base + 2] * (ldf1 - two_third1), + ) + df1_copy.to_parquet( + transformed_parquet_dir / f"transformed_resource_{i}.parquet", + index=False, + ) + else: + df2_copy = df2_copy.assign( + fact=["".join(random.choices(chars, k=10)) for _ in range(ldf2)], + entity=[base] * one_third2 + + [base + 1] * (two_third2 - one_third2) + + [base + 2] * (ldf2 - two_third2), + ) + df2_copy.to_parquet( + transformed_parquet_dir / f"transformed_resource_{i}.parquet", + index=False, + ) + + package = DatasetParquetPackage( + dataset="conservation-area", + path=tmp_path / "conservation-area", + specification_dir=None, + transformed_parquet_dir=transformed_parquet_dir, + ) + + package.group_parquet_files(transformed_parquet_dir=transformed_parquet_dir) + package.load_facts(transformed_parquet_dir=transformed_parquet_dir) + package.load_fact_resource(transformed_parquet_dir=transformed_parquet_dir) + package.load_entities(transformed_parquet_dir, resource_path, org_path) + + # test facts + output_file = ( + tmp_path + / "conservation-area" + / "fact" + / "dataset=conservation-area" + / "fact.parquet" + ) + assert os.path.exists(output_file), "fact.parquet file does not exist" + + df = pd.read_parquet(output_file) + + assert len(df) > 0, "No data in fact.parquet file" + assert ( + len(df) == expected_facts + ), "No. of facts does not match expected" # No of unique facts + assert df.shape[1] == 9, "Not all columns saved in fact.parquet file" + + # test fact_resource + output_file = ( + tmp_path + / "conservation-area" + / "fact-resource" + / "dataset=conservation-area" + / "fact-resource.parquet" + ) + assert os.path.exists(output_file), "fact-resource.parquet file does not exist" + + df = pd.read_parquet(output_file) + + assert len(df) > 0, "No data in fact-resource,parquet file" + assert len(df) == expected_facts, "Not all data saved in fact-resource.parquet file" + + assert df.shape[1] == 7, "Not all columns saved in fact-resource.parquet file" + + # test entities + output_file = ( + tmp_path + / "conservation-area" + / "entity" + / "dataset=conservation-area" + / "entity.parquet" + ) + assert os.path.exists(output_file), "entity.parquet file does not exist" + + df = pd.read_parquet(output_file) + + assert len(df) > 0, "No data in entity.parquet file" + assert len(df) == expected_entities, "No. of entities is not correct" + assert df["entity"].nunique() == len(df), "Entity column contains duplicate values" + + # @pytest.mark.parametrize( # "data1,data2,expected", [(transformed_1_data, transformed_2_data, 35)] # ) From 635e5a5cc32139f76de60986a1346f837675af25 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Fri, 9 May 2025 12:26:17 +0100 Subject: [PATCH 21/26] Remove commented out test --- .../package/test_dataset_parquet.py | 103 ------------------ 1 file changed, 103 deletions(-) diff --git a/tests/integration/package/test_dataset_parquet.py b/tests/integration/package/test_dataset_parquet.py index 64ceb1cb7..8797385e3 100644 --- a/tests/integration/package/test_dataset_parquet.py +++ b/tests/integration/package/test_dataset_parquet.py @@ -616,109 +616,6 @@ def test_load_functions_batch( assert df["entity"].nunique() == len(df), "Entity column contains duplicate values" -# @pytest.mark.parametrize( -# "data1,data2,expected", [(transformed_1_data, transformed_2_data, 35)] -# ) -# def test_load_facts_from_temp_parquet(data1, data2, expected, tmp_path): -# """ -# tests loading from a directory when there are multiple files, loads them -# into a single parquet file and gets facts from that file -# """ -# # convert data to df's and save to a single parquet file -# df1 = pd.DataFrame.from_dict(data1) -# df2 = pd.DataFrame.from_dict(data2) -# transformed_parquet_dir = tmp_path / "transformed" -# transformed_parquet_dir.mkdir(parents=True, exist_ok=True) -# df1.to_parquet( -# transformed_parquet_dir / "transformed_resource_1.parquet", index=False -# ) -# df2.to_parquet( -# transformed_parquet_dir / "transformed_resource_2.parquet", index=False -# ) -# -# package = DatasetParquetPackage( -# dataset="conservation-area", -# path=tmp_path / "conservation-area", -# specification_dir=None, -# ) -# temp_parquet = package.load_details_into_temp_parquet( -# transformed_parquet_dir=transformed_parquet_dir -# ) -# assert os.path.exists(temp_parquet), "temp parquet file not created" -# package.load_facts_from_temp_parquet( -# transformed_parquet_dir=transformed_parquet_dir, temp_parquet=temp_parquet -# ) -# -# output_file = ( -# tmp_path -# / "conservation-area" -# / "fact" -# / "dataset=conservation-area" -# / "fact.parquet" -# ) -# assert os.path.exists(output_file), "fact.parquet file does not exist" -# -# df = pd.read_parquet(output_file) -# -# assert len(df) > 0, "No data in fact.parquet file" -# assert ( -# len(df) == expected -# ), "No. of facts does not match expected" # No of unique facts -# assert df.shape[1] == 9, "Not all columns saved in fact.parquet file" -# -# -# @pytest.mark.parametrize( -# "data_1,data_2,expected", [(transformed_1_data, transformed_2_data, 35)] -# ) -# def test_load_fact_resource_from_temp_parquet(data_1, data_2, expected, tmp_path): -# """ -# tests loading from a directory when there are multiple files, loads them -# into a single parquet file and gets fact_resources from that file -# """ -# # convert data to df's and save to a single parquet file -# df_1 = pd.DataFrame.from_dict(data_1) -# df_2 = pd.DataFrame.from_dict(data_2) -# transformed_parquet_dir = tmp_path / "transformed" -# transformed_parquet_dir.mkdir(parents=True, exist_ok=True) -# df_1.to_parquet( -# transformed_parquet_dir / "transformed_resource_1.parquet", index=False -# ) -# df_2.to_parquet( -# transformed_parquet_dir / "transformed_resource_2.parquet", index=False -# ) -# -# package = DatasetParquetPackage( -# dataset="conservation-area", -# path=tmp_path / "conservation-area", -# specification_dir=None, -# ) -# temp_parquet = package.load_details_into_temp_parquet( -# transformed_parquet_dir=transformed_parquet_dir -# ) -# assert os.path.exists(temp_parquet), "temp parquet file not created" -# package.load_fact_resource_from_temp_parquet( -# transformed_parquet_dir=transformed_parquet_dir, temp_parquet=temp_parquet -# ) -# -# # Check if the output parquet file exists and verify contents -# output_file = ( -# tmp_path -# / "conservation-area" -# / "fact-resource" -# / "dataset=conservation-area" -# / "fact-resource.parquet" -# ) -# assert os.path.exists(output_file), "fact-resource.parquet file does not exist" -# -# # Load Parquet into a DataFrame to verify data correctness -# df = pd.read_parquet(output_file) -# -# assert len(df) > 0, "No data in fact-resource,parquet file" -# assert len(df) == expected, "Not all data saved in fact-resource.parquet file" -# -# assert df.shape[1] == 7, "Not all columns saved in fact-resource.parquet file" - - @pytest.mark.parametrize("data,expected", [(transformed_1_data, 16)]) def test_load_fact_resource_single_file(data, expected, tmp_path): From 0fb9035f9271e53e9bf116b1949110362912d3d3 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Fri, 9 May 2025 15:25:25 +0100 Subject: [PATCH 22/26] Removed old functions using a single temporary parquet file --- digital_land/package/dataset_parquet.py | 142 ------------------------ 1 file changed, 142 deletions(-) diff --git a/digital_land/package/dataset_parquet.py b/digital_land/package/dataset_parquet.py index 39fe9175e..f3a2dac87 100644 --- a/digital_land/package/dataset_parquet.py +++ b/digital_land/package/dataset_parquet.py @@ -445,148 +445,6 @@ def load_fact_resource(self, transformed_parquet_dir): """ ) - # def load_details_into_temp_parquet(self, transformed_parquet_dir): - # """ - # Save all details into a temporary parquet file, to use later to - # """ - # logger.info(f"loading all details from {str(transformed_parquet_dir)}") - # temp_dir = Path(tempfile.mkdtemp()) - # temp_dir.mkdir(parents=True, exist_ok=True) - # output_path = temp_dir / "temp_table.parquet" - # - # process = psutil.Process(os.getpid()) - # mem = process.memory_info().rss / 1024**2 # Memory in MB - # logger.info(f"[Memory usage] At start: {mem:.2f} MB") - # - # fact_fields = self.specification.schema["fact"]["fields"] - # fact_resource_fields = self.specification.schema["fact-resource"]["fields"] - # fields = list(set([*fact_fields, *fact_resource_fields])) - # fields_str = ", ".join([field.replace("-", "_") for field in fields]) - # - # # All parquet files to be loaded into a temporary parquet file. Extract needed columns and export - # # Need to add entry_number as it is needed for fact table - # query = f""" - # SELECT {fields_str}, entry_number - # FROM '{str(transformed_parquet_dir)}/*.parquet' - # """ - # - # self.conn.execute( - # f""" - # COPY ( - # {query} - # ) TO '{str(output_path)}' (FORMAT PARQUET); - # """ - # ) - # - # process = psutil.Process(os.getpid()) - # mem = process.memory_info().rss / 1024**2 # Memory in MB - # logger.info(f"[Memory usage] At end of temp_parquet query: {mem:.2f} MB") - # - # return output_path - # - # def load_facts_from_temp_parquet( - # self, transformed_parquet_dir, temp_parquet, parquet_dir_details - # ): - # """ - # This method loads facts into a parquet table using the temporary parquet table from earlier - # """ - # output_path = self.fact_path - # output_path.parent.mkdir(parents=True, exist_ok=True) - # logger.info("loading facts") - # - # fact_fields = self.specification.schema["fact"]["fields"] - # fields_str = ", ".join([field.replace("-", "_") for field in fact_fields]) - # - # process = psutil.Process(os.getpid()) - # mem = process.memory_info().rss / 1024**2 # Memory in MB - # logger.info(f"[Memory usage] At start: {mem:.2f} MB") - # - # # query to extract data from the temp table (containing raw data), group by a fact, and get the highest - # # priority or latest record - # # Max partition size should either be 1GB or twice the maximum - # max_partition_memory_mb = 1024 # Set it to be 1GB per partition - # n_buckets = parquet_dir_details["total_size_mb"] // max_partition_memory_mb + 1 - # n_buckets = min(n_buckets, parquet_dir_details["no_parquet_files"]) - # bucket_outputs = [] - # - # # Using 'with' to ensure temp_dir is cleaned up if there are any issues - # with tempfile.TemporaryDirectory(prefix="duckdb_buckets_") as temp_dir_str: - # temp_dir = Path(temp_dir_str) - # - # # Split this query into buckets to avoid using too much memory at the same time. - # for bucket in range(n_buckets): - # output_file = temp_dir / f"bucket_{bucket}.parquet" - # - # query = f""" - # COPY ( - # SELECT {fields_str} - # FROM read_parquet('{temp_parquet}') - # WHERE MOD(HASH(fact), {n_buckets}) = {bucket} - # QUALIFY ROW_NUMBER() OVER ( - # PARTITION BY fact ORDER BY priority, entry_date DESC, entry_number DESC - # ) = 1 - # ) TO '{output_file}' (FORMAT PARQUET); - # """ - # self.conn.execute(query) - # bucket_outputs.append(output_file) - # - # process = psutil.Process(os.getpid()) - # mem = process.memory_info().rss / 1024**2 # Memory in MB - # logger.info(f"[Memory usage] At end of query: {mem:.2f} MB") - # - # files_str = ", ".join([f"'{str(f)}'" for f in bucket_outputs]) - # - # self.conn.execute( - # f""" - # COPY ( - # SELECT * FROM read_parquet([{files_str}]) - # ) TO '{output_path}' (FORMAT PARQUET); - # """ - # ) - # - # process = psutil.Process(os.getpid()) - # mem = process.memory_info().rss / 1024**2 # Memory in MB - # logger.info(f"[Memory usage] For concatenation: {mem:.2f} MB") - # - # def load_fact_resource_from_temp_parquet( - # self, transformed_parquet_dir, temp_parquet - # ): - # """ - # This method loads fact resources into a parquet table using the temporary parquet table from earlier - # """ - # output_path = self.fact_resource_path - # output_path.parent.mkdir(parents=True, exist_ok=True) - # logger.info("loading fact resources") - # - # fact_resource_fields = self.specification.schema["fact-resource"]["fields"] - # fields_str = ", ".join( - # [field.replace("-", "_") for field in fact_resource_fields] - # ) - # - # # All CSV files have been loaded into a temporary table. Extract required columns and export - # process = psutil.Process(os.getpid()) - # mem = process.memory_info().rss / 1024**2 # Memory in MB - # logger.info(f"[Memory usage] At start: {mem:.2f} MB") - # - # query = f""" - # SELECT {fields_str} - # FROM '{temp_parquet}' - # """ - # self.conn.execute( - # f""" - # COPY ( - # {query} - # ) TO '{str(output_path)}' (FORMAT PARQUET); - # """ - # ) - # - # process = psutil.Process(os.getpid()) - # mem = process.memory_info().rss / 1024**2 - # logger.info(f"[Memory usage] At end of query: {mem:.2f} MB") - # - # # parquet file no longer needed, remove to clear up memory - # os.remove(temp_parquet) - def load_entities_range( self, transformed_parquet_dir, From c669ba6f5ea874a2443b9fd2e7b52458217489d6 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Fri, 9 May 2025 15:34:15 +0100 Subject: [PATCH 23/26] Removed comments --- digital_land/package/dataset_parquet.py | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/digital_land/package/dataset_parquet.py b/digital_land/package/dataset_parquet.py index f3a2dac87..3c8980cd8 100644 --- a/digital_land/package/dataset_parquet.py +++ b/digital_land/package/dataset_parquet.py @@ -400,20 +400,6 @@ def load_facts(self, transformed_parquet_dir): mem = process.memory_info().rss / 1024**2 # Memory in MB logger.info(f"[Memory usage] At end of query: {mem:.2f} MB") - # files_str = ", ".join([f"'{str(f)}'" for f in bucket_outputs]) - # - # self.conn.execute( - # f""" - # COPY ( - # SELECT * FROM read_parquet([{files_str}]) - # ) TO '{output_path}' (FORMAT PARQUET); - # """ - # ) - # - # process = psutil.Process(os.getpid()) - # mem = process.memory_info().rss / 1024**2 # Memory in MB - # logger.info(f"[Memory usage] For concatenation: {mem:.2f} MB") - def load_fact_resource(self, transformed_parquet_dir): logger.info(f"loading fact resources from {str(transformed_parquet_dir)}") output_path = self.fact_resource_path From 4f4c91092085b580a4ed64fe12dba58429657d1d Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Fri, 9 May 2025 16:12:20 +0100 Subject: [PATCH 24/26] Minor edit to comment --- digital_land/package/dataset_parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/digital_land/package/dataset_parquet.py b/digital_land/package/dataset_parquet.py index 3c8980cd8..cdabdf631 100644 --- a/digital_land/package/dataset_parquet.py +++ b/digital_land/package/dataset_parquet.py @@ -285,7 +285,7 @@ def load_facts(self, transformed_parquet_dir): ) n_buckets = 1 else: - # Max partition size should the smallest value of either be 4GB or 1/4 of the available memory + # Max partition size should the smallest value of either be 2GB or 1/4 of the available memory max_partition_memory_mb = min( 2048, self.parquet_dir_details["memory_available"] / 4 ) From 8eea573ddf3a6aac55d86b9550ff77b64cac8971 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Mon, 12 May 2025 14:28:08 +0100 Subject: [PATCH 25/26] Edits to logger info --- digital_land/package/dataset_parquet.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/digital_land/package/dataset_parquet.py b/digital_land/package/dataset_parquet.py index cdabdf631..49fe0b657 100644 --- a/digital_land/package/dataset_parquet.py +++ b/digital_land/package/dataset_parquet.py @@ -372,7 +372,7 @@ def load_facts(self, transformed_parquet_dir): """ ) logger.info( - f"Have {len(list(transformed_parquet_dir.glob('result_*.parquet')))} result files" + f"Have {len(list(result_dir.glob('result_*.parquet')))} result files" ) # for path in bucket_paths: @@ -392,7 +392,8 @@ def load_facts(self, transformed_parquet_dir): PARQUET); """ ) - + logger.info(f"output_path: {output_path}") + logger.info(f"output_path exists: {os.path.exists(output_path)}") # for path in result_paths: # path.unlink(missing_ok=True) From f06d1583bece29884623b6d486e80558caf1b30d Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Mon, 12 May 2025 14:37:41 +0100 Subject: [PATCH 26/26] Edit to output_path --- digital_land/package/dataset_parquet.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/digital_land/package/dataset_parquet.py b/digital_land/package/dataset_parquet.py index 49fe0b657..7db81b899 100644 --- a/digital_land/package/dataset_parquet.py +++ b/digital_land/package/dataset_parquet.py @@ -387,9 +387,7 @@ def load_facts(self, transformed_parquet_dir): COPY( SELECT * FROM read_parquet('{str(result_dir)}/result_*.parquet') - ) TO - '{output_path}.parquet'(FORMAT - PARQUET); + ) TO '{str(output_path)}' (FORMAT PARQUET); """ ) logger.info(f"output_path: {output_path}")