From 3bfc2cce9e25e68e5fc7958dfff03a5b2e60385c Mon Sep 17 00:00:00 2001 From: Christopher Johns Date: Mon, 25 Nov 2024 14:04:49 +0000 Subject: [PATCH 01/54] Use passed dataset namne typology from the spec. --- digital_land/package/datasetparquet.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/digital_land/package/datasetparquet.py b/digital_land/package/datasetparquet.py index 36f5b161f..3f21b16be 100644 --- a/digital_land/package/datasetparquet.py +++ b/digital_land/package/datasetparquet.py @@ -1,6 +1,5 @@ import os import logging -from pathlib import Path import duckdb from .package import Package @@ -37,6 +36,7 @@ def __init__(self, dataset, input_paths, **kwargs): self.duckdb_file = "input_paths_database.duckdb" self.conn = duckdb.connect(self.duckdb_file) self.schema = self.get_schema(input_paths) + self.typology = self.specification.schema[dataset]["typology"] def get_schema(self, input_paths): # There are issues with the schema when reading in lots of files, namely smaller files have few or zero rows @@ -223,7 +223,6 @@ def load_entities( SELECT * FROM read_csv_auto('{org_csv}') """ - dataset = Path(output_path).name sql = f""" INSTALL spatial; LOAD spatial; COPY( @@ -236,8 +235,8 @@ def load_entities( ELSE point END AS point FROM ( - SELECT '{dataset}' as dataset, - '{dataset}' as typology, + SELECT '{self.dataset}' as dataset, + '{self.typology}' as typology, t2.entity as organisation_entity, {select_statement}, {null_fields_statement}, From df661b632d21e2cb8ce939b06c0ce2e738844f66 Mon Sep 17 00:00:00 2001 From: Christopher Johns Date: Mon, 25 Nov 2024 14:14:05 +0000 Subject: [PATCH 02/54] Changed dataset used in test. --- tests/integration/test_package_datasetparquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_package_datasetparquet.py b/tests/integration/test_package_datasetparquet.py index 331bce18d..b706f08d2 100644 --- a/tests/integration/test_package_datasetparquet.py +++ b/tests/integration/test_package_datasetparquet.py @@ -462,7 +462,7 @@ def test_dataset_parquet_package(temp_dir): # Instantiate the DatasetParquetPackage with temp_dir input paths and a mock schema package = DatasetParquetPackage( - dataset="test_dataset", input_paths=input_paths, specification_dir=None + dataset="conservation-area", input_paths=input_paths, specification_dir=None ) package.create_temp_table(input_paths) From 41c112e31a675996338349e450853f42d0bb39a7 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Mon, 25 Nov 2024 15:59:53 +0000 Subject: [PATCH 03/54] Ensure 'organisation' is not included in jsom_fields --- digital_land/package/datasetparquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/digital_land/package/datasetparquet.py b/digital_land/package/datasetparquet.py index 3f21b16be..e1c70eebd 100644 --- a/digital_land/package/datasetparquet.py +++ b/digital_land/package/datasetparquet.py @@ -149,7 +149,7 @@ def load_entities( # json fields - list of fields which are present in the fact table which # do not exist separately in the entity table - json_fields = [field for field in distinct_fields if field not in entity_fields] + json_fields = [field for field in distinct_fields if field not in entity_fields + ['organisation']] # null fields - list of fields which are not present in the fact tables which have # to be in the entity table as a column From ce104251d83813e9e111f752f77a7241c155cdea Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Mon, 25 Nov 2024 16:04:32 +0000 Subject: [PATCH 04/54] Ran black on code --- digital_land/package/datasetparquet.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/digital_land/package/datasetparquet.py b/digital_land/package/datasetparquet.py index e1c70eebd..a83e03535 100644 --- a/digital_land/package/datasetparquet.py +++ b/digital_land/package/datasetparquet.py @@ -149,8 +149,12 @@ def load_entities( # json fields - list of fields which are present in the fact table which # do not exist separately in the entity table - json_fields = [field for field in distinct_fields if field not in entity_fields + ['organisation']] - + json_fields = [ + field + for field in distinct_fields + if field not in entity_fields + ["organisation"] + ] +:wq # null fields - list of fields which are not present in the fact tables which have # to be in the entity table as a column extra_fields = [ From 133d577ef13c86e5d67d48500602a1eafbb98a85 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Mon, 25 Nov 2024 16:21:38 +0000 Subject: [PATCH 05/54] Print statements to serach where _geoms are --- digital_land/package/datasetparquet.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/digital_land/package/datasetparquet.py b/digital_land/package/datasetparquet.py index a83e03535..0a03566fc 100644 --- a/digital_land/package/datasetparquet.py +++ b/digital_land/package/datasetparquet.py @@ -154,7 +154,7 @@ def load_entities( for field in distinct_fields if field not in entity_fields + ["organisation"] ] -:wq + # null fields - list of fields which are not present in the fact tables which have # to be in the entity table as a column extra_fields = [ @@ -227,6 +227,13 @@ def load_entities( SELECT * FROM read_csv_auto('{org_csv}') """ + print("\n") + print("select_statement") + print(select_statement) + print("null_fields_statement") + print(null_fields_statement) + print("\n") + sql = f""" INSTALL spatial; LOAD spatial; COPY( From 96fa0bd3d606c655058e34d334fc05cd5a72df89 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Mon, 25 Nov 2024 16:31:00 +0000 Subject: [PATCH 06/54] Removed '_geom' from null fields statement --- digital_land/package/datasetparquet.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/digital_land/package/datasetparquet.py b/digital_land/package/datasetparquet.py index 0a03566fc..d578096bb 100644 --- a/digital_land/package/datasetparquet.py +++ b/digital_land/package/datasetparquet.py @@ -211,8 +211,9 @@ def load_entities( # include columns in the json statement # Collate list of fields which don't exist but need to be in the final table select_statement = ", ".join([f"t1.{field}" for field in select_fields]) + # Don't want to include anything that ends with "_geom" null_fields_statement = ", ".join( - [f'NULL::VARCHAR AS "{field}"' for field in null_fields] + [f'NULL::VARCHAR AS "{field}"' for field in null_fields if not field.endswith("_geom")] ) json_statement = ", ".join( [ From e95644935d3179ac2e96136eb1b3ce0036b2c5d5 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Mon, 25 Nov 2024 16:35:44 +0000 Subject: [PATCH 07/54] Removed print statements --- digital_land/package/datasetparquet.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/digital_land/package/datasetparquet.py b/digital_land/package/datasetparquet.py index d578096bb..5d978a44f 100644 --- a/digital_land/package/datasetparquet.py +++ b/digital_land/package/datasetparquet.py @@ -149,6 +149,7 @@ def load_entities( # json fields - list of fields which are present in the fact table which # do not exist separately in the entity table + # Need to ensure that 'organisation' is not included either json_fields = [ field for field in distinct_fields @@ -213,7 +214,11 @@ def load_entities( select_statement = ", ".join([f"t1.{field}" for field in select_fields]) # Don't want to include anything that ends with "_geom" null_fields_statement = ", ".join( - [f'NULL::VARCHAR AS "{field}"' for field in null_fields if not field.endswith("_geom")] + [ + f'NULL::VARCHAR AS "{field}"' + for field in null_fields + if not field.endswith("_geom") + ] ) json_statement = ", ".join( [ @@ -228,13 +233,6 @@ def load_entities( SELECT * FROM read_csv_auto('{org_csv}') """ - print("\n") - print("select_statement") - print(select_statement) - print("null_fields_statement") - print(null_fields_statement) - print("\n") - sql = f""" INSTALL spatial; LOAD spatial; COPY( From b0b391c4455377e027742a3c533c85f61114bd59 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Mon, 25 Nov 2024 16:44:49 +0000 Subject: [PATCH 08/54] Altered code as '_geom' columns no longer in output --- tests/integration/test_package_datasetparquet.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tests/integration/test_package_datasetparquet.py b/tests/integration/test_package_datasetparquet.py index b706f08d2..d33b927f5 100644 --- a/tests/integration/test_package_datasetparquet.py +++ b/tests/integration/test_package_datasetparquet.py @@ -199,8 +199,6 @@ def test_dataset_parquet_package(temp_dir): for row in data: f.write(",".join(map(str, row)) + "\n") - # df = pd.read_csv(input_paths[0]) - # Test data for the tables. This has plenty of 'duplicates' to check data = [ [ @@ -534,11 +532,9 @@ def test_load_entities_basic(test_dataset_parquet_package, temp_dir): df = pd.read_parquet(output_file) assert len(df) > 0, "No data in entity.parquet file" assert len(df) == 11, "No. of entities is not correct" - assert df.shape[1] == 16, "Not all columns saved in entity.parquet file" + assert df.shape[1] == 14, "Not all columns saved in entity.parquet file" assert df["end_date"].isnull().all() # Check null handling assert df["geojson"].isnull().all() # Check null handling - assert df["geometry_geom"].isnull().all() # Check null handling - assert df["point_geom"].isnull().all() # Check null handling def test_load_pq_to_sqlite_basic(test_dataset_parquet_package, temp_dir): From b8c5b04c51437dcf4c68d199ea88fea34d22a717 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Tue, 26 Nov 2024 10:45:26 +0000 Subject: [PATCH 09/54] Commented out parquet commands to check old sqlite outputs --- digital_land/commands.py | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index d6373057e..8eac10950 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -395,23 +395,23 @@ def dataset_create( package.add_counts() - # Repeat for parquet - # Set up cache directory to store parquet files. The sqlite files created from this will be saved in the dataset - if not os.path.exists(cache_dir): - os.makedirs(cache_dir) - - pqpackage = DatasetParquetPackage( - dataset, - path=output_path, - input_paths=input_paths, - specification_dir=None, # TBD: package should use this specification object - ) - pqpackage.create_temp_table(input_paths) - pqpackage.load_facts(input_paths, cache_dir) - pqpackage.load_fact_resource(input_paths, cache_dir) - pqpackage.load_entities(input_paths, cache_dir, organisation_path) - pqpackage.pq_to_sqlite(output_path, cache_dir) - pqpackage.close_conn() + # # Repeat for parquet + # # Set up cache directory to store parquet files. The sqlite files created from this will be saved in the dataset + # if not os.path.exists(cache_dir): + # os.makedirs(cache_dir) + # + # pqpackage = DatasetParquetPackage( + # dataset, + # path=output_path, + # input_paths=input_paths, + # specification_dir=None, # TBD: package should use this specification object + # ) + # pqpackage.create_temp_table(input_paths) + # pqpackage.load_facts(input_paths, cache_dir) + # pqpackage.load_fact_resource(input_paths, cache_dir) + # pqpackage.load_entities(input_paths, cache_dir, organisation_path) + # pqpackage.pq_to_sqlite(output_path, cache_dir) + # pqpackage.close_conn() def dataset_dump(input_path, output_path): From 4b5cfab0f60ad31975495ffa6b59b8d373d0e2f7 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Tue, 26 Nov 2024 10:46:55 +0000 Subject: [PATCH 10/54] Added parquet commands back in --- digital_land/commands.py | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index 8eac10950..d6373057e 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -395,23 +395,23 @@ def dataset_create( package.add_counts() - # # Repeat for parquet - # # Set up cache directory to store parquet files. The sqlite files created from this will be saved in the dataset - # if not os.path.exists(cache_dir): - # os.makedirs(cache_dir) - # - # pqpackage = DatasetParquetPackage( - # dataset, - # path=output_path, - # input_paths=input_paths, - # specification_dir=None, # TBD: package should use this specification object - # ) - # pqpackage.create_temp_table(input_paths) - # pqpackage.load_facts(input_paths, cache_dir) - # pqpackage.load_fact_resource(input_paths, cache_dir) - # pqpackage.load_entities(input_paths, cache_dir, organisation_path) - # pqpackage.pq_to_sqlite(output_path, cache_dir) - # pqpackage.close_conn() + # Repeat for parquet + # Set up cache directory to store parquet files. The sqlite files created from this will be saved in the dataset + if not os.path.exists(cache_dir): + os.makedirs(cache_dir) + + pqpackage = DatasetParquetPackage( + dataset, + path=output_path, + input_paths=input_paths, + specification_dir=None, # TBD: package should use this specification object + ) + pqpackage.create_temp_table(input_paths) + pqpackage.load_facts(input_paths, cache_dir) + pqpackage.load_fact_resource(input_paths, cache_dir) + pqpackage.load_entities(input_paths, cache_dir, organisation_path) + pqpackage.pq_to_sqlite(output_path, cache_dir) + pqpackage.close_conn() def dataset_dump(input_path, output_path): From c382ae97665b50a8cfbef445fcfdbd115d8e21b4 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Tue, 26 Nov 2024 11:13:51 +0000 Subject: [PATCH 11/54] Added print statment to find where 'organisation' is in dataset_dump_flattened function --- digital_land/commands.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/digital_land/commands.py b/digital_land/commands.py index d6373057e..912b16a42 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -464,6 +464,11 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): kebab_case_row = dict( [(key.replace("_", "-"), val) for key, val in row.items()] ) + if 'organisation' in kebab_case_row.keys(): + print("kebab_case_row") + print(kebab_case_row) + print(row) + print("\n") writer.writerow(kebab_case_row) entities.append(kebab_case_row) From aab2246611854f764947609a4946f495ca3d1cc2 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Tue, 26 Nov 2024 11:18:56 +0000 Subject: [PATCH 12/54] Filtered 'field_names' --- digital_land/commands.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index 912b16a42..62b035994 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -462,7 +462,7 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): json_string = row.pop("json") or "{}" row.update(json.loads(json_string)) kebab_case_row = dict( - [(key.replace("_", "-"), val) for key, val in row.items()] + [(key.replace("_", "-"), val) for key, val in row.items() if key.replace("_", "-") in field_names] ) if 'organisation' in kebab_case_row.keys(): print("kebab_case_row") From dc14e9ffcc8f0956a5d5330d5a8e44469f32d52e Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Tue, 26 Nov 2024 11:23:29 +0000 Subject: [PATCH 13/54] Print every row for debug purposes --- digital_land/commands.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index 62b035994..58423eeef 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -462,13 +462,12 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): json_string = row.pop("json") or "{}" row.update(json.loads(json_string)) kebab_case_row = dict( - [(key.replace("_", "-"), val) for key, val in row.items() if key.replace("_", "-") in field_names] + [(key.replace("_", "-"), val) for key, val in row.items()] ) - if 'organisation' in kebab_case_row.keys(): - print("kebab_case_row") - print(kebab_case_row) - print(row) - print("\n") + print("kebab_case_row") + print(kebab_case_row) + print(row) + print("\n") writer.writerow(kebab_case_row) entities.append(kebab_case_row) From 09a34e865555e3a25f510facd37800bcb2199889 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Tue, 26 Nov 2024 11:28:59 +0000 Subject: [PATCH 14/54] More print statements for debug purposes --- digital_land/commands.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index 58423eeef..ccb6d8614 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -430,6 +430,8 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): logging.error(f"Can't extract datapackage name from {csv_path}") sys.exit(-1) + print("dataset_name") + print(dataset_name) flattened_csv_path = os.path.join(flattened_dir, f"{dataset_name}.csv") with open(csv_path, "r") as read_file, open(flattened_csv_path, "w+") as write_file: reader = csv.DictReader(read_file) @@ -464,20 +466,20 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): kebab_case_row = dict( [(key.replace("_", "-"), val) for key, val in row.items()] ) - print("kebab_case_row") - print(kebab_case_row) - print(row) - print("\n") writer.writerow(kebab_case_row) entities.append(kebab_case_row) + print(f"Finished {dataset_name}") # write the entities to json file as well flattened_json_path = os.path.join(flattened_dir, f"{dataset_name}.json") + print("flattened_json_path") + print(flattened_json_path) with open(flattened_json_path, "w") as out_json: out_json.write(json.dumps({"entities": entities})) batch_size = 100000 temp_geojson_files = [] geography_entities = [e for e in entities if e["typology"] == "geography"] + print("Before process_data_in_batches") for i in range(0, len(geography_entities), batch_size): batch = geography_entities[i : i + batch_size] feature_collection = process_data_in_batches(batch, flattened_dir, dataset_name) @@ -489,6 +491,7 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): out_geojson.write(geojson.dumps(feature_collection)) except Exception as e: logging.error(f"Error writing to GeoJSON file: {e}") + print("After process_data_in_batches") if all(os.path.isfile(path) for path in temp_geojson_files): rfc7946_geojson_path = os.path.join(flattened_dir, f"{dataset_name}.geojson") From cdbabe7085e249e51b53081247c4d6d9673f8a00 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Tue, 26 Nov 2024 11:32:29 +0000 Subject: [PATCH 15/54] More print statements for debug purposes --- digital_land/commands.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index ccb6d8614..8983d0c3b 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -430,8 +430,6 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): logging.error(f"Can't extract datapackage name from {csv_path}") sys.exit(-1) - print("dataset_name") - print(dataset_name) flattened_csv_path = os.path.join(flattened_dir, f"{dataset_name}.csv") with open(csv_path, "r") as read_file, open(flattened_csv_path, "w+") as write_file: reader = csv.DictReader(read_file) @@ -468,18 +466,15 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): ) writer.writerow(kebab_case_row) entities.append(kebab_case_row) - print(f"Finished {dataset_name}") # write the entities to json file as well flattened_json_path = os.path.join(flattened_dir, f"{dataset_name}.json") - print("flattened_json_path") - print(flattened_json_path) with open(flattened_json_path, "w") as out_json: out_json.write(json.dumps({"entities": entities})) batch_size = 100000 temp_geojson_files = [] geography_entities = [e for e in entities if e["typology"] == "geography"] - print("Before process_data_in_batches") + for i in range(0, len(geography_entities), batch_size): batch = geography_entities[i : i + batch_size] feature_collection = process_data_in_batches(batch, flattened_dir, dataset_name) @@ -491,8 +486,8 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): out_geojson.write(geojson.dumps(feature_collection)) except Exception as e: logging.error(f"Error writing to GeoJSON file: {e}") - print("After process_data_in_batches") + print("Pre temp_geojson_files") if all(os.path.isfile(path) for path in temp_geojson_files): rfc7946_geojson_path = os.path.join(flattened_dir, f"{dataset_name}.geojson") env = ( @@ -533,6 +528,7 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): # clear up input geojson file if os.path.isfile(temp_path): os.remove(temp_path) + print("Post temp_geojson_files") # @@ -899,9 +895,10 @@ def process_data_in_batches(entities, flattened_dir, dataset_name): logging.error(f"Error loading wkt from entity {entity['entity']}") logging.error(e) else: - logging.error( - f"No geometry or point data for entity {entity['entity']} with typology 'geography'" - ) + pass + # logging.error( + # f"No geometry or point data for entity {entity['entity']} with typology 'geography'" + # ) if features: feature_collection = geojson.FeatureCollection( From cfccd9cf52eb3d565f577fe83c110ab6c7292952 Mon Sep 17 00:00:00 2001 From: Christopher Johns Date: Tue, 26 Nov 2024 11:32:30 +0000 Subject: [PATCH 16/54] Add dataset to parquet path. --- digital_land/commands.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/digital_land/commands.py b/digital_land/commands.py index ccb6d8614..52b3d8d1a 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -360,6 +360,8 @@ def dataset_create( dataset_resource_dir="var/dataset-resource", cache_dir="var/cache/parquet", ): + cache_dir = os.path.join(cache_dir, dataset) + if not output_path: print("missing output path", file=sys.stderr) sys.exit(2) From b56e7747f6cb79c98fd8c68785c9c1be8c9e690a Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Tue, 26 Nov 2024 11:36:04 +0000 Subject: [PATCH 17/54] More print statements for debug purposes --- digital_land/commands.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/digital_land/commands.py b/digital_land/commands.py index 2d63670ca..79618abcc 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -490,6 +490,7 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): logging.error(f"Error writing to GeoJSON file: {e}") print("Pre temp_geojson_files") + print(len(temp_geojson_files)) if all(os.path.isfile(path) for path in temp_geojson_files): rfc7946_geojson_path = os.path.join(flattened_dir, f"{dataset_name}.geojson") env = ( @@ -498,6 +499,7 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): else os.environ ) for temp_path in temp_geojson_files: + print(temp_path) responseCode, _, _ = execute( [ "ogr2ogr", From c48729fe9f1de3a10c91bfc7de19788f91a8685c Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Tue, 26 Nov 2024 11:37:57 +0000 Subject: [PATCH 18/54] More print statements for debug purposes --- digital_land/commands.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/digital_land/commands.py b/digital_land/commands.py index 79618abcc..74c64f96e 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -492,12 +492,15 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): print("Pre temp_geojson_files") print(len(temp_geojson_files)) if all(os.path.isfile(path) for path in temp_geojson_files): + print('a') rfc7946_geojson_path = os.path.join(flattened_dir, f"{dataset_name}.geojson") + print('b') env = ( dict(os.environ, OGR_GEOJSON_MAX_OBJ_SIZE="0") if get_gdal_version() >= Version("3.5.2") else os.environ ) + print('c') for temp_path in temp_geojson_files: print(temp_path) responseCode, _, _ = execute( From 6168a8ed7d8a3ef95539e39ce29a54afb9180ea7 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Tue, 26 Nov 2024 11:43:49 +0000 Subject: [PATCH 19/54] More print statements for debug purposes --- digital_land/commands.py | 1 + 1 file changed, 1 insertion(+) diff --git a/digital_land/commands.py b/digital_land/commands.py index 74c64f96e..e56047790 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -495,6 +495,7 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): print('a') rfc7946_geojson_path = os.path.join(flattened_dir, f"{dataset_name}.geojson") print('b') + print(get_gdal_version()) env = ( dict(os.environ, OGR_GEOJSON_MAX_OBJ_SIZE="0") if get_gdal_version() >= Version("3.5.2") From c66e231578625468a5e59740a641bc0520b75758 Mon Sep 17 00:00:00 2001 From: Christopher Johns Date: Tue, 26 Nov 2024 11:53:09 +0000 Subject: [PATCH 20/54] Updated test. --- tests/acceptance/test_dataset_create.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/acceptance/test_dataset_create.py b/tests/acceptance/test_dataset_create.py index 687828a90..8be577484 100644 --- a/tests/acceptance/test_dataset_create.py +++ b/tests/acceptance/test_dataset_create.py @@ -114,7 +114,8 @@ def test_acceptance_dataset_create( print(result.exception) assert result.exit_code == 0, "error returned when building dataset" - pq_files = [file for file in os.listdir(cache_path) if file.endswith(".parquet")] + pq_cache = os.path.join(cache_path, test_dataset) + pq_files = [file for file in os.listdir(pq_cache) if file.endswith(".parquet")] assert len(pq_files) == 3, "Not all parquet files created" assert np.all( np.sort(pq_files) == ["entity.parquet", "fact.parquet", "fact_resource.parquet"] @@ -136,7 +137,7 @@ def test_acceptance_dataset_create( ), f"Missing following tables in sqlite database: {missing_tables}" for table in list(expected_tables): - pq_rows = len(pd.read_parquet(f"{cache_path}/{table}.parquet")) + pq_rows = len(pd.read_parquet(f"{pq_cache}/{table}.parquet")) sql_rows = cursor.execute(f"SELECT COUNT(*) FROM {table};").fetchone()[0] assert ( pq_rows == sql_rows From 88c4f927de599b30785dd3a1edd3775fcc24ddf4 Mon Sep 17 00:00:00 2001 From: Christopher Johns Date: Tue, 26 Nov 2024 11:53:43 +0000 Subject: [PATCH 21/54] Fixed black issues. --- digital_land/commands.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index e56047790..8bb302584 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -492,16 +492,16 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): print("Pre temp_geojson_files") print(len(temp_geojson_files)) if all(os.path.isfile(path) for path in temp_geojson_files): - print('a') + print("a") rfc7946_geojson_path = os.path.join(flattened_dir, f"{dataset_name}.geojson") - print('b') + print("b") print(get_gdal_version()) env = ( dict(os.environ, OGR_GEOJSON_MAX_OBJ_SIZE="0") if get_gdal_version() >= Version("3.5.2") else os.environ ) - print('c') + print("c") for temp_path in temp_geojson_files: print(temp_path) responseCode, _, _ = execute( From 9e1e38490ad1dcdb56b51fae6b9f4b6a7f97a7bc Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Tue, 26 Nov 2024 12:15:18 +0000 Subject: [PATCH 22/54] More print statements for debugging --- digital_land/commands.py | 1 + 1 file changed, 1 insertion(+) diff --git a/digital_land/commands.py b/digital_land/commands.py index e56047790..5f1519cfb 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -496,6 +496,7 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): rfc7946_geojson_path = os.path.join(flattened_dir, f"{dataset_name}.geojson") print('b') print(get_gdal_version()) + print(get_gdal_version() >= Version("3.5.2")) env = ( dict(os.environ, OGR_GEOJSON_MAX_OBJ_SIZE="0") if get_gdal_version() >= Version("3.5.2") From c3b2e88ee6226c7b8698b4df080cf27c8783155b Mon Sep 17 00:00:00 2001 From: Christopher Johns Date: Tue, 26 Nov 2024 13:34:35 +0000 Subject: [PATCH 23/54] Use dataset name in duckdb file. --- digital_land/package/datasetparquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/digital_land/package/datasetparquet.py b/digital_land/package/datasetparquet.py index 5d978a44f..1ddd95c9c 100644 --- a/digital_land/package/datasetparquet.py +++ b/digital_land/package/datasetparquet.py @@ -33,7 +33,7 @@ def __init__(self, dataset, input_paths, **kwargs): self.input_paths = input_paths self._spatialite = None # Persistent connection for the class. Given name to ensure that table is stored on disk (not purely in memory) - self.duckdb_file = "input_paths_database.duckdb" + self.duckdb_file = f"input_paths_{dataset}.duckdb" self.conn = duckdb.connect(self.duckdb_file) self.schema = self.get_schema(input_paths) self.typology = self.specification.schema[dataset]["typology"] From ac111dfea97c819db4460df3c9b331a3ac673f04 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Tue, 26 Nov 2024 17:24:10 +0000 Subject: [PATCH 24/54] More print statements for debugging --- digital_land/commands.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/digital_land/commands.py b/digital_land/commands.py index 5afbeffd5..37cb6656d 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -12,6 +12,8 @@ import geojson import shapely +import subprocess + from digital_land.package.organisation import OrganisationPackage from digital_land.check import duplicate_reference_check from digital_land.specification import Specification @@ -495,6 +497,12 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): print("a") rfc7946_geojson_path = os.path.join(flattened_dir, f"{dataset_name}.geojson") print("b") + out, _ = subprocess.Popen( + ["ogr2ogr", "--version"], + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + ).communicate() + print(out) print(get_gdal_version()) print(get_gdal_version() >= Version("3.5.2")) env = ( From e6493798b880500a5d8e906348e7f50ac7c15f04 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Tue, 26 Nov 2024 17:29:23 +0000 Subject: [PATCH 25/54] More print statements for debugging --- digital_land/commands.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/digital_land/commands.py b/digital_land/commands.py index 37cb6656d..5fdb196d6 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -497,6 +497,8 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): print("a") rfc7946_geojson_path = os.path.join(flattened_dir, f"{dataset_name}.geojson") print("b") + print(os.environ.get("PATH")) + print(os.environ.get("GDAL_DATA")) out, _ = subprocess.Popen( ["ogr2ogr", "--version"], stdout=subprocess.PIPE, From 7d34d4cdb0eb885df65fc96a76152f94b193b8a0 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Tue, 26 Nov 2024 17:31:02 +0000 Subject: [PATCH 26/54] More print statements for debugging --- digital_land/commands.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index 5fdb196d6..f4c4d3223 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -496,9 +496,9 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): if all(os.path.isfile(path) for path in temp_geojson_files): print("a") rfc7946_geojson_path = os.path.join(flattened_dir, f"{dataset_name}.geojson") - print("b") print(os.environ.get("PATH")) print(os.environ.get("GDAL_DATA")) + print("b") out, _ = subprocess.Popen( ["ogr2ogr", "--version"], stdout=subprocess.PIPE, From 6df361aa43b8f04036558b2fabbbb30347cd12a1 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Tue, 26 Nov 2024 17:32:59 +0000 Subject: [PATCH 27/54] More print statements for debugging --- digital_land/commands.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/digital_land/commands.py b/digital_land/commands.py index f4c4d3223..1805db6fd 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -496,7 +496,9 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): if all(os.path.isfile(path) for path in temp_geojson_files): print("a") rfc7946_geojson_path = os.path.join(flattened_dir, f"{dataset_name}.geojson") + print("os.environ.get('PATH')") print(os.environ.get("PATH")) + print("os.environ.get('GDAL_DATA')") print(os.environ.get("GDAL_DATA")) print("b") out, _ = subprocess.Popen( From ce7e58c444577af840d2e07beac246467c702e88 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Tue, 26 Nov 2024 17:39:56 +0000 Subject: [PATCH 28/54] More print statements for debugging --- digital_land/commands.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index 1805db6fd..21435de0d 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -496,10 +496,7 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): if all(os.path.isfile(path) for path in temp_geojson_files): print("a") rfc7946_geojson_path = os.path.join(flattened_dir, f"{dataset_name}.geojson") - print("os.environ.get('PATH')") - print(os.environ.get("PATH")) - print("os.environ.get('GDAL_DATA')") - print(os.environ.get("GDAL_DATA")) + print(subprocess.run(["ogr2ogr", "--version"]).stdout) print("b") out, _ = subprocess.Popen( ["ogr2ogr", "--version"], From d689d95155088a499e1efe967c75a186ab454f5c Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Tue, 26 Nov 2024 17:45:20 +0000 Subject: [PATCH 29/54] More print statements for debugging --- digital_land/commands.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index 21435de0d..798006143 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -496,7 +496,24 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): if all(os.path.isfile(path) for path in temp_geojson_files): print("a") rfc7946_geojson_path = os.path.join(flattened_dir, f"{dataset_name}.geojson") - print(subprocess.run(["ogr2ogr", "--version"]).stdout) + print("subprocess.run(['ogr2ogr', '--version']).stdout") + try: + result = subprocess.run(["ogr2ogr", "--version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + print(result.stdout.decode()) + except Exception as e: + print(f"Error: {e}") + print("subprocess.Popen") + try: + process = subprocess.Popen( + ["ogr2ogr", "--version"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + stdout, stderr = process.communicate() + print(stdout.decode(), stderr.decode()) + except Exception as e: + print(f"Error: {e}") + print("b") out, _ = subprocess.Popen( ["ogr2ogr", "--version"], From 1f492eccf85d421b2242e17e035cb5cdb4024432 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Tue, 26 Nov 2024 18:27:18 +0000 Subject: [PATCH 30/54] Trying os.environ in subprocess --- digital_land/commands.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index 798006143..71770fd58 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -497,8 +497,9 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): print("a") rfc7946_geojson_path = os.path.join(flattened_dir, f"{dataset_name}.geojson") print("subprocess.run(['ogr2ogr', '--version']).stdout") + env = os.environ.copy() try: - result = subprocess.run(["ogr2ogr", "--version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + result = subprocess.run(["ogr2ogr", "--version"], env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) print(result.stdout.decode()) except Exception as e: print(f"Error: {e}") From a93cde0c2ad45a42cae2409df26286cc8a94aeda Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Tue, 26 Nov 2024 18:43:04 +0000 Subject: [PATCH 31/54] Trying os.environ in subprocess --- digital_land/commands.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/digital_land/commands.py b/digital_land/commands.py index 71770fd58..9ea948a26 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -496,6 +496,8 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): if all(os.path.isfile(path) for path in temp_geojson_files): print("a") rfc7946_geojson_path = os.path.join(flattened_dir, f"{dataset_name}.geojson") + print("subprocess.run(['ls']).stdout") + print(subprocess.run(['ls']).stdout) print("subprocess.run(['ogr2ogr', '--version']).stdout") env = os.environ.copy() try: From 7dea1c9bdf9b48521b25a9f587459a5a0c2b3b13 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Tue, 26 Nov 2024 18:48:46 +0000 Subject: [PATCH 32/54] Trying os.environ in subprocess --- digital_land/commands.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index 9ea948a26..c9b8c04ad 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -496,8 +496,14 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): if all(os.path.isfile(path) for path in temp_geojson_files): print("a") rfc7946_geojson_path = os.path.join(flattened_dir, f"{dataset_name}.geojson") - print("subprocess.run(['ls']).stdout") - print(subprocess.run(['ls']).stdout) + print("subprocess.Popen(['ls'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)") + try: + process = subprocess.Popen(['ls'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = process.communicate() + print(f"stdout: {stdout.decode()}") + print(f"stderr: {stderr.decode()}") + except Exception as e: + print(f"Error: {e}") print("subprocess.run(['ogr2ogr', '--version']).stdout") env = os.environ.copy() try: From 3ac12f1320c411761a7589ea8870b0909e595eaa Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Tue, 26 Nov 2024 18:52:50 +0000 Subject: [PATCH 33/54] Added print statments to debug --- digital_land/commands.py | 53 +++++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 25 deletions(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index c9b8c04ad..60c84359e 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -496,32 +496,35 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): if all(os.path.isfile(path) for path in temp_geojson_files): print("a") rfc7946_geojson_path = os.path.join(flattened_dir, f"{dataset_name}.geojson") - print("subprocess.Popen(['ls'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)") - try: - process = subprocess.Popen(['ls'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = process.communicate() - print(f"stdout: {stdout.decode()}") - print(f"stderr: {stderr.decode()}") - except Exception as e: - print(f"Error: {e}") - print("subprocess.run(['ogr2ogr', '--version']).stdout") env = os.environ.copy() - try: - result = subprocess.run(["ogr2ogr", "--version"], env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - print(result.stdout.decode()) - except Exception as e: - print(f"Error: {e}") - print("subprocess.Popen") - try: - process = subprocess.Popen( - ["ogr2ogr", "--version"], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE - ) - stdout, stderr = process.communicate() - print(stdout.decode(), stderr.decode()) - except Exception as e: - print(f"Error: {e}") + print("env") + print(env) + # print("subprocess.Popen(['ls'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)") + # try: + # process = subprocess.Popen(['ls'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # stdout, stderr = process.communicate() + # print(f"stdout: {stdout.decode()}") + # print(f"stderr: {stderr.decode()}") + # except Exception as e: + # print(f"Error: {e}") + # print("subprocess.run(['ogr2ogr', '--version']).stdout") + # env = os.environ.copy() + # try: + # result = subprocess.run(["ogr2ogr", "--version"], env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # print(result.stdout.decode()) + # except Exception as e: + # print(f"Error: {e}") + # print("subprocess.Popen") + # try: + # process = subprocess.Popen( + # ["ogr2ogr", "--version"], + # stdout=subprocess.PIPE, + # stderr=subprocess.PIPE + # ) + # stdout, stderr = process.communicate() + # print(stdout.decode(), stderr.decode()) + # except Exception as e: + # print(f"Error: {e}") print("b") out, _ = subprocess.Popen( From 4ff097220b021b80bee96e390040bacf248ffa38 Mon Sep 17 00:00:00 2001 From: Christopher Johns Date: Wed, 27 Nov 2024 10:07:56 +0000 Subject: [PATCH 34/54] insert into the SQLite table rather than recreate it. --- digital_land/commands.py | 4 ++-- digital_land/package/datasetparquet.py | 16 ++++++++++++++-- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index 60c84359e..a6eff8d50 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -419,7 +419,7 @@ def dataset_create( def dataset_dump(input_path, output_path): - cmd = f"sqlite3 -header -csv {input_path} 'select * from entity;' > {output_path}" + cmd = f"sqlite3 -header -csv {input_path} 'select * from entity order by entity;' > {output_path}" logging.info(cmd) os.system(cmd) @@ -431,7 +431,7 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): elif isinstance(csv_path, Path): dataset_name = csv_path.stem else: - logging.error(f"Can't extract datapackage name from {csv_path}") + logging.error(f"Can't extract datapackage name from {csv_path}") sys.exit(-1) flattened_csv_path = os.path.join(flattened_dir, f"{dataset_name}.csv") diff --git a/digital_land/package/datasetparquet.py b/digital_land/package/datasetparquet.py index 1ddd95c9c..26900179d 100644 --- a/digital_land/package/datasetparquet.py +++ b/digital_land/package/datasetparquet.py @@ -288,10 +288,22 @@ def pq_to_sqlite(self, output_path, cache_dir): self.conn.execute( f"ATTACH DATABASE '{sqlite_file_path}' AS sqlite_db (TYPE SQLITE);" ) - self.conn.execute(f"DROP TABLE IF EXISTS sqlite_db.{table_name};") + + # Fix the column names + for column in self.conn.execute("DESCRIBE TABLE temp_table;").fetchall(): + if "-" in column[0]: + self.conn.execute( + f"ALTER TABLE temp_table RENAME COLUMN '{column[0]}' TO '{column[0].replace('-','_')}';" + ) + self.conn.execute( - f"CREATE TABLE sqlite_db.{table_name} AS SELECT * FROM temp_table;" + f"INSERT INTO sqlite_db.{table_name} BY NAME (SELECT * FROM temp_table);" ) + + # self.conn.execute(f"DROP TABLE IF EXISTS sqlite_db.{table_name};") + # self.conn.execute( + # f"CREATE TABLE sqlite_db.{table_name} AS SELECT * FROM temp_table;" + # ) self.conn.execute("DETACH DATABASE sqlite_db;") def close_conn(self): From 53a4b3d47e921cd484bc0d01fe8a6a8fbfd6b01a Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Wed, 27 Nov 2024 11:07:08 +0000 Subject: [PATCH 35/54] Trying gartbage collect --- digital_land/commands.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index 60c84359e..64bf75ca5 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -13,6 +13,7 @@ import shapely import subprocess +import gc from digital_land.package.organisation import OrganisationPackage from digital_land.check import duplicate_reference_check @@ -499,14 +500,15 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): env = os.environ.copy() print("env") print(env) - # print("subprocess.Popen(['ls'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)") - # try: - # process = subprocess.Popen(['ls'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - # stdout, stderr = process.communicate() - # print(f"stdout: {stdout.decode()}") - # print(f"stderr: {stderr.decode()}") - # except Exception as e: - # print(f"Error: {e}") + gc() + print("subprocess.Popen(['ls'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)") + try: + process = subprocess.Popen(['ls'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = process.communicate() + print(f"stdout: {stdout.decode()}") + print(f"stderr: {stderr.decode()}") + except Exception as e: + print(f"Error: {e}") # print("subprocess.run(['ogr2ogr', '--version']).stdout") # env = os.environ.copy() # try: From 7afa43f77881876ed62d931d74787dae6b54ee98 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Wed, 27 Nov 2024 11:09:55 +0000 Subject: [PATCH 36/54] Trying gartbage collect --- digital_land/commands.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index 48905a8e6..1310bce58 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -500,7 +500,7 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): env = os.environ.copy() print("env") print(env) - gc() + gc.collect() print("subprocess.Popen(['ls'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)") try: process = subprocess.Popen(['ls'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) From a1d22f8065d69ef8eb46c18f232772da8584bd47 Mon Sep 17 00:00:00 2001 From: alexglasertpx Date: Wed, 27 Nov 2024 13:06:32 +0000 Subject: [PATCH 37/54] Added print statments to debug --- digital_land/commands.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index 1310bce58..263b4b25c 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -13,7 +13,6 @@ import shapely import subprocess -import gc from digital_land.package.organisation import OrganisationPackage from digital_land.check import duplicate_reference_check @@ -363,6 +362,7 @@ def dataset_create( dataset_resource_dir="var/dataset-resource", cache_dir="var/cache/parquet", ): + print("In dataset_create") cache_dir = os.path.join(cache_dir, dataset) if not output_path: @@ -405,6 +405,7 @@ def dataset_create( if not os.path.exists(cache_dir): os.makedirs(cache_dir) + print("\nPre DatasetParquetPackage\n") pqpackage = DatasetParquetPackage( dataset, path=output_path, @@ -417,6 +418,7 @@ def dataset_create( pqpackage.load_entities(input_paths, cache_dir, organisation_path) pqpackage.pq_to_sqlite(output_path, cache_dir) pqpackage.close_conn() + print("\nPost DatasetParquetPackage\n") def dataset_dump(input_path, output_path): @@ -500,7 +502,6 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): env = os.environ.copy() print("env") print(env) - gc.collect() print("subprocess.Popen(['ls'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)") try: process = subprocess.Popen(['ls'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) From 75453426aa041fd35626bce0aac909eec24362b3 Mon Sep 17 00:00:00 2001 From: Christopher Johns Date: Wed, 27 Nov 2024 15:30:12 +0000 Subject: [PATCH 38/54] Replace empty json with NULL --- digital_land/commands.py | 8 ++++++-- digital_land/package/datasetparquet.py | 5 ++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index 263b4b25c..17e0ee1f5 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -502,9 +502,13 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): env = os.environ.copy() print("env") print(env) - print("subprocess.Popen(['ls'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)") + print( + "subprocess.Popen(['ls'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)" + ) try: - process = subprocess.Popen(['ls'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + process = subprocess.Popen( + ["ls"], stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) stdout, stderr = process.communicate() print(f"stdout: {stdout.decode()}") print(f"stderr: {stderr.decode()}") diff --git a/digital_land/package/datasetparquet.py b/digital_land/package/datasetparquet.py index 26900179d..06e7e949a 100644 --- a/digital_land/package/datasetparquet.py +++ b/digital_land/package/datasetparquet.py @@ -256,7 +256,10 @@ def load_entities( on t1.organisation = t2.organisation ) ) - SELECT * FROM computed_centroid + SELECT + * EXCLUDE (json), + CASE WHEN json = '{{}}' THEN NULL ELSE json END AS json + FROM computed_centroid ) TO '{output_path}/entity{self.suffix}' (FORMAT PARQUET); """ self.conn.execute(sql) From 0d29d727d2f4767c93c3a6538614cd6bd54a967b Mon Sep 17 00:00:00 2001 From: Christopher Johns Date: Wed, 27 Nov 2024 16:03:39 +0000 Subject: [PATCH 39/54] Get schema from specification --- digital_land/commands.py | 2 +- digital_land/package/datasetparquet.py | 24 +++++++++++++++++++++--- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index 17e0ee1f5..17de04efb 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -422,7 +422,7 @@ def dataset_create( def dataset_dump(input_path, output_path): - cmd = f"sqlite3 -header -csv {input_path} 'select * from entity order by entity;' > {output_path}" + cmd = f"sqlite3 -header -csv {input_path} 'select * from entity;' > {output_path}" logging.info(cmd) os.system(cmd) diff --git a/digital_land/package/datasetparquet.py b/digital_land/package/datasetparquet.py index 06e7e949a..0173a6abf 100644 --- a/digital_land/package/datasetparquet.py +++ b/digital_land/package/datasetparquet.py @@ -39,6 +39,20 @@ def __init__(self, dataset, input_paths, **kwargs): self.typology = self.specification.schema[dataset]["typology"] def get_schema(self, input_paths): + schema = {} + for field in sorted( + list( + set(self.specification.schema["fact"]["fields"]).union( + set(self.specification.schema["fact-resource"]["fields"]) + ) + ) + ): + schema[field] = "VARCHAR" + + return schema + + set(self.specification.schema["fact"]["fields"]) + # There are issues with the schema when reading in lots of files, namely smaller files have few or zero rows # Plan is to find the largest file, create an initial database schema from that then use that in future largest_file = max(input_paths, key=os.path.getsize) @@ -68,14 +82,17 @@ def create_temp_table(self, input_paths): ) input_paths_str = ", ".join([f"'{path}'" for path in input_paths]) + all_columns_str = ", ".join(f"'{key}'" for key in self.schema.keys()) self.conn.execute("DROP TABLE IF EXISTS temp_table") query = f""" CREATE TEMPORARY TABLE temp_table AS SELECT * - FROM read_csv_auto( + FROM read_csv( [{input_paths_str}], - columns = {self.schema} + columns = {self.schema}, + header = true, + force_not_null = [{all_columns_str}] ) """ self.conn.execute(query) @@ -299,6 +316,7 @@ def pq_to_sqlite(self, output_path, cache_dir): f"ALTER TABLE temp_table RENAME COLUMN '{column[0]}' TO '{column[0].replace('-','_')}';" ) + # Copy the data self.conn.execute( f"INSERT INTO sqlite_db.{table_name} BY NAME (SELECT * FROM temp_table);" ) @@ -312,9 +330,9 @@ def pq_to_sqlite(self, output_path, cache_dir): def close_conn(self): logging.info("Close connection to duckdb database in session") if self.conn is not None: + self.conn.close() if os.path.exists(self.duckdb_file): os.remove(self.duckdb_file) - self.conn.close() def load(self): pass From 12a59a4f1ac7ec10fb3ec75023ade7ca16e4673f Mon Sep 17 00:00:00 2001 From: Christopher Johns Date: Wed, 27 Nov 2024 17:09:48 +0000 Subject: [PATCH 40/54] Updated tests. --- .../test_package_datasetparquet.py | 75 +++++++++++++++++-- 1 file changed, 70 insertions(+), 5 deletions(-) diff --git a/tests/integration/test_package_datasetparquet.py b/tests/integration/test_package_datasetparquet.py index d33b927f5..912decc2a 100644 --- a/tests/integration/test_package_datasetparquet.py +++ b/tests/integration/test_package_datasetparquet.py @@ -197,7 +197,9 @@ def test_dataset_parquet_package(temp_dir): with open(input_paths[0], "w") as f: f.write(",".join(columns) + "\n") for row in data: - f.write(",".join(map(str, row)) + "\n") + f.write( + ",".join(map(lambda x: str(x) if x is not np.nan else "", row)) + "\n" + ) # Test data for the tables. This has plenty of 'duplicates' to check data = [ @@ -452,11 +454,13 @@ def test_dataset_parquet_package(temp_dir): with open(input_paths[1], "w") as f: f.write(",".join(columns) + "\n") for row in data: - f.write(",".join(map(str, row)) + "\n") + f.write( + ",".join(map(lambda x: str(x) if x is not np.nan else "", row)) + "\n" + ) # Leave hash3.csv empty except for the headers (to test that an empty csv doesn't screw things up). with open(input_paths[2], "w") as f: - f.write(",".join(columns) + "\n") # Only write the header row + f.write(",".join(map(lambda x: str(x) if x is not np.nan else "", row)) + "\n") # Instantiate the DatasetParquetPackage with temp_dir input paths and a mock schema package = DatasetParquetPackage( @@ -496,6 +500,7 @@ def test_load_fact_resource_basic(test_dataset_parquet_package, temp_dir): df = pd.read_parquet(output_file) assert len(df) > 0, "No data in fact-resource,parquet file" assert len(df) == 31, "Not all data saved in fact-resource.parquet file" + assert df.shape[1] == 7, "Not all columns saved in fact-resource.parquet file" @@ -539,6 +544,61 @@ def test_load_entities_basic(test_dataset_parquet_package, temp_dir): def test_load_pq_to_sqlite_basic(test_dataset_parquet_package, temp_dir): output_path = os.path.join(temp_dir, "integration_test.sqlite3") + conn = sqlite3.connect(output_path) + conn.execute( + """ + CREATE TABLE entity( + dataset TEXT, + end_date TEXT, + entity INTEGER PRIMARY KEY, + entry_date TEXT, + geojson JSON, + geometry TEXT, + json JSON, + name TEXT, + organisation_entity TEXT, + point TEXT, + prefix TEXT, + reference TEXT, + start_date TEXT, + typology TEXT + ); + """ + ) + conn.execute( + """ + CREATE TABLE fact( + end_date TEXT, + entity INTEGER, + fact TEXT PRIMARY KEY, + field TEXT, + entry_date TEXT, + priority INTEGER, + reference_entity TEXT, + start_date TEXT, + value TEXT, + FOREIGN KEY(entity) REFERENCES entity(entity) + ); + """ + ) + conn.execute( + """ + CREATE TABLE fact_resource( + end_date TEXT, + fact TEXT, + entry_date TEXT, + entry_number INTEGER, + priority INTEGER, + resource TEXT, + start_date TEXT, + FOREIGN KEY(fact) REFERENCES fact(fact) + ); + """ + ) + + conn.commit() + conn.close() + test_dataset_parquet_package.pq_to_sqlite(output_path, temp_dir) assert os.path.exists(output_path), "sqlite3 file does not exist" @@ -546,14 +606,15 @@ def test_load_pq_to_sqlite_basic(test_dataset_parquet_package, temp_dir): cnx = sqlite3.connect(output_path) df_sql = pd.read_sql_query("SELECT * FROM fact_resource", cnx) assert len(df_sql) > 0, "No data in fact_resource table" + assert np.all( - pd.isnull(df_sql["end-date"]) + len(df_sql["end_date"] == 0) ), "Non-empty strings in end_date from fact_resource table" df_sql = pd.read_sql_query("SELECT * FROM fact", cnx) assert len(df_sql) > 0, "No data in fact table" assert np.all( - pd.isnull(df_sql["end-date"]) + len(df_sql["end_date"] == 0) ), "Non-empty strings in end_date from fact table" df_sql = pd.read_sql_query("SELECT * FROM entity", cnx) @@ -561,4 +622,8 @@ def test_load_pq_to_sqlite_basic(test_dataset_parquet_package, temp_dir): assert np.any( pd.isnull(df_sql["geometry"]) ), "All geometries from entity table have values" + assert np.any( + len(df_sql["geometry"] == 0) + ), "All geometries from entity table have non-blank values" + cnx.close() From 408fdf35de4523fd9b0fb1a6ee53534df9c08138 Mon Sep 17 00:00:00 2001 From: Christopher Johns Date: Wed, 27 Nov 2024 22:37:21 +0000 Subject: [PATCH 41/54] Replace empty data with blank strings to match sqlite version. --- digital_land/package/datasetparquet.py | 37 ++++--------------- .../test_package_datasetparquet.py | 6 +-- 2 files changed, 10 insertions(+), 33 deletions(-) diff --git a/digital_land/package/datasetparquet.py b/digital_land/package/datasetparquet.py index 0173a6abf..6bde07a45 100644 --- a/digital_land/package/datasetparquet.py +++ b/digital_land/package/datasetparquet.py @@ -35,11 +35,12 @@ def __init__(self, dataset, input_paths, **kwargs): # Persistent connection for the class. Given name to ensure that table is stored on disk (not purely in memory) self.duckdb_file = f"input_paths_{dataset}.duckdb" self.conn = duckdb.connect(self.duckdb_file) - self.schema = self.get_schema(input_paths) + self.schema = self.get_schema() self.typology = self.specification.schema[dataset]["typology"] - def get_schema(self, input_paths): + def get_schema(self): schema = {} + for field in sorted( list( set(self.specification.schema["fact"]["fields"]).union( @@ -47,34 +48,11 @@ def get_schema(self, input_paths): ) ) ): - schema[field] = "VARCHAR" + datatype = self.specification.field[field]["datatype"] + schema[field] = "BIGINT" if datatype == "integer" else "VARCHAR" return schema - set(self.specification.schema["fact"]["fields"]) - - # There are issues with the schema when reading in lots of files, namely smaller files have few or zero rows - # Plan is to find the largest file, create an initial database schema from that then use that in future - largest_file = max(input_paths, key=os.path.getsize) - - create_temp_table_query = f""" - DROP TABLE IF EXISTS schema_table; - CREATE TEMP TABLE schema_table AS - SELECT * FROM read_csv_auto('{largest_file}') - LIMIT 1000; - """ - self.conn.query(create_temp_table_query) - - # Extract the schema from the temporary table - schema_query = """ - SELECT column_name, data_type - FROM information_schema.columns - WHERE table_name = 'schema_table'; - """ - schema_df = self.conn.query(schema_query).df() - - return dict(zip(schema_df["column_name"], schema_df["data_type"])) - def create_temp_table(self, input_paths): # Create a temp table of the data from input_paths as we need the information stored there at various times logging.info( @@ -82,7 +60,6 @@ def create_temp_table(self, input_paths): ) input_paths_str = ", ".join([f"'{path}'" for path in input_paths]) - all_columns_str = ", ".join(f"'{key}'" for key in self.schema.keys()) self.conn.execute("DROP TABLE IF EXISTS temp_table") query = f""" @@ -92,7 +69,7 @@ def create_temp_table(self, input_paths): [{input_paths_str}], columns = {self.schema}, header = true, - force_not_null = [{all_columns_str}] + force_not_null = {[field for field in self.schema.keys()]} ) """ self.conn.execute(query) @@ -232,7 +209,7 @@ def load_entities( # Don't want to include anything that ends with "_geom" null_fields_statement = ", ".join( [ - f'NULL::VARCHAR AS "{field}"' + f"''::VARCHAR AS \"{field}\"" for field in null_fields if not field.endswith("_geom") ] diff --git a/tests/integration/test_package_datasetparquet.py b/tests/integration/test_package_datasetparquet.py index 912decc2a..6656e9904 100644 --- a/tests/integration/test_package_datasetparquet.py +++ b/tests/integration/test_package_datasetparquet.py @@ -538,8 +538,8 @@ def test_load_entities_basic(test_dataset_parquet_package, temp_dir): assert len(df) > 0, "No data in entity.parquet file" assert len(df) == 11, "No. of entities is not correct" assert df.shape[1] == 14, "Not all columns saved in entity.parquet file" - assert df["end_date"].isnull().all() # Check null handling - assert df["geojson"].isnull().all() # Check null handling + assert df["end_date"].isin([""]).all() # Check null handling + assert df["geojson"].isin([""]).all() # Check null handling def test_load_pq_to_sqlite_basic(test_dataset_parquet_package, temp_dir): @@ -620,7 +620,7 @@ def test_load_pq_to_sqlite_basic(test_dataset_parquet_package, temp_dir): df_sql = pd.read_sql_query("SELECT * FROM entity", cnx) assert len(df_sql) > 0, "No data in entity table" assert np.any( - pd.isnull(df_sql["geometry"]) + len(df_sql["geometry"] == 0) ), "All geometries from entity table have values" assert np.any( len(df_sql["geometry"] == 0) From 130aade0a8fb39074597690a868489c7614090db Mon Sep 17 00:00:00 2001 From: Christopher Johns Date: Wed, 27 Nov 2024 23:31:46 +0000 Subject: [PATCH 42/54] Put the duckdb file in the cache. --- digital_land/commands.py | 36 ++++++-------- digital_land/package/datasetparquet.py | 47 ++++++++++--------- .../test_package_datasetparquet.py | 27 ++++++----- 3 files changed, 55 insertions(+), 55 deletions(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index 17de04efb..e924e2951 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -362,7 +362,6 @@ def dataset_create( dataset_resource_dir="var/dataset-resource", cache_dir="var/cache/parquet", ): - print("In dataset_create") cache_dir = os.path.join(cache_dir, dataset) if not output_path: @@ -405,24 +404,23 @@ def dataset_create( if not os.path.exists(cache_dir): os.makedirs(cache_dir) - print("\nPre DatasetParquetPackage\n") pqpackage = DatasetParquetPackage( dataset, + organisation=organisation, path=output_path, - input_paths=input_paths, + cache_dir=cache_dir, specification_dir=None, # TBD: package should use this specification object ) pqpackage.create_temp_table(input_paths) - pqpackage.load_facts(input_paths, cache_dir) - pqpackage.load_fact_resource(input_paths, cache_dir) - pqpackage.load_entities(input_paths, cache_dir, organisation_path) - pqpackage.pq_to_sqlite(output_path, cache_dir) + pqpackage.load_facts() + pqpackage.load_fact_resource() + pqpackage.load_entities() + pqpackage.pq_to_sqlite() pqpackage.close_conn() - print("\nPost DatasetParquetPackage\n") def dataset_dump(input_path, output_path): - cmd = f"sqlite3 -header -csv {input_path} 'select * from entity;' > {output_path}" + cmd = f"sqlite3 -header -csv {input_path} 'select * from entity order by entity;' > {output_path}" logging.info(cmd) os.system(cmd) @@ -502,18 +500,14 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): env = os.environ.copy() print("env") print(env) - print( - "subprocess.Popen(['ls'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)" - ) - try: - process = subprocess.Popen( - ["ls"], stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) - stdout, stderr = process.communicate() - print(f"stdout: {stdout.decode()}") - print(f"stderr: {stderr.decode()}") - except Exception as e: - print(f"Error: {e}") + # print("subprocess.Popen(['ls'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)") + # try: + # process = subprocess.Popen(['ls'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # stdout, stderr = process.communicate() + # print(f"stdout: {stdout.decode()}") + # print(f"stderr: {stderr.decode()}") + # except Exception as e: + # print(f"Error: {e}") # print("subprocess.run(['ogr2ogr', '--version']).stdout") # env = os.environ.copy() # try: diff --git a/digital_land/package/datasetparquet.py b/digital_land/package/datasetparquet.py index 6bde07a45..a75e026d6 100644 --- a/digital_land/package/datasetparquet.py +++ b/digital_land/package/datasetparquet.py @@ -26,14 +26,16 @@ class DatasetParquetPackage(Package): - def __init__(self, dataset, input_paths, **kwargs): + def __init__(self, dataset, organisation, cache_dir, **kwargs): self.suffix = ".parquet" super().__init__(dataset, tables=tables, indexes=indexes, **kwargs) self.dataset = dataset - self.input_paths = input_paths + self.organisation = organisation + self.cache_dir = cache_dir self._spatialite = None # Persistent connection for the class. Given name to ensure that table is stored on disk (not purely in memory) - self.duckdb_file = f"input_paths_{dataset}.duckdb" + os.makedirs(cache_dir, exist_ok=True) + self.duckdb_file = os.path.join(cache_dir, f"{dataset}.duckdb") self.conn = duckdb.connect(self.duckdb_file) self.schema = self.get_schema() self.typology = self.specification.schema[dataset]["typology"] @@ -74,8 +76,8 @@ def create_temp_table(self, input_paths): """ self.conn.execute(query) - def load_facts(self, input_paths, output_path): - logging.info(f"loading facts from {os.path.dirname(input_paths[0])}") + def load_facts(self): + logging.info("loading facts from temp table") fact_fields = self.specification.schema["fact"]["fields"] fields_str = ", ".join( @@ -96,12 +98,12 @@ def load_facts(self, input_paths, output_path): f""" COPY ( {query} - ) TO '{output_path}/fact{self.suffix}' (FORMAT PARQUET); + ) TO '{self.cache_dir}/fact{self.suffix}' (FORMAT PARQUET); """ ) - def load_fact_resource(self, input_paths, output_path): - logging.info(f"loading fact resources from {os.path.dirname(input_paths[0])}") + def load_fact_resource(self): + logging.info("loading fact resources from temp table") fact_resource_fields = self.specification.schema["fact-resource"]["fields"] fields_str = ", ".join( @@ -118,19 +120,19 @@ def load_fact_resource(self, input_paths, output_path): f""" COPY ( {query} - ) TO '{output_path}/fact_resource{self.suffix}' (FORMAT PARQUET); + ) TO '{self.cache_dir}/fact_resource{self.suffix}' (FORMAT PARQUET); """ ) - def load_entities( - self, input_paths, output_path, organisation_path="./var/cache/organisation.csv" - ): - logging.info(f"loading entities from {os.path.dirname(input_paths[0])}") + def load_entities(self): + organisation_path = self.organisation.organisation_path + + logging.info("loading entities from temp table") entity_fields = self.specification.schema["entity"]["fields"] # Do this to match with later field names. entity_fields = [e.replace("-", "_") for e in entity_fields] - input_paths_str = f"{output_path}/fact{self.suffix}" + input_paths_str = f"{self.cache_dir}/fact{self.suffix}" query = f""" SELECT DISTINCT REPLACE(field,'-','_') @@ -254,20 +256,23 @@ def load_entities( * EXCLUDE (json), CASE WHEN json = '{{}}' THEN NULL ELSE json END AS json FROM computed_centroid - ) TO '{output_path}/entity{self.suffix}' (FORMAT PARQUET); + ) TO '{self.cache_dir}/entity{self.suffix}' (FORMAT PARQUET); """ self.conn.execute(sql) - def pq_to_sqlite(self, output_path, cache_dir): + def pq_to_sqlite(self): # At present we are saving the parquet files in 'cache' but saving the sqlite files produced in 'dataset' # In future when parquet files are saved to 'dataset' remove the 'cache_dir' in the function arguments and # replace 'cache_dir' with 'output_path' in this function's code - logging.info(f"loading sqlite3 tables from parquet files in {cache_dir}") + logging.info( + f"loading sqlite3 tables in {self.path} from parquet files in {self.cache_dir}" + ) query = "INSTALL sqlite; LOAD sqlite;" self.conn.execute(query) - parquet_files = [fn for fn in os.listdir(cache_dir) if fn.endswith(self.suffix)] - sqlite_file_path = output_path + parquet_files = [ + fn for fn in os.listdir(self.cache_dir) if fn.endswith(self.suffix) + ] for parquet_file in parquet_files: table_name = os.path.splitext(os.path.basename(parquet_file))[0] @@ -277,13 +282,13 @@ def pq_to_sqlite(self, output_path, cache_dir): self.conn.execute( f""" CREATE TABLE temp_table AS - SELECT * FROM parquet_scan('{cache_dir}/{parquet_file}'); + SELECT * FROM parquet_scan('{self.cache_dir}/{parquet_file}'); """ ) # Export the DuckDB table to the SQLite database self.conn.execute( - f"ATTACH DATABASE '{sqlite_file_path}' AS sqlite_db (TYPE SQLITE);" + f"ATTACH DATABASE '{self.path}' AS sqlite_db (TYPE SQLITE);" ) # Fix the column names diff --git a/tests/integration/test_package_datasetparquet.py b/tests/integration/test_package_datasetparquet.py index 6656e9904..5b2e3a570 100644 --- a/tests/integration/test_package_datasetparquet.py +++ b/tests/integration/test_package_datasetparquet.py @@ -6,6 +6,11 @@ from digital_land.package.datasetparquet import DatasetParquetPackage +class MockOrganisation(object): + def __init__(self, organisation_path): + self.organisation_path = organisation_path + + # Fixture to create a shared temporary directory @pytest.fixture(scope="session") def temp_dir(tmpdir_factory): @@ -464,7 +469,11 @@ def test_dataset_parquet_package(temp_dir): # Instantiate the DatasetParquetPackage with temp_dir input paths and a mock schema package = DatasetParquetPackage( - dataset="conservation-area", input_paths=input_paths, specification_dir=None + dataset="conservation-area", + organisation=MockOrganisation(os.path.join(temp_dir, "organisation.csv")), + path=os.path.join(temp_dir, "integration_test.sqlite3"), + cache_dir=temp_dir, + specification_dir=None, ) package.create_temp_table(input_paths) @@ -473,9 +482,7 @@ def test_dataset_parquet_package(temp_dir): def test_load_fact_basic(test_dataset_parquet_package, temp_dir): output_dir = temp_dir - test_dataset_parquet_package.load_facts( - test_dataset_parquet_package.input_paths, output_dir - ) + test_dataset_parquet_package.load_facts() output_file = output_dir / "fact.parquet" assert os.path.exists(output_file), "fact.parquet file does not exist" @@ -488,9 +495,7 @@ def test_load_fact_basic(test_dataset_parquet_package, temp_dir): def test_load_fact_resource_basic(test_dataset_parquet_package, temp_dir): output_dir = temp_dir - test_dataset_parquet_package.load_fact_resource( - test_dataset_parquet_package.input_paths, output_dir - ) + test_dataset_parquet_package.load_fact_resource() # Check if the output parquet file exists and verify contents output_file = output_dir / "fact_resource.parquet" @@ -525,11 +530,7 @@ def test_load_entities_basic(test_dataset_parquet_package, temp_dir): for row in data: f.write(",".join(map(str, row)) + "\n") - test_dataset_parquet_package.load_entities( - test_dataset_parquet_package.input_paths, - output_dir, - f"{temp_dir}/organisation.csv", - ) + test_dataset_parquet_package.load_entities() output_file = os.path.join(output_dir, "entity.parquet") assert os.path.exists(output_file), "entity.parquet file does not exist" @@ -599,7 +600,7 @@ def test_load_pq_to_sqlite_basic(test_dataset_parquet_package, temp_dir): conn.commit() conn.close() - test_dataset_parquet_package.pq_to_sqlite(output_path, temp_dir) + test_dataset_parquet_package.pq_to_sqlite() assert os.path.exists(output_path), "sqlite3 file does not exist" From 3ab2e7201e3a4646bfe6b6b0e7e3e65d209fe353 Mon Sep 17 00:00:00 2001 From: alexiglaser Date: Fri, 29 Nov 2024 12:11:51 +0000 Subject: [PATCH 43/54] Tests relating to missing points --- digital_land/package/datasetparquet.py | 8 +- .../test_package_datasetparquet.py | 125 +++++++++++++----- 2 files changed, 92 insertions(+), 41 deletions(-) diff --git a/digital_land/package/datasetparquet.py b/digital_land/package/datasetparquet.py index a75e026d6..f11910dd0 100644 --- a/digital_land/package/datasetparquet.py +++ b/digital_land/package/datasetparquet.py @@ -234,9 +234,9 @@ def load_entities(self): COPY( WITH computed_centroid AS ( SELECT - * EXCLUDE (point), -- Calculate centroid point + * EXCLUDE (point), -- Calculate centroid point if not given CASE - WHEN geometry IS NOT NULL AND point IS NULL + WHEN geometry IS NOT NULL AND (point IS NULL OR point = '') THEN ST_AsText(ST_Centroid(ST_GeomFromText(geometry))) ELSE point END AS point @@ -303,10 +303,6 @@ def pq_to_sqlite(self): f"INSERT INTO sqlite_db.{table_name} BY NAME (SELECT * FROM temp_table);" ) - # self.conn.execute(f"DROP TABLE IF EXISTS sqlite_db.{table_name};") - # self.conn.execute( - # f"CREATE TABLE sqlite_db.{table_name} AS SELECT * FROM temp_table;" - # ) self.conn.execute("DETACH DATABASE sqlite_db;") def close_conn(self): diff --git a/tests/integration/test_package_datasetparquet.py b/tests/integration/test_package_datasetparquet.py index 5b2e3a570..704748586 100644 --- a/tests/integration/test_package_datasetparquet.py +++ b/tests/integration/test_package_datasetparquet.py @@ -47,7 +47,7 @@ def test_dataset_parquet_package(temp_dir): 11, "2023-01-01", 2, - "abcdef", + "abcdef1", "entry-date", 2, np.nan, @@ -60,7 +60,7 @@ def test_dataset_parquet_package(temp_dir): 11, "2023-01-01", 2, - "abcdef", + "abcdef2", "geometry", 2, np.nan, @@ -73,7 +73,33 @@ def test_dataset_parquet_package(temp_dir): 11, "2023-01-01", 2, - "abcdef", + "abcdef2p", + "point", + 2, + np.nan, + "zyxwvu", + np.nan, + '"POINT(-0.4812, 53.788)"', # This checks that point is not recalculated if given + ], + [ + np.nan, + 11, + "2023-01-01", + 2, + "abcdef3", + "document-url", + 2, + np.nan, + "zyxwvu", + np.nan, + "https://www.test.xyz", + ], + [ + np.nan, + 11, + "2023-01-01", + 2, + "abcdef4", "organisation", 2, np.nan, @@ -86,7 +112,7 @@ def test_dataset_parquet_package(temp_dir): 12, "2023-02-01", 2, - "abc123", + "abc1231", "entry-date", 2, np.nan, @@ -99,7 +125,7 @@ def test_dataset_parquet_package(temp_dir): 12, "2023-02-01", 2, - "abc123", + "abc1232", "geometry", 2, np.nan, @@ -112,7 +138,7 @@ def test_dataset_parquet_package(temp_dir): 12, "2023-01-01", 2, - "abc123", + "abc1233", "organisation", 2, np.nan, @@ -125,7 +151,7 @@ def test_dataset_parquet_package(temp_dir): 13, "2023-01-01", 2, - "def456", + "def4561", "entry-date", 2, np.nan, @@ -138,7 +164,7 @@ def test_dataset_parquet_package(temp_dir): 13, "2023-01-01", 2, - "def456", + "def4562", "geometry", 2, np.nan, @@ -151,7 +177,7 @@ def test_dataset_parquet_package(temp_dir): 13, "2023-01-01", 2, - "def456", + "def4563", "organisation", 2, np.nan, @@ -164,7 +190,7 @@ def test_dataset_parquet_package(temp_dir): 14, "2023-01-01", 2, - "a1b2c3", + "a1b2c31", "entry-date", 2, np.nan, @@ -177,7 +203,7 @@ def test_dataset_parquet_package(temp_dir): 14, "2023-01-01", 2, - "a1b2c3", + "a1b2c32", "geometry", 2, np.nan, @@ -190,7 +216,33 @@ def test_dataset_parquet_package(temp_dir): 14, "2023-01-01", 2, - "a1b2c3", + "a1b2c33", + "document-url", + 2, + np.nan, + "zyxwvu", + np.nan, + "https://www.testing.yyz", + ], + [ + np.nan, + 14, + "2023-01-01", + 2, + "a1b2c34", + "notes-checking", + 2, + np.nan, + "zyxwvu", + np.nan, + "Something random", + ], + [ + np.nan, + 14, + "2023-01-01", + 2, + "a1b2c35", "organisation", 2, np.nan, @@ -213,7 +265,7 @@ def test_dataset_parquet_package(temp_dir): 110, "2023-01-01", 2, - "badcfe", + "badcfe1", "entry-date", 2, np.nan, @@ -226,7 +278,7 @@ def test_dataset_parquet_package(temp_dir): 110, "2023-01-01", 2, - "badcfe", + "badcfe2", "entry-date", 2, np.nan, @@ -239,7 +291,7 @@ def test_dataset_parquet_package(temp_dir): 110, "2023-01-01", 2, - "badcfe", + "badcfe3", "organisation", 2, np.nan, @@ -252,7 +304,7 @@ def test_dataset_parquet_package(temp_dir): 111, "2023-01-01", 2, - "fedcba", + "fedcba1", "entry-date", 2, np.nan, @@ -265,7 +317,7 @@ def test_dataset_parquet_package(temp_dir): 111, "2023-02-01", 2, - "fedcba", + "fedcba2", "entry-date", 2, np.nan, @@ -278,7 +330,7 @@ def test_dataset_parquet_package(temp_dir): 111, "2023-02-01", 2, - "fedcba", + "fedcba3", "organisation", 2, np.nan, @@ -291,7 +343,7 @@ def test_dataset_parquet_package(temp_dir): 112, "2023-02-01", 2, - "bcdefg", + "bcdefg1", "entry-date", 2, np.nan, @@ -304,7 +356,7 @@ def test_dataset_parquet_package(temp_dir): 112, "2023-02-01", 12, - "bcdefg", + "bcdefg2", "entry-date", 2, np.nan, @@ -317,7 +369,7 @@ def test_dataset_parquet_package(temp_dir): 112, "2023-01-01", 12, - "bcdefg", + "bcdefg3", "organisation", 2, np.nan, @@ -330,7 +382,7 @@ def test_dataset_parquet_package(temp_dir): 113, "2023-01-01", 2, - "cdefgh", + "cdefgh1", "entry-date", 2, np.nan, @@ -343,7 +395,7 @@ def test_dataset_parquet_package(temp_dir): 113, "2023-01-01", 2, - "hgfedc", + "hgfedc1", "entry-date", 2, np.nan, @@ -356,7 +408,7 @@ def test_dataset_parquet_package(temp_dir): 113, "2023-01-01", 2, - "cdefgh", + "cdefgh2", "organisation", 2, np.nan, @@ -369,7 +421,7 @@ def test_dataset_parquet_package(temp_dir): 114, "2023-04-01", 2, - "efghij", + "efghij1", "entry-date", 1, np.nan, @@ -380,22 +432,22 @@ def test_dataset_parquet_package(temp_dir): [ np.nan, 114, - "2023-04-01", + "2023-05-01", 2, - "efghij", + "efghij2", "entry-date", 2, np.nan, "xyz123", np.nan, - "2023-04-01", + "2023-05-01", ], # priority [ np.nan, 114, "2023-01-01", 2, - "efghij", + "efghij3", "organisation", 1, np.nan, @@ -408,7 +460,7 @@ def test_dataset_parquet_package(temp_dir): 115, "2023-01-01", 2, - "defghi", + "defghi1", "entry-date", 2, np.nan, @@ -421,7 +473,7 @@ def test_dataset_parquet_package(temp_dir): 115, "2023-01-01", 2, - "defghi", + "defghi2", "entry-date", 2, np.nan, @@ -434,7 +486,7 @@ def test_dataset_parquet_package(temp_dir): 115, "2023-01-01", 2, - "defghi", + "defghi3", "organisation", 2, np.nan, @@ -447,7 +499,7 @@ def test_dataset_parquet_package(temp_dir): 116, "2023-01-01", 2, - "ihgfed", + "ihgfed1", "entry-date", 2, np.nan, @@ -488,8 +540,9 @@ def test_load_fact_basic(test_dataset_parquet_package, temp_dir): assert os.path.exists(output_file), "fact.parquet file does not exist" df = pd.read_parquet(output_file) + assert len(df) > 0, "No data in fact.parquet file" - assert len(df) == 12, "No. of facts is not correct" # No of unique facts + assert len(df) == 35, "No. of facts is not correct" # No of unique facts assert df.shape[1] == 9, "Not all columns saved in fact.parquet file" @@ -503,8 +556,9 @@ def test_load_fact_resource_basic(test_dataset_parquet_package, temp_dir): # Load Parquet into a DataFrame to verify data correctness df = pd.read_parquet(output_file) + assert len(df) > 0, "No data in fact-resource,parquet file" - assert len(df) == 31, "Not all data saved in fact-resource.parquet file" + assert len(df) == 35, "Not all data saved in fact-resource.parquet file" assert df.shape[1] == 7, "Not all columns saved in fact-resource.parquet file" @@ -536,6 +590,7 @@ def test_load_entities_basic(test_dataset_parquet_package, temp_dir): assert os.path.exists(output_file), "entity.parquet file does not exist" df = pd.read_parquet(output_file) + assert len(df) > 0, "No data in entity.parquet file" assert len(df) == 11, "No. of entities is not correct" assert df.shape[1] == 14, "Not all columns saved in entity.parquet file" From f19304bc5a9fcfa05bade208a6b82f82b8d0dcb7 Mon Sep 17 00:00:00 2001 From: Christopher Johns Date: Fri, 29 Nov 2024 12:12:41 +0000 Subject: [PATCH 44/54] Fix json field names. --- digital_land/package/datasetparquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/digital_land/package/datasetparquet.py b/digital_land/package/datasetparquet.py index f11910dd0..e8b9cb94e 100644 --- a/digital_land/package/datasetparquet.py +++ b/digital_land/package/datasetparquet.py @@ -218,7 +218,7 @@ def load_entities(self): ) json_statement = ", ".join( [ - f"CASE WHEN t1.{field} IS NOT NULL THEN '{field}' ELSE NULL END, t1.{field}" + f"CASE WHEN t1.{field} IS NOT NULL THEN REPLACE('{field}', '_', '-') ELSE NULL END, t1.{field}" for field in json_fields ] ) From cb8564edc2357b161829c698b45b5b7b7805371b Mon Sep 17 00:00:00 2001 From: Christopher Johns Date: Fri, 29 Nov 2024 13:49:22 +0000 Subject: [PATCH 45/54] Don't try to compute point if geometry is blank. --- digital_land/package/datasetparquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/digital_land/package/datasetparquet.py b/digital_land/package/datasetparquet.py index e8b9cb94e..d99901400 100644 --- a/digital_land/package/datasetparquet.py +++ b/digital_land/package/datasetparquet.py @@ -236,7 +236,7 @@ def load_entities(self): SELECT * EXCLUDE (point), -- Calculate centroid point if not given CASE - WHEN geometry IS NOT NULL AND (point IS NULL OR point = '') + WHEN (geometry IS NOT NULL and geometry <> '') AND (point IS NULL OR point = '') THEN ST_AsText(ST_Centroid(ST_GeomFromText(geometry))) ELSE point END AS point From 8b08b2b1d0dd21203be323e1c5a832eef7efdcf6 Mon Sep 17 00:00:00 2001 From: Christopher Johns Date: Fri, 29 Nov 2024 14:06:04 +0000 Subject: [PATCH 46/54] Reduce the computed points to 6dp --- digital_land/package/datasetparquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/digital_land/package/datasetparquet.py b/digital_land/package/datasetparquet.py index d99901400..79b5ee74f 100644 --- a/digital_land/package/datasetparquet.py +++ b/digital_land/package/datasetparquet.py @@ -237,7 +237,7 @@ def load_entities(self): * EXCLUDE (point), -- Calculate centroid point if not given CASE WHEN (geometry IS NOT NULL and geometry <> '') AND (point IS NULL OR point = '') - THEN ST_AsText(ST_Centroid(ST_GeomFromText(geometry))) + THEN ST_AsText(ST_ReducePrecision(ST_Centroid(ST_GeomFromText(geometry)),0.000001)) ELSE point END AS point FROM ( From 9db4bbb401a55b8c05221c207fcb0c4e521367c3 Mon Sep 17 00:00:00 2001 From: alexiglaser Date: Fri, 29 Nov 2024 14:47:08 +0000 Subject: [PATCH 47/54] Added new tests and edited point data --- tests/integration/test_package_datasetparquet.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_package_datasetparquet.py b/tests/integration/test_package_datasetparquet.py index 704748586..d8e95e0fe 100644 --- a/tests/integration/test_package_datasetparquet.py +++ b/tests/integration/test_package_datasetparquet.py @@ -3,6 +3,7 @@ import pandas as pd import pytest import os +import json from digital_land.package.datasetparquet import DatasetParquetPackage @@ -79,7 +80,7 @@ def test_dataset_parquet_package(temp_dir): np.nan, "zyxwvu", np.nan, - '"POINT(-0.4812, 53.788)"', # This checks that point is not recalculated if given + '"POINT(-0.481 53.788)"', # This checks that point is not recalculated if given ], [ np.nan, @@ -662,24 +663,30 @@ def test_load_pq_to_sqlite_basic(test_dataset_parquet_package, temp_dir): cnx = sqlite3.connect(output_path) df_sql = pd.read_sql_query("SELECT * FROM fact_resource", cnx) assert len(df_sql) > 0, "No data in fact_resource table" - + assert len(df_sql) == 35, "Not all data saved in fact_resource table" assert np.all( len(df_sql["end_date"] == 0) ), "Non-empty strings in end_date from fact_resource table" df_sql = pd.read_sql_query("SELECT * FROM fact", cnx) assert len(df_sql) > 0, "No data in fact table" + assert len(df_sql) == 35, "Not all data saved in fact table" assert np.all( len(df_sql["end_date"] == 0) ), "Non-empty strings in end_date from fact table" df_sql = pd.read_sql_query("SELECT * FROM entity", cnx) assert len(df_sql) > 0, "No data in entity table" + assert len(df_sql) == 11, "Not all data saved in entity table" assert np.any( len(df_sql["geometry"] == 0) ), "All geometries from entity table have values" assert np.any( len(df_sql["geometry"] == 0) ), "All geometries from entity table have non-blank values" + assert not any([ + any("_" in key for key in json.loads(row).keys()) if isinstance(row, str) else False + for row in df_sql['json'] + ]), "Some json object have underscores in their 'keys'" cnx.close() From 6bf3c2ee0a87988c69d3e07ae0d7e68f46bf420d Mon Sep 17 00:00:00 2001 From: Christopher Johns Date: Fri, 29 Nov 2024 15:08:41 +0000 Subject: [PATCH 48/54] black --- tests/integration/test_package_datasetparquet.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_package_datasetparquet.py b/tests/integration/test_package_datasetparquet.py index d8e95e0fe..0e15ae049 100644 --- a/tests/integration/test_package_datasetparquet.py +++ b/tests/integration/test_package_datasetparquet.py @@ -684,9 +684,15 @@ def test_load_pq_to_sqlite_basic(test_dataset_parquet_package, temp_dir): assert np.any( len(df_sql["geometry"] == 0) ), "All geometries from entity table have non-blank values" - assert not any([ - any("_" in key for key in json.loads(row).keys()) if isinstance(row, str) else False - for row in df_sql['json'] - ]), "Some json object have underscores in their 'keys'" + assert not any( + [ + ( + any("_" in key for key in json.loads(row).keys()) + if isinstance(row, str) + else False + ) + for row in df_sql["json"] + ] + ), "Some json object have underscores in their 'keys'" cnx.close() From 63abd9bc67d408953ad8747ac27e97352d198635 Mon Sep 17 00:00:00 2001 From: alexiglaser Date: Fri, 29 Nov 2024 15:23:08 +0000 Subject: [PATCH 49/54] Removed print statements --- digital_land/commands.py | 40 +--------------------------------------- 1 file changed, 1 insertion(+), 39 deletions(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index e924e2951..32ffc9b2e 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -492,58 +492,21 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): except Exception as e: logging.error(f"Error writing to GeoJSON file: {e}") - print("Pre temp_geojson_files") - print(len(temp_geojson_files)) if all(os.path.isfile(path) for path in temp_geojson_files): - print("a") rfc7946_geojson_path = os.path.join(flattened_dir, f"{dataset_name}.geojson") env = os.environ.copy() - print("env") - print(env) - # print("subprocess.Popen(['ls'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)") - # try: - # process = subprocess.Popen(['ls'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - # stdout, stderr = process.communicate() - # print(f"stdout: {stdout.decode()}") - # print(f"stderr: {stderr.decode()}") - # except Exception as e: - # print(f"Error: {e}") - # print("subprocess.run(['ogr2ogr', '--version']).stdout") - # env = os.environ.copy() - # try: - # result = subprocess.run(["ogr2ogr", "--version"], env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - # print(result.stdout.decode()) - # except Exception as e: - # print(f"Error: {e}") - # print("subprocess.Popen") - # try: - # process = subprocess.Popen( - # ["ogr2ogr", "--version"], - # stdout=subprocess.PIPE, - # stderr=subprocess.PIPE - # ) - # stdout, stderr = process.communicate() - # print(stdout.decode(), stderr.decode()) - # except Exception as e: - # print(f"Error: {e}") - - print("b") + out, _ = subprocess.Popen( ["ogr2ogr", "--version"], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, ).communicate() - print(out) - print(get_gdal_version()) - print(get_gdal_version() >= Version("3.5.2")) env = ( dict(os.environ, OGR_GEOJSON_MAX_OBJ_SIZE="0") if get_gdal_version() >= Version("3.5.2") else os.environ ) - print("c") for temp_path in temp_geojson_files: - print(temp_path) responseCode, _, _ = execute( [ "ogr2ogr", @@ -576,7 +539,6 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): # clear up input geojson file if os.path.isfile(temp_path): os.remove(temp_path) - print("Post temp_geojson_files") # From ab3904ae8ff4ce1131a6f1bb75ca58e1f56d4335 Mon Sep 17 00:00:00 2001 From: alexiglaser Date: Fri, 29 Nov 2024 16:11:06 +0000 Subject: [PATCH 50/54] Using row_number to split ties --- digital_land/package/datasetparquet.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/digital_land/package/datasetparquet.py b/digital_land/package/datasetparquet.py index 79b5ee74f..36acc367b 100644 --- a/digital_land/package/datasetparquet.py +++ b/digital_land/package/datasetparquet.py @@ -66,7 +66,7 @@ def create_temp_table(self, input_paths): self.conn.execute("DROP TABLE IF EXISTS temp_table") query = f""" CREATE TEMPORARY TABLE temp_table AS - SELECT * + SELECT *, ROW_NUMBER() OVER () AS row_number FROM read_csv( [{input_paths_str}], columns = {self.schema}, @@ -74,6 +74,7 @@ def create_temp_table(self, input_paths): force_not_null = {[field for field in self.schema.keys()]} ) """ + self.conn.execute(query) def load_facts(self): @@ -192,7 +193,7 @@ def load_entities(self): FROM temp_table QUALIFY ROW_NUMBER() OVER ( PARTITION BY entity, field - ORDER BY priority, "entry-date" DESC, "entry-number" DESC, resource, fact + ORDER BY priority, "entry-date" DESC, "entry-number" DESC, row_number DESC --resource, fact ) = 1 """ From 28c00c6afa48a41fdd9ec080a362c26db5f1a28b Mon Sep 17 00:00:00 2001 From: alexiglaser Date: Fri, 29 Nov 2024 16:24:39 +0000 Subject: [PATCH 51/54] Removing row_number --- digital_land/package/datasetparquet.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/digital_land/package/datasetparquet.py b/digital_land/package/datasetparquet.py index 36acc367b..6c19514c5 100644 --- a/digital_land/package/datasetparquet.py +++ b/digital_land/package/datasetparquet.py @@ -66,7 +66,7 @@ def create_temp_table(self, input_paths): self.conn.execute("DROP TABLE IF EXISTS temp_table") query = f""" CREATE TEMPORARY TABLE temp_table AS - SELECT *, ROW_NUMBER() OVER () AS row_number + SELECT * FROM read_csv( [{input_paths_str}], columns = {self.schema}, @@ -193,7 +193,7 @@ def load_entities(self): FROM temp_table QUALIFY ROW_NUMBER() OVER ( PARTITION BY entity, field - ORDER BY priority, "entry-date" DESC, "entry-number" DESC, row_number DESC --resource, fact + ORDER BY priority, "entry-date" DESC, "entry-number" DESC, resource, fact ) = 1 """ From 69ba0ade8a87e4f455b2e76f08e8be5e12a719e4 Mon Sep 17 00:00:00 2001 From: alexiglaser Date: Mon, 2 Dec 2024 10:57:41 +0000 Subject: [PATCH 52/54] Added an end date to choice of entity and field --- digital_land/package/datasetparquet.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/digital_land/package/datasetparquet.py b/digital_land/package/datasetparquet.py index 6c19514c5..fdf609a35 100644 --- a/digital_land/package/datasetparquet.py +++ b/digital_land/package/datasetparquet.py @@ -189,12 +189,16 @@ def load_entities(self): # Take original data, group by entity & field, and order by highest priority then latest record. # If there are still matches then pick the first resource (and fact, just to make sure) query = f""" - SELECT {fields_str} - FROM temp_table - QUALIFY ROW_NUMBER() OVER ( - PARTITION BY entity, field - ORDER BY priority, "entry-date" DESC, "entry-number" DESC, resource, fact - ) = 1 + SELECT {fields_str} FROM ( + SELECT {fields_str}, CASE WHEN b.end_date IS NULL THEN '2999-12-31' + FROM temp_table a + LEFT JOIN read_csv_auto('collection/resource.csv') b + ON a.resource = b.resource + QUALIFY ROW_NUMBER() OVER ( + PARTITION BY entity, field + ORDER BY a.priority, "a.entry-date" DESC, "a.entry-number" DESC, b.end_date DESC, a.resource, a.fact + ) = 1 + ) """ pivot_query = f""" From 2f9000ecbecee90a0e83a42547e6c730c093c662 Mon Sep 17 00:00:00 2001 From: Christopher Johns Date: Mon, 2 Dec 2024 11:53:47 +0000 Subject: [PATCH 53/54] Updated SQL --- digital_land/package/datasetparquet.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/digital_land/package/datasetparquet.py b/digital_land/package/datasetparquet.py index fdf609a35..4fd75153a 100644 --- a/digital_land/package/datasetparquet.py +++ b/digital_land/package/datasetparquet.py @@ -190,13 +190,13 @@ def load_entities(self): # If there are still matches then pick the first resource (and fact, just to make sure) query = f""" SELECT {fields_str} FROM ( - SELECT {fields_str}, CASE WHEN b.end_date IS NULL THEN '2999-12-31' - FROM temp_table a - LEFT JOIN read_csv_auto('collection/resource.csv') b - ON a.resource = b.resource + SELECT {fields_str}, CASE WHEN resource_csv."end-date" IS NULL THEN '2999-12-31' ELSE resource_csv."end-date" END AS resource_end_date + FROM temp_table + LEFT JOIN read_csv_auto('collection/resource.csv') resource_csv + ON temp_table.resource = resource_csv.resource QUALIFY ROW_NUMBER() OVER ( PARTITION BY entity, field - ORDER BY a.priority, "a.entry-date" DESC, "a.entry-number" DESC, b.end_date DESC, a.resource, a.fact + ORDER BY priority, "entry-date" DESC, "entry-number" DESC, resource_end_date DESC, temp_table.resource, fact ) = 1 ) """ From a079ccb1996d65b2795cf114aa9057e3d9e4f7bc Mon Sep 17 00:00:00 2001 From: alexiglaser Date: Mon, 2 Dec 2024 16:31:01 +0000 Subject: [PATCH 54/54] Added resource end_date --- digital_land/cli.py | 3 +++ digital_land/commands.py | 2 ++ digital_land/package/datasetparquet.py | 5 +++-- tests/acceptance/test_dataset_create.py | 12 ++++++++++++ tests/integration/test_package_datasetparquet.py | 9 ++++++++- 5 files changed, 28 insertions(+), 3 deletions(-) diff --git a/digital_land/cli.py b/digital_land/cli.py index a4ce695c1..095d57066 100644 --- a/digital_land/cli.py +++ b/digital_land/cli.py @@ -142,6 +142,7 @@ def convert_cmd(input_path, output_path): @dataset_resource_dir @issue_dir @click.option("--cache-dir", type=click.Path(), default="var/cache/parquet") +@click.option("--resource-path", type=click.Path(), default="collection/resource.csv") @click.argument("input-paths", nargs=-1, type=click.Path(exists=True)) @click.pass_context def dataset_create_cmd( @@ -153,6 +154,7 @@ def dataset_create_cmd( dataset_resource_dir, issue_dir, cache_dir, + resource_path, ): return dataset_create( input_paths=input_paths, @@ -165,6 +167,7 @@ def dataset_create_cmd( dataset_resource_dir=dataset_resource_dir, issue_dir=issue_dir, cache_dir=cache_dir, + resource_path=resource_path, ) diff --git a/digital_land/commands.py b/digital_land/commands.py index 32ffc9b2e..b410a15ac 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -361,6 +361,7 @@ def dataset_create( column_field_dir="var/column-field", dataset_resource_dir="var/dataset-resource", cache_dir="var/cache/parquet", + resource_path="collection/resource.csv", ): cache_dir = os.path.join(cache_dir, dataset) @@ -409,6 +410,7 @@ def dataset_create( organisation=organisation, path=output_path, cache_dir=cache_dir, + resource_path=resource_path, specification_dir=None, # TBD: package should use this specification object ) pqpackage.create_temp_table(input_paths) diff --git a/digital_land/package/datasetparquet.py b/digital_land/package/datasetparquet.py index 4fd75153a..a306da477 100644 --- a/digital_land/package/datasetparquet.py +++ b/digital_land/package/datasetparquet.py @@ -26,13 +26,14 @@ class DatasetParquetPackage(Package): - def __init__(self, dataset, organisation, cache_dir, **kwargs): + def __init__(self, dataset, organisation, cache_dir, resource_path, **kwargs): self.suffix = ".parquet" super().__init__(dataset, tables=tables, indexes=indexes, **kwargs) self.dataset = dataset self.organisation = organisation self.cache_dir = cache_dir self._spatialite = None + self.resource_path = resource_path # Persistent connection for the class. Given name to ensure that table is stored on disk (not purely in memory) os.makedirs(cache_dir, exist_ok=True) self.duckdb_file = os.path.join(cache_dir, f"{dataset}.duckdb") @@ -192,7 +193,7 @@ def load_entities(self): SELECT {fields_str} FROM ( SELECT {fields_str}, CASE WHEN resource_csv."end-date" IS NULL THEN '2999-12-31' ELSE resource_csv."end-date" END AS resource_end_date FROM temp_table - LEFT JOIN read_csv_auto('collection/resource.csv') resource_csv + LEFT JOIN read_csv_auto('{self.resource_path}') resource_csv ON temp_table.resource = resource_csv.resource QUALIFY ROW_NUMBER() OVER ( PARTITION BY entity, field diff --git a/tests/acceptance/test_dataset_create.py b/tests/acceptance/test_dataset_create.py index 8be577484..de67328af 100644 --- a/tests/acceptance/test_dataset_create.py +++ b/tests/acceptance/test_dataset_create.py @@ -68,6 +68,15 @@ def issue_dir(session_tmp_path): return issue_dir +@pytest.fixture +def resource_path(session_tmp_path): + resource_path = session_tmp_path / "resource.csv" + columns = ["resource", "end-date"] + with open(resource_path, "w") as f: + f.write(",".join(columns) + "\n") + return resource_path + + def test_acceptance_dataset_create( session_tmp_path, organisation_path, @@ -75,6 +84,7 @@ def test_acceptance_dataset_create( issue_dir, cache_path, dataset_dir, + resource_path, ): output_path = dataset_dir / f"{test_dataset}.sqlite3" @@ -99,6 +109,8 @@ def test_acceptance_dataset_create( str(issue_dir), "--cache-dir", str(cache_path), + "--resource-path", + str(resource_path), ] + input_paths, catch_exceptions=False, diff --git a/tests/integration/test_package_datasetparquet.py b/tests/integration/test_package_datasetparquet.py index 0e15ae049..03ccc95d8 100644 --- a/tests/integration/test_package_datasetparquet.py +++ b/tests/integration/test_package_datasetparquet.py @@ -518,7 +518,13 @@ def test_dataset_parquet_package(temp_dir): # Leave hash3.csv empty except for the headers (to test that an empty csv doesn't screw things up). with open(input_paths[2], "w") as f: - f.write(",".join(map(lambda x: str(x) if x is not np.nan else "", row)) + "\n") + f.write(",".join(columns) + "\n") + # f.write(",".join(map(lambda x: str(x) if x is not np.nan else "", row)) + "\n") + + resource_path = str(temp_dir / "resource.csv") + resource_columns = ["resource", "end-date"] + with open(resource_path, "w") as f: + f.write(",".join(resource_columns) + "\n") # Instantiate the DatasetParquetPackage with temp_dir input paths and a mock schema package = DatasetParquetPackage( @@ -526,6 +532,7 @@ def test_dataset_parquet_package(temp_dir): organisation=MockOrganisation(os.path.join(temp_dir, "organisation.csv")), path=os.path.join(temp_dir, "integration_test.sqlite3"), cache_dir=temp_dir, + resource_path=resource_path, specification_dir=None, ) package.create_temp_table(input_paths)