Skip to content
44 changes: 44 additions & 0 deletions digital_land/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
pipeline_run,
collection_add_source,
expectations,
debug_pipeline,
)

from digital_land.command_arguments import (
Expand All @@ -26,6 +27,7 @@
issue_dir,
dataset_resource_dir,
column_field_dir,
endpoint_path,
)


Expand Down Expand Up @@ -224,3 +226,45 @@ def collection_add_source_cmd(ctx, collection, endpoint_url, collection_dir):
@click.option("--data-quality-yaml", help="path to expectations yaml", required=True)
def call_expectations(results_path, sqlite_dataset_path, data_quality_yaml):
return expectations(results_path, sqlite_dataset_path, data_quality_yaml)


@cli.command(
"debug-contribution",
short_help="runs an org through the entire process including collector,pipeline and dataset",
)
@click.option(
"--organisation",
help="string representing the organisation, must match organisation name in source.csv",
required=True,
)
@endpoint_path
@issue_dir
@column_field_dir
@dataset_resource_dir
@collection_dir
@click.pass_context
def run_debug_pipeline(
ctx,
organisation,
endpoint_path,
collection_dir,
issue_dir,
column_field_dir,
dataset_resource_dir,
):
pipeline = ctx.obj["PIPELINE"]
dataset = ctx.obj["DATASET"]
specification = ctx.obj["SPECIFICATION"]

debug_pipeline(
organisation,
dataset,
pipeline,
endpoint_path,
collection_dir,
specification,
issue_dir,
column_field_dir,
dataset_resource_dir,
)
return
1 change: 1 addition & 0 deletions digital_land/collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class Collector:
log_dir = "collection/log/"

def __init__(self, dataset="", collection_dir=None):
# TODO do we need to defin this here?
self.dataset = dataset
if collection_dir:
self.resource_dir = collection_dir / "resource/"
Expand Down
11 changes: 10 additions & 1 deletion digital_land/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,16 @@ def save_item(self, item, path):

def check_item_path(self, item, path):
m = re.match(r"^.*\/([-\d]+)\/(\w+).json", path)
(date, endpoint) = m.groups()

# This regex method is not compatible with running Tests on Windows
# Added an alternate method that is cross-platform
if m is None:
file_path = Path(path)
file_path_parts = file_path.parts
date = file_path_parts[len(file_path_parts) - 2]
endpoint = file_path.stem
else:
(date, endpoint) = m.groups()

if not item.get("entry-date", "").startswith(date):
logging.warning(
Expand Down
152 changes: 152 additions & 0 deletions digital_land/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import geojson
import shapely
import urllib.request

from digital_land.collect import Collector
from digital_land.collection import Collection, resource_path
Expand Down Expand Up @@ -397,3 +398,154 @@ def resource_from_path(path):
def default_output_path(command, input_path):
directory = "" if command in ["harmonised", "transformed"] else "var/"
return f"{directory}{command}/{resource_from_path(input_path)}.csv"


def debug_pipeline(
org,
dataset,
pipeline,
endpoint_path,
collection_dir,
specification,
issue_dir,
column_field_dir,
dataset_resource_dir,
):
print(f"running process for organisation:{org} and pipeline: {pipeline.name}")
# identify relevant sources and endpoints
# read in relevant end points
# organisation info is in the source csv, therefore to get the right enpoints we need to identify all of the relevant ones from source .csi
endpoint_hashes = []
for row in csv.DictReader(
open(os.path.join(collection_dir, "source.csv"), newline="")
):
if row["organisation"] == org and row["pipelines"] == pipeline.name:
endpoint_hash = row["endpoint"]
endpoint_hashes.append(endpoint_hash)

# check length of hashes to see if there are relevant endpoints
if len(endpoint_hashes) < 0:
print("no enpoints need collecting")
else:
text = ",".join(endpoint_hashes)
print(f"endpoints found {text}")

# create a collector
collector = Collector(pipeline.name, collection_dir=None)
# download endpoints
for row in csv.DictReader(open(endpoint_path, newline="")):
endpoint = row["endpoint"]
if endpoint in endpoint_hashes:
print(f"downloading {endpoint}")
endpoint = row["endpoint"]
url = row["endpoint-url"]
plugin = row.get("plugin", "")

# skip manually added files ..
if not url:
continue

collector.fetch(
url,
endpoint=endpoint,
end_date=row.get("end-date", ""),
plugin=plugin,
)

# collection step, this will need to be a bit different
# remove previously created log.csv and resouorce.csv files
try:
os.remove(Path(collection_dir) / "log.csv")
os.remove(Path(collection_dir) / "resource.csv")
# seems unclear why we accept an error here, what happens if first line errors and second doesn't
except OSError:
pass

collection = Collection(name=None, directory=collection_dir)
collection.load()
# no direct way to filter which logs/resources are save to the csv only built to do all at once
# we manually filter the entries for logs/resources using the enpoint hashes made above to achieve this
for endpoint in endpoint_hashes:
log_entries = [
entry for entry in collection.log.entries if entry["endpoint"] == endpoint
]
resource_entries = [
entry
for entry in collection.resource.entries
if entry["endpoints"] == endpoint
]

collection.log.entries = log_entries
collection.resource.entries = resource_entries
collection.save_csv()

# print resource history this could be useful
print("Resource History")
print("start_date,end_date,resource")
for resource in resource_entries:
print(f"{resource['start-date']},{resource['end-date']},{resource['resource']}")

# download previously collected resources
collection = specification.dataset[dataset]["collection"]
for resource in resource_entries:
if not os.path.exists(f"collection/resource/{resource['resource']}"):
if not os.path.exists():
os.makedirs("collection/resource/")
files_url = "https://files.planning.data.gov.uk"
urllib.request.urlretrieve(
os.path.join(
files_url,
collection,
"collection",
"resource",
resource["resource"],
),
os.path.join("collection/resource", resource),
)

# download from files url

# pipeline step for loop for each of the files
# define additional files
# for resource in resource_entries:
# input_path = f"{collection_dir}/resource/{resource}"
# output_path = f"transformed/{dataset}/{resource}.csv"
# update below with correct values
# pipeline_run(
# dataset,
# pipeline,
# specification,
# input_path,
# output_path,
# issue_dir=issue_dir,
# column_field_dir=column_field_dir,
# dataset_resource_dir=dataset_resource_dir,
# organisation_path=organisation_path,
# save_harmonised=save_harmonised,
# endpoints=endpoints,
# organisations=organisations,
# entry_date=entry_date,
# custom_temp_dir=custom_temp_dir,
# )

# # once files are loaded create the dataset
# def pipeline_run(
# dataset,
# pipeline,
# specification,
# input_path,
# output_path,
# collection_dir="./collection", # TBD: remove, replaced by endpoints, organisations and entry_date
# null_path=None, # TBD: remove this
# issue_dir=None,
# organisation_path=None,
# save_harmonised=False,
# column_field_dir=None,
# dataset_resource_dir=None,
# custom_temp_dir=None, # TBD: rename to "tmpdir"
# endpoints=[],
# organisations=[],
# entry_date="",
# )
# run expectations? this made need to be made so only certain ones are ran as they may be specific to certain datasets
# end
Loading