Skip to content

Commit f187086

Browse files
authored
Merge pull request #456 from digital-land/implement-pipeline-phase-manage
2 parents 055e4ae + e783f53 commit f187086

8 files changed

Lines changed: 749 additions & 739 deletions

File tree

digital_land/commands.py

Lines changed: 35 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import duckdb
1818

1919
from digital_land.package.organisation import OrganisationPackage
20-
from digital_land.check import duplicate_reference_check
2120
from digital_land.specification import Specification
2221
from digital_land.collect import Collector
2322
from digital_land.collection import Collection, resource_path
@@ -32,17 +31,13 @@
3231

3332
from digital_land.package.dataset import DatasetPackage
3433
from digital_land.package.dataset_parquet import DatasetParquetPackage
35-
from digital_land.phase.combine import FactCombinePhase
3634
from digital_land.phase.concat import ConcatFieldPhase
3735
from digital_land.phase.convert import ConvertPhase, execute
3836
from digital_land.phase.default import DefaultPhase
3937
from digital_land.phase.dump import DumpPhase
40-
from digital_land.phase.factor import FactorPhase
4138
from digital_land.phase.filter import FilterPhase
4239
from digital_land.phase.harmonise import HarmonisePhase
4340
from digital_land.phase.lookup import (
44-
EntityLookupPhase,
45-
FactLookupPhase,
4641
PrintLookupPhase,
4742
)
4843
from digital_land.phase.map import MapPhase
@@ -51,12 +46,9 @@
5146
from digital_land.phase.organisation import OrganisationPhase
5247
from digital_land.phase.parse import ParsePhase
5348
from digital_land.phase.patch import PatchPhase
54-
from digital_land.phase.priority import PriorityPhase
55-
from digital_land.phase.pivot import PivotPhase
5649
from digital_land.phase.prefix import EntityPrefixPhase
57-
from digital_land.phase.prune import FieldPrunePhase, EntityPrunePhase, FactPrunePhase
58-
from digital_land.phase.reference import EntityReferencePhase, FactReferencePhase
59-
from digital_land.phase.save import SavePhase
50+
from digital_land.phase.prune import FieldPrunePhase
51+
from digital_land.phase.reference import EntityReferencePhase
6052
from digital_land.pipeline import run_pipeline, Lookups, Pipeline
6153
from digital_land.pipeline.process import convert_tranformed_csv_to_pq
6254
from digital_land.schema import Schema
@@ -220,7 +212,6 @@ def pipeline_run(
220212
input_path,
221213
output_path: Path,
222214
collection_dir, # TBD: remove, replaced by endpoints, organisations and entry_date
223-
null_path=None, # TBD: remove this
224215
issue_dir=None,
225216
operational_issue_dir="performance/operational_issue/",
226217
organisation_path=None,
@@ -244,37 +235,18 @@ def pipeline_run(
244235
if resource is None:
245236
resource = resource_from_path(input_path)
246237
dataset = dataset
247-
schema = specification.pipeline[pipeline.name]["schema"]
248-
intermediate_fieldnames = specification.intermediate_fieldnames(pipeline)
249-
issue_log = IssueLog(dataset=dataset, resource=resource)
250-
operational_issue_log = OperationalIssueLog(dataset=dataset, resource=resource)
251-
column_field_log = ColumnFieldLog(dataset=dataset, resource=resource)
252-
dataset_resource_log = DatasetResourceLog(dataset=dataset, resource=resource)
253-
converted_resource_log = ConvertedResourceLog(dataset=dataset, resource=resource)
254-
api = API(specification=specification)
255-
entity_range_min = specification.get_dataset_entity_min(dataset)
256-
entity_range_max = specification.get_dataset_entity_max(dataset)
257238

258-
# load pipeline configuration
259-
skip_patterns = pipeline.skip_patterns(resource, endpoints)
260-
columns = pipeline.columns(resource, endpoints=endpoints)
261-
concats = pipeline.concatenations(resource, endpoints=endpoints)
262-
patches = pipeline.patches(resource=resource, endpoints=endpoints)
263-
lookups = pipeline.lookups(resource=resource)
264-
default_fields = pipeline.default_fields(resource=resource, endpoints=endpoints)
265-
default_values = pipeline.default_values(endpoints=endpoints)
266-
combine_fields = pipeline.combine_fields(endpoints=endpoints)
267-
redirect_lookups = pipeline.redirect_lookups()
268-
269-
# load config db
270-
# TODO get more information from the config
239+
# load config db and pass to Pipeline=> TODO move to pipeline class and use more widely
271240
# TODO in future we need better way of making specification optional for config
272241
if Path(config_path).exists():
273242
config = Config(path=config_path, specification=specification)
274243
else:
275-
logging.error("Config path does not exist")
244+
logging.error("Config path does not exist")
276245
config = None
277246

247+
pipeline.config = config
248+
pipeline.specification = specification
249+
278250
# load organisations
279251
organisation = Organisation(
280252
organisation_path=organisation_path, pipeline_dir=Path(pipeline.path)
@@ -288,116 +260,40 @@ def pipeline_run(
288260
organisations = collection.resource_organisations(resource)
289261
entry_date = collection.resource_start_date(resource)
290262

291-
# Load valid category values
263+
api = API(specification=specification)
292264
valid_category_values = api.get_valid_category_values(dataset, pipeline)
293265

294-
# resource specific default values
295-
if len(organisations) == 1:
296-
default_values["organisation"] = organisations[0]
297-
298-
# need an entry-date for all entries and for facts
299-
# if a default entry-date isn't set through config then use the entry-date passed
300-
# to this function
301-
if entry_date:
302-
if "entry-date" not in default_values:
303-
default_values["entry-date"] = entry_date
304-
305-
# TODO Migrate all of this into a function in the Pipeline function
306-
run_pipeline(
307-
ConvertPhase(
308-
path=input_path,
309-
dataset_resource_log=dataset_resource_log,
310-
converted_resource_log=converted_resource_log,
311-
output_path=converted_path,
312-
),
313-
NormalisePhase(skip_patterns=skip_patterns),
314-
ParsePhase(),
315-
ConcatFieldPhase(concats=concats, log=column_field_log),
316-
FilterPhase(filters=pipeline.filters(resource)),
317-
MapPhase(
318-
fieldnames=intermediate_fieldnames,
319-
columns=columns,
320-
log=column_field_log,
321-
),
322-
FilterPhase(filters=pipeline.filters(resource, endpoints=endpoints)),
323-
PatchPhase(
324-
issues=issue_log,
325-
patches=patches,
326-
),
327-
HarmonisePhase(
328-
field_datatype_map=specification.get_field_datatype_map(),
329-
issues=issue_log,
330-
dataset=dataset,
331-
valid_category_values=valid_category_values,
332-
),
333-
DefaultPhase(
334-
default_fields=default_fields,
335-
default_values=default_values,
336-
issues=issue_log,
337-
),
338-
# TBD: move migrating columns to fields to be immediately after map
339-
# this will simplify harmonisation and remove intermediate_fieldnames
340-
# but effects brownfield-land and other pipelines which operate on columns
341-
MigratePhase(
342-
fields=specification.schema_field[schema],
343-
migrations=pipeline.migrations(),
344-
),
345-
OrganisationPhase(organisation=organisation, issues=issue_log),
346-
FieldPrunePhase(fields=specification.current_fieldnames(schema)),
347-
EntityReferencePhase(
348-
dataset=dataset,
349-
prefix=specification.dataset_prefix(dataset),
350-
issues=issue_log,
351-
),
352-
EntityPrefixPhase(dataset=dataset),
353-
EntityLookupPhase(
354-
lookups=lookups,
355-
redirect_lookups=redirect_lookups,
356-
issue_log=issue_log,
357-
operational_issue_log=operational_issue_log,
358-
entity_range=[entity_range_min, entity_range_max],
359-
),
360-
SavePhase(
361-
default_output_path("harmonised", input_path),
362-
fieldnames=intermediate_fieldnames,
363-
enabled=save_harmonised,
364-
),
365-
EntityPrunePhase(dataset_resource_log=dataset_resource_log),
366-
PriorityPhase(config=config, providers=organisations),
367-
PivotPhase(),
368-
FactCombinePhase(issue_log=issue_log, fields=combine_fields),
369-
FactorPhase(),
370-
FactReferencePhase(
371-
field_typology_map=specification.get_field_typology_map(),
372-
field_prefix_map=specification.get_field_prefix_map(),
373-
),
374-
FactLookupPhase(
375-
lookups=lookups,
376-
redirect_lookups=redirect_lookups,
377-
issue_log=issue_log,
378-
odp_collections=specification.get_odp_collections(),
379-
),
380-
FactPrunePhase(),
381-
SavePhase(
382-
output_path,
383-
fieldnames=specification.factor_fieldnames(),
384-
),
266+
# Transform the resource
267+
issue_log = pipeline.transform(
268+
input_path=input_path,
269+
output_path=output_path,
270+
organisation=organisation,
271+
endpoints=endpoints,
272+
organisations=organisations,
273+
entry_date=entry_date,
274+
resource=resource,
275+
converted_path=converted_path,
276+
harmonised_output_path=default_output_path("harmonised", input_path),
277+
save_harmonised=save_harmonised,
278+
valid_category_values=valid_category_values,
385279
)
386280

387-
# In the FactCombinePhase, when combine_fields has some values, we check for duplicates and combine values.
388-
# If we have done this then we will not call duplicate_reference_check as we have already carried out a
389-
# duplicate check and stop messages appearing in issues about reference values not being unique
390-
if combine_fields == {}:
391-
issue_log = duplicate_reference_check(issues=issue_log, csv_path=output_path)
281+
# Save logs in pipeline
282+
pipeline.save_logs(
283+
issue_path=os.path.join(issue_dir, resource + ".csv"),
284+
operational_issue_path=os.path.join(operational_issue_dir, resource + ".csv"),
285+
column_field_path=os.path.join(column_field_dir, resource + ".csv"),
286+
dataset_resource_path=os.path.join(dataset_resource_dir, resource + ".csv"),
287+
converted_resource_path=os.path.join(converted_resource_dir, resource + ".csv"),
288+
)
392289

393-
issue_log.apply_entity_map()
394-
issue_log.save(os.path.join(issue_dir, resource + ".csv"))
290+
# Parquet seperate save of issue log
395291
issue_log.save_parquet(os.path.join(output_log_dir, "issue/"))
396-
operational_issue_log.save(output_dir=operational_issue_dir)
397-
if column_field_dir:
398-
column_field_log.save(os.path.join(column_field_dir, resource + ".csv"))
399-
dataset_resource_log.save(os.path.join(dataset_resource_dir, resource + ".csv"))
400-
converted_resource_log.save(os.path.join(converted_resource_dir, resource + ".csv"))
292+
293+
# create converted parquet in the var directory
294+
cache_dir = Path(organisation_path).parent
295+
transformed_parquet_dir = cache_dir / "transformed_parquet" / dataset
296+
401297
# create converted parquet in the var director
402298
# TODO test without output_path conversation above to make sure we have a test that would've failed
403299
transformed_parquet_dir = output_path.parent

notebooks/debug_resource_transformation.ipynb

Lines changed: 4 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -141,83 +141,10 @@
141141
},
142142
{
143143
"cell_type": "code",
144-
"execution_count": 92,
144+
"execution_count": null,
145145
"metadata": {},
146-
"outputs": [
147-
{
148-
"name": "stdout",
149-
"output_type": "stream",
150-
"text": [
151-
"resource 1c192f194a6d7cb044006bbe0d7bb7909eed3783eeb8a53026fc15b9fe31a836 for dataset article-4-direction-area transformed to data/debug_resource_transformation/transformed/article-4-direction-area/1c192f194a6d7cb044006bbe0d7bb7909eed3783eeb8a53026fc15b9fe31a836.csv\n"
152-
]
153-
}
154-
],
155-
"source": [
156-
"\n",
157-
"output_path = data_dir / 'transformed' / dataset / f'{resource_hash}.csv'\n",
158-
"output_path.parent.mkdir(parents=True, exist_ok=True)\n",
159-
"converted_path = data_dir / 'converted' / dataset / f'{resource_hash}.csv'\n",
160-
"converted_path.parent.mkdir(parents=True, exist_ok=True)\n",
161-
"\n",
162-
"# create pipeline object\n",
163-
"pipeline = Pipeline(pipeline_dir, dataset)\n",
164-
"\n",
165-
"# create logs\n",
166-
"issue_dir = data_dir / 'issues' / dataset\n",
167-
"issue_dir.mkdir(parents=True, exist_ok=True)\n",
168-
"operational_issue_dir = data_dir / 'performance' / 'operational_issues'\n",
169-
"operational_issue_dir.mkdir(parents=True, exist_ok=True)\n",
170-
"column_field_dir = cache_dir / 'column_field' / dataset\n",
171-
"column_field_dir.mkdir(parents=True, exist_ok=True)\n",
172-
"dataset_resource_dir = cache_dir / 'dataset_resource' / dataset\n",
173-
"dataset_resource_dir.mkdir(parents=True, exist_ok=True)\n",
174-
"converted_resource_dir = cache_dir / 'converted_resource' / dataset\n",
175-
"converted_resource_dir.mkdir(parents=True, exist_ok=True)\n",
176-
"output_log_dir = data_dir / 'log'\n",
177-
"output_log_dir.mkdir(parents=True, exist_ok=True)\n",
178-
"\n",
179-
"# get endpoints from the collection TODO include redirects\n",
180-
"collection = Collection(directory = collection_dir)\n",
181-
"collection.load()\n",
182-
"endpoints = collection.resource_endpoints(resource_hash)\n",
183-
"organisations = collection.resource_organisations(resource_hash)\n",
184-
"entry_date = collection.resource_start_date(resource_hash)\n",
185-
"\n",
186-
"# build config from downloaded files \n",
187-
"config_path = cache_dir / 'config.sqlite3'\n",
188-
"config = Config(path=config_path, specification=spec)\n",
189-
"config.create()\n",
190-
"tables = {key: pipeline.path for key in config.tables.keys()}\n",
191-
"config.load(tables)\n",
192-
"\n",
193-
"pipeline_run(\n",
194-
" dataset=dataset,\n",
195-
" pipeline=pipeline,\n",
196-
" specification=spec,\n",
197-
" input_path=resource_path,\n",
198-
" output_path=output_path,\n",
199-
" collection_dir=collection_dir, # TBD: remove, replaced by endpoints, organisations and entry_date\n",
200-
" null_path=None, # TBD: remove this\n",
201-
" issue_dir=issue_dir,\n",
202-
" operational_issue_dir=operational_issue_dir,\n",
203-
" organisation_path=org_path,\n",
204-
" save_harmonised=False,\n",
205-
" # TBD save all logs in a log directory, this will mean only one path passed in.\n",
206-
" column_field_dir=column_field_dir,\n",
207-
" dataset_resource_dir=dataset_resource_dir,\n",
208-
" converted_resource_dir=converted_resource_dir,\n",
209-
" cache_dir=cache_dir,\n",
210-
" endpoints=endpoints,\n",
211-
" organisations=organisations,\n",
212-
" entry_date=entry_date,\n",
213-
" config_path=config_path,\n",
214-
" resource=resource_hash,\n",
215-
" output_log_dir=output_log_dir,\n",
216-
" converted_path=converted_path,\n",
217-
")\n",
218-
"\n",
219-
"print(f'resource {resource_hash} for dataset {dataset} transformed to {output_path}')"
220-
]
146+
"outputs": [],
147+
"source": "\noutput_path = data_dir / 'transformed' / dataset / f'{resource_hash}.csv'\noutput_path.parent.mkdir(parents=True, exist_ok=True)\nconverted_path = data_dir / 'converted' / dataset / f'{resource_hash}.csv'\nconverted_path.parent.mkdir(parents=True, exist_ok=True)\n\n# create pipeline object\npipeline = Pipeline(pipeline_dir, dataset)\n\n# create logs\nissue_dir = data_dir / 'issues' / dataset\nissue_dir.mkdir(parents=True, exist_ok=True)\noperational_issue_dir = data_dir / 'performance' / 'operational_issues'\noperational_issue_dir.mkdir(parents=True, exist_ok=True)\ncolumn_field_dir = cache_dir / 'column_field' / dataset\ncolumn_field_dir.mkdir(parents=True, exist_ok=True)\ndataset_resource_dir = cache_dir / 'dataset_resource' / dataset\ndataset_resource_dir.mkdir(parents=True, exist_ok=True)\nconverted_resource_dir = cache_dir / 'converted_resource' / dataset\nconverted_resource_dir.mkdir(parents=True, exist_ok=True)\noutput_log_dir = data_dir / 'log'\noutput_log_dir.mkdir(parents=True, exist_ok=True)\n\n# get endpoints from the collection TODO include redirects\ncollection = Collection(directory = collection_dir)\ncollection.load()\nendpoints = collection.resource_endpoints(resource_hash)\norganisations = collection.resource_organisations(resource_hash)\nentry_date = collection.resource_start_date(resource_hash)\n\n# build config from downloaded files \nconfig_path = cache_dir / 'config.sqlite3'\nconfig = Config(path=config_path, specification=spec)\nconfig.create()\ntables = {key: pipeline.path for key in config.tables.keys()}\nconfig.load(tables)\n\npipeline_run(\n dataset=dataset,\n pipeline=pipeline,\n specification=spec,\n input_path=resource_path,\n output_path=output_path,\n collection_dir=collection_dir, # TBD: remove, replaced by endpoints, organisations and entry_date\n issue_dir=issue_dir,\n operational_issue_dir=operational_issue_dir,\n organisation_path=org_path,\n save_harmonised=False,\n # TBD save all logs in a log directory, this will mean only one path passed in.\n column_field_dir=column_field_dir,\n dataset_resource_dir=dataset_resource_dir,\n converted_resource_dir=converted_resource_dir,\n cache_dir=cache_dir,\n endpoints=endpoints,\n organisations=organisations,\n entry_date=entry_date,\n config_path=config_path,\n resource=resource_hash,\n output_log_dir=output_log_dir,\n converted_path=converted_path,\n)\n\nprint(f'resource {resource_hash} for dataset {dataset} transformed to {output_path}')"
221148
},
222149
{
223150
"cell_type": "code",
@@ -248,4 +175,4 @@
248175
},
249176
"nbformat": 4,
250177
"nbformat_minor": 2
251-
}
178+
}

0 commit comments

Comments
 (0)