Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions digital_land/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,20 @@ def collection_list_resources_cmd(collection_dir):
default=None,
help="path of the output state file",
)
@click.option(
"--dataset-resource-dir",
type=click.Path(),
default=None,
help="directory of existing dataset resource logs; when provided only resources with changed config, specification, or code version are included",
)
def collection_pipeline_makerules_cmd(
collection_dir,
specification_dir,
pipeline_dir,
resource_dir,
incremental_loading_override,
state_path,
dataset_resource_dir,
):
return collection_pipeline_makerules(
collection_dir,
Expand All @@ -158,6 +165,7 @@ def collection_pipeline_makerules_cmd(
resource_dir,
incremental_loading_override,
state_path=state_path,
dataset_resource_dir=dataset_resource_dir,
)


Expand Down Expand Up @@ -751,13 +759,20 @@ def config_load_cmd(ctx, config_path):
default="state.json",
help="path of the output state file",
)
@click.option(
"--dataset-resource-dir",
type=click.Path(),
default=None,
help="directory of existing dataset resource logs; when provided only resources with changed config, specification, or code version are counted",
)
def save_state_cmd(
specification_dir,
collection_dir,
pipeline_dir,
resource_dir,
incremental_loading_override,
output_path,
dataset_resource_dir,
):
save_state(
specification_dir,
Expand All @@ -766,6 +781,7 @@ def save_state_cmd(
resource_dir,
incremental_loading_override,
output_path,
dataset_resource_dir=dataset_resource_dir,
)


Expand Down
2 changes: 2 additions & 0 deletions digital_land/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ def pipeline_makerules(
resource_dir,
incremental_loading_override,
state_path=None,
dataset_resource_dir=None,
):
pipeline_makerules(
self,
Expand All @@ -453,6 +454,7 @@ def pipeline_makerules(
resource_dir,
incremental_loading_override,
state_path=state_path,
dataset_resource_dir=dataset_resource_dir,
)

def dataset_resource_map(self):
Expand Down
4 changes: 4 additions & 0 deletions digital_land/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def collection_pipeline_makerules(
resource_dir,
incremental_loading_override,
state_path=None,
dataset_resource_dir=None,
):
collection = Collection(name=None, directory=collection_dir)
collection.load()
Expand All @@ -119,6 +120,7 @@ def collection_pipeline_makerules(
resource_dir,
incremental_loading_override,
state_path=state_path,
dataset_resource_dir=dataset_resource_dir,
)


Expand Down Expand Up @@ -1622,13 +1624,15 @@ def save_state(
resource_dir,
incremental_loading_override,
output_path,
dataset_resource_dir=None,
):
state = State.build(
specification_dir=specification_dir,
collection_dir=collection_dir,
pipeline_dir=pipeline_dir,
resource_dir=resource_dir,
incremental_loading_override=incremental_loading_override,
dataset_resource_dir=dataset_resource_dir,
)
state.save(
output_path=output_path,
Expand Down
9 changes: 9 additions & 0 deletions digital_land/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,9 @@ class DatasetResourceLog(Log):
"mime-type",
"internal-path",
"internal-mime-type",
"code-version",
"config-hash",
"specification-hash",
]

def __init__(self, *args, **kwargs):
Expand All @@ -274,6 +277,9 @@ def __init__(self, *args, **kwargs):
self.mime_type = ""
self.internal_path = ""
self.internal_mime_type = ""
self.code_version = ""
self.config_hash = ""
self.specification_hash = ""

def add(self):
self.rows.append(
Expand All @@ -285,6 +291,9 @@ def add(self):
"mime-type": self.mime_type,
"internal-path": self.internal_path,
"internal-mime-type": self.internal_mime_type,
"code-version": self.code_version,
"config-hash": self.config_hash,
"specification-hash": self.specification_hash,
}
)

Expand Down
38 changes: 38 additions & 0 deletions digital_land/makerules.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

from enum import Enum

from digital_land import __version__
from digital_land.utils.hash_utils import hash_directory
from digital_land.utils.dataset_resource_utils import resource_needs_processing


class ProcessingOption(Enum):
PROCESS_ALL = "all"
Expand Down Expand Up @@ -73,8 +77,42 @@ def pipeline_makerules(
resource_dir,
incremental_loading_override,
state_path=None,
dataset_resource_dir=None,
):
dataset_resource = collection.dataset_resource_map()

if dataset_resource_dir is not None:
current_code_version = __version__
try:
current_config_hash = hash_directory(pipeline_dir)
except Exception:
current_config_hash = ""
try:
current_specification_hash = hash_directory(specification_dir)
except Exception:
current_specification_hash = ""

dataset_resource = {
dataset: {
resource
for resource in resources
if resource_needs_processing(
dataset_resource_dir,
dataset,
resource,
current_code_version,
current_config_hash,
current_specification_hash,
)
}
for dataset, resources in dataset_resource.items()
}
# Drop datasets with nothing left to process
dataset_resource = {
dataset: resources
for dataset, resources in dataset_resource.items()
if resources
}
# process = get_processing_option(
# collection,
# specification_dir,
Expand Down
17 changes: 17 additions & 0 deletions digital_land/pipeline/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
from pathlib import Path
from typing import Dict, List, Optional

from digital_land import __version__
from digital_land.utils.hash_utils import hash_directory

from digital_land.organisation import Organisation

from digital_land.phase.map import normalise
Expand Down Expand Up @@ -424,6 +427,20 @@ def init_logs(self, dataset, resource):
self.dataset_resource_log = DatasetResourceLog(
dataset=dataset, resource=resource
)
self.dataset_resource_log.code_version = __version__
try:
self.dataset_resource_log.config_hash = hash_directory(self.path)
except Exception:
logging.warning(f"Could not hash pipeline config directory: {self.path}")
if self.specification:
try:
self.dataset_resource_log.specification_hash = hash_directory(
self.specification.path
)
except Exception:
logging.warning(
f"Could not hash specification directory: {self.specification.path}"
)
self.converted_resource_log = ConvertedResourceLog(
dataset=dataset, resource=resource
)
Expand Down
1 change: 1 addition & 0 deletions digital_land/specification.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

class Specification:
def __init__(self, path="specification"):
self.path = path
self.dataset = {}
self.dataset_names = []
self.schema = {}
Expand Down
94 changes: 79 additions & 15 deletions digital_land/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import hashlib
from datetime import date
from digital_land.collection import Collection
from digital_land import __version__
from digital_land.utils.dataset_resource_utils import resource_needs_processing

# Read the file in 32MB chunks
_chunk_size = 32 * 1024 * 1024
Expand All @@ -20,27 +22,42 @@ def build(
pipeline_dir,
resource_dir,
incremental_loading_override,
dataset_resource_dir=None,
):
"""Build a state object from the current configuration and code"""

# build collection to get counts
collection = Collection(directory=collection_dir)
collection.load(directory=collection_dir)

# Pre-compute once so counts and state dict share the same values
specification_hash = State.get_dir_hash(specification_dir)
pipeline_hash = State.get_dir_hash(pipeline_dir)

return State(
{
"code": State.get_code_hash(),
"specification": State.get_dir_hash(specification_dir),
"specification": specification_hash,
"collection": State.get_dir_hash(
collection_dir, ["log/", "log.csv", "pipeline.mk", "resource/"]
),
"resource": State.get_dir_hash(resource_dir),
"pipeline": State.get_dir_hash(pipeline_dir),
"pipeline": pipeline_hash,
"incremental_loading_override": incremental_loading_override,
"last_updated_date": date.today().isoformat(), # date in YYYY-MM-DD format
"transform_count": State.get_transform_count(collection),
"transform_count": State.get_transform_count(
collection,
dataset_resource_dir=dataset_resource_dir,
current_code_version=__version__,
current_config_hash=pipeline_hash,
current_specification_hash=specification_hash,
),
"transform_count_by_dataset": State.get_transform_count_by_dataset(
collection
collection,
dataset_resource_dir=dataset_resource_dir,
current_code_version=__version__,
current_config_hash=pipeline_hash,
current_specification_hash=specification_hash,
),
}
)
Expand Down Expand Up @@ -92,23 +109,70 @@ def get_code_hash():
commit = repo.revparse_single("HEAD")
return str(commit.id)

def get_transform_count(collection: Collection):
"""Calculate the number of transformations that need to be completed"""
def get_transform_count(
collection: Collection,
dataset_resource_dir=None,
current_code_version=None,
current_config_hash=None,
current_specification_hash=None,
):
"""Calculate the number of transformations that need to be completed.

When dataset_resource_dir is provided, only resources whose existing log
differs from the current code version, config hash, or specification hash
are counted. If None, all resources are counted.
"""
dataset_resource = collection.dataset_resource_map()

# Count total number of transformations (resources across all datasets)
return sum(len(resources) for resources in dataset_resource.values())
if dataset_resource_dir is None:
return sum(len(resources) for resources in dataset_resource.values())

return sum(
1
for dataset, resources in dataset_resource.items()
for resource in resources
if resource_needs_processing(
dataset_resource_dir,
dataset,
resource,
current_code_version,
current_config_hash,
current_specification_hash,
)
)

def get_transform_count_by_dataset(collection: Collection):
"""Calculate the number of transformations that need to be completed"""
def get_transform_count_by_dataset(
collection: Collection,
dataset_resource_dir=None,
current_code_version=None,
current_config_hash=None,
current_specification_hash=None,
):
"""Calculate the number of transformations needed per dataset.

When dataset_resource_dir is provided, only resources whose existing log
differs from the current code version, config hash, or specification hash
are counted. If None, all resources are counted.
"""
dataset_resource = collection.dataset_resource_map()
tranform_count_by_dataset = {}
transform_count_by_dataset = {}
for dataset, resources in dataset_resource.items():
tranform_count_by_dataset[dataset] = len(resources)

# Count total number of transformations (resources across all datasets)
return tranform_count_by_dataset
if dataset_resource_dir is None:
transform_count_by_dataset[dataset] = len(resources)
else:
transform_count_by_dataset[dataset] = sum(
1
for resource in resources
if resource_needs_processing(
dataset_resource_dir,
dataset,
resource,
current_code_version,
current_config_hash,
current_specification_hash,
)
)
return transform_count_by_dataset


def compare_state(
Expand Down
Loading