From 5fa5a2597ca8f9e1de2b050f5f3def0f776276ed Mon Sep 17 00:00:00 2001 From: eveleighoj <35256612+eveleighoj@users.noreply.github.com> Date: Mon, 23 Feb 2026 18:45:42 +0000 Subject: [PATCH 1/2] update dataset_resource log to include spec, conffig and code data to see if they need reprocessing --- digital_land/log.py | 9 +++++ digital_land/pipeline/main.py | 17 ++++++++ digital_land/specification.py | 1 + digital_land/utils/hash_utils.py | 47 ++++++++++++++++++++++ tests/integration/test_hash_utils.py | 59 ++++++++++++++++++++++++++++ tests/unit/test_log.py | 26 +++++++++++- tests/unit/test_pipeline.py | 53 +++++++++++++++++++++++++ 7 files changed, 211 insertions(+), 1 deletion(-) create mode 100644 digital_land/utils/hash_utils.py create mode 100644 tests/integration/test_hash_utils.py diff --git a/digital_land/log.py b/digital_land/log.py index 918828168..85a92e3ac 100644 --- a/digital_land/log.py +++ b/digital_land/log.py @@ -265,6 +265,9 @@ class DatasetResourceLog(Log): "mime-type", "internal-path", "internal-mime-type", + "code-version", + "config-hash", + "specification-hash", ] def __init__(self, *args, **kwargs): @@ -274,6 +277,9 @@ def __init__(self, *args, **kwargs): self.mime_type = "" self.internal_path = "" self.internal_mime_type = "" + self.code_version = "" + self.config_hash = "" + self.specification_hash = "" def add(self): self.rows.append( @@ -285,6 +291,9 @@ def add(self): "mime-type": self.mime_type, "internal-path": self.internal_path, "internal-mime-type": self.internal_mime_type, + "code-version": self.code_version, + "config-hash": self.config_hash, + "specification-hash": self.specification_hash, } ) diff --git a/digital_land/pipeline/main.py b/digital_land/pipeline/main.py index 01a46ddab..4c15cc3a8 100644 --- a/digital_land/pipeline/main.py +++ b/digital_land/pipeline/main.py @@ -7,6 +7,9 @@ from pathlib import Path from typing import Dict, List, Optional +from digital_land import __version__ +from digital_land.utils.hash_utils import hash_directory + from digital_land.organisation import Organisation from digital_land.phase.map import normalise @@ -424,6 +427,20 @@ def init_logs(self, dataset, resource): self.dataset_resource_log = DatasetResourceLog( dataset=dataset, resource=resource ) + self.dataset_resource_log.code_version = __version__ + try: + self.dataset_resource_log.config_hash = hash_directory(self.path) + except Exception: + logging.warning(f"Could not hash pipeline config directory: {self.path}") + if self.specification: + try: + self.dataset_resource_log.specification_hash = hash_directory( + self.specification.path + ) + except Exception: + logging.warning( + f"Could not hash specification directory: {self.specification.path}" + ) self.converted_resource_log = ConvertedResourceLog( dataset=dataset, resource=resource ) diff --git a/digital_land/specification.py b/digital_land/specification.py index 53121710f..fdffa09e5 100644 --- a/digital_land/specification.py +++ b/digital_land/specification.py @@ -28,6 +28,7 @@ class Specification: def __init__(self, path="specification"): + self.path = path self.dataset = {} self.dataset_names = [] self.schema = {} diff --git a/digital_land/utils/hash_utils.py b/digital_land/utils/hash_utils.py new file mode 100644 index 000000000..e47ca85d8 --- /dev/null +++ b/digital_land/utils/hash_utils.py @@ -0,0 +1,47 @@ +import os +import hashlib + +# Read files in 32MB chunks +_chunk_size = 32 * 1024 * 1024 + + +def hash_directory(dir, exclude=[]): + """Returns a SHA1 hex digest of the contents of a directory. + + Files are sorted before hashing so the result is stable regardless + of filesystem ordering. File names are included in the hash so + renames are detected. + + Args: + dir: Path to the directory to hash. + exclude: List of path prefixes (relative to dir) to exclude. + + Raises: + RuntimeError: If dir does not exist or is not a directory. + """ + if not os.path.isdir(dir): + raise RuntimeError( + f"Can't hash {dir} as it doesn't exist or is not a directory" + ) + + hash = hashlib.sha1() + all_files = [] + for root, _, files in os.walk(dir): + rel = os.path.relpath(root, dir) + if rel == ".": + rel = "" + rel_files = [os.path.join(rel, file) for file in files] + all_files += [f for f in rel_files if not f.startswith(tuple(exclude))] + + all_files.sort() + + for file in all_files: + hash.update(file.encode() + b"\n") + with open(os.path.join(dir, file), "rb") as f: + while True: + data = f.read(_chunk_size) + if not data: + break + hash.update(data) + + return hash.digest().hex() diff --git a/tests/integration/test_hash_utils.py b/tests/integration/test_hash_utils.py new file mode 100644 index 000000000..d76a88a08 --- /dev/null +++ b/tests/integration/test_hash_utils.py @@ -0,0 +1,59 @@ +import pytest +from digital_land.utils.hash_utils import hash_directory + + +def test_hash_directory_returns_hex_string(tmp_path): + (tmp_path / "file.txt").write_text("hello") + result = hash_directory(str(tmp_path)) + assert isinstance(result, str) + assert len(result) == 40 # SHA1 produces a 40-character hex digest + + +def test_hash_directory_is_stable(tmp_path): + (tmp_path / "file.txt").write_text("hello") + assert hash_directory(str(tmp_path)) == hash_directory(str(tmp_path)) + + +def test_hash_directory_changes_on_content_change(tmp_path): + f = tmp_path / "file.txt" + f.write_text("hello") + hash_before = hash_directory(str(tmp_path)) + f.write_text("world") + assert hash_directory(str(tmp_path)) != hash_before + + +def test_hash_directory_changes_on_new_file(tmp_path): + (tmp_path / "file.txt").write_text("hello") + hash_before = hash_directory(str(tmp_path)) + (tmp_path / "new_file.txt").write_text("new") + assert hash_directory(str(tmp_path)) != hash_before + + +def test_hash_directory_changes_on_rename(tmp_path): + f = tmp_path / "file.txt" + f.write_text("hello") + hash_before = hash_directory(str(tmp_path)) + f.rename(tmp_path / "renamed.txt") + assert hash_directory(str(tmp_path)) != hash_before + + +def test_hash_directory_raises_for_nonexistent_dir(): + with pytest.raises(RuntimeError): + hash_directory("/nonexistent/path/that/does/not/exist") + + +def test_hash_directory_exclude_omits_matching_files(tmp_path): + (tmp_path / "include.txt").write_text("included") + (tmp_path / "exclude.txt").write_text("excluded") + hash_all = hash_directory(str(tmp_path)) + hash_excluded = hash_directory(str(tmp_path), exclude=["exclude"]) + assert hash_all != hash_excluded + + +def test_hash_directory_exclude_is_stable_when_excluded_file_changes(tmp_path): + (tmp_path / "include.txt").write_text("included") + excluded = tmp_path / "exclude.txt" + excluded.write_text("original") + hash_before = hash_directory(str(tmp_path), exclude=["exclude"]) + excluded.write_text("changed") + assert hash_directory(str(tmp_path), exclude=["exclude"]) == hash_before diff --git a/tests/unit/test_log.py b/tests/unit/test_log.py index b9b9cfaed..61bcf974c 100644 --- a/tests/unit/test_log.py +++ b/tests/unit/test_log.py @@ -1,5 +1,5 @@ import pytest -from digital_land.log import IssueLog, OperationalIssueLog +from digital_land.log import DatasetResourceLog, IssueLog, OperationalIssueLog from unittest.mock import patch, mock_open import pandas as pd @@ -105,3 +105,27 @@ def test_IssueLog_entity_map(): assert issue_log.rows[1]["entity"] == "100002" assert issue_log.rows[2]["entity"] == "100100" assert issue_log.rows[3]["entity"] is None + + +def test_dataset_resource_log_new_fields_in_fieldnames(): + assert "code-version" in DatasetResourceLog.fieldnames + assert "config-hash" in DatasetResourceLog.fieldnames + assert "specification-hash" in DatasetResourceLog.fieldnames + + +def test_dataset_resource_log_new_fields_default_empty(): + log = DatasetResourceLog(dataset="test-dataset", resource="test-resource") + assert log.code_version == "" + assert log.config_hash == "" + assert log.specification_hash == "" + + +def test_dataset_resource_log_new_fields_included_in_add(): + log = DatasetResourceLog(dataset="test-dataset", resource="test-resource") + log.code_version = "1.2.3" + log.config_hash = "abc123" + log.specification_hash = "def456" + log.add() + assert log.rows[0]["code-version"] == "1.2.3" + assert log.rows[0]["config-hash"] == "abc123" + assert log.rows[0]["specification-hash"] == "def456" diff --git a/tests/unit/test_pipeline.py b/tests/unit/test_pipeline.py index 169477b74..9d57c6abb 100755 --- a/tests/unit/test_pipeline.py +++ b/tests/unit/test_pipeline.py @@ -413,6 +413,59 @@ def test_return_only_default_filter(self, pipeline): filters = pipeline.filters() assert filters["field2"] == "default_pattern" + def test_init_logs_sets_code_version(self, mocker): + from digital_land import __version__ + + mocker.patch("digital_land.pipeline.Pipeline.file_reader", lambda self, f: []) + mocker.patch("digital_land.pipeline.main.hash_directory", return_value="abc123") + + p = Pipeline("any_path", "test-dataset") + p.specification = None + p.init_logs("test-dataset", "test-resource") + + assert p.dataset_resource_log.code_version == __version__ + + def test_init_logs_sets_config_hash(self, mocker): + mocker.patch("digital_land.pipeline.Pipeline.file_reader", lambda self, f: []) + mocker.patch( + "digital_land.pipeline.main.hash_directory", + return_value="config-hash-value", + ) + + p = Pipeline("any_path", "test-dataset") + p.specification = None + p.init_logs("test-dataset", "test-resource") + + assert p.dataset_resource_log.config_hash == "config-hash-value" + + def test_init_logs_sets_specification_hash(self, mocker): + mocker.patch("digital_land.pipeline.Pipeline.file_reader", lambda self, f: []) + mocker.patch( + "digital_land.pipeline.main.hash_directory", return_value="spec-hash-value" + ) + + spec = mocker.MagicMock() + spec.path = "/some/spec/path" + + p = Pipeline("any_path", "test-dataset") + p.specification = spec + p.init_logs("test-dataset", "test-resource") + + assert p.dataset_resource_log.specification_hash == "spec-hash-value" + + def test_init_logs_handles_hash_error_gracefully(self, mocker): + mocker.patch("digital_land.pipeline.Pipeline.file_reader", lambda self, f: []) + mocker.patch( + "digital_land.pipeline.main.hash_directory", + side_effect=RuntimeError("dir not found"), + ) + + p = Pipeline("any_path", "test-dataset") + p.specification = None + p.init_logs("test-dataset", "test-resource") # should not raise + + assert p.dataset_resource_log.config_hash == "" + if __name__ == "__main__": pytest.main() From 3203fa1c946038e1b19a6246ca7b32affbf0dc1f Mon Sep 17 00:00:00 2001 From: eveleighoj <35256612+eveleighoj@users.noreply.github.com> Date: Tue, 24 Feb 2026 11:55:05 +0000 Subject: [PATCH 2/2] add code to use dataset-resource directory for processing --- digital_land/cli.py | 16 ++++ digital_land/collection.py | 2 + digital_land/commands.py | 4 + digital_land/makerules.py | 38 ++++++++ digital_land/state.py | 94 ++++++++++++++++---- digital_land/utils/dataset_resource_utils.py | 49 ++++++++++ 6 files changed, 188 insertions(+), 15 deletions(-) create mode 100644 digital_land/utils/dataset_resource_utils.py diff --git a/digital_land/cli.py b/digital_land/cli.py index 2ed6e11b4..847625a87 100644 --- a/digital_land/cli.py +++ b/digital_land/cli.py @@ -143,6 +143,12 @@ def collection_list_resources_cmd(collection_dir): default=None, help="path of the output state file", ) +@click.option( + "--dataset-resource-dir", + type=click.Path(), + default=None, + help="directory of existing dataset resource logs; when provided only resources with changed config, specification, or code version are included", +) def collection_pipeline_makerules_cmd( collection_dir, specification_dir, @@ -150,6 +156,7 @@ def collection_pipeline_makerules_cmd( resource_dir, incremental_loading_override, state_path, + dataset_resource_dir, ): return collection_pipeline_makerules( collection_dir, @@ -158,6 +165,7 @@ def collection_pipeline_makerules_cmd( resource_dir, incremental_loading_override, state_path=state_path, + dataset_resource_dir=dataset_resource_dir, ) @@ -751,6 +759,12 @@ def config_load_cmd(ctx, config_path): default="state.json", help="path of the output state file", ) +@click.option( + "--dataset-resource-dir", + type=click.Path(), + default=None, + help="directory of existing dataset resource logs; when provided only resources with changed config, specification, or code version are counted", +) def save_state_cmd( specification_dir, collection_dir, @@ -758,6 +772,7 @@ def save_state_cmd( resource_dir, incremental_loading_override, output_path, + dataset_resource_dir, ): save_state( specification_dir, @@ -766,6 +781,7 @@ def save_state_cmd( resource_dir, incremental_loading_override, output_path, + dataset_resource_dir=dataset_resource_dir, ) diff --git a/digital_land/collection.py b/digital_land/collection.py index bf5b769c3..377ab0466 100644 --- a/digital_land/collection.py +++ b/digital_land/collection.py @@ -445,6 +445,7 @@ def pipeline_makerules( resource_dir, incremental_loading_override, state_path=None, + dataset_resource_dir=None, ): pipeline_makerules( self, @@ -453,6 +454,7 @@ def pipeline_makerules( resource_dir, incremental_loading_override, state_path=state_path, + dataset_resource_dir=dataset_resource_dir, ) def dataset_resource_map(self): diff --git a/digital_land/commands.py b/digital_land/commands.py index 2527143ec..37166354a 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -110,6 +110,7 @@ def collection_pipeline_makerules( resource_dir, incremental_loading_override, state_path=None, + dataset_resource_dir=None, ): collection = Collection(name=None, directory=collection_dir) collection.load() @@ -119,6 +120,7 @@ def collection_pipeline_makerules( resource_dir, incremental_loading_override, state_path=state_path, + dataset_resource_dir=dataset_resource_dir, ) @@ -1622,6 +1624,7 @@ def save_state( resource_dir, incremental_loading_override, output_path, + dataset_resource_dir=None, ): state = State.build( specification_dir=specification_dir, @@ -1629,6 +1632,7 @@ def save_state( pipeline_dir=pipeline_dir, resource_dir=resource_dir, incremental_loading_override=incremental_loading_override, + dataset_resource_dir=dataset_resource_dir, ) state.save( output_path=output_path, diff --git a/digital_land/makerules.py b/digital_land/makerules.py index 1f76d1ed8..2fba14bad 100644 --- a/digital_land/makerules.py +++ b/digital_land/makerules.py @@ -8,6 +8,10 @@ from enum import Enum +from digital_land import __version__ +from digital_land.utils.hash_utils import hash_directory +from digital_land.utils.dataset_resource_utils import resource_needs_processing + class ProcessingOption(Enum): PROCESS_ALL = "all" @@ -73,8 +77,42 @@ def pipeline_makerules( resource_dir, incremental_loading_override, state_path=None, + dataset_resource_dir=None, ): dataset_resource = collection.dataset_resource_map() + + if dataset_resource_dir is not None: + current_code_version = __version__ + try: + current_config_hash = hash_directory(pipeline_dir) + except Exception: + current_config_hash = "" + try: + current_specification_hash = hash_directory(specification_dir) + except Exception: + current_specification_hash = "" + + dataset_resource = { + dataset: { + resource + for resource in resources + if resource_needs_processing( + dataset_resource_dir, + dataset, + resource, + current_code_version, + current_config_hash, + current_specification_hash, + ) + } + for dataset, resources in dataset_resource.items() + } + # Drop datasets with nothing left to process + dataset_resource = { + dataset: resources + for dataset, resources in dataset_resource.items() + if resources + } # process = get_processing_option( # collection, # specification_dir, diff --git a/digital_land/state.py b/digital_land/state.py index 50227b9c5..ebdc0fed5 100644 --- a/digital_land/state.py +++ b/digital_land/state.py @@ -4,6 +4,8 @@ import hashlib from datetime import date from digital_land.collection import Collection +from digital_land import __version__ +from digital_land.utils.dataset_resource_utils import resource_needs_processing # Read the file in 32MB chunks _chunk_size = 32 * 1024 * 1024 @@ -20,6 +22,7 @@ def build( pipeline_dir, resource_dir, incremental_loading_override, + dataset_resource_dir=None, ): """Build a state object from the current configuration and code""" @@ -27,20 +30,34 @@ def build( collection = Collection(directory=collection_dir) collection.load(directory=collection_dir) + # Pre-compute once so counts and state dict share the same values + specification_hash = State.get_dir_hash(specification_dir) + pipeline_hash = State.get_dir_hash(pipeline_dir) + return State( { "code": State.get_code_hash(), - "specification": State.get_dir_hash(specification_dir), + "specification": specification_hash, "collection": State.get_dir_hash( collection_dir, ["log/", "log.csv", "pipeline.mk", "resource/"] ), "resource": State.get_dir_hash(resource_dir), - "pipeline": State.get_dir_hash(pipeline_dir), + "pipeline": pipeline_hash, "incremental_loading_override": incremental_loading_override, "last_updated_date": date.today().isoformat(), # date in YYYY-MM-DD format - "transform_count": State.get_transform_count(collection), + "transform_count": State.get_transform_count( + collection, + dataset_resource_dir=dataset_resource_dir, + current_code_version=__version__, + current_config_hash=pipeline_hash, + current_specification_hash=specification_hash, + ), "transform_count_by_dataset": State.get_transform_count_by_dataset( - collection + collection, + dataset_resource_dir=dataset_resource_dir, + current_code_version=__version__, + current_config_hash=pipeline_hash, + current_specification_hash=specification_hash, ), } ) @@ -92,23 +109,70 @@ def get_code_hash(): commit = repo.revparse_single("HEAD") return str(commit.id) - def get_transform_count(collection: Collection): - """Calculate the number of transformations that need to be completed""" + def get_transform_count( + collection: Collection, + dataset_resource_dir=None, + current_code_version=None, + current_config_hash=None, + current_specification_hash=None, + ): + """Calculate the number of transformations that need to be completed. + + When dataset_resource_dir is provided, only resources whose existing log + differs from the current code version, config hash, or specification hash + are counted. If None, all resources are counted. + """ dataset_resource = collection.dataset_resource_map() - # Count total number of transformations (resources across all datasets) - return sum(len(resources) for resources in dataset_resource.values()) + if dataset_resource_dir is None: + return sum(len(resources) for resources in dataset_resource.values()) + + return sum( + 1 + for dataset, resources in dataset_resource.items() + for resource in resources + if resource_needs_processing( + dataset_resource_dir, + dataset, + resource, + current_code_version, + current_config_hash, + current_specification_hash, + ) + ) - def get_transform_count_by_dataset(collection: Collection): - """Calculate the number of transformations that need to be completed""" + def get_transform_count_by_dataset( + collection: Collection, + dataset_resource_dir=None, + current_code_version=None, + current_config_hash=None, + current_specification_hash=None, + ): + """Calculate the number of transformations needed per dataset. + When dataset_resource_dir is provided, only resources whose existing log + differs from the current code version, config hash, or specification hash + are counted. If None, all resources are counted. + """ dataset_resource = collection.dataset_resource_map() - tranform_count_by_dataset = {} + transform_count_by_dataset = {} for dataset, resources in dataset_resource.items(): - tranform_count_by_dataset[dataset] = len(resources) - - # Count total number of transformations (resources across all datasets) - return tranform_count_by_dataset + if dataset_resource_dir is None: + transform_count_by_dataset[dataset] = len(resources) + else: + transform_count_by_dataset[dataset] = sum( + 1 + for resource in resources + if resource_needs_processing( + dataset_resource_dir, + dataset, + resource, + current_code_version, + current_config_hash, + current_specification_hash, + ) + ) + return transform_count_by_dataset def compare_state( diff --git a/digital_land/utils/dataset_resource_utils.py b/digital_land/utils/dataset_resource_utils.py new file mode 100644 index 000000000..ca7887b24 --- /dev/null +++ b/digital_land/utils/dataset_resource_utils.py @@ -0,0 +1,49 @@ +import csv +import os + + +def read_dataset_resource_log(dataset_resource_dir, dataset, resource): + """Read an existing DatasetResourceLog CSV for a given dataset/resource. + + Returns a dict with code-version, config-hash, and specification-hash, + or None if the file doesn't exist or can't be read. + + Expected path: {dataset_resource_dir}/{dataset}/{resource}.csv + """ + path = os.path.join(dataset_resource_dir, dataset, f"{resource}.csv") + if not os.path.isfile(path): + return None + try: + with open(path, newline="") as f: + reader = csv.DictReader(f) + for row in reader: + return { + "code-version": row.get("code-version", ""), + "config-hash": row.get("config-hash", ""), + "specification-hash": row.get("specification-hash", ""), + } + except Exception: + return None + return None + + +def resource_needs_processing( + dataset_resource_dir, + dataset, + resource, + current_code_version, + current_config_hash, + current_specification_hash, +): + """Check whether a resource needs processing by comparing its log against current state. + + Returns True if there is no existing log or if any of the three values differ. + """ + log = read_dataset_resource_log(dataset_resource_dir, dataset, resource) + if log is None: + return True + return ( + log["code-version"] != current_code_version + or log["config-hash"] != current_config_hash + or log["specification-hash"] != current_specification_hash + )