From 45fc6ba264d1f07bfcff631d7f9425d2b45069c5 Mon Sep 17 00:00:00 2001 From: Owen Eveleigh Date: Mon, 27 Mar 2023 17:59:26 +0100 Subject: [PATCH 01/12] add cli for debug --- digital_land/cli.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/digital_land/cli.py b/digital_land/cli.py index 8bf5b84b6..9d87ee9bc 100644 --- a/digital_land/cli.py +++ b/digital_land/cli.py @@ -224,3 +224,29 @@ def collection_add_source_cmd(ctx, collection, endpoint_url, collection_dir): @click.option("--data-quality-yaml", help="path to expectations yaml", required=True) def call_expectations(results_path, sqlite_dataset_path, data_quality_yaml): return expectations(results_path, sqlite_dataset_path, data_quality_yaml) + + +@cli.command( + "debug-pipeline", + short_help="runs an org through the entire process including collection,pipeline and dataset", +) +@click.option( + "--organisation", + help="string representing the organisation, must match organisation name in source.csv", + required=True, +) +@click.option( + "--pipline", + help="name of the pipeline to be ran, should be equivalent to the dataset", + required=True, +) +@click.option("--data-quality-yaml", help="path to expectations yaml", required=True) +def debug_pipline(org, pipeline): + print(f"running process for organisation:{org} and pipeline: {pipeline}") + # identify relevant sources and enpoints + # collect data + # collection step, this will need to be a bit different + # pipeline step for loop for each of the files + # once files are loaded create the dataset + # run expectations? this made need to be made so only certain ones are ran as they may be specific to certain datasets + # end From 3fe5b9d1f1cd0ae73242df06f104ba01971bc240 Mon Sep 17 00:00:00 2001 From: Owen Eveleigh Date: Mon, 27 Mar 2023 20:21:52 +0100 Subject: [PATCH 02/12] add basic ommand and start getting the collector to work --- digital_land/cli.py | 21 ++++++++++----------- digital_land/commands.py | 31 +++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 11 deletions(-) diff --git a/digital_land/cli.py b/digital_land/cli.py index 9d87ee9bc..ba66aeacd 100644 --- a/digital_land/cli.py +++ b/digital_land/cli.py @@ -17,6 +17,7 @@ pipeline_run, collection_add_source, expectations, + debug_pipeline, ) from digital_land.command_arguments import ( @@ -236,17 +237,15 @@ def call_expectations(results_path, sqlite_dataset_path, data_quality_yaml): required=True, ) @click.option( - "--pipline", + "--pipeline", help="name of the pipeline to be ran, should be equivalent to the dataset", required=True, ) -@click.option("--data-quality-yaml", help="path to expectations yaml", required=True) -def debug_pipline(org, pipeline): - print(f"running process for organisation:{org} and pipeline: {pipeline}") - # identify relevant sources and enpoints - # collect data - # collection step, this will need to be a bit different - # pipeline step for loop for each of the files - # once files are loaded create the dataset - # run expectations? this made need to be made so only certain ones are ran as they may be specific to certain datasets - # end +@click.argument( + "endpoint-path", + type=click.Path(exists=True), + default="collection/endpoint.csv", +) +def debug_pipline(organisation, pipeline, endpoint_path): + debug_pipeline(organisation, pipeline, endpoint_path) + return diff --git a/digital_land/commands.py b/digital_land/commands.py index 658e9332d..3ea3ab062 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -397,3 +397,34 @@ def resource_from_path(path): def default_output_path(command, input_path): directory = "" if command in ["harmonised", "transformed"] else "var/" return f"{directory}{command}/{resource_from_path(input_path)}.csv" + + +def debug_pipeline(org, pipeline, endpoint_path): + print(f"running process for organisation:{org} and pipeline: {pipeline}") + # identify relevant sources and enpoints + # read in relevant end points + # organisation info is in the source csv, therefore to get the right enpoints we need to identify all of the relevant ones from source .csi + endpoint_hashes = [] + for row in csv.DictReader(open(endpoint_path, newline="")): + if row["organisation"] == org and row["pipelines"] == pipeline: + endpoint_hash = row["endpoint"] + endpoint_hashes.append(endpoint_hash) + + # check length of hashes to see if there are relevant endpoints + if len(endpoint_hashes) < 0: + print("no enpoints need collecting") + + # download endpoints + for row in csv.DictReader(open(endpoint_path, newline="")): + endpoint = row["endpoint"] + if endpoint in endpoint_hashes: + print(f"downloading {endpoint}") + # url = row["endpoint-url"] + # plugin = row.get("plugin", "") + + # collect data + # collection step, this will need to be a bit different + # pipeline step for loop for each of the files + # once files are loaded create the dataset + # run expectations? this made need to be made so only certain ones are ran as they may be specific to certain datasets + # end From d271e2a4e221a2a58a5078a97e345a0890b7be8a Mon Sep 17 00:00:00 2001 From: Owen Eveleigh Date: Tue, 28 Mar 2023 16:15:44 +0100 Subject: [PATCH 03/12] add collector step --- digital_land/cli.py | 9 +++++---- digital_land/collect.py | 1 + digital_land/commands.py | 25 +++++++++++++++++++++---- 3 files changed, 27 insertions(+), 8 deletions(-) diff --git a/digital_land/cli.py b/digital_land/cli.py index ba66aeacd..2126413ed 100644 --- a/digital_land/cli.py +++ b/digital_land/cli.py @@ -228,8 +228,8 @@ def call_expectations(results_path, sqlite_dataset_path, data_quality_yaml): @cli.command( - "debug-pipeline", - short_help="runs an org through the entire process including collection,pipeline and dataset", + "debug-contribution", + short_help="runs an org through the entire process including collector,pipeline and dataset", ) @click.option( "--organisation", @@ -246,6 +246,7 @@ def call_expectations(results_path, sqlite_dataset_path, data_quality_yaml): type=click.Path(exists=True), default="collection/endpoint.csv", ) -def debug_pipline(organisation, pipeline, endpoint_path): - debug_pipeline(organisation, pipeline, endpoint_path) +@click.pass_context +def run_debug_pipeline(ctx, organisation, endpoint_path): + debug_pipeline(organisation, ctx.obj["PIPELINE"], endpoint_path) return diff --git a/digital_land/collect.py b/digital_land/collect.py index a3e03f644..34c696806 100755 --- a/digital_land/collect.py +++ b/digital_land/collect.py @@ -34,6 +34,7 @@ class Collector: log_dir = "collection/log/" def __init__(self, dataset="", collection_dir=None): + # TODO do we need to defin this here? self.dataset = dataset if collection_dir: self.resource_dir = collection_dir / "resource/" diff --git a/digital_land/commands.py b/digital_land/commands.py index 3ea3ab062..108bcde45 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -401,26 +401,43 @@ def default_output_path(command, input_path): def debug_pipeline(org, pipeline, endpoint_path): print(f"running process for organisation:{org} and pipeline: {pipeline}") - # identify relevant sources and enpoints + # identify relevant sources and endpoints # read in relevant end points # organisation info is in the source csv, therefore to get the right enpoints we need to identify all of the relevant ones from source .csi endpoint_hashes = [] for row in csv.DictReader(open(endpoint_path, newline="")): - if row["organisation"] == org and row["pipelines"] == pipeline: + if row["organisation"] == org and row["pipelines"] == pipeline.name: endpoint_hash = row["endpoint"] endpoint_hashes.append(endpoint_hash) # check length of hashes to see if there are relevant endpoints if len(endpoint_hashes) < 0: print("no enpoints need collecting") + else: + text = ",".join(endpoint_hashes) + print(f"endpoints found {text}") + # create a collector + collector = Collector(pipeline.name, collection_dir=None) # download endpoints for row in csv.DictReader(open(endpoint_path, newline="")): endpoint = row["endpoint"] if endpoint in endpoint_hashes: print(f"downloading {endpoint}") - # url = row["endpoint-url"] - # plugin = row.get("plugin", "") + endpoint = row["endpoint"] + url = row["endpoint-url"] + plugin = row.get("plugin", "") + + # skip manually added files .. + if not url: + continue + + collector.fetch( + url, + endpoint=endpoint, + end_date=row.get("end-date", ""), + plugin=plugin, + ) # collect data # collection step, this will need to be a bit different From 7f06d12e31b9889220e32d79d47b00fe5418bccc Mon Sep 17 00:00:00 2001 From: Owen Eveleigh Date: Tue, 28 Mar 2023 16:36:17 +0100 Subject: [PATCH 04/12] inlcude collection directory --- digital_land/cli.py | 17 +++++------------ digital_land/commands.py | 8 +++++--- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/digital_land/cli.py b/digital_land/cli.py index 2126413ed..56758a7d7 100644 --- a/digital_land/cli.py +++ b/digital_land/cli.py @@ -27,6 +27,7 @@ issue_dir, dataset_resource_dir, column_field_dir, + endpoint_path, ) @@ -236,17 +237,9 @@ def call_expectations(results_path, sqlite_dataset_path, data_quality_yaml): help="string representing the organisation, must match organisation name in source.csv", required=True, ) -@click.option( - "--pipeline", - help="name of the pipeline to be ran, should be equivalent to the dataset", - required=True, -) -@click.argument( - "endpoint-path", - type=click.Path(exists=True), - default="collection/endpoint.csv", -) +@endpoint_path +@collection_dir @click.pass_context -def run_debug_pipeline(ctx, organisation, endpoint_path): - debug_pipeline(organisation, ctx.obj["PIPELINE"], endpoint_path) +def run_debug_pipeline(ctx, organisation, endpoint_path, collection_dir): + debug_pipeline(organisation, ctx.obj["PIPELINE"], endpoint_path, collection_dir) return diff --git a/digital_land/commands.py b/digital_land/commands.py index 108bcde45..0c30a5e00 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -399,13 +399,15 @@ def default_output_path(command, input_path): return f"{directory}{command}/{resource_from_path(input_path)}.csv" -def debug_pipeline(org, pipeline, endpoint_path): - print(f"running process for organisation:{org} and pipeline: {pipeline}") +def debug_pipeline(org, pipeline, endpoint_path, collection_dir): + print(f"running process for organisation:{org} and pipeline: {pipeline.name}") # identify relevant sources and endpoints # read in relevant end points # organisation info is in the source csv, therefore to get the right enpoints we need to identify all of the relevant ones from source .csi endpoint_hashes = [] - for row in csv.DictReader(open(endpoint_path, newline="")): + for row in csv.DictReader( + open(os.path.join(collection_dir, "source.csv"), newline="") + ): if row["organisation"] == org and row["pipelines"] == pipeline.name: endpoint_hash = row["endpoint"] endpoint_hashes.append(endpoint_hash) From 86020a4fb4d021f5fcf03e7116518b2ce52ef4d7 Mon Sep 17 00:00:00 2001 From: Owen Eveleigh Date: Tue, 28 Mar 2023 17:42:41 +0100 Subject: [PATCH 05/12] add alterations --- digital_land/commands.py | 1 - 1 file changed, 1 deletion(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index 0c30a5e00..c9b891160 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -441,7 +441,6 @@ def debug_pipeline(org, pipeline, endpoint_path, collection_dir): plugin=plugin, ) - # collect data # collection step, this will need to be a bit different # pipeline step for loop for each of the files # once files are loaded create the dataset From e43b8fe0ef5c78c3b56c5552dd1e94e8c112009b Mon Sep 17 00:00:00 2001 From: morgan-sl <128702799+morgan-sl@users.noreply.github.com> Date: Thu, 30 Mar 2023 17:14:08 +0100 Subject: [PATCH 06/12] Initial Commit Isolated version (does not reference dl-python) --- .../funcs/test_cli_debug_funcs.py | 96 ++++++++++ .../debug_pipline/helpers/adapter/__init__.py | 0 tests/e2e/debug_pipline/helpers/collect.py | 178 ++++++++++++++++++ .../debug_pipline/helpers/plugins/__init__.py | 0 .../debug_pipline/helpers/plugins/arcgis.py | 18 ++ .../debug_pipline/helpers/plugins/sparql.py | 58 ++++++ .../e2e/debug_pipline/helpers/plugins/wfs.py | 28 +++ .../e2e/debug_pipline/test_debug_pipeline.py | 80 ++++++++ 8 files changed, 458 insertions(+) create mode 100644 tests/e2e/debug_pipline/funcs/test_cli_debug_funcs.py create mode 100644 tests/e2e/debug_pipline/helpers/adapter/__init__.py create mode 100644 tests/e2e/debug_pipline/helpers/collect.py create mode 100644 tests/e2e/debug_pipline/helpers/plugins/__init__.py create mode 100644 tests/e2e/debug_pipline/helpers/plugins/arcgis.py create mode 100644 tests/e2e/debug_pipline/helpers/plugins/sparql.py create mode 100644 tests/e2e/debug_pipline/helpers/plugins/wfs.py create mode 100644 tests/e2e/debug_pipline/test_debug_pipeline.py diff --git a/tests/e2e/debug_pipline/funcs/test_cli_debug_funcs.py b/tests/e2e/debug_pipline/funcs/test_cli_debug_funcs.py new file mode 100644 index 000000000..022e190d6 --- /dev/null +++ b/tests/e2e/debug_pipline/funcs/test_cli_debug_funcs.py @@ -0,0 +1,96 @@ +import csv +import os + +from tests.e2e.debug_pipline.helpers.collect import Collector + + +def get_endpoints_info(collection_dir): + return_value = False + + try: + full_file_pathname = os.path.join(collection_dir, "source.csv") + reader = csv.DictReader(open(full_file_pathname, newline="")) + + print("") + print("") + print(">>> Endpoints found") + print("-------------------") + for row in reader: + print(f'organisation: {row["organisation"]}') + print(f' pipelines: {row["pipelines"]}') + print(f' endpoint: {row["endpoint"]}') + print("") + + return_value = True + except: + return_value = False + finally: + return return_value + + +def get_endpoints(organisation, pipeline_name, collection_dir): + endpoint_hashes = [] + return_value = None + + try: + full_file_pathname = os.path.join(collection_dir, "source.csv") + reader = csv.DictReader(open(full_file_pathname, newline="")) + + for row in reader: + if row["organisation"] == organisation and row["pipelines"] == pipeline_name: + endpoint_hash = row["endpoint"] + endpoint_hashes.append(endpoint_hash) + + return_value = endpoint_hashes + except: + return_value = None + finally: + return return_value + + +def download_endpoints(pipeline_name, endpoint_path, endpoint_hashes): + return_value = False + + # create a collector + collector = Collector(pipeline_name, collection_dir=None) + # collector.log_dir = endpoint_path + + # download endpoints + full_file_pathname = os.path.join(endpoint_path, "endpoint.csv") + reader = csv.DictReader(open(full_file_pathname, newline="")) + + print("") + print("") + print(">>> Endpoint downloads") + print("----------------------") + for row in reader: + endpoint = row["endpoint"] + print(f"Checking endpoint: {endpoint}") + if endpoint in endpoint_hashes: + url = row["endpoint-url"] + plugin = row.get("plugin", "") + + # skip manually added files .. + if not url: + continue + + print(f" Using endpoint: {endpoint}") + print(f" with url: {url}") + print(f" with plugin: {plugin}") + print("Downloading...") + try: + collector.fetch( + url, + endpoint=endpoint, + end_date=row.get("end-date", ""), + plugin=plugin, + ) + + return_value = True + except: + return_value = None + + print("") + + return return_value + diff --git a/tests/e2e/debug_pipline/helpers/adapter/__init__.py b/tests/e2e/debug_pipline/helpers/adapter/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/e2e/debug_pipline/helpers/collect.py b/tests/e2e/debug_pipline/helpers/collect.py new file mode 100644 index 000000000..34c696806 --- /dev/null +++ b/tests/e2e/debug_pipline/helpers/collect.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python3 + +# +# collect resource +# +import csv +import hashlib +import logging +import os +from datetime import datetime +from enum import Enum +from timeit import default_timer as timer + +import canonicaljson +import requests + +from .adapter.file import FileAdapter +from .plugins.sparql import get as sparql_get +from .plugins.wfs import get as wfs_get +from .plugins.arcgis import get as arcgis_get + + +class FetchStatus(Enum): + OK = 1 + EXPIRED = 2 + HASH_FAILURE = 3 + ALREADY_FETCHED = 4 + FAILED = 5 + + +class Collector: + user_agent = "DLUHC Digital Land" + resource_dir = "collection/resource/" + log_dir = "collection/log/" + + def __init__(self, dataset="", collection_dir=None): + # TODO do we need to defin this here? + self.dataset = dataset + if collection_dir: + self.resource_dir = collection_dir / "resource/" + self.log_dir = collection_dir / "log/" + self.session = requests.Session() + self.session.mount("file:", FileAdapter()) + self.endpoint = {} + + def url_endpoint(self, url): + return hashlib.sha256(url.encode("utf-8")).hexdigest() + + def log_path(self, log_datetime, endpoint): + log_date = log_datetime.isoformat()[:10] + return os.path.join(self.log_dir, log_date, endpoint + ".json") + + def save_log(self, path, log): + self.save(path, canonicaljson.encode_canonical_json(log)) + + def save_content(self, content): + resource = hashlib.sha256(content).hexdigest() + path = os.path.join(self.resource_dir, resource) + self.save(path, content) + return resource + + def save(self, path, data): + os.makedirs(os.path.dirname(path), exist_ok=True) + if not os.path.exists(path): + logging.info(path) + with open(path, "wb") as f: + f.write(data) + + def get(self, url, log={}, verify_ssl=True, plugin="get"): + logging.info("%s %s" % (plugin, url)) + log["ssl-verify"] = verify_ssl + + try: + response = self.session.get( + url, + headers={"User-Agent": self.user_agent}, + timeout=120, + verify=verify_ssl, + ) + except ( + requests.exceptions.SSLError, + requests.ConnectionError, + requests.HTTPError, + requests.Timeout, + requests.TooManyRedirects, + requests.exceptions.MissingSchema, + requests.exceptions.ChunkedEncodingError, + ) as exception: + logging.warning(exception) + log["exception"] = type(exception).__name__ + response = None + + content = None + + if response is not None: + log["status"] = str(response.status_code) + log["request-headers"] = dict(response.request.headers) + log["response-headers"] = dict(response.headers) + + if log["status"] == "200" and not response.headers.get( + "Content-Type", "" + ).startswith("text/html"): + content = response.content + + return log, content + + def fetch( + self, + url, + endpoint=None, + log_datetime=datetime.utcnow(), + end_date="", + plugin="", + ): + if end_date and datetime.strptime(end_date, "%Y-%m-%d") < log_datetime: + return FetchStatus.EXPIRED + + url_endpoint = self.url_endpoint(url) + if not endpoint: + endpoint = url_endpoint + elif endpoint != url_endpoint: + logging.error( + "url '%s' given endpoint %s expected %s" % (url, endpoint, url_endpoint) + ) + return FetchStatus.HASH_FAILURE + + # fetch each source at most once per-day + log_path = self.log_path(log_datetime, endpoint) + if os.path.isfile(log_path): + logging.debug(f"{log_path} exists") + return FetchStatus.ALREADY_FETCHED + + log = { + "endpoint-url": url, + "entry-date": log_datetime.isoformat(), + } + + start = timer() + + # TBD: use pluggy and move modules to digital-land.plugin.xxx namespace? + if plugin == "": + log, content = self.get(url, log) + elif plugin == "arcgis": + log, content = arcgis_get(self, url, log) + elif plugin == "wfs": + log, content = wfs_get(self, url, log) + elif plugin == "sparql": + log, content = sparql_get(self, url, log) + else: + logging.error("unknown plugin '%s' for endpoint %s" % (plugin, endpoint)) + + log["elapsed"] = str(round(timer() - start, 3)) + + if content: + status = FetchStatus.OK + log["resource"] = self.save_content(content) + else: + status = FetchStatus.FAILED + + self.save_log(log_path, log) + return status + + def collect(self, endpoint_path): + for row in csv.DictReader(open(endpoint_path, newline="")): + endpoint = row["endpoint"] + url = row["endpoint-url"] + plugin = row.get("plugin", "") + + # skip manually added files .. + if not url: + continue + + self.fetch( + url, + endpoint=endpoint, + end_date=row.get("end-date", ""), + plugin=plugin, + ) diff --git a/tests/e2e/debug_pipline/helpers/plugins/__init__.py b/tests/e2e/debug_pipline/helpers/plugins/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/e2e/debug_pipline/helpers/plugins/arcgis.py b/tests/e2e/debug_pipline/helpers/plugins/arcgis.py new file mode 100644 index 000000000..92ec57200 --- /dev/null +++ b/tests/e2e/debug_pipline/helpers/plugins/arcgis.py @@ -0,0 +1,18 @@ +import json +from esridump.dumper import EsriDumper + + +def get(collector, url, log={}, plugin="arcgis"): + dumper = EsriDumper(url, fields=None) + + content = '{"type":"FeatureCollection","features":[' + sep = "\n" + + for feature in dumper: + content += sep + json.dumps(feature) + sep = ",\n" + + content += "]}" + + content = str.encode(content) + return log, content diff --git a/tests/e2e/debug_pipline/helpers/plugins/sparql.py b/tests/e2e/debug_pipline/helpers/plugins/sparql.py new file mode 100644 index 000000000..cb37693d2 --- /dev/null +++ b/tests/e2e/debug_pipline/helpers/plugins/sparql.py @@ -0,0 +1,58 @@ +import io +import csv +import logging +from SPARQLWrapper import SPARQLWrapper, JSON + + +url_prefix = {"https://query.wikidata.org/sparql": "http://www.wikidata.org/entity/"} + + +def sparql(endpoint_url, query, prefix=""): + s = SPARQLWrapper( + endpoint_url, + agent="Mozilla/5.0 (Windows NT 5.1; rv:36.0) " "Gecko/20100101 Firefox/36.0", + ) + + s.setQuery(query) + s.setReturnFormat(JSON) + return s.query().convert() + + +def remove_prefix(value, prefix): + if prefix and value.startswith(prefix): + return value[len(prefix) :] + return value + + +def as_csv(data, prefix=""): + fields = [field.replace("_", "-") for field in data["head"]["vars"]] + + content = io.StringIO() + w = csv.DictWriter(content, sorted(fields), extrasaction="ignore") + w.writeheader() + + for o in data["results"]["bindings"]: + row = {} + for field in fields: + f = field.replace("-", "_") + if f in o: + value = o[f]["value"] + row[field] = remove_prefix(value, prefix) + + w.writerow(row) + + return content.getvalue().encode() + + +def get(collector, url, log={}, plugin="sparql"): + url, script = url.split("#") + prefix = url_prefix.get(url, "") + + logging.info("%s %s %s %s" % (plugin, url, script, prefix)) + + query = open(script).read() + + data = sparql(url, query) + content = as_csv(data, prefix) + + return log, content diff --git a/tests/e2e/debug_pipline/helpers/plugins/wfs.py b/tests/e2e/debug_pipline/helpers/plugins/wfs.py new file mode 100644 index 000000000..f119eece9 --- /dev/null +++ b/tests/e2e/debug_pipline/helpers/plugins/wfs.py @@ -0,0 +1,28 @@ +import io +import re + +from digital_land.phase.convert import detect_encoding + + +# TBD: split this code into a WFS API plugin and a canonicalisation step + + +strip_exps = [ + (re.compile(rb' ?timeStamp="[^"]*"'), rb""), + (re.compile(rb' ?fid="[^"]*"'), rb""), + (re.compile(rb'(gml:id="[^."]+)[^"]*'), rb"\1"), +] + + +def strip_variable_content(content): + for strip_exp, replacement in strip_exps: + content = strip_exp.sub(replacement, content) + return content + + +def get(collector, url, log={}, plugin="wfs"): + log, content = collector.get(url=url, log=log, plugin=plugin) + encoding = detect_encoding(io.BytesIO(content)) + if encoding: + content = strip_variable_content(content) + return log, content diff --git a/tests/e2e/debug_pipline/test_debug_pipeline.py b/tests/e2e/debug_pipline/test_debug_pipeline.py new file mode 100644 index 000000000..05987bdba --- /dev/null +++ b/tests/e2e/debug_pipline/test_debug_pipeline.py @@ -0,0 +1,80 @@ +import pytest + +from tests.e2e.debug_pipline.funcs.test_cli_debug_funcs \ + import get_endpoints_info, get_endpoints, download_endpoints + + +def pytest_namespace(): + return {"endpoint_hashes": []} + + +# This test is a utility function +# It expands the debug_pipeline function of commands.py, +# reporting on each step, and providing useful info +# to the user. +@pytest.fixture +def env_vars_default(): + return { + "organisation": "", + "pipeline": "", + "endpoint_path": "../data/collection/", + "collection_dir": "../data/collection/" + } + + +@pytest.fixture +def env_vars_national_park(): + return { + "organisation": "government-organisation:D303", + "pipeline": "national-park", + "endpoint_path": "../../data/national-park-collection/", + "collection_dir": "../../data/national-park-collection/" + } + + +@pytest.fixture +def env_vars(env_vars_default, env_vars_national_park): + return env_vars_national_park + + +def test_debug_pipeline_start(env_vars): + print("") + print("") + print("========================================") + print("Running debug_pipeline with:") + print(f" organisation: {env_vars['organisation']}") + print(f" pipeline: {env_vars['pipeline']}") + print(f" endpoint_path: {env_vars['endpoint_path']}") + print(f"collection_dir: {env_vars['collection_dir']}") + print("========================================") + + +def test_get_endpoints_info(env_vars): + collection_dir = env_vars['collection_dir'] + + sut_result = get_endpoints_info(collection_dir) + + assert sut_result is True + + +def test_get_endpoints(env_vars): + organisation = env_vars['organisation'] + pipeline = env_vars['pipeline'] + collection_dir = env_vars['collection_dir'] + + endpoint_hashes = get_endpoints(organisation, pipeline, collection_dir) + pytest.endpoint_hashes = endpoint_hashes + + assert endpoint_hashes is not None + assert endpoint_hashes != [] + + +def test_download_endpoints(env_vars): + pipeline = env_vars['pipeline'] + endpoint_path = env_vars['endpoint_path'] + endpoint_hashes = pytest.endpoint_hashes + + sut_result = download_endpoints(pipeline, endpoint_path, endpoint_hashes) + + assert sut_result is True + From a48d9574324e3c7a763e33e00372de5c64292385 Mon Sep 17 00:00:00 2001 From: morgan-sl <128702799+morgan-sl@users.noreply.github.com> Date: Thu, 30 Mar 2023 17:58:43 +0100 Subject: [PATCH 07/12] Improved reporting for downloads UnIsolated version (does reference dl-python) --- .../funcs/test_cli_debug_funcs.py | 11 +- .../debug_pipline/helpers/adapter/__init__.py | 0 tests/e2e/debug_pipline/helpers/collect.py | 178 ------------------ .../debug_pipline/helpers/plugins/__init__.py | 0 .../debug_pipline/helpers/plugins/arcgis.py | 18 -- .../debug_pipline/helpers/plugins/sparql.py | 58 ------ .../e2e/debug_pipline/helpers/plugins/wfs.py | 28 --- .../e2e/debug_pipline/test_debug_pipeline.py | 8 + 8 files changed, 17 insertions(+), 284 deletions(-) delete mode 100644 tests/e2e/debug_pipline/helpers/adapter/__init__.py delete mode 100644 tests/e2e/debug_pipline/helpers/collect.py delete mode 100644 tests/e2e/debug_pipline/helpers/plugins/__init__.py delete mode 100644 tests/e2e/debug_pipline/helpers/plugins/arcgis.py delete mode 100644 tests/e2e/debug_pipline/helpers/plugins/sparql.py delete mode 100644 tests/e2e/debug_pipline/helpers/plugins/wfs.py diff --git a/tests/e2e/debug_pipline/funcs/test_cli_debug_funcs.py b/tests/e2e/debug_pipline/funcs/test_cli_debug_funcs.py index 022e190d6..c2efd5cea 100644 --- a/tests/e2e/debug_pipline/funcs/test_cli_debug_funcs.py +++ b/tests/e2e/debug_pipline/funcs/test_cli_debug_funcs.py @@ -1,7 +1,7 @@ import csv import os -from tests.e2e.debug_pipline.helpers.collect import Collector +from digital_land.collect import Collector, FetchStatus def get_endpoints_info(collection_dir): @@ -79,7 +79,7 @@ def download_endpoints(pipeline_name, endpoint_path, endpoint_hashes): print(f" with plugin: {plugin}") print("Downloading...") try: - collector.fetch( + status = collector.fetch( url, endpoint=endpoint, end_date=row.get("end-date", ""), @@ -89,7 +89,14 @@ def download_endpoints(pipeline_name, endpoint_path, endpoint_hashes): return_value = True except: return_value = None + finally: + if status == FetchStatus.FAILED: + print(f"Download FAILED for: {url}") + print(f" with status: {status}") + else: + print(f"Download SUCCESS for {url}") + return_value = True print("") return return_value diff --git a/tests/e2e/debug_pipline/helpers/adapter/__init__.py b/tests/e2e/debug_pipline/helpers/adapter/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/e2e/debug_pipline/helpers/collect.py b/tests/e2e/debug_pipline/helpers/collect.py deleted file mode 100644 index 34c696806..000000000 --- a/tests/e2e/debug_pipline/helpers/collect.py +++ /dev/null @@ -1,178 +0,0 @@ -#!/usr/bin/env python3 - -# -# collect resource -# -import csv -import hashlib -import logging -import os -from datetime import datetime -from enum import Enum -from timeit import default_timer as timer - -import canonicaljson -import requests - -from .adapter.file import FileAdapter -from .plugins.sparql import get as sparql_get -from .plugins.wfs import get as wfs_get -from .plugins.arcgis import get as arcgis_get - - -class FetchStatus(Enum): - OK = 1 - EXPIRED = 2 - HASH_FAILURE = 3 - ALREADY_FETCHED = 4 - FAILED = 5 - - -class Collector: - user_agent = "DLUHC Digital Land" - resource_dir = "collection/resource/" - log_dir = "collection/log/" - - def __init__(self, dataset="", collection_dir=None): - # TODO do we need to defin this here? - self.dataset = dataset - if collection_dir: - self.resource_dir = collection_dir / "resource/" - self.log_dir = collection_dir / "log/" - self.session = requests.Session() - self.session.mount("file:", FileAdapter()) - self.endpoint = {} - - def url_endpoint(self, url): - return hashlib.sha256(url.encode("utf-8")).hexdigest() - - def log_path(self, log_datetime, endpoint): - log_date = log_datetime.isoformat()[:10] - return os.path.join(self.log_dir, log_date, endpoint + ".json") - - def save_log(self, path, log): - self.save(path, canonicaljson.encode_canonical_json(log)) - - def save_content(self, content): - resource = hashlib.sha256(content).hexdigest() - path = os.path.join(self.resource_dir, resource) - self.save(path, content) - return resource - - def save(self, path, data): - os.makedirs(os.path.dirname(path), exist_ok=True) - if not os.path.exists(path): - logging.info(path) - with open(path, "wb") as f: - f.write(data) - - def get(self, url, log={}, verify_ssl=True, plugin="get"): - logging.info("%s %s" % (plugin, url)) - log["ssl-verify"] = verify_ssl - - try: - response = self.session.get( - url, - headers={"User-Agent": self.user_agent}, - timeout=120, - verify=verify_ssl, - ) - except ( - requests.exceptions.SSLError, - requests.ConnectionError, - requests.HTTPError, - requests.Timeout, - requests.TooManyRedirects, - requests.exceptions.MissingSchema, - requests.exceptions.ChunkedEncodingError, - ) as exception: - logging.warning(exception) - log["exception"] = type(exception).__name__ - response = None - - content = None - - if response is not None: - log["status"] = str(response.status_code) - log["request-headers"] = dict(response.request.headers) - log["response-headers"] = dict(response.headers) - - if log["status"] == "200" and not response.headers.get( - "Content-Type", "" - ).startswith("text/html"): - content = response.content - - return log, content - - def fetch( - self, - url, - endpoint=None, - log_datetime=datetime.utcnow(), - end_date="", - plugin="", - ): - if end_date and datetime.strptime(end_date, "%Y-%m-%d") < log_datetime: - return FetchStatus.EXPIRED - - url_endpoint = self.url_endpoint(url) - if not endpoint: - endpoint = url_endpoint - elif endpoint != url_endpoint: - logging.error( - "url '%s' given endpoint %s expected %s" % (url, endpoint, url_endpoint) - ) - return FetchStatus.HASH_FAILURE - - # fetch each source at most once per-day - log_path = self.log_path(log_datetime, endpoint) - if os.path.isfile(log_path): - logging.debug(f"{log_path} exists") - return FetchStatus.ALREADY_FETCHED - - log = { - "endpoint-url": url, - "entry-date": log_datetime.isoformat(), - } - - start = timer() - - # TBD: use pluggy and move modules to digital-land.plugin.xxx namespace? - if plugin == "": - log, content = self.get(url, log) - elif plugin == "arcgis": - log, content = arcgis_get(self, url, log) - elif plugin == "wfs": - log, content = wfs_get(self, url, log) - elif plugin == "sparql": - log, content = sparql_get(self, url, log) - else: - logging.error("unknown plugin '%s' for endpoint %s" % (plugin, endpoint)) - - log["elapsed"] = str(round(timer() - start, 3)) - - if content: - status = FetchStatus.OK - log["resource"] = self.save_content(content) - else: - status = FetchStatus.FAILED - - self.save_log(log_path, log) - return status - - def collect(self, endpoint_path): - for row in csv.DictReader(open(endpoint_path, newline="")): - endpoint = row["endpoint"] - url = row["endpoint-url"] - plugin = row.get("plugin", "") - - # skip manually added files .. - if not url: - continue - - self.fetch( - url, - endpoint=endpoint, - end_date=row.get("end-date", ""), - plugin=plugin, - ) diff --git a/tests/e2e/debug_pipline/helpers/plugins/__init__.py b/tests/e2e/debug_pipline/helpers/plugins/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/e2e/debug_pipline/helpers/plugins/arcgis.py b/tests/e2e/debug_pipline/helpers/plugins/arcgis.py deleted file mode 100644 index 92ec57200..000000000 --- a/tests/e2e/debug_pipline/helpers/plugins/arcgis.py +++ /dev/null @@ -1,18 +0,0 @@ -import json -from esridump.dumper import EsriDumper - - -def get(collector, url, log={}, plugin="arcgis"): - dumper = EsriDumper(url, fields=None) - - content = '{"type":"FeatureCollection","features":[' - sep = "\n" - - for feature in dumper: - content += sep + json.dumps(feature) - sep = ",\n" - - content += "]}" - - content = str.encode(content) - return log, content diff --git a/tests/e2e/debug_pipline/helpers/plugins/sparql.py b/tests/e2e/debug_pipline/helpers/plugins/sparql.py deleted file mode 100644 index cb37693d2..000000000 --- a/tests/e2e/debug_pipline/helpers/plugins/sparql.py +++ /dev/null @@ -1,58 +0,0 @@ -import io -import csv -import logging -from SPARQLWrapper import SPARQLWrapper, JSON - - -url_prefix = {"https://query.wikidata.org/sparql": "http://www.wikidata.org/entity/"} - - -def sparql(endpoint_url, query, prefix=""): - s = SPARQLWrapper( - endpoint_url, - agent="Mozilla/5.0 (Windows NT 5.1; rv:36.0) " "Gecko/20100101 Firefox/36.0", - ) - - s.setQuery(query) - s.setReturnFormat(JSON) - return s.query().convert() - - -def remove_prefix(value, prefix): - if prefix and value.startswith(prefix): - return value[len(prefix) :] - return value - - -def as_csv(data, prefix=""): - fields = [field.replace("_", "-") for field in data["head"]["vars"]] - - content = io.StringIO() - w = csv.DictWriter(content, sorted(fields), extrasaction="ignore") - w.writeheader() - - for o in data["results"]["bindings"]: - row = {} - for field in fields: - f = field.replace("-", "_") - if f in o: - value = o[f]["value"] - row[field] = remove_prefix(value, prefix) - - w.writerow(row) - - return content.getvalue().encode() - - -def get(collector, url, log={}, plugin="sparql"): - url, script = url.split("#") - prefix = url_prefix.get(url, "") - - logging.info("%s %s %s %s" % (plugin, url, script, prefix)) - - query = open(script).read() - - data = sparql(url, query) - content = as_csv(data, prefix) - - return log, content diff --git a/tests/e2e/debug_pipline/helpers/plugins/wfs.py b/tests/e2e/debug_pipline/helpers/plugins/wfs.py deleted file mode 100644 index f119eece9..000000000 --- a/tests/e2e/debug_pipline/helpers/plugins/wfs.py +++ /dev/null @@ -1,28 +0,0 @@ -import io -import re - -from digital_land.phase.convert import detect_encoding - - -# TBD: split this code into a WFS API plugin and a canonicalisation step - - -strip_exps = [ - (re.compile(rb' ?timeStamp="[^"]*"'), rb""), - (re.compile(rb' ?fid="[^"]*"'), rb""), - (re.compile(rb'(gml:id="[^."]+)[^"]*'), rb"\1"), -] - - -def strip_variable_content(content): - for strip_exp, replacement in strip_exps: - content = strip_exp.sub(replacement, content) - return content - - -def get(collector, url, log={}, plugin="wfs"): - log, content = collector.get(url=url, log=log, plugin=plugin) - encoding = detect_encoding(io.BytesIO(content)) - if encoding: - content = strip_variable_content(content) - return log, content diff --git a/tests/e2e/debug_pipline/test_debug_pipeline.py b/tests/e2e/debug_pipline/test_debug_pipeline.py index 05987bdba..7b231e326 100644 --- a/tests/e2e/debug_pipline/test_debug_pipeline.py +++ b/tests/e2e/debug_pipline/test_debug_pipeline.py @@ -1,3 +1,7 @@ +import os +import pathlib +import shutil + import pytest from tests.e2e.debug_pipline.funcs.test_cli_debug_funcs \ @@ -48,6 +52,10 @@ def test_debug_pipeline_start(env_vars): print(f"collection_dir: {env_vars['collection_dir']}") print("========================================") + output_path = pathlib.Path("./collection") + if output_path.exists(): + shutil.rmtree(output_path) + def test_get_endpoints_info(env_vars): collection_dir = env_vars['collection_dir'] From 534fa36f4994e04d0d38ae4bc1d9098719bf4dd5 Mon Sep 17 00:00:00 2001 From: Owen Eveleigh Date: Tue, 4 Apr 2023 18:37:44 +0100 Subject: [PATCH 08/12] update with collection step --- digital_land/cli.py | 8 ++++++- digital_land/commands.py | 52 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 58 insertions(+), 2 deletions(-) diff --git a/digital_land/cli.py b/digital_land/cli.py index 56758a7d7..ad889a0da 100644 --- a/digital_land/cli.py +++ b/digital_land/cli.py @@ -241,5 +241,11 @@ def call_expectations(results_path, sqlite_dataset_path, data_quality_yaml): @collection_dir @click.pass_context def run_debug_pipeline(ctx, organisation, endpoint_path, collection_dir): - debug_pipeline(organisation, ctx.obj["PIPELINE"], endpoint_path, collection_dir) + pipeline = ctx.obj["PIPELINE"] + dataset = ctx.obj["DATASET"] + specification = ctx.obj["SPECIFICATION"] + + debug_pipeline( + organisation, dataset, pipeline, endpoint_path, collection_dir, specification + ) return diff --git a/digital_land/commands.py b/digital_land/commands.py index c9b891160..6d34995ea 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -399,7 +399,9 @@ def default_output_path(command, input_path): return f"{directory}{command}/{resource_from_path(input_path)}.csv" -def debug_pipeline(org, pipeline, endpoint_path, collection_dir): +def debug_pipeline( + org, dataset, pipeline, endpoint_path, collection_dir, specification +): print(f"running process for organisation:{org} and pipeline: {pipeline.name}") # identify relevant sources and endpoints # read in relevant end points @@ -442,7 +444,55 @@ def debug_pipeline(org, pipeline, endpoint_path, collection_dir): ) # collection step, this will need to be a bit different + # remove previously created log.csv and resouorce.csv files + try: + os.remove(Path(collection_dir) / "log.csv") + os.remove(Path(collection_dir) / "resource.csv") + # seems unclear why we accept an error here, what happens if first line errors and second doesn't + except OSError: + pass + + collection = Collection(name=None, directory=collection_dir) + collection.load() + # no direct way to filter which logs/resources are save to the csv only built to do all at once + # we manually filter the entries for logs/resources using the enpoint hashes made above to achieve this + for endpoint in endpoint_hashes: + log_entries = [ + entry for entry in collection.log.entries if entry["endpoint"] == endpoint + ] + resource_entries = [ + entry + for entry in collection.resource.entries + if entry["endpoints"] == endpoint + ] + + collection.log.entries = log_entries + collection.resource.entries = resource_entries + collection.save_csv() + # pipeline step for loop for each of the files + # define additiionial files + # for resource in resource_entries: + # input_path = f"{collection_dir}/resource/{resource}" + # output_path = f"transformed/{dataset}/{resource}.csv" + # update below with correct values + # pipeline_run( + # dataset, + # pipeline, + # specification, + # input_path, + # output_path, + # issue_dir=issue_dir, + # column_field_dir=column_field_dir, + # dataset_resource_dir=dataset_resource_dir, + # organisation_path=organisation_path, + # save_harmonised=save_harmonised, + # endpoints=endpoints, + # organisations=organisations, + # entry_date=entry_date, + # custom_temp_dir=custom_temp_dir, + # ) + # once files are loaded create the dataset # run expectations? this made need to be made so only certain ones are ran as they may be specific to certain datasets # end From a898fc1e6b6d95c9f74d2b2f879679d0ebe79ecf Mon Sep 17 00:00:00 2001 From: morgan-sl <128702799+morgan-sl@users.noreply.github.com> Date: Wed, 5 Apr 2023 16:14:08 +0100 Subject: [PATCH 09/12] Added Collection step and beginnings of the pipeline run to the e2e test_debug_pipeline test suite --- digital_land/collection.py | 11 +- digital_land/commands.py | 2 +- .../funcs/test_cli_debug_funcs.py | 77 ++++++++ .../e2e/debug_pipeline/test_debug_pipeline.py | 167 ++++++++++++++++++ .../e2e/debug_pipline/test_debug_pipeline.py | 88 --------- 5 files changed, 255 insertions(+), 90 deletions(-) rename tests/e2e/{debug_pipline => debug_pipeline}/funcs/test_cli_debug_funcs.py (50%) create mode 100644 tests/e2e/debug_pipeline/test_debug_pipeline.py delete mode 100644 tests/e2e/debug_pipline/test_debug_pipeline.py diff --git a/digital_land/collection.py b/digital_land/collection.py index 1f40d639a..155ca5353 100644 --- a/digital_land/collection.py +++ b/digital_land/collection.py @@ -66,7 +66,16 @@ def save_item(self, item, path): def check_item_path(self, item, path): m = re.match(r"^.*\/([-\d]+)\/(\w+).json", path) - (date, endpoint) = m.groups() + + # This regex method is not compatible with running Tests on Windows + # Added an alternate method that is cross-platform + if m is None: + file_path = Path(path) + file_path_parts = file_path.parts + date = file_path_parts[len(file_path_parts) - 2] + endpoint = file_path.stem + else: + (date, endpoint) = m.groups() if not item.get("entry-date", "").startswith(date): logging.warning( diff --git a/digital_land/commands.py b/digital_land/commands.py index 6d34995ea..6445e6405 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -471,7 +471,7 @@ def debug_pipeline( collection.save_csv() # pipeline step for loop for each of the files - # define additiionial files + # define additional files # for resource in resource_entries: # input_path = f"{collection_dir}/resource/{resource}" # output_path = f"transformed/{dataset}/{resource}.csv" diff --git a/tests/e2e/debug_pipline/funcs/test_cli_debug_funcs.py b/tests/e2e/debug_pipeline/funcs/test_cli_debug_funcs.py similarity index 50% rename from tests/e2e/debug_pipline/funcs/test_cli_debug_funcs.py rename to tests/e2e/debug_pipeline/funcs/test_cli_debug_funcs.py index c2efd5cea..bef34f918 100644 --- a/tests/e2e/debug_pipline/funcs/test_cli_debug_funcs.py +++ b/tests/e2e/debug_pipeline/funcs/test_cli_debug_funcs.py @@ -1,7 +1,9 @@ import csv import os +from pathlib import Path from digital_land.collect import Collector, FetchStatus +from digital_land.collection import Collection def get_endpoints_info(collection_dir): @@ -101,3 +103,78 @@ def download_endpoints(pipeline_name, endpoint_path, endpoint_hashes): return return_value + +def endpoint_collection(collection_dir, endpoint_hashes): + return_value = None + + # remove previously created log.csv and resouorce.csv files + try: + os.remove(Path(collection_dir) / "log.csv") + os.remove(Path(collection_dir) / "resource.csv") + except FileNotFoundError as exc: + print("") + print(">>> INFO: endpoint_collection:", exc) + except OSError as exc: + print("") + print("ERROR: endpoint_collection:", exc) + return_value = None + + collection = Collection(name=None, directory=collection_dir) + collection.load() + + print("") + print("") + print(">>> Collection endpoint stats") + print("-----------------------------") + print(f" Num source entries: {len(collection.source.entries)}") + print(f" Num source records: {len(collection.source.records)}") + print("-----------------------------") + print(f"Num endpoint entries: {len(collection.endpoint.entries)}") + print(f"Num endpoint records: {len(collection.endpoint.records)}") + print("-----------------------------") + print(f"Num resource entries: {len(collection.resource.entries)}") + print(f"Num resource records: {len(collection.resource.records)}") + print("-----------------------------") + print(f" Num log entries: {len(collection.log.entries)}") + print(f" Num log records: {len(collection.log.records)}") + + # there is no direct way to filter which logs/resources are saved to the csv + # this was originally built to do all collections at once + # we now manually filter the entries for logs/resources using the endpoint hashes made above to achieve this + for endpoint in endpoint_hashes: + log_entries = [ + entry for entry in collection.log.entries if entry["endpoint"] == endpoint + ] + resource_entries = [ + entry for entry in collection.resource.entries if entry["endpoints"] == endpoint + ] + + collection.log.entries = log_entries + collection.resource.entries = resource_entries + collection.save_csv() + + print("") + print("") + print(">>> Items added to Collection") + print("-----------------------------") + print(f"Num resource entries: {len(collection.resource.entries)}") + print("-----------------------------") + print(f" Num log entries: {len(collection.log.entries)}") + + print(">>>>", collection.resource.entries) + + return log_entries, resource_entries + + +def run_collection_pipeline(collection_dir, pipeline_name, dataset_name, resource_entries, log_entries): + print("") + + # for each of the files + # define additiionial files + for resource in resource_entries: + input_path = f"{collection_dir}resource/{resource['resource']}" + output_path = f"{collection_dir}transformed/{dataset_name}/{resource['resource']}.csv" + + print(f">>> input_path: {input_path}") + print(f">>> output_path:{output_path}") + diff --git a/tests/e2e/debug_pipeline/test_debug_pipeline.py b/tests/e2e/debug_pipeline/test_debug_pipeline.py new file mode 100644 index 000000000..bf5609f0e --- /dev/null +++ b/tests/e2e/debug_pipeline/test_debug_pipeline.py @@ -0,0 +1,167 @@ +import os +from pathlib import Path +import shutil + +import pytest + +from tests.e2e.debug_pipeline.funcs.test_cli_debug_funcs import \ + get_endpoints_info, get_endpoints, download_endpoints, \ + endpoint_collection, run_collection_pipeline + +def pytest_namespace(): + return { + "endpoint_hashes": [], + "log_entries": None, + "resource_entries": None + } + +# This test is a utility function +# It expands the debug_pipeline function of commands.py, +# reporting on each step, and providing useful info +# to the user. +@pytest.fixture +def env_vars_default(request): + + root_dir = request.config.rootdir + tests_dir = Path(root_dir).resolve().parent.parent + + return { + "organisation": "", + "pipeline_name": "", + "dataset_name": "", + "endpoint_path": f"{tests_dir}/tests/data/collection/", + "collection_dir": f"{tests_dir}/tests/data/collection/", + "collection_dir_out": f"{tests_dir}/tests/e2e/debug_pipeline/collection/", + "specification_dir": f"{tests_dir}/tests/data/specification/" + } + + +@pytest.fixture +def env_vars_national_park(request): + + root_dir = request.config.rootdir + tests_dir = Path(root_dir).resolve() + + return { + "organisation": "government-organisation:D303", + "pipeline_name": "national-park", + "dataset_name": "national-park", + "endpoint_path": f"{tests_dir}/tests/data/collection-national-park/", + "collection_dir": f"{tests_dir}/tests/data/collection-national-park/", + "collection_dir_out": f"{tests_dir}/tests/e2e/debug_pipeline/collection/", + "specification_dir": f"{tests_dir}/tests/data/specification/" + } + + +@pytest.fixture +def env_vars_article_4_direction(request): + + root_dir = request.config.rootdir + tests_dir = Path(root_dir).resolve() + + return { + "organisation": "local-authority-eng:SWK", + "pipeline_name": "article-4-direction-area", + "dataset_name": "article-4-direction", + "endpoint_path": f"{tests_dir}/tests/data/collection-article-4-direction/", + "collection_dir": f"{tests_dir}/tests/data/collection-article-4-direction/", + "collection_dir_out": f"{root_dir}/tests/e2e/debug_pipeline/collection/", + "specification_dir": f"{tests_dir}/tests/data/specification/" + } + + +@pytest.fixture +def env_vars(env_vars_default, env_vars_article_4_direction): + return env_vars_article_4_direction + + +def test_debug_pipeline_start(env_vars): + print("") + print("") + print("========================================") + print("Running debug_pipeline with:") + print(f" organisation: {env_vars['organisation']}") + print(f" pipeline_name: {env_vars['pipeline_name']}") + print(f" dataset_name: {env_vars['dataset_name']}") + print(f" endpoint_path: {env_vars['endpoint_path']}") + print(f" collection_dir: {env_vars['collection_dir']}") + print(f"collection_dir_out: {env_vars['collection_dir_out']}") + print(f" specification_dir: {env_vars['specification_dir']}") + print("========================================") + + output_path = Path("./collection") + if output_path.exists(): + shutil.rmtree(output_path) + + +def test_get_endpoints_info(env_vars): + collection_dir = env_vars['collection_dir'] + + sut_result = get_endpoints_info(collection_dir) + + assert sut_result is True + + +def test_get_endpoints(env_vars): + organisation = env_vars['organisation'] + pipeline_name = env_vars['pipeline_name'] + collection_dir = env_vars['collection_dir'] + + endpoint_hashes = get_endpoints(organisation, pipeline_name, collection_dir) + pytest.endpoint_hashes = endpoint_hashes + + assert endpoint_hashes is not None + assert endpoint_hashes != [] + + +def test_download_endpoints(env_vars): + pipeline_name = env_vars['pipeline_name'] + endpoint_path = env_vars['endpoint_path'] + endpoint_hashes = pytest.endpoint_hashes + + sut_result = download_endpoints(pipeline_name, endpoint_path, endpoint_hashes) + + assert sut_result is True + + +def move_collection_files_to_test_collection_dir(source_csv_path, endpoint_csv_path, collection_dir): + + if os.path.isfile(source_csv_path): + shutil.copy(source_csv_path, collection_dir) + + if os.path.isfile(endpoint_csv_path): + shutil.copy(endpoint_csv_path, collection_dir) + + +def test_endpoint_collection(env_vars): + source_csv_path = f"{env_vars['endpoint_path']}source.csv" + endpoint_csv_path = f"{env_vars['endpoint_path']}endpoint.csv" + collection_dir_out = env_vars['collection_dir_out'] + endpoint_hashes = pytest.endpoint_hashes + + # Move files from source collection dir to test dir, + # to keep original collection clean of test-run artifacts + move_collection_files_to_test_collection_dir(source_csv_path, endpoint_csv_path, collection_dir_out) + + (log_entries, resource_entries) = endpoint_collection(collection_dir_out, endpoint_hashes) + + pytest.log_entries = log_entries + pytest.resource_entries = resource_entries + + assert log_entries is not None + assert resource_entries is not None + + +def test_run_collection_pipeline(env_vars): + pipeline_name = env_vars['pipeline_name'] + collection_dir_out = env_vars['collection_dir_out'] + dataset_name = env_vars['dataset_name'] + endpoint_hashes = pytest.endpoint_hashes + log_entries = pytest.log_entries + resource_entries = pytest.resource_entries + + sut_result = run_collection_pipeline(collection_dir_out, pipeline_name, dataset_name, resource_entries, log_entries) + + +# use article-4-direction-collection +# specification = Specification(specification_dir) diff --git a/tests/e2e/debug_pipline/test_debug_pipeline.py b/tests/e2e/debug_pipline/test_debug_pipeline.py deleted file mode 100644 index 7b231e326..000000000 --- a/tests/e2e/debug_pipline/test_debug_pipeline.py +++ /dev/null @@ -1,88 +0,0 @@ -import os -import pathlib -import shutil - -import pytest - -from tests.e2e.debug_pipline.funcs.test_cli_debug_funcs \ - import get_endpoints_info, get_endpoints, download_endpoints - - -def pytest_namespace(): - return {"endpoint_hashes": []} - - -# This test is a utility function -# It expands the debug_pipeline function of commands.py, -# reporting on each step, and providing useful info -# to the user. -@pytest.fixture -def env_vars_default(): - return { - "organisation": "", - "pipeline": "", - "endpoint_path": "../data/collection/", - "collection_dir": "../data/collection/" - } - - -@pytest.fixture -def env_vars_national_park(): - return { - "organisation": "government-organisation:D303", - "pipeline": "national-park", - "endpoint_path": "../../data/national-park-collection/", - "collection_dir": "../../data/national-park-collection/" - } - - -@pytest.fixture -def env_vars(env_vars_default, env_vars_national_park): - return env_vars_national_park - - -def test_debug_pipeline_start(env_vars): - print("") - print("") - print("========================================") - print("Running debug_pipeline with:") - print(f" organisation: {env_vars['organisation']}") - print(f" pipeline: {env_vars['pipeline']}") - print(f" endpoint_path: {env_vars['endpoint_path']}") - print(f"collection_dir: {env_vars['collection_dir']}") - print("========================================") - - output_path = pathlib.Path("./collection") - if output_path.exists(): - shutil.rmtree(output_path) - - -def test_get_endpoints_info(env_vars): - collection_dir = env_vars['collection_dir'] - - sut_result = get_endpoints_info(collection_dir) - - assert sut_result is True - - -def test_get_endpoints(env_vars): - organisation = env_vars['organisation'] - pipeline = env_vars['pipeline'] - collection_dir = env_vars['collection_dir'] - - endpoint_hashes = get_endpoints(organisation, pipeline, collection_dir) - pytest.endpoint_hashes = endpoint_hashes - - assert endpoint_hashes is not None - assert endpoint_hashes != [] - - -def test_download_endpoints(env_vars): - pipeline = env_vars['pipeline'] - endpoint_path = env_vars['endpoint_path'] - endpoint_hashes = pytest.endpoint_hashes - - sut_result = download_endpoints(pipeline, endpoint_path, endpoint_hashes) - - assert sut_result is True - From 8c21cb35c567e7f8e4f9f33e88a21aef33bc346f Mon Sep 17 00:00:00 2001 From: morgan-sl <128702799+morgan-sl@users.noreply.github.com> Date: Wed, 5 Apr 2023 18:34:30 +0100 Subject: [PATCH 10/12] Added 2 unit tests to check the change made in check_item_path() of collection.py --- tests/unit/test_collection.py | 49 +++++++++++++++++++++++++++++++++-- 1 file changed, 47 insertions(+), 2 deletions(-) diff --git a/tests/unit/test_collection.py b/tests/unit/test_collection.py index 369ce1d50..0a407eca7 100644 --- a/tests/unit/test_collection.py +++ b/tests/unit/test_collection.py @@ -1,5 +1,8 @@ -from digital_land.register import hash_value -from digital_land.collection import Collection +import logging + +from digital_land.register import hash_value, Item +from digital_land.collection import Collection, LogStore +from digital_land.schema import Schema test_collection_dir = "tests/data/collection" @@ -43,3 +46,45 @@ def test_collection(): assert collection.resource_organisations( "ae191fd3dc6a892d82337d9045bf4c1043804a1961131b0a9271280f86b6a8cf" ) == ["organisation:2"] + + +# This method does not return any data and only outputs to the log. +# Test expects to be run with pytest runner +# The sut throws warning messages, so we look for the presence of those +def test_check_item_path_with_good_data(request, caplog): + root_dir = request.config.rootdir + collection_dir = f"{root_dir}/{test_collection_dir}" + caplog.set_level(logging.WARNING) + + item = Item() + item.data["entry-date"] = "2020-02-12T16:55:54.703438" + item.data["endpoint"] = "50335d6703d9bebb683f1b27e02ad17e991ff527bed7e0ab620cd1b6e4b5689e" + regex_friendly_directory = f"{collection_dir}/log/2020-02-12/50335d6703d9bebb683f1b27e02ad17e991ff527bed7e0ab620cd1b6e4b5689e.json" + + store = LogStore(Schema("log")) + store.check_item_path(item, regex_friendly_directory) + + assert len(caplog.records) == 0 + + +# This method does not return any data and only outputs to the log. +# Test expects to be run with pytest runner +# The sut throws warning messages, so we look for the presence of those +def test_check_item_path_with_bad_data(request, caplog): + root_dir = request.config.rootdir + collection_dir = f"{root_dir}/{test_collection_dir}" + caplog.set_level(logging.WARNING) + + item = Item() + # date change to force an error + item.data["entry-date"] = "2021-02-12T16:55:54.703438" + item.data["endpoint"] = "50335d6703d9bebb683f1b27e02ad17e991ff527bed7e0ab620cd1b6e4b5689e" + regex_friendly_directory = f"{collection_dir}/log/2020-02-12/50335d6703d9bebb683f1b27e02ad17e991ff527bed7e0ab620cd1b6e4b5689e.json" + + store = LogStore(Schema("log")) + store.check_item_path(item, regex_friendly_directory) + + assert len(caplog.records) > 0 + + + From f0949c7bc2d1f63c9dac9cc1728b50a777d896c7 Mon Sep 17 00:00:00 2001 From: Owen Eveleigh Date: Fri, 21 Apr 2023 19:47:59 +0100 Subject: [PATCH 11/12] update code --- digital_land/cli.py | 23 +++++++++++++++++++-- digital_land/commands.py | 44 ++++++++++++++++++++++++++++------------ 2 files changed, 52 insertions(+), 15 deletions(-) diff --git a/digital_land/cli.py b/digital_land/cli.py index ad889a0da..ecf82c0a2 100644 --- a/digital_land/cli.py +++ b/digital_land/cli.py @@ -238,14 +238,33 @@ def call_expectations(results_path, sqlite_dataset_path, data_quality_yaml): required=True, ) @endpoint_path +@issue_dir +@column_field_dir +@dataset_resource_dir @collection_dir @click.pass_context -def run_debug_pipeline(ctx, organisation, endpoint_path, collection_dir): +def run_debug_pipeline( + ctx, + organisation, + endpoint_path, + collection_dir, + issue_dir, + column_field_dir, + dataset_resoource_dir, +): pipeline = ctx.obj["PIPELINE"] dataset = ctx.obj["DATASET"] specification = ctx.obj["SPECIFICATION"] debug_pipeline( - organisation, dataset, pipeline, endpoint_path, collection_dir, specification + organisation, + dataset, + pipeline, + endpoint_path, + collection_dir, + specification, + issue_dir, + column_field_dir, + dataset_resoource_dir, ) return diff --git a/digital_land/commands.py b/digital_land/commands.py index 6d34995ea..cb10cc73d 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -400,7 +400,7 @@ def default_output_path(command, input_path): def debug_pipeline( - org, dataset, pipeline, endpoint_path, collection_dir, specification + org, dataset, pipeline, endpoint_path, collection_dir, specification, issue_dir ): print(f"running process for organisation:{org} and pipeline: {pipeline.name}") # identify relevant sources and endpoints @@ -476,23 +476,41 @@ def debug_pipeline( # input_path = f"{collection_dir}/resource/{resource}" # output_path = f"transformed/{dataset}/{resource}.csv" # update below with correct values - # pipeline_run( + # pipeline_run( + # dataset, + # pipeline, + # specification, + # input_path, + # output_path, + # issue_dir=issue_dir, + # column_field_dir=column_field_dir, + # dataset_resource_dir=dataset_resource_dir, + # organisation_path=organisation_path, + # save_harmonised=save_harmonised, + # endpoints=endpoints, + # organisations=organisations, + # entry_date=entry_date, + # custom_temp_dir=custom_temp_dir, + # ) + + # # once files are loaded create the dataset + # def pipeline_run( # dataset, # pipeline, # specification, # input_path, # output_path, - # issue_dir=issue_dir, - # column_field_dir=column_field_dir, - # dataset_resource_dir=dataset_resource_dir, - # organisation_path=organisation_path, - # save_harmonised=save_harmonised, - # endpoints=endpoints, - # organisations=organisations, - # entry_date=entry_date, - # custom_temp_dir=custom_temp_dir, + # collection_dir="./collection", # TBD: remove, replaced by endpoints, organisations and entry_date + # null_path=None, # TBD: remove this + # issue_dir=None, + # organisation_path=None, + # save_harmonised=False, + # column_field_dir=None, + # dataset_resource_dir=None, + # custom_temp_dir=None, # TBD: rename to "tmpdir" + # endpoints=[], + # organisations=[], + # entry_date="", # ) - - # once files are loaded create the dataset # run expectations? this made need to be made so only certain ones are ran as they may be specific to certain datasets # end From 6062db3e895d78ad2be8b29af233fd452613f557 Mon Sep 17 00:00:00 2001 From: Owen Eveleigh Date: Tue, 2 May 2023 13:05:01 +0100 Subject: [PATCH 12/12] add resource history to logging out --- digital_land/cli.py | 4 ++-- digital_land/commands.py | 37 ++++++++++++++++++++++++++++++++++++- 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/digital_land/cli.py b/digital_land/cli.py index ecf82c0a2..172783189 100644 --- a/digital_land/cli.py +++ b/digital_land/cli.py @@ -250,7 +250,7 @@ def run_debug_pipeline( collection_dir, issue_dir, column_field_dir, - dataset_resoource_dir, + dataset_resource_dir, ): pipeline = ctx.obj["PIPELINE"] dataset = ctx.obj["DATASET"] @@ -265,6 +265,6 @@ def run_debug_pipeline( specification, issue_dir, column_field_dir, - dataset_resoource_dir, + dataset_resource_dir, ) return diff --git a/digital_land/commands.py b/digital_land/commands.py index 50c1d8bb3..d32150dfc 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -9,6 +9,7 @@ import geojson import shapely +import urllib.request from digital_land.collect import Collector from digital_land.collection import Collection, resource_path @@ -400,7 +401,15 @@ def default_output_path(command, input_path): def debug_pipeline( - org, dataset, pipeline, endpoint_path, collection_dir, specification, issue_dir + org, + dataset, + pipeline, + endpoint_path, + collection_dir, + specification, + issue_dir, + column_field_dir, + dataset_resource_dir, ): print(f"running process for organisation:{org} and pipeline: {pipeline.name}") # identify relevant sources and endpoints @@ -470,6 +479,32 @@ def debug_pipeline( collection.resource.entries = resource_entries collection.save_csv() + # print resource history this could be useful + print("Resource History") + print("start_date,end_date,resource") + for resource in resource_entries: + print(f"{resource['start-date']},{resource['end-date']},{resource['resource']}") + + # download previously collected resources + collection = specification.dataset[dataset]["collection"] + for resource in resource_entries: + if not os.path.exists(f"collection/resource/{resource['resource']}"): + if not os.path.exists(): + os.makedirs("collection/resource/") + files_url = "https://files.planning.data.gov.uk" + urllib.request.urlretrieve( + os.path.join( + files_url, + collection, + "collection", + "resource", + resource["resource"], + ), + os.path.join("collection/resource", resource), + ) + + # download from files url + # pipeline step for loop for each of the files # define additional files # for resource in resource_entries: