diff --git a/digital_land/commands.py b/digital_land/commands.py index 172c04ac9..204937176 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -477,7 +477,19 @@ def dataset_create( path=dataset_parquet_path, specification_dir=None, # TBD: package should use this specification object duckdb_path=cache_dir / "overflow.duckdb", + transformed_parquet_dir=transformed_parquet_dir, ) + # To find facts we have a complex SQL window function that can cause memory issues. To aid the allocation of memory + # we decide on a parquet strategy, based on how many parquet files we have, the overall size of these + # files and the available memory. We will look at the following strategies: + # 1) if we have a small number of files or the total size of the files is small then we can run the SQL over all of + # these files. + # 2) Grouping the parquet files into 256MB batches. Then running SQL either on all of these batches at once, or + # bucketing the data so that we run the window SQL function on a subset of facts (then concatenate them) + + # Group parquet files into approx 256MB batches (if needed) + if pqpackage.strategy != "direct": + pqpackage.group_parquet_files(transformed_parquet_dir, target_mb=256) pqpackage.load_facts(transformed_parquet_dir) pqpackage.load_fact_resource(transformed_parquet_dir) pqpackage.load_entities(transformed_parquet_dir, resource_path, organisation_path) diff --git a/digital_land/package/dataset_parquet.py b/digital_land/package/dataset_parquet.py index 0e236165d..7db81b899 100644 --- a/digital_land/package/dataset_parquet.py +++ b/digital_land/package/dataset_parquet.py @@ -1,9 +1,12 @@ import os import logging +import sys + import duckdb import shutil from pathlib import Path from .package import Package +import psutil logger = logging.getLogger(__name__) @@ -28,7 +31,9 @@ class DatasetParquetPackage(Package): - def __init__(self, dataset, path, duckdb_path=None, **kwargs): + def __init__( + self, dataset, path, duckdb_path=None, transformed_parquet_dir=None, **kwargs + ): """ Initialisation method to set up information as needed @@ -64,6 +69,13 @@ def __init__(self, dataset, path, duckdb_path=None, **kwargs): self.entity_path = ( self.path / "entity" / f"dataset={self.dataset}" / "entity.parquet" ) + if transformed_parquet_dir is None: + self.strategy = "direct" + else: + self.parquet_dir_details = self.analyze_parquet_dir( + transformed_parquet_dir=transformed_parquet_dir + ) + self.strategy = self.choose_strategy(self.parquet_dir_details) def get_schema(self): schema = {} @@ -125,6 +137,111 @@ def get_schema(self): # logging.info(f"Failed to read in when max_size = {max_size}") # raise + def analyze_parquet_dir(self, transformed_parquet_dir): + """ + Get details about the transformed_parquet_dir to decide on which strategy to use for + creating the fact and fact_resource tables + """ + files = list(transformed_parquet_dir.glob("*.parquet")) + no_parquet_files = len(files) + total_size_bytes = sum(f.stat().st_size for f in files) + total_size_mb = total_size_bytes / (1024 * 1024) + avg_size_mb = total_size_mb / len(files) if files else 0 + max_size_mb = max(f.stat().st_size for f in files) / (1024 * 1024) + mem = psutil.virtual_memory() + memory_available = mem.total / 1024**2 + + return { + "no_parquet_files": no_parquet_files, + "total_size_mb": total_size_mb, + "avg_size_mb": avg_size_mb, + "max_size_mb": max_size_mb, + "memory_available": memory_available, + } + + def choose_strategy(self, parquet_dir_details): + """ + What strategy should we use to create fact, fact_resource and entity tables: + Return one of: + - "direct" - analyse all parquet files at once + - "batch" - group the parquet files into batch files of approx. 256MB + - Did have the other following as potential strategies but it appears as if 'batch' and 'direct' will suffice + - since batching everything into one fie is the equivalent os the 'single_file' option. + - "single_file" - put all parquet files into a single parquet file + - "consolidate_then_bucket" - put all parquet files into several larger files + """ + # If memory is less than 2GB (or 1/4 of available memory, whichever is smaller) then can potentially process + # them directly + memory_check = min(2048, parquet_dir_details["memory_available"] / 4) + if ( + parquet_dir_details["total_size_mb"] < memory_check + and parquet_dir_details["no_parquet_files"] < 100 + ) or (parquet_dir_details["no_parquet_files"] < 4): + return "direct" + + # if parquet_dir_details["no_parquet_files"] > 500 or parquet_dir_details[ + # "total_size_mb" + # ] > (parquet_dir_details["memory_available"] * 0.75): + # return "consolidate_then_bucket" + + return "batch" + + def group_parquet_files( + self, transformed_parquet_dir, target_mb=256, delete_originals=False + ): + """ + group parquet files into batches, each aiming for approximately 'target_mb' in size. + """ + process = psutil.Process(os.getpid()) + mem = process.memory_info().rss / 1024**2 # Memory in MB + logger.info(f"[Memory usage] Before grouping: {mem:.2f} MB") + + logger.info(f"Batching all files from {str(transformed_parquet_dir)}") + target_bytes = target_mb * 1024 * 1024 + parquet_files = list(transformed_parquet_dir.glob("*.parquet")) + + # List of (file_path, file_size_in_bytes) + file_sizes = [(f, f.stat().st_size) for f in parquet_files] + file_sizes.sort(key=lambda x: x[1], reverse=True) + + batches = [] + + # apply first-fit decreasing heuristic + for f, size in file_sizes: + placed = False + for batch in batches: + if batch["total_size"] + size <= target_bytes: + batch["files"].append(f) + batch["total_size"] += size + placed = True + break + if not placed: + # Start a new batch + batches.append({"files": [f], "total_size": size}) + + digits = max(2, len(str(len(batches) - 1))) + batch_dir = transformed_parquet_dir / "batch" + batch_dir.mkdir(parents=True, exist_ok=True) + for i, batch in enumerate(batches): + files = batch["files"] + output_file = batch_dir / f"batch_{i:0{digits}}.parquet" + files_str = ", ".join(f"'{str(f)}'" for f in files) + query = f""" + COPY ( + SELECT * FROM read_parquet([{files_str}]) + ) TO '{str(output_file)}' (FORMAT PARQUET) + """ + self.conn.execute(query) + + # Should we delete the files now that they have been 'batched'? + if delete_originals: + for f in files: + f.unlink() + + process = psutil.Process(os.getpid()) + mem = process.memory_info().rss / 1024**2 # Memory in MB + logger.info(f"[Memory usage] After grouping: {mem:.2f} MB") + def load_facts(self, transformed_parquet_dir): """ This method loads facts into a fact table from a directory containing all transformed files as parquet files @@ -136,23 +253,151 @@ def load_facts(self, transformed_parquet_dir): fact_fields = self.specification.schema["fact"]["fields"] fields_str = ", ".join([field.replace("-", "_") for field in fact_fields]) - # query to extract data from the temp table (containing raw data), group by a fact, and get the highest - # priority or latest record + process = psutil.Process(os.getpid()) + mem = process.memory_info().rss / 1024**2 # Memory in MB + logger.info(f"[Memory usage] At start of load_facts: {mem:.2f} MB") + + # query to extract data from either the transformed parquet files or batched ones. Data is grouped by fact, + # and we get the highest priority or latest record + if self.strategy == "direct": + logger.info("Using direct strategy for facts") + query = f""" + SELECT {fields_str} + FROM '{str(transformed_parquet_dir)}/*.parquet' + QUALIFY ROW_NUMBER() OVER ( + PARTITION BY fact ORDER BY priority, entry_date DESC, entry_number DESC + ) = 1 + """ + self.conn.execute( + f""" + COPY ( + {query} + ) TO '{str(output_path)}' (FORMAT PARQUET); + """ + ) + else: + no_of_batched_files = len( + list(transformed_parquet_dir.glob("batch/batch_*.parquet")) + ) + if no_of_batched_files == 1: + logger.info( + "Only have one batched file for facts - using 'simple' query" + ) + n_buckets = 1 + else: + # Max partition size should the smallest value of either be 2GB or 1/4 of the available memory + max_partition_memory_mb = min( + 2048, self.parquet_dir_details["memory_available"] / 4 + ) + n_buckets = ( + self.parquet_dir_details["total_size_mb"] // max_partition_memory_mb + + 1 + ) + n_buckets = int(min(n_buckets, no_of_batched_files)) + if n_buckets == 0: + sys.exit( + "Have got a value of zero for n_buckets in `load_facts`. Cannot continue." + ) + if n_buckets == 1: + logger.info("Only need one bucket for facts - using 'simple' query") + if no_of_batched_files == 1 or n_buckets == 1: + query = f""" + SELECT {fields_str} + FROM '{str(transformed_parquet_dir)}/batch/batch_*.parquet' + QUALIFY ROW_NUMBER() OVER ( + PARTITION BY fact ORDER BY priority, entry_date DESC, entry_number DESC + ) = 1 + """ + self.conn.execute( + f""" + COPY ( + {query} + ) TO '{str(output_path)}' (FORMAT PARQUET); + """ + ) + else: + # Multiple buckets and batched files used + logger.info( + "Need to use multiple buckets in windowed function for facts" + ) + bucket_dir = transformed_parquet_dir / "bucket" + bucket_dir.mkdir(parents=True, exist_ok=True) + digits = max(2, len(str(n_buckets - 1))) + bucket_paths = [ + bucket_dir / f"bucket_{i:0{digits}}.parquet" + for i in range(n_buckets) + ] + logger.info( + f"Have {len(list(transformed_parquet_dir.glob('batch/batch_*.parquet')))} batch files" + ) + + # Loop over each batch file and assign to a bucket file + logger.info(f"Assigning to {n_buckets} buckets") + for f in transformed_parquet_dir.glob("batch/batch_*.parquet"): + for i in range(n_buckets): + self.conn.execute( + f""" + COPY ( + SELECT * + FROM read_parquet('{f}') + WHERE MOD(HASH(fact), {n_buckets}) = {i} + ) TO '{bucket_paths[i]}' (FORMAT PARQUET, APPEND TRUE); + """ + ) - query = f""" - SELECT {fields_str} - FROM '{str(transformed_parquet_dir)}/*.parquet' - QUALIFY ROW_NUMBER() OVER ( - PARTITION BY fact ORDER BY priority, entry_date DESC, entry_number DESC - ) = 1 - """ - self.conn.execute( - f""" - COPY ( - {query} - ) TO '{str(output_path)}' (FORMAT PARQUET); - """ - ) + logger.info( + f"Have {len(list(bucket_dir.glob('bucket_*.parquet')))} bucket files" + ) + process = psutil.Process(os.getpid()) + mem = process.memory_info().rss / 1024**2 # Memory in MB + logger.info(f"[Memory usage] After 'bucketing': {mem:.2f} MB") + + result_dir = transformed_parquet_dir / "result" + result_dir.mkdir(parents=True, exist_ok=True) + result_paths = [ + result_dir / f"result_{i:0{digits}}.parquet" + for i in range(n_buckets) + ] + for i in range(n_buckets): + bucket_path = bucket_dir / f"bucket_{i:0{digits}}.parquet" + self.conn.execute( + f""" + COPY ( + SELECT * + FROM read_parquet('{bucket_path}') + QUALIFY ROW_NUMBER() OVER ( + PARTITION BY fact ORDER BY priority, entry_date DESC, entry_number DESC + ) = 1 + ) TO '{result_paths[i]}' (FORMAT PARQUET); + """ + ) + logger.info( + f"Have {len(list(result_dir.glob('result_*.parquet')))} result files" + ) + + # for path in bucket_paths: + # path.unlink(missing_ok=True) + + process = psutil.Process(os.getpid()) + mem = process.memory_info().rss / 1024**2 # Memory in MB + logger.info(f"[Memory usage] After 'result': {mem:.2f} MB") + + self.conn.execute( + f""" + COPY( + SELECT * FROM + read_parquet('{str(result_dir)}/result_*.parquet') + ) TO '{str(output_path)}' (FORMAT PARQUET); + """ + ) + logger.info(f"output_path: {output_path}") + logger.info(f"output_path exists: {os.path.exists(output_path)}") + # for path in result_paths: + # path.unlink(missing_ok=True) + + process = psutil.Process(os.getpid()) + mem = process.memory_info().rss / 1024**2 # Memory in MB + logger.info(f"[Memory usage] At end of query: {mem:.2f} MB") def load_fact_resource(self, transformed_parquet_dir): logger.info(f"loading fact resources from {str(transformed_parquet_dir)}") @@ -163,10 +408,18 @@ def load_fact_resource(self, transformed_parquet_dir): [field.replace("-", "_") for field in fact_resource_fields] ) - # All CSV files have been loaded into a temporary table. Extract several columns and export + process = psutil.Process(os.getpid()) + mem = process.memory_info().rss / 1024**2 # Memory in MB + logger.info(f"[Memory usage] At start: {mem:.2f} MB") + + # Extract relevant columns from original parquet or batched parquet files + if self.strategy == "direct": + parquet_str = "*.parquet" + else: + parquet_str = "batch/batch_*.parquet" query = f""" SELECT {fields_str} - FROM '{str(transformed_parquet_dir)}/*.parquet' + FROM '{str(transformed_parquet_dir)}/{parquet_str}' """ self.conn.execute( @@ -210,9 +463,13 @@ def load_entities_range( else: entity_where_clause = "" + if self.strategy == "direct": + parquet_str = "*.parquet" + else: + parquet_str = "batch/batch_*.parquet" query = f""" SELECT DISTINCT REPLACE(field,'-','_') - FROM parquet_scan('{transformed_parquet_dir}/*.parquet') + FROM parquet_scan('{transformed_parquet_dir}/{parquet_str}') {entity_where_clause} """ @@ -292,10 +549,15 @@ def load_entities_range( # craft a where clause to limit entities in quetion, this chunking helps solve memory issues + if self.strategy == "direct": + parquet_str = "*.parquet" + else: + parquet_str = "batch/batch_*.parquet" + query = f""" SELECT {fields_str}{optional_org_str} FROM ( SELECT {fields_str}, CASE WHEN resource_csv."end-date" IS NULL THEN '2999-12-31' ELSE resource_csv."end-date" END AS resource_end_date - FROM parquet_scan('{transformed_parquet_dir}/*.parquet') tf + FROM parquet_scan('{transformed_parquet_dir}/{parquet_str}') tf LEFT JOIN read_csv_auto('{resource_path}', max_line_size=40000000) resource_csv ON tf.resource = resource_csv.resource {entity_where_clause} @@ -388,9 +650,13 @@ def load_entities(self, transformed_parquet_dir, resource_path, organisation_pat output_path.parent.mkdir(parents=True, exist_ok=True) # retrieve entity counnts including and minimum - min_sql = f"select MIN(entity) FROM parquet_scan('{transformed_parquet_dir}/*.parquet');" + if self.strategy == "direct": + parquet_str = "*.parquet" + else: + parquet_str = "batch/batch_*.parquet" + min_sql = f"select MIN(entity) FROM parquet_scan('{transformed_parquet_dir}/{parquet_str}');" min_entity = self.conn.execute(min_sql).fetchone()[0] - max_sql = f"select MAX(entity) FROM parquet_scan('{transformed_parquet_dir}/*.parquet');" + max_sql = f"select MAX(entity) FROM parquet_scan('{transformed_parquet_dir}/{parquet_str}');" max_entity = self.conn.execute(max_sql).fetchone()[0] total_entities = max_entity - min_entity entity_limit = 100000 @@ -428,7 +694,10 @@ def load_entities(self, transformed_parquet_dir, resource_path, organisation_pat shutil.rmtree(temp_dir) else: self.load_entities_range( - transformed_parquet_dir, resource_path, organisation_path, output_path + transformed_parquet_dir, + resource_path, + organisation_path, + output_path, ) def load_to_sqlite(self, sqlite_path): @@ -466,6 +735,7 @@ def load_to_sqlite(self, sqlite_path): ) logger.info("loading fact data") + logger.info(self.fact_path) # insert fact data fact_fields = self.specification.schema["fact"]["fields"] fields_str = ", ".join([field.replace("-", "_") for field in fact_fields]) diff --git a/setup.py b/setup.py index e2d16f876..a051b7356 100644 --- a/setup.py +++ b/setup.py @@ -60,6 +60,7 @@ def get_long_description(): "pygit2", "boto3", "moto", + "psutil", ], entry_points={"console_scripts": ["digital-land=digital_land.cli:cli"]}, setup_requires=["pytest-runner"], diff --git a/tests/integration/package/test_dataset_parquet.py b/tests/integration/package/test_dataset_parquet.py index f94244dd1..8797385e3 100644 --- a/tests/integration/package/test_dataset_parquet.py +++ b/tests/integration/package/test_dataset_parquet.py @@ -8,6 +8,8 @@ import pyarrow.parquet as pq import pyarrow as pa from digital_land.package.dataset_parquet import DatasetParquetPackage +import random +import string class MockOrganisation(object): @@ -374,9 +376,9 @@ def test_load_facts_single_file(data: dict, expected: int, tmp_path): specification_dir=None, ) - # this method is explicitely designed to load facts from the temp table - # however it shouldn't need this, it's dupllicating all of the same data in a emporary space - # we should try leveraging the power of duckdb and parquet. + # this method is explicitly designed to load facts from the temp table + # however it shouldn't need this, it's duplicating all of the same data in a temporary space + # we should try leveraging the power of duckdb and parquet. package.load_facts(transformed_parquet_dir=transformed_parquet_dir) output_file = ( @@ -497,6 +499,123 @@ def test_load_facts_one_file_with_empty_file(data, expected, tmp_path): assert df.shape[1] == 9, "Not all columns saved in fact.parquet file" +@pytest.mark.parametrize( + "data1,data2,expected_facts,expected_entities", + [(transformed_1_data, transformed_2_data, 35 * 100 / 2, 3 * 100)], +) +def test_load_functions_batch( + data1, data2, expected_facts, expected_entities, tmp_path, org_path, resource_path +): + """ + test loading multiple files into the fact table when they're from a single directory and batched into a single file + """ + # Use current data, but edit the fact and entities so that we have many more facts and entities than current test + # data gives us + df1 = pd.DataFrame.from_dict(data1) + df2 = pd.DataFrame.from_dict(data2) + df1_copy = df1.copy() + df2_copy = df2.copy() + ldf1 = len(df1_copy) + one_third1 = ldf1 // 3 + two_third1 = 2 * ldf1 // 3 + ldf2 = len(df2_copy) + one_third2 = ldf2 // 3 + two_third2 = 2 * ldf2 // 3 + transformed_parquet_dir = tmp_path / "transformed" + transformed_parquet_dir.mkdir(parents=True, exist_ok=True) + random.seed(42) # Set seed to ensure consistency + chars = string.ascii_lowercase + string.digits + # Create 100 files so that strategy is 'batch' + for i in range(100): + # Change the fact and entity values to ensure uniqueness + base = i * 10 # base entity value (assigned to first third of rows) + if i % 2 == 0: + df1_copy = df1_copy.assign( + fact=["".join(random.choices(chars, k=10)) for _ in range(ldf1)], + entity=[base] * one_third1 + + [base + 1] * (two_third1 - one_third1) + + [base + 2] * (ldf1 - two_third1), + ) + df1_copy.to_parquet( + transformed_parquet_dir / f"transformed_resource_{i}.parquet", + index=False, + ) + else: + df2_copy = df2_copy.assign( + fact=["".join(random.choices(chars, k=10)) for _ in range(ldf2)], + entity=[base] * one_third2 + + [base + 1] * (two_third2 - one_third2) + + [base + 2] * (ldf2 - two_third2), + ) + df2_copy.to_parquet( + transformed_parquet_dir / f"transformed_resource_{i}.parquet", + index=False, + ) + + package = DatasetParquetPackage( + dataset="conservation-area", + path=tmp_path / "conservation-area", + specification_dir=None, + transformed_parquet_dir=transformed_parquet_dir, + ) + + package.group_parquet_files(transformed_parquet_dir=transformed_parquet_dir) + package.load_facts(transformed_parquet_dir=transformed_parquet_dir) + package.load_fact_resource(transformed_parquet_dir=transformed_parquet_dir) + package.load_entities(transformed_parquet_dir, resource_path, org_path) + + # test facts + output_file = ( + tmp_path + / "conservation-area" + / "fact" + / "dataset=conservation-area" + / "fact.parquet" + ) + assert os.path.exists(output_file), "fact.parquet file does not exist" + + df = pd.read_parquet(output_file) + + assert len(df) > 0, "No data in fact.parquet file" + assert ( + len(df) == expected_facts + ), "No. of facts does not match expected" # No of unique facts + assert df.shape[1] == 9, "Not all columns saved in fact.parquet file" + + # test fact_resource + output_file = ( + tmp_path + / "conservation-area" + / "fact-resource" + / "dataset=conservation-area" + / "fact-resource.parquet" + ) + assert os.path.exists(output_file), "fact-resource.parquet file does not exist" + + df = pd.read_parquet(output_file) + + assert len(df) > 0, "No data in fact-resource,parquet file" + assert len(df) == expected_facts, "Not all data saved in fact-resource.parquet file" + + assert df.shape[1] == 7, "Not all columns saved in fact-resource.parquet file" + + # test entities + output_file = ( + tmp_path + / "conservation-area" + / "entity" + / "dataset=conservation-area" + / "entity.parquet" + ) + assert os.path.exists(output_file), "entity.parquet file does not exist" + + df = pd.read_parquet(output_file) + + assert len(df) > 0, "No data in entity.parquet file" + assert len(df) == expected_entities, "No. of entities is not correct" + assert df["entity"].nunique() == len(df), "Entity column contains duplicate values" + + @pytest.mark.parametrize("data,expected", [(transformed_1_data, 16)]) def test_load_fact_resource_single_file(data, expected, tmp_path):