diff --git a/digital_land/cli.py b/digital_land/cli.py index a4ce695c1..095d57066 100644 --- a/digital_land/cli.py +++ b/digital_land/cli.py @@ -142,6 +142,7 @@ def convert_cmd(input_path, output_path): @dataset_resource_dir @issue_dir @click.option("--cache-dir", type=click.Path(), default="var/cache/parquet") +@click.option("--resource-path", type=click.Path(), default="collection/resource.csv") @click.argument("input-paths", nargs=-1, type=click.Path(exists=True)) @click.pass_context def dataset_create_cmd( @@ -153,6 +154,7 @@ def dataset_create_cmd( dataset_resource_dir, issue_dir, cache_dir, + resource_path, ): return dataset_create( input_paths=input_paths, @@ -165,6 +167,7 @@ def dataset_create_cmd( dataset_resource_dir=dataset_resource_dir, issue_dir=issue_dir, cache_dir=cache_dir, + resource_path=resource_path, ) diff --git a/digital_land/commands.py b/digital_land/commands.py index d6373057e..b410a15ac 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -12,6 +12,8 @@ import geojson import shapely +import subprocess + from digital_land.package.organisation import OrganisationPackage from digital_land.check import duplicate_reference_check from digital_land.specification import Specification @@ -359,7 +361,10 @@ def dataset_create( column_field_dir="var/column-field", dataset_resource_dir="var/dataset-resource", cache_dir="var/cache/parquet", + resource_path="collection/resource.csv", ): + cache_dir = os.path.join(cache_dir, dataset) + if not output_path: print("missing output path", file=sys.stderr) sys.exit(2) @@ -402,20 +407,22 @@ def dataset_create( pqpackage = DatasetParquetPackage( dataset, + organisation=organisation, path=output_path, - input_paths=input_paths, + cache_dir=cache_dir, + resource_path=resource_path, specification_dir=None, # TBD: package should use this specification object ) pqpackage.create_temp_table(input_paths) - pqpackage.load_facts(input_paths, cache_dir) - pqpackage.load_fact_resource(input_paths, cache_dir) - pqpackage.load_entities(input_paths, cache_dir, organisation_path) - pqpackage.pq_to_sqlite(output_path, cache_dir) + pqpackage.load_facts() + pqpackage.load_fact_resource() + pqpackage.load_entities() + pqpackage.pq_to_sqlite() pqpackage.close_conn() def dataset_dump(input_path, output_path): - cmd = f"sqlite3 -header -csv {input_path} 'select * from entity;' > {output_path}" + cmd = f"sqlite3 -header -csv {input_path} 'select * from entity order by entity;' > {output_path}" logging.info(cmd) os.system(cmd) @@ -427,7 +434,7 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): elif isinstance(csv_path, Path): dataset_name = csv_path.stem else: - logging.error(f"Can't extract datapackage name from {csv_path}") + logging.error(f"Can't extract datapackage name from {csv_path}") sys.exit(-1) flattened_csv_path = os.path.join(flattened_dir, f"{dataset_name}.csv") @@ -474,6 +481,7 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): batch_size = 100000 temp_geojson_files = [] geography_entities = [e for e in entities if e["typology"] == "geography"] + for i in range(0, len(geography_entities), batch_size): batch = geography_entities[i : i + batch_size] feature_collection = process_data_in_batches(batch, flattened_dir, dataset_name) @@ -488,6 +496,13 @@ def dataset_dump_flattened(csv_path, flattened_dir, specification, dataset): if all(os.path.isfile(path) for path in temp_geojson_files): rfc7946_geojson_path = os.path.join(flattened_dir, f"{dataset_name}.geojson") + env = os.environ.copy() + + out, _ = subprocess.Popen( + ["ogr2ogr", "--version"], + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + ).communicate() env = ( dict(os.environ, OGR_GEOJSON_MAX_OBJ_SIZE="0") if get_gdal_version() >= Version("3.5.2") @@ -892,9 +907,10 @@ def process_data_in_batches(entities, flattened_dir, dataset_name): logging.error(f"Error loading wkt from entity {entity['entity']}") logging.error(e) else: - logging.error( - f"No geometry or point data for entity {entity['entity']} with typology 'geography'" - ) + pass + # logging.error( + # f"No geometry or point data for entity {entity['entity']} with typology 'geography'" + # ) if features: feature_collection = geojson.FeatureCollection( diff --git a/digital_land/package/datasetparquet.py b/digital_land/package/datasetparquet.py index 36f5b161f..a306da477 100644 --- a/digital_land/package/datasetparquet.py +++ b/digital_land/package/datasetparquet.py @@ -1,6 +1,5 @@ import os import logging -from pathlib import Path import duckdb from .package import Package @@ -27,39 +26,35 @@ class DatasetParquetPackage(Package): - def __init__(self, dataset, input_paths, **kwargs): + def __init__(self, dataset, organisation, cache_dir, resource_path, **kwargs): self.suffix = ".parquet" super().__init__(dataset, tables=tables, indexes=indexes, **kwargs) self.dataset = dataset - self.input_paths = input_paths + self.organisation = organisation + self.cache_dir = cache_dir self._spatialite = None + self.resource_path = resource_path # Persistent connection for the class. Given name to ensure that table is stored on disk (not purely in memory) - self.duckdb_file = "input_paths_database.duckdb" + os.makedirs(cache_dir, exist_ok=True) + self.duckdb_file = os.path.join(cache_dir, f"{dataset}.duckdb") self.conn = duckdb.connect(self.duckdb_file) - self.schema = self.get_schema(input_paths) - - def get_schema(self, input_paths): - # There are issues with the schema when reading in lots of files, namely smaller files have few or zero rows - # Plan is to find the largest file, create an initial database schema from that then use that in future - largest_file = max(input_paths, key=os.path.getsize) - - create_temp_table_query = f""" - DROP TABLE IF EXISTS schema_table; - CREATE TEMP TABLE schema_table AS - SELECT * FROM read_csv_auto('{largest_file}') - LIMIT 1000; - """ - self.conn.query(create_temp_table_query) + self.schema = self.get_schema() + self.typology = self.specification.schema[dataset]["typology"] - # Extract the schema from the temporary table - schema_query = """ - SELECT column_name, data_type - FROM information_schema.columns - WHERE table_name = 'schema_table'; - """ - schema_df = self.conn.query(schema_query).df() + def get_schema(self): + schema = {} + + for field in sorted( + list( + set(self.specification.schema["fact"]["fields"]).union( + set(self.specification.schema["fact-resource"]["fields"]) + ) + ) + ): + datatype = self.specification.field[field]["datatype"] + schema[field] = "BIGINT" if datatype == "integer" else "VARCHAR" - return dict(zip(schema_df["column_name"], schema_df["data_type"])) + return schema def create_temp_table(self, input_paths): # Create a temp table of the data from input_paths as we need the information stored there at various times @@ -73,15 +68,18 @@ def create_temp_table(self, input_paths): query = f""" CREATE TEMPORARY TABLE temp_table AS SELECT * - FROM read_csv_auto( + FROM read_csv( [{input_paths_str}], - columns = {self.schema} + columns = {self.schema}, + header = true, + force_not_null = {[field for field in self.schema.keys()]} ) """ + self.conn.execute(query) - def load_facts(self, input_paths, output_path): - logging.info(f"loading facts from {os.path.dirname(input_paths[0])}") + def load_facts(self): + logging.info("loading facts from temp table") fact_fields = self.specification.schema["fact"]["fields"] fields_str = ", ".join( @@ -102,12 +100,12 @@ def load_facts(self, input_paths, output_path): f""" COPY ( {query} - ) TO '{output_path}/fact{self.suffix}' (FORMAT PARQUET); + ) TO '{self.cache_dir}/fact{self.suffix}' (FORMAT PARQUET); """ ) - def load_fact_resource(self, input_paths, output_path): - logging.info(f"loading fact resources from {os.path.dirname(input_paths[0])}") + def load_fact_resource(self): + logging.info("loading fact resources from temp table") fact_resource_fields = self.specification.schema["fact-resource"]["fields"] fields_str = ", ".join( @@ -124,19 +122,19 @@ def load_fact_resource(self, input_paths, output_path): f""" COPY ( {query} - ) TO '{output_path}/fact_resource{self.suffix}' (FORMAT PARQUET); + ) TO '{self.cache_dir}/fact_resource{self.suffix}' (FORMAT PARQUET); """ ) - def load_entities( - self, input_paths, output_path, organisation_path="./var/cache/organisation.csv" - ): - logging.info(f"loading entities from {os.path.dirname(input_paths[0])}") + def load_entities(self): + organisation_path = self.organisation.organisation_path + + logging.info("loading entities from temp table") entity_fields = self.specification.schema["entity"]["fields"] # Do this to match with later field names. entity_fields = [e.replace("-", "_") for e in entity_fields] - input_paths_str = f"{output_path}/fact{self.suffix}" + input_paths_str = f"{self.cache_dir}/fact{self.suffix}" query = f""" SELECT DISTINCT REPLACE(field,'-','_') @@ -149,7 +147,12 @@ def load_entities( # json fields - list of fields which are present in the fact table which # do not exist separately in the entity table - json_fields = [field for field in distinct_fields if field not in entity_fields] + # Need to ensure that 'organisation' is not included either + json_fields = [ + field + for field in distinct_fields + if field not in entity_fields + ["organisation"] + ] # null fields - list of fields which are not present in the fact tables which have # to be in the entity table as a column @@ -187,12 +190,16 @@ def load_entities( # Take original data, group by entity & field, and order by highest priority then latest record. # If there are still matches then pick the first resource (and fact, just to make sure) query = f""" - SELECT {fields_str} - FROM temp_table - QUALIFY ROW_NUMBER() OVER ( - PARTITION BY entity, field - ORDER BY priority, "entry-date" DESC, "entry-number" DESC, resource, fact - ) = 1 + SELECT {fields_str} FROM ( + SELECT {fields_str}, CASE WHEN resource_csv."end-date" IS NULL THEN '2999-12-31' ELSE resource_csv."end-date" END AS resource_end_date + FROM temp_table + LEFT JOIN read_csv_auto('{self.resource_path}') resource_csv + ON temp_table.resource = resource_csv.resource + QUALIFY ROW_NUMBER() OVER ( + PARTITION BY entity, field + ORDER BY priority, "entry-date" DESC, "entry-number" DESC, resource_end_date DESC, temp_table.resource, fact + ) = 1 + ) """ pivot_query = f""" @@ -207,12 +214,17 @@ def load_entities( # include columns in the json statement # Collate list of fields which don't exist but need to be in the final table select_statement = ", ".join([f"t1.{field}" for field in select_fields]) + # Don't want to include anything that ends with "_geom" null_fields_statement = ", ".join( - [f'NULL::VARCHAR AS "{field}"' for field in null_fields] + [ + f"''::VARCHAR AS \"{field}\"" + for field in null_fields + if not field.endswith("_geom") + ] ) json_statement = ", ".join( [ - f"CASE WHEN t1.{field} IS NOT NULL THEN '{field}' ELSE NULL END, t1.{field}" + f"CASE WHEN t1.{field} IS NOT NULL THEN REPLACE('{field}', '_', '-') ELSE NULL END, t1.{field}" for field in json_fields ] ) @@ -223,21 +235,20 @@ def load_entities( SELECT * FROM read_csv_auto('{org_csv}') """ - dataset = Path(output_path).name sql = f""" INSTALL spatial; LOAD spatial; COPY( WITH computed_centroid AS ( SELECT - * EXCLUDE (point), -- Calculate centroid point + * EXCLUDE (point), -- Calculate centroid point if not given CASE - WHEN geometry IS NOT NULL AND point IS NULL - THEN ST_AsText(ST_Centroid(ST_GeomFromText(geometry))) + WHEN (geometry IS NOT NULL and geometry <> '') AND (point IS NULL OR point = '') + THEN ST_AsText(ST_ReducePrecision(ST_Centroid(ST_GeomFromText(geometry)),0.000001)) ELSE point END AS point FROM ( - SELECT '{dataset}' as dataset, - '{dataset}' as typology, + SELECT '{self.dataset}' as dataset, + '{self.typology}' as typology, t2.entity as organisation_entity, {select_statement}, {null_fields_statement}, @@ -247,21 +258,27 @@ def load_entities( on t1.organisation = t2.organisation ) ) - SELECT * FROM computed_centroid - ) TO '{output_path}/entity{self.suffix}' (FORMAT PARQUET); + SELECT + * EXCLUDE (json), + CASE WHEN json = '{{}}' THEN NULL ELSE json END AS json + FROM computed_centroid + ) TO '{self.cache_dir}/entity{self.suffix}' (FORMAT PARQUET); """ self.conn.execute(sql) - def pq_to_sqlite(self, output_path, cache_dir): + def pq_to_sqlite(self): # At present we are saving the parquet files in 'cache' but saving the sqlite files produced in 'dataset' # In future when parquet files are saved to 'dataset' remove the 'cache_dir' in the function arguments and # replace 'cache_dir' with 'output_path' in this function's code - logging.info(f"loading sqlite3 tables from parquet files in {cache_dir}") + logging.info( + f"loading sqlite3 tables in {self.path} from parquet files in {self.cache_dir}" + ) query = "INSTALL sqlite; LOAD sqlite;" self.conn.execute(query) - parquet_files = [fn for fn in os.listdir(cache_dir) if fn.endswith(self.suffix)] - sqlite_file_path = output_path + parquet_files = [ + fn for fn in os.listdir(self.cache_dir) if fn.endswith(self.suffix) + ] for parquet_file in parquet_files: table_name = os.path.splitext(os.path.basename(parquet_file))[0] @@ -271,26 +288,35 @@ def pq_to_sqlite(self, output_path, cache_dir): self.conn.execute( f""" CREATE TABLE temp_table AS - SELECT * FROM parquet_scan('{cache_dir}/{parquet_file}'); + SELECT * FROM parquet_scan('{self.cache_dir}/{parquet_file}'); """ ) # Export the DuckDB table to the SQLite database self.conn.execute( - f"ATTACH DATABASE '{sqlite_file_path}' AS sqlite_db (TYPE SQLITE);" + f"ATTACH DATABASE '{self.path}' AS sqlite_db (TYPE SQLITE);" ) - self.conn.execute(f"DROP TABLE IF EXISTS sqlite_db.{table_name};") + + # Fix the column names + for column in self.conn.execute("DESCRIBE TABLE temp_table;").fetchall(): + if "-" in column[0]: + self.conn.execute( + f"ALTER TABLE temp_table RENAME COLUMN '{column[0]}' TO '{column[0].replace('-','_')}';" + ) + + # Copy the data self.conn.execute( - f"CREATE TABLE sqlite_db.{table_name} AS SELECT * FROM temp_table;" + f"INSERT INTO sqlite_db.{table_name} BY NAME (SELECT * FROM temp_table);" ) + self.conn.execute("DETACH DATABASE sqlite_db;") def close_conn(self): logging.info("Close connection to duckdb database in session") if self.conn is not None: + self.conn.close() if os.path.exists(self.duckdb_file): os.remove(self.duckdb_file) - self.conn.close() def load(self): pass diff --git a/tests/acceptance/test_dataset_create.py b/tests/acceptance/test_dataset_create.py index 687828a90..de67328af 100644 --- a/tests/acceptance/test_dataset_create.py +++ b/tests/acceptance/test_dataset_create.py @@ -68,6 +68,15 @@ def issue_dir(session_tmp_path): return issue_dir +@pytest.fixture +def resource_path(session_tmp_path): + resource_path = session_tmp_path / "resource.csv" + columns = ["resource", "end-date"] + with open(resource_path, "w") as f: + f.write(",".join(columns) + "\n") + return resource_path + + def test_acceptance_dataset_create( session_tmp_path, organisation_path, @@ -75,6 +84,7 @@ def test_acceptance_dataset_create( issue_dir, cache_path, dataset_dir, + resource_path, ): output_path = dataset_dir / f"{test_dataset}.sqlite3" @@ -99,6 +109,8 @@ def test_acceptance_dataset_create( str(issue_dir), "--cache-dir", str(cache_path), + "--resource-path", + str(resource_path), ] + input_paths, catch_exceptions=False, @@ -114,7 +126,8 @@ def test_acceptance_dataset_create( print(result.exception) assert result.exit_code == 0, "error returned when building dataset" - pq_files = [file for file in os.listdir(cache_path) if file.endswith(".parquet")] + pq_cache = os.path.join(cache_path, test_dataset) + pq_files = [file for file in os.listdir(pq_cache) if file.endswith(".parquet")] assert len(pq_files) == 3, "Not all parquet files created" assert np.all( np.sort(pq_files) == ["entity.parquet", "fact.parquet", "fact_resource.parquet"] @@ -136,7 +149,7 @@ def test_acceptance_dataset_create( ), f"Missing following tables in sqlite database: {missing_tables}" for table in list(expected_tables): - pq_rows = len(pd.read_parquet(f"{cache_path}/{table}.parquet")) + pq_rows = len(pd.read_parquet(f"{pq_cache}/{table}.parquet")) sql_rows = cursor.execute(f"SELECT COUNT(*) FROM {table};").fetchone()[0] assert ( pq_rows == sql_rows diff --git a/tests/integration/test_package_datasetparquet.py b/tests/integration/test_package_datasetparquet.py index 331bce18d..03ccc95d8 100644 --- a/tests/integration/test_package_datasetparquet.py +++ b/tests/integration/test_package_datasetparquet.py @@ -3,9 +3,15 @@ import pandas as pd import pytest import os +import json from digital_land.package.datasetparquet import DatasetParquetPackage +class MockOrganisation(object): + def __init__(self, organisation_path): + self.organisation_path = organisation_path + + # Fixture to create a shared temporary directory @pytest.fixture(scope="session") def temp_dir(tmpdir_factory): @@ -42,7 +48,7 @@ def test_dataset_parquet_package(temp_dir): 11, "2023-01-01", 2, - "abcdef", + "abcdef1", "entry-date", 2, np.nan, @@ -55,7 +61,7 @@ def test_dataset_parquet_package(temp_dir): 11, "2023-01-01", 2, - "abcdef", + "abcdef2", "geometry", 2, np.nan, @@ -68,7 +74,33 @@ def test_dataset_parquet_package(temp_dir): 11, "2023-01-01", 2, - "abcdef", + "abcdef2p", + "point", + 2, + np.nan, + "zyxwvu", + np.nan, + '"POINT(-0.481 53.788)"', # This checks that point is not recalculated if given + ], + [ + np.nan, + 11, + "2023-01-01", + 2, + "abcdef3", + "document-url", + 2, + np.nan, + "zyxwvu", + np.nan, + "https://www.test.xyz", + ], + [ + np.nan, + 11, + "2023-01-01", + 2, + "abcdef4", "organisation", 2, np.nan, @@ -81,7 +113,7 @@ def test_dataset_parquet_package(temp_dir): 12, "2023-02-01", 2, - "abc123", + "abc1231", "entry-date", 2, np.nan, @@ -94,7 +126,7 @@ def test_dataset_parquet_package(temp_dir): 12, "2023-02-01", 2, - "abc123", + "abc1232", "geometry", 2, np.nan, @@ -107,7 +139,7 @@ def test_dataset_parquet_package(temp_dir): 12, "2023-01-01", 2, - "abc123", + "abc1233", "organisation", 2, np.nan, @@ -120,7 +152,7 @@ def test_dataset_parquet_package(temp_dir): 13, "2023-01-01", 2, - "def456", + "def4561", "entry-date", 2, np.nan, @@ -133,7 +165,7 @@ def test_dataset_parquet_package(temp_dir): 13, "2023-01-01", 2, - "def456", + "def4562", "geometry", 2, np.nan, @@ -146,7 +178,7 @@ def test_dataset_parquet_package(temp_dir): 13, "2023-01-01", 2, - "def456", + "def4563", "organisation", 2, np.nan, @@ -159,7 +191,7 @@ def test_dataset_parquet_package(temp_dir): 14, "2023-01-01", 2, - "a1b2c3", + "a1b2c31", "entry-date", 2, np.nan, @@ -172,7 +204,7 @@ def test_dataset_parquet_package(temp_dir): 14, "2023-01-01", 2, - "a1b2c3", + "a1b2c32", "geometry", 2, np.nan, @@ -185,7 +217,33 @@ def test_dataset_parquet_package(temp_dir): 14, "2023-01-01", 2, - "a1b2c3", + "a1b2c33", + "document-url", + 2, + np.nan, + "zyxwvu", + np.nan, + "https://www.testing.yyz", + ], + [ + np.nan, + 14, + "2023-01-01", + 2, + "a1b2c34", + "notes-checking", + 2, + np.nan, + "zyxwvu", + np.nan, + "Something random", + ], + [ + np.nan, + 14, + "2023-01-01", + 2, + "a1b2c35", "organisation", 2, np.nan, @@ -197,9 +255,9 @@ def test_dataset_parquet_package(temp_dir): with open(input_paths[0], "w") as f: f.write(",".join(columns) + "\n") for row in data: - f.write(",".join(map(str, row)) + "\n") - - # df = pd.read_csv(input_paths[0]) + f.write( + ",".join(map(lambda x: str(x) if x is not np.nan else "", row)) + "\n" + ) # Test data for the tables. This has plenty of 'duplicates' to check data = [ @@ -208,7 +266,7 @@ def test_dataset_parquet_package(temp_dir): 110, "2023-01-01", 2, - "badcfe", + "badcfe1", "entry-date", 2, np.nan, @@ -221,7 +279,7 @@ def test_dataset_parquet_package(temp_dir): 110, "2023-01-01", 2, - "badcfe", + "badcfe2", "entry-date", 2, np.nan, @@ -234,7 +292,7 @@ def test_dataset_parquet_package(temp_dir): 110, "2023-01-01", 2, - "badcfe", + "badcfe3", "organisation", 2, np.nan, @@ -247,7 +305,7 @@ def test_dataset_parquet_package(temp_dir): 111, "2023-01-01", 2, - "fedcba", + "fedcba1", "entry-date", 2, np.nan, @@ -260,7 +318,7 @@ def test_dataset_parquet_package(temp_dir): 111, "2023-02-01", 2, - "fedcba", + "fedcba2", "entry-date", 2, np.nan, @@ -273,7 +331,7 @@ def test_dataset_parquet_package(temp_dir): 111, "2023-02-01", 2, - "fedcba", + "fedcba3", "organisation", 2, np.nan, @@ -286,7 +344,7 @@ def test_dataset_parquet_package(temp_dir): 112, "2023-02-01", 2, - "bcdefg", + "bcdefg1", "entry-date", 2, np.nan, @@ -299,7 +357,7 @@ def test_dataset_parquet_package(temp_dir): 112, "2023-02-01", 12, - "bcdefg", + "bcdefg2", "entry-date", 2, np.nan, @@ -312,7 +370,7 @@ def test_dataset_parquet_package(temp_dir): 112, "2023-01-01", 12, - "bcdefg", + "bcdefg3", "organisation", 2, np.nan, @@ -325,7 +383,7 @@ def test_dataset_parquet_package(temp_dir): 113, "2023-01-01", 2, - "cdefgh", + "cdefgh1", "entry-date", 2, np.nan, @@ -338,7 +396,7 @@ def test_dataset_parquet_package(temp_dir): 113, "2023-01-01", 2, - "hgfedc", + "hgfedc1", "entry-date", 2, np.nan, @@ -351,7 +409,7 @@ def test_dataset_parquet_package(temp_dir): 113, "2023-01-01", 2, - "cdefgh", + "cdefgh2", "organisation", 2, np.nan, @@ -364,7 +422,7 @@ def test_dataset_parquet_package(temp_dir): 114, "2023-04-01", 2, - "efghij", + "efghij1", "entry-date", 1, np.nan, @@ -375,22 +433,22 @@ def test_dataset_parquet_package(temp_dir): [ np.nan, 114, - "2023-04-01", + "2023-05-01", 2, - "efghij", + "efghij2", "entry-date", 2, np.nan, "xyz123", np.nan, - "2023-04-01", + "2023-05-01", ], # priority [ np.nan, 114, "2023-01-01", 2, - "efghij", + "efghij3", "organisation", 1, np.nan, @@ -403,7 +461,7 @@ def test_dataset_parquet_package(temp_dir): 115, "2023-01-01", 2, - "defghi", + "defghi1", "entry-date", 2, np.nan, @@ -416,7 +474,7 @@ def test_dataset_parquet_package(temp_dir): 115, "2023-01-01", 2, - "defghi", + "defghi2", "entry-date", 2, np.nan, @@ -429,7 +487,7 @@ def test_dataset_parquet_package(temp_dir): 115, "2023-01-01", 2, - "defghi", + "defghi3", "organisation", 2, np.nan, @@ -442,7 +500,7 @@ def test_dataset_parquet_package(temp_dir): 116, "2023-01-01", 2, - "ihgfed", + "ihgfed1", "entry-date", 2, np.nan, @@ -454,15 +512,28 @@ def test_dataset_parquet_package(temp_dir): with open(input_paths[1], "w") as f: f.write(",".join(columns) + "\n") for row in data: - f.write(",".join(map(str, row)) + "\n") + f.write( + ",".join(map(lambda x: str(x) if x is not np.nan else "", row)) + "\n" + ) # Leave hash3.csv empty except for the headers (to test that an empty csv doesn't screw things up). with open(input_paths[2], "w") as f: - f.write(",".join(columns) + "\n") # Only write the header row + f.write(",".join(columns) + "\n") + # f.write(",".join(map(lambda x: str(x) if x is not np.nan else "", row)) + "\n") + + resource_path = str(temp_dir / "resource.csv") + resource_columns = ["resource", "end-date"] + with open(resource_path, "w") as f: + f.write(",".join(resource_columns) + "\n") # Instantiate the DatasetParquetPackage with temp_dir input paths and a mock schema package = DatasetParquetPackage( - dataset="test_dataset", input_paths=input_paths, specification_dir=None + dataset="conservation-area", + organisation=MockOrganisation(os.path.join(temp_dir, "organisation.csv")), + path=os.path.join(temp_dir, "integration_test.sqlite3"), + cache_dir=temp_dir, + resource_path=resource_path, + specification_dir=None, ) package.create_temp_table(input_paths) @@ -471,24 +542,21 @@ def test_dataset_parquet_package(temp_dir): def test_load_fact_basic(test_dataset_parquet_package, temp_dir): output_dir = temp_dir - test_dataset_parquet_package.load_facts( - test_dataset_parquet_package.input_paths, output_dir - ) + test_dataset_parquet_package.load_facts() output_file = output_dir / "fact.parquet" assert os.path.exists(output_file), "fact.parquet file does not exist" df = pd.read_parquet(output_file) + assert len(df) > 0, "No data in fact.parquet file" - assert len(df) == 12, "No. of facts is not correct" # No of unique facts + assert len(df) == 35, "No. of facts is not correct" # No of unique facts assert df.shape[1] == 9, "Not all columns saved in fact.parquet file" def test_load_fact_resource_basic(test_dataset_parquet_package, temp_dir): output_dir = temp_dir - test_dataset_parquet_package.load_fact_resource( - test_dataset_parquet_package.input_paths, output_dir - ) + test_dataset_parquet_package.load_fact_resource() # Check if the output parquet file exists and verify contents output_file = output_dir / "fact_resource.parquet" @@ -496,8 +564,10 @@ def test_load_fact_resource_basic(test_dataset_parquet_package, temp_dir): # Load Parquet into a DataFrame to verify data correctness df = pd.read_parquet(output_file) + assert len(df) > 0, "No data in fact-resource,parquet file" - assert len(df) == 31, "Not all data saved in fact-resource.parquet file" + assert len(df) == 35, "Not all data saved in fact-resource.parquet file" + assert df.shape[1] == 7, "Not all columns saved in fact-resource.parquet file" @@ -522,47 +592,114 @@ def test_load_entities_basic(test_dataset_parquet_package, temp_dir): for row in data: f.write(",".join(map(str, row)) + "\n") - test_dataset_parquet_package.load_entities( - test_dataset_parquet_package.input_paths, - output_dir, - f"{temp_dir}/organisation.csv", - ) + test_dataset_parquet_package.load_entities() output_file = os.path.join(output_dir, "entity.parquet") assert os.path.exists(output_file), "entity.parquet file does not exist" df = pd.read_parquet(output_file) + assert len(df) > 0, "No data in entity.parquet file" assert len(df) == 11, "No. of entities is not correct" - assert df.shape[1] == 16, "Not all columns saved in entity.parquet file" - assert df["end_date"].isnull().all() # Check null handling - assert df["geojson"].isnull().all() # Check null handling - assert df["geometry_geom"].isnull().all() # Check null handling - assert df["point_geom"].isnull().all() # Check null handling + assert df.shape[1] == 14, "Not all columns saved in entity.parquet file" + assert df["end_date"].isin([""]).all() # Check null handling + assert df["geojson"].isin([""]).all() # Check null handling def test_load_pq_to_sqlite_basic(test_dataset_parquet_package, temp_dir): output_path = os.path.join(temp_dir, "integration_test.sqlite3") - test_dataset_parquet_package.pq_to_sqlite(output_path, temp_dir) + conn = sqlite3.connect(output_path) + conn.execute( + """ + CREATE TABLE entity( + dataset TEXT, + end_date TEXT, + entity INTEGER PRIMARY KEY, + entry_date TEXT, + geojson JSON, + geometry TEXT, + json JSON, + name TEXT, + organisation_entity TEXT, + point TEXT, + prefix TEXT, + reference TEXT, + start_date TEXT, + typology TEXT + ); + """ + ) + conn.execute( + """ + CREATE TABLE fact( + end_date TEXT, + entity INTEGER, + fact TEXT PRIMARY KEY, + field TEXT, + entry_date TEXT, + priority INTEGER, + reference_entity TEXT, + start_date TEXT, + value TEXT, + FOREIGN KEY(entity) REFERENCES entity(entity) + ); + """ + ) + conn.execute( + """ + CREATE TABLE fact_resource( + end_date TEXT, + fact TEXT, + entry_date TEXT, + entry_number INTEGER, + priority INTEGER, + resource TEXT, + start_date TEXT, + FOREIGN KEY(fact) REFERENCES fact(fact) + ); + """ + ) + + conn.commit() + conn.close() + + test_dataset_parquet_package.pq_to_sqlite() assert os.path.exists(output_path), "sqlite3 file does not exist" cnx = sqlite3.connect(output_path) df_sql = pd.read_sql_query("SELECT * FROM fact_resource", cnx) assert len(df_sql) > 0, "No data in fact_resource table" + assert len(df_sql) == 35, "Not all data saved in fact_resource table" assert np.all( - pd.isnull(df_sql["end-date"]) + len(df_sql["end_date"] == 0) ), "Non-empty strings in end_date from fact_resource table" df_sql = pd.read_sql_query("SELECT * FROM fact", cnx) assert len(df_sql) > 0, "No data in fact table" + assert len(df_sql) == 35, "Not all data saved in fact table" assert np.all( - pd.isnull(df_sql["end-date"]) + len(df_sql["end_date"] == 0) ), "Non-empty strings in end_date from fact table" df_sql = pd.read_sql_query("SELECT * FROM entity", cnx) assert len(df_sql) > 0, "No data in entity table" + assert len(df_sql) == 11, "Not all data saved in entity table" assert np.any( - pd.isnull(df_sql["geometry"]) + len(df_sql["geometry"] == 0) ), "All geometries from entity table have values" + assert np.any( + len(df_sql["geometry"] == 0) + ), "All geometries from entity table have non-blank values" + assert not any( + [ + ( + any("_" in key for key in json.loads(row).keys()) + if isinstance(row, str) + else False + ) + for row in df_sql["json"] + ] + ), "Some json object have underscores in their 'keys'" + cnx.close()