diff --git a/digital_land/cli.py b/digital_land/cli.py index 8bf5b84b6..172783189 100644 --- a/digital_land/cli.py +++ b/digital_land/cli.py @@ -17,6 +17,7 @@ pipeline_run, collection_add_source, expectations, + debug_pipeline, ) from digital_land.command_arguments import ( @@ -26,6 +27,7 @@ issue_dir, dataset_resource_dir, column_field_dir, + endpoint_path, ) @@ -224,3 +226,45 @@ def collection_add_source_cmd(ctx, collection, endpoint_url, collection_dir): @click.option("--data-quality-yaml", help="path to expectations yaml", required=True) def call_expectations(results_path, sqlite_dataset_path, data_quality_yaml): return expectations(results_path, sqlite_dataset_path, data_quality_yaml) + + +@cli.command( + "debug-contribution", + short_help="runs an org through the entire process including collector,pipeline and dataset", +) +@click.option( + "--organisation", + help="string representing the organisation, must match organisation name in source.csv", + required=True, +) +@endpoint_path +@issue_dir +@column_field_dir +@dataset_resource_dir +@collection_dir +@click.pass_context +def run_debug_pipeline( + ctx, + organisation, + endpoint_path, + collection_dir, + issue_dir, + column_field_dir, + dataset_resource_dir, +): + pipeline = ctx.obj["PIPELINE"] + dataset = ctx.obj["DATASET"] + specification = ctx.obj["SPECIFICATION"] + + debug_pipeline( + organisation, + dataset, + pipeline, + endpoint_path, + collection_dir, + specification, + issue_dir, + column_field_dir, + dataset_resource_dir, + ) + return diff --git a/digital_land/collect.py b/digital_land/collect.py index a3e03f644..34c696806 100755 --- a/digital_land/collect.py +++ b/digital_land/collect.py @@ -34,6 +34,7 @@ class Collector: log_dir = "collection/log/" def __init__(self, dataset="", collection_dir=None): + # TODO do we need to defin this here? self.dataset = dataset if collection_dir: self.resource_dir = collection_dir / "resource/" diff --git a/digital_land/collection.py b/digital_land/collection.py index 1f40d639a..155ca5353 100644 --- a/digital_land/collection.py +++ b/digital_land/collection.py @@ -66,7 +66,16 @@ def save_item(self, item, path): def check_item_path(self, item, path): m = re.match(r"^.*\/([-\d]+)\/(\w+).json", path) - (date, endpoint) = m.groups() + + # This regex method is not compatible with running Tests on Windows + # Added an alternate method that is cross-platform + if m is None: + file_path = Path(path) + file_path_parts = file_path.parts + date = file_path_parts[len(file_path_parts) - 2] + endpoint = file_path.stem + else: + (date, endpoint) = m.groups() if not item.get("entry-date", "").startswith(date): logging.warning( diff --git a/digital_land/commands.py b/digital_land/commands.py index 658e9332d..d32150dfc 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -9,6 +9,7 @@ import geojson import shapely +import urllib.request from digital_land.collect import Collector from digital_land.collection import Collection, resource_path @@ -397,3 +398,154 @@ def resource_from_path(path): def default_output_path(command, input_path): directory = "" if command in ["harmonised", "transformed"] else "var/" return f"{directory}{command}/{resource_from_path(input_path)}.csv" + + +def debug_pipeline( + org, + dataset, + pipeline, + endpoint_path, + collection_dir, + specification, + issue_dir, + column_field_dir, + dataset_resource_dir, +): + print(f"running process for organisation:{org} and pipeline: {pipeline.name}") + # identify relevant sources and endpoints + # read in relevant end points + # organisation info is in the source csv, therefore to get the right enpoints we need to identify all of the relevant ones from source .csi + endpoint_hashes = [] + for row in csv.DictReader( + open(os.path.join(collection_dir, "source.csv"), newline="") + ): + if row["organisation"] == org and row["pipelines"] == pipeline.name: + endpoint_hash = row["endpoint"] + endpoint_hashes.append(endpoint_hash) + + # check length of hashes to see if there are relevant endpoints + if len(endpoint_hashes) < 0: + print("no enpoints need collecting") + else: + text = ",".join(endpoint_hashes) + print(f"endpoints found {text}") + + # create a collector + collector = Collector(pipeline.name, collection_dir=None) + # download endpoints + for row in csv.DictReader(open(endpoint_path, newline="")): + endpoint = row["endpoint"] + if endpoint in endpoint_hashes: + print(f"downloading {endpoint}") + endpoint = row["endpoint"] + url = row["endpoint-url"] + plugin = row.get("plugin", "") + + # skip manually added files .. + if not url: + continue + + collector.fetch( + url, + endpoint=endpoint, + end_date=row.get("end-date", ""), + plugin=plugin, + ) + + # collection step, this will need to be a bit different + # remove previously created log.csv and resouorce.csv files + try: + os.remove(Path(collection_dir) / "log.csv") + os.remove(Path(collection_dir) / "resource.csv") + # seems unclear why we accept an error here, what happens if first line errors and second doesn't + except OSError: + pass + + collection = Collection(name=None, directory=collection_dir) + collection.load() + # no direct way to filter which logs/resources are save to the csv only built to do all at once + # we manually filter the entries for logs/resources using the enpoint hashes made above to achieve this + for endpoint in endpoint_hashes: + log_entries = [ + entry for entry in collection.log.entries if entry["endpoint"] == endpoint + ] + resource_entries = [ + entry + for entry in collection.resource.entries + if entry["endpoints"] == endpoint + ] + + collection.log.entries = log_entries + collection.resource.entries = resource_entries + collection.save_csv() + + # print resource history this could be useful + print("Resource History") + print("start_date,end_date,resource") + for resource in resource_entries: + print(f"{resource['start-date']},{resource['end-date']},{resource['resource']}") + + # download previously collected resources + collection = specification.dataset[dataset]["collection"] + for resource in resource_entries: + if not os.path.exists(f"collection/resource/{resource['resource']}"): + if not os.path.exists(): + os.makedirs("collection/resource/") + files_url = "https://files.planning.data.gov.uk" + urllib.request.urlretrieve( + os.path.join( + files_url, + collection, + "collection", + "resource", + resource["resource"], + ), + os.path.join("collection/resource", resource), + ) + + # download from files url + + # pipeline step for loop for each of the files + # define additional files + # for resource in resource_entries: + # input_path = f"{collection_dir}/resource/{resource}" + # output_path = f"transformed/{dataset}/{resource}.csv" + # update below with correct values + # pipeline_run( + # dataset, + # pipeline, + # specification, + # input_path, + # output_path, + # issue_dir=issue_dir, + # column_field_dir=column_field_dir, + # dataset_resource_dir=dataset_resource_dir, + # organisation_path=organisation_path, + # save_harmonised=save_harmonised, + # endpoints=endpoints, + # organisations=organisations, + # entry_date=entry_date, + # custom_temp_dir=custom_temp_dir, + # ) + + # # once files are loaded create the dataset + # def pipeline_run( + # dataset, + # pipeline, + # specification, + # input_path, + # output_path, + # collection_dir="./collection", # TBD: remove, replaced by endpoints, organisations and entry_date + # null_path=None, # TBD: remove this + # issue_dir=None, + # organisation_path=None, + # save_harmonised=False, + # column_field_dir=None, + # dataset_resource_dir=None, + # custom_temp_dir=None, # TBD: rename to "tmpdir" + # endpoints=[], + # organisations=[], + # entry_date="", + # ) + # run expectations? this made need to be made so only certain ones are ran as they may be specific to certain datasets + # end diff --git a/tests/e2e/debug_pipeline/funcs/test_cli_debug_funcs.py b/tests/e2e/debug_pipeline/funcs/test_cli_debug_funcs.py new file mode 100644 index 000000000..bef34f918 --- /dev/null +++ b/tests/e2e/debug_pipeline/funcs/test_cli_debug_funcs.py @@ -0,0 +1,180 @@ +import csv +import os +from pathlib import Path + +from digital_land.collect import Collector, FetchStatus +from digital_land.collection import Collection + + +def get_endpoints_info(collection_dir): + return_value = False + + try: + full_file_pathname = os.path.join(collection_dir, "source.csv") + reader = csv.DictReader(open(full_file_pathname, newline="")) + + print("") + print("") + print(">>> Endpoints found") + print("-------------------") + for row in reader: + print(f'organisation: {row["organisation"]}') + print(f' pipelines: {row["pipelines"]}') + print(f' endpoint: {row["endpoint"]}') + print("") + + return_value = True + except: + return_value = False + finally: + return return_value + + +def get_endpoints(organisation, pipeline_name, collection_dir): + endpoint_hashes = [] + return_value = None + + try: + full_file_pathname = os.path.join(collection_dir, "source.csv") + reader = csv.DictReader(open(full_file_pathname, newline="")) + + for row in reader: + if row["organisation"] == organisation and row["pipelines"] == pipeline_name: + endpoint_hash = row["endpoint"] + endpoint_hashes.append(endpoint_hash) + + return_value = endpoint_hashes + except: + return_value = None + finally: + return return_value + + +def download_endpoints(pipeline_name, endpoint_path, endpoint_hashes): + return_value = False + + # create a collector + collector = Collector(pipeline_name, collection_dir=None) + # collector.log_dir = endpoint_path + + # download endpoints + full_file_pathname = os.path.join(endpoint_path, "endpoint.csv") + reader = csv.DictReader(open(full_file_pathname, newline="")) + + print("") + print("") + print(">>> Endpoint downloads") + print("----------------------") + for row in reader: + endpoint = row["endpoint"] + print(f"Checking endpoint: {endpoint}") + if endpoint in endpoint_hashes: + url = row["endpoint-url"] + plugin = row.get("plugin", "") + + # skip manually added files .. + if not url: + continue + + print(f" Using endpoint: {endpoint}") + print(f" with url: {url}") + print(f" with plugin: {plugin}") + print("Downloading...") + try: + status = collector.fetch( + url, + endpoint=endpoint, + end_date=row.get("end-date", ""), + plugin=plugin, + ) + + return_value = True + except: + return_value = None + finally: + if status == FetchStatus.FAILED: + print(f"Download FAILED for: {url}") + print(f" with status: {status}") + else: + print(f"Download SUCCESS for {url}") + + return_value = True + print("") + + return return_value + + +def endpoint_collection(collection_dir, endpoint_hashes): + return_value = None + + # remove previously created log.csv and resouorce.csv files + try: + os.remove(Path(collection_dir) / "log.csv") + os.remove(Path(collection_dir) / "resource.csv") + except FileNotFoundError as exc: + print("") + print(">>> INFO: endpoint_collection:", exc) + except OSError as exc: + print("") + print("ERROR: endpoint_collection:", exc) + return_value = None + + collection = Collection(name=None, directory=collection_dir) + collection.load() + + print("") + print("") + print(">>> Collection endpoint stats") + print("-----------------------------") + print(f" Num source entries: {len(collection.source.entries)}") + print(f" Num source records: {len(collection.source.records)}") + print("-----------------------------") + print(f"Num endpoint entries: {len(collection.endpoint.entries)}") + print(f"Num endpoint records: {len(collection.endpoint.records)}") + print("-----------------------------") + print(f"Num resource entries: {len(collection.resource.entries)}") + print(f"Num resource records: {len(collection.resource.records)}") + print("-----------------------------") + print(f" Num log entries: {len(collection.log.entries)}") + print(f" Num log records: {len(collection.log.records)}") + + # there is no direct way to filter which logs/resources are saved to the csv + # this was originally built to do all collections at once + # we now manually filter the entries for logs/resources using the endpoint hashes made above to achieve this + for endpoint in endpoint_hashes: + log_entries = [ + entry for entry in collection.log.entries if entry["endpoint"] == endpoint + ] + resource_entries = [ + entry for entry in collection.resource.entries if entry["endpoints"] == endpoint + ] + + collection.log.entries = log_entries + collection.resource.entries = resource_entries + collection.save_csv() + + print("") + print("") + print(">>> Items added to Collection") + print("-----------------------------") + print(f"Num resource entries: {len(collection.resource.entries)}") + print("-----------------------------") + print(f" Num log entries: {len(collection.log.entries)}") + + print(">>>>", collection.resource.entries) + + return log_entries, resource_entries + + +def run_collection_pipeline(collection_dir, pipeline_name, dataset_name, resource_entries, log_entries): + print("") + + # for each of the files + # define additiionial files + for resource in resource_entries: + input_path = f"{collection_dir}resource/{resource['resource']}" + output_path = f"{collection_dir}transformed/{dataset_name}/{resource['resource']}.csv" + + print(f">>> input_path: {input_path}") + print(f">>> output_path:{output_path}") + diff --git a/tests/e2e/debug_pipeline/test_debug_pipeline.py b/tests/e2e/debug_pipeline/test_debug_pipeline.py new file mode 100644 index 000000000..bf5609f0e --- /dev/null +++ b/tests/e2e/debug_pipeline/test_debug_pipeline.py @@ -0,0 +1,167 @@ +import os +from pathlib import Path +import shutil + +import pytest + +from tests.e2e.debug_pipeline.funcs.test_cli_debug_funcs import \ + get_endpoints_info, get_endpoints, download_endpoints, \ + endpoint_collection, run_collection_pipeline + +def pytest_namespace(): + return { + "endpoint_hashes": [], + "log_entries": None, + "resource_entries": None + } + +# This test is a utility function +# It expands the debug_pipeline function of commands.py, +# reporting on each step, and providing useful info +# to the user. +@pytest.fixture +def env_vars_default(request): + + root_dir = request.config.rootdir + tests_dir = Path(root_dir).resolve().parent.parent + + return { + "organisation": "", + "pipeline_name": "", + "dataset_name": "", + "endpoint_path": f"{tests_dir}/tests/data/collection/", + "collection_dir": f"{tests_dir}/tests/data/collection/", + "collection_dir_out": f"{tests_dir}/tests/e2e/debug_pipeline/collection/", + "specification_dir": f"{tests_dir}/tests/data/specification/" + } + + +@pytest.fixture +def env_vars_national_park(request): + + root_dir = request.config.rootdir + tests_dir = Path(root_dir).resolve() + + return { + "organisation": "government-organisation:D303", + "pipeline_name": "national-park", + "dataset_name": "national-park", + "endpoint_path": f"{tests_dir}/tests/data/collection-national-park/", + "collection_dir": f"{tests_dir}/tests/data/collection-national-park/", + "collection_dir_out": f"{tests_dir}/tests/e2e/debug_pipeline/collection/", + "specification_dir": f"{tests_dir}/tests/data/specification/" + } + + +@pytest.fixture +def env_vars_article_4_direction(request): + + root_dir = request.config.rootdir + tests_dir = Path(root_dir).resolve() + + return { + "organisation": "local-authority-eng:SWK", + "pipeline_name": "article-4-direction-area", + "dataset_name": "article-4-direction", + "endpoint_path": f"{tests_dir}/tests/data/collection-article-4-direction/", + "collection_dir": f"{tests_dir}/tests/data/collection-article-4-direction/", + "collection_dir_out": f"{root_dir}/tests/e2e/debug_pipeline/collection/", + "specification_dir": f"{tests_dir}/tests/data/specification/" + } + + +@pytest.fixture +def env_vars(env_vars_default, env_vars_article_4_direction): + return env_vars_article_4_direction + + +def test_debug_pipeline_start(env_vars): + print("") + print("") + print("========================================") + print("Running debug_pipeline with:") + print(f" organisation: {env_vars['organisation']}") + print(f" pipeline_name: {env_vars['pipeline_name']}") + print(f" dataset_name: {env_vars['dataset_name']}") + print(f" endpoint_path: {env_vars['endpoint_path']}") + print(f" collection_dir: {env_vars['collection_dir']}") + print(f"collection_dir_out: {env_vars['collection_dir_out']}") + print(f" specification_dir: {env_vars['specification_dir']}") + print("========================================") + + output_path = Path("./collection") + if output_path.exists(): + shutil.rmtree(output_path) + + +def test_get_endpoints_info(env_vars): + collection_dir = env_vars['collection_dir'] + + sut_result = get_endpoints_info(collection_dir) + + assert sut_result is True + + +def test_get_endpoints(env_vars): + organisation = env_vars['organisation'] + pipeline_name = env_vars['pipeline_name'] + collection_dir = env_vars['collection_dir'] + + endpoint_hashes = get_endpoints(organisation, pipeline_name, collection_dir) + pytest.endpoint_hashes = endpoint_hashes + + assert endpoint_hashes is not None + assert endpoint_hashes != [] + + +def test_download_endpoints(env_vars): + pipeline_name = env_vars['pipeline_name'] + endpoint_path = env_vars['endpoint_path'] + endpoint_hashes = pytest.endpoint_hashes + + sut_result = download_endpoints(pipeline_name, endpoint_path, endpoint_hashes) + + assert sut_result is True + + +def move_collection_files_to_test_collection_dir(source_csv_path, endpoint_csv_path, collection_dir): + + if os.path.isfile(source_csv_path): + shutil.copy(source_csv_path, collection_dir) + + if os.path.isfile(endpoint_csv_path): + shutil.copy(endpoint_csv_path, collection_dir) + + +def test_endpoint_collection(env_vars): + source_csv_path = f"{env_vars['endpoint_path']}source.csv" + endpoint_csv_path = f"{env_vars['endpoint_path']}endpoint.csv" + collection_dir_out = env_vars['collection_dir_out'] + endpoint_hashes = pytest.endpoint_hashes + + # Move files from source collection dir to test dir, + # to keep original collection clean of test-run artifacts + move_collection_files_to_test_collection_dir(source_csv_path, endpoint_csv_path, collection_dir_out) + + (log_entries, resource_entries) = endpoint_collection(collection_dir_out, endpoint_hashes) + + pytest.log_entries = log_entries + pytest.resource_entries = resource_entries + + assert log_entries is not None + assert resource_entries is not None + + +def test_run_collection_pipeline(env_vars): + pipeline_name = env_vars['pipeline_name'] + collection_dir_out = env_vars['collection_dir_out'] + dataset_name = env_vars['dataset_name'] + endpoint_hashes = pytest.endpoint_hashes + log_entries = pytest.log_entries + resource_entries = pytest.resource_entries + + sut_result = run_collection_pipeline(collection_dir_out, pipeline_name, dataset_name, resource_entries, log_entries) + + +# use article-4-direction-collection +# specification = Specification(specification_dir) diff --git a/tests/unit/test_collection.py b/tests/unit/test_collection.py index 369ce1d50..0a407eca7 100644 --- a/tests/unit/test_collection.py +++ b/tests/unit/test_collection.py @@ -1,5 +1,8 @@ -from digital_land.register import hash_value -from digital_land.collection import Collection +import logging + +from digital_land.register import hash_value, Item +from digital_land.collection import Collection, LogStore +from digital_land.schema import Schema test_collection_dir = "tests/data/collection" @@ -43,3 +46,45 @@ def test_collection(): assert collection.resource_organisations( "ae191fd3dc6a892d82337d9045bf4c1043804a1961131b0a9271280f86b6a8cf" ) == ["organisation:2"] + + +# This method does not return any data and only outputs to the log. +# Test expects to be run with pytest runner +# The sut throws warning messages, so we look for the presence of those +def test_check_item_path_with_good_data(request, caplog): + root_dir = request.config.rootdir + collection_dir = f"{root_dir}/{test_collection_dir}" + caplog.set_level(logging.WARNING) + + item = Item() + item.data["entry-date"] = "2020-02-12T16:55:54.703438" + item.data["endpoint"] = "50335d6703d9bebb683f1b27e02ad17e991ff527bed7e0ab620cd1b6e4b5689e" + regex_friendly_directory = f"{collection_dir}/log/2020-02-12/50335d6703d9bebb683f1b27e02ad17e991ff527bed7e0ab620cd1b6e4b5689e.json" + + store = LogStore(Schema("log")) + store.check_item_path(item, regex_friendly_directory) + + assert len(caplog.records) == 0 + + +# This method does not return any data and only outputs to the log. +# Test expects to be run with pytest runner +# The sut throws warning messages, so we look for the presence of those +def test_check_item_path_with_bad_data(request, caplog): + root_dir = request.config.rootdir + collection_dir = f"{root_dir}/{test_collection_dir}" + caplog.set_level(logging.WARNING) + + item = Item() + # date change to force an error + item.data["entry-date"] = "2021-02-12T16:55:54.703438" + item.data["endpoint"] = "50335d6703d9bebb683f1b27e02ad17e991ff527bed7e0ab620cd1b6e4b5689e" + regex_friendly_directory = f"{collection_dir}/log/2020-02-12/50335d6703d9bebb683f1b27e02ad17e991ff527bed7e0ab620cd1b6e4b5689e.json" + + store = LogStore(Schema("log")) + store.check_item_path(item, regex_friendly_directory) + + assert len(caplog.records) > 0 + + +