From d4987e6fa684e9ec1d2d20a64d955764c31ae32b Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Wed, 22 Apr 2026 15:17:45 +0200 Subject: [PATCH] Add invalidate_revision for data quality-driven refetch DatasetStore.invalidate_revision(dataset, reason) marks the current revision as VALIDATION_FAILED and resets last_modified_at. On the next run, should_refetch returns True for VALIDATION_FAILED revisions, causing ingestify to refetch and create a new revision. Emits RevisionInvalidated event for audit trail / alerting. --- ingestify/application/dataset_store.py | 18 +++- .../models/dataset/dataset_repository.py | 6 ++ ingestify/domain/models/dataset/events.py | 6 ++ ingestify/domain/models/fetch_policy.py | 3 + .../store/dataset/sqlalchemy/repository.py | 25 +++++ .../tests/test_refetch_validation_failed.py | 101 ++++++++++++++++++ 6 files changed, 158 insertions(+), 1 deletion(-) create mode 100644 ingestify/tests/test_refetch_validation_failed.py diff --git a/ingestify/application/dataset_store.py b/ingestify/application/dataset_store.py index 03ebade..738cfb5 100644 --- a/ingestify/application/dataset_store.py +++ b/ingestify/application/dataset_store.py @@ -20,7 +20,11 @@ ) from ingestify.domain.models.dataset.dataset import DatasetState -from ingestify.domain.models.dataset.events import RevisionAdded, MetadataUpdated +from ingestify.domain.models.dataset.events import ( + RevisionAdded, + MetadataUpdated, + RevisionInvalidated, +) from ingestify.domain.models.dataset.file import NotModifiedFile from ingestify.domain.models.dataset.file_collection import FileCollection from ingestify.domain.models.dataset.revision import RevisionSource @@ -491,6 +495,18 @@ def update_dataset( return revision + def invalidate_revision(self, dataset: Dataset, reason: str = ""): + """Mark the current revision as VALIDATION_FAILED and reset + last_modified_at so the dataset is refetched on the next run. + + Args: + dataset: Dataset whose current revision should be invalidated + reason: Human-readable reason for invalidation + """ + self.dataset_repository.invalidate_revision(dataset) + + self.dispatch(RevisionInvalidated(dataset=dataset, reason=reason)) + def destroy_dataset(self, dataset: Dataset): # TODO: remove files. Now we leave some orphaned files around self.dataset_repository.destroy(dataset) diff --git a/ingestify/domain/models/dataset/dataset_repository.py b/ingestify/domain/models/dataset/dataset_repository.py index 4076e78..2079812 100644 --- a/ingestify/domain/models/dataset/dataset_repository.py +++ b/ingestify/domain/models/dataset/dataset_repository.py @@ -36,6 +36,12 @@ def get_dataset_last_modified_at_map( dataset+revision+file graph.""" return {} + @abstractmethod + def invalidate_revision(self, dataset: Dataset): + """Mark the current revision as VALIDATION_FAILED and reset + last_modified_at on the dataset.""" + pass + @abstractmethod def destroy(self, dataset: Dataset): pass diff --git a/ingestify/domain/models/dataset/events.py b/ingestify/domain/models/dataset/events.py index 85befc2..63c2070 100644 --- a/ingestify/domain/models/dataset/events.py +++ b/ingestify/domain/models/dataset/events.py @@ -32,3 +32,9 @@ class SelectorSkipped(DomainEvent): class DatasetSkipped(DomainEvent): dataset: Dataset event_type: ClassVar[str] = "dataset_skipped" + + +class RevisionInvalidated(DomainEvent): + dataset: Dataset + reason: str + event_type: ClassVar[str] = "revision_invalidated" diff --git a/ingestify/domain/models/fetch_policy.py b/ingestify/domain/models/fetch_policy.py index 31522b3..ead637c 100644 --- a/ingestify/domain/models/fetch_policy.py +++ b/ingestify/domain/models/fetch_policy.py @@ -1,6 +1,7 @@ from datetime import timedelta from ingestify.domain import Dataset, Identifier, DatasetResource +from ingestify.domain.models.dataset.revision import RevisionState from ingestify.utils import utcnow @@ -22,6 +23,8 @@ def should_refetch( # TODO: this is weird? Dataset without any data. Fetch error? return True elif current_revision: + if current_revision.state == RevisionState.VALIDATION_FAILED: + return True files_last_modified = { file.file_id: file.last_modified for file in dataset_resource.files.values() diff --git a/ingestify/infra/store/dataset/sqlalchemy/repository.py b/ingestify/infra/store/dataset/sqlalchemy/repository.py index 3be5e03..4b9b14c 100644 --- a/ingestify/infra/store/dataset/sqlalchemy/repository.py +++ b/ingestify/infra/store/dataset/sqlalchemy/repository.py @@ -26,6 +26,7 @@ from sqlalchemy.orm import Session, Query, sessionmaker, scoped_session from ingestify.domain import File, Revision +from ingestify.domain.models.dataset.revision import RevisionState from ingestify.domain.models import ( Dataset, DatasetCollection, @@ -675,6 +676,30 @@ def _save(self, datasets: list[Dataset]): else: connection.commit() + def invalidate_revision(self, dataset: Dataset): + current_revision = dataset.current_revision + with self.connect() as connection: + # Set revision state to VALIDATION_FAILED + connection.execute( + self.revision_table.update() + .where(self.revision_table.c.dataset_id == dataset.dataset_id) + .where( + self.revision_table.c.revision_id == current_revision.revision_id + ) + .values(state=RevisionState.VALIDATION_FAILED) + ) + # Reset last_modified_at so the pre-check cache doesn't skip it + connection.execute( + self.dataset_table.update() + .where(self.dataset_table.c.dataset_id == dataset.dataset_id) + .values(last_modified_at=None) + ) + connection.commit() + + # Update in-memory state + current_revision.state = RevisionState.VALIDATION_FAILED + dataset.last_modified_at = None + def destroy(self, dataset: Dataset): with self.connect() as connection: try: diff --git a/ingestify/tests/test_refetch_validation_failed.py b/ingestify/tests/test_refetch_validation_failed.py new file mode 100644 index 0000000..cb6eaf4 --- /dev/null +++ b/ingestify/tests/test_refetch_validation_failed.py @@ -0,0 +1,101 @@ +"""Tests for invalidating revisions and refetching.""" +from datetime import datetime, timezone + +from ingestify import Source, DatasetResource +from ingestify.domain import DataSpecVersionCollection, DraftFile, Selector +from ingestify.domain.models.dataset.collection_metadata import ( + DatasetCollectionMetadata, +) +from ingestify.domain.models.dataset.revision import RevisionState +from ingestify.domain.models.fetch_policy import FetchPolicy +from ingestify.domain.models.ingestion.ingestion_plan import IngestionPlan + +# Fixed timestamp so last_modified doesn't change between runs +FIXED_TIME = datetime(2026, 1, 1, tzinfo=timezone.utc) + +call_count = 0 + + +def counting_loader(file_resource, current_file, **kwargs): + global call_count + call_count += 1 + return DraftFile.from_input(f"data-{call_count}", data_feed_key="f1") + + +class SimpleSource(Source): + provider = "test_provider" + + def find_datasets( + self, dataset_type, data_spec_versions, dataset_collection_metadata, **kwargs + ): + r = DatasetResource( + dataset_resource_id={"item_id": 1}, + provider=self.provider, + dataset_type="test", + name="item-1", + ) + r.add_file( + last_modified=FIXED_TIME, + data_feed_key="f1", + data_spec_version="v1", + file_loader=counting_loader, + ) + yield r + + +def _setup(engine): + dsv = DataSpecVersionCollection.from_dict({"default": {"v1"}}) + engine.add_ingestion_plan( + IngestionPlan( + source=SimpleSource("s"), + fetch_policy=FetchPolicy(), + dataset_type="test", + selectors=[Selector.build({}, data_spec_versions=dsv)], + data_spec_versions=dsv, + ) + ) + + +def test_normal_second_run_skips(engine): + """Verify a second run with same last_modified does NOT refetch.""" + global call_count + call_count = 0 + _setup(engine) + + engine.run() + assert call_count == 1 + + engine.run() + assert call_count == 1, "Should NOT refetch when nothing changed" + + +def test_invalidate_revision_triggers_refetch(engine): + """Invalidating a revision causes ingestify to refetch on next run.""" + global call_count + call_count = 0 + _setup(engine) + + # First run: creates the dataset + engine.run() + assert call_count == 1 + + # Invalidate the current revision + datasets = list( + engine.store.get_dataset_collection( + provider="test_provider", dataset_type="test" + ) + ) + dataset = datasets[0] + engine.store.invalidate_revision(dataset, reason="Data quality check failed") + + # Verify state + datasets = list( + engine.store.get_dataset_collection( + provider="test_provider", dataset_type="test" + ) + ) + assert datasets[0].current_revision.state == RevisionState.VALIDATION_FAILED + + # Second run: should refetch + engine.run() + assert call_count == 2, "Dataset with invalidated revision should be refetched"