Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion ingestify/application/dataset_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
)

from ingestify.domain.models.dataset.dataset import DatasetState
from ingestify.domain.models.dataset.events import RevisionAdded, MetadataUpdated
from ingestify.domain.models.dataset.events import (
RevisionAdded,
MetadataUpdated,
RevisionInvalidated,
)
from ingestify.domain.models.dataset.file import NotModifiedFile
from ingestify.domain.models.dataset.file_collection import FileCollection
from ingestify.domain.models.dataset.revision import RevisionSource
Expand Down Expand Up @@ -491,6 +495,18 @@ def update_dataset(

return revision

def invalidate_revision(self, dataset: Dataset, reason: str = ""):
"""Mark the current revision as VALIDATION_FAILED and reset
last_modified_at so the dataset is refetched on the next run.

Args:
dataset: Dataset whose current revision should be invalidated
reason: Human-readable reason for invalidation
"""
self.dataset_repository.invalidate_revision(dataset)

self.dispatch(RevisionInvalidated(dataset=dataset, reason=reason))

def destroy_dataset(self, dataset: Dataset):
# TODO: remove files. Now we leave some orphaned files around
self.dataset_repository.destroy(dataset)
Expand Down
6 changes: 6 additions & 0 deletions ingestify/domain/models/dataset/dataset_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ def get_dataset_last_modified_at_map(
dataset+revision+file graph."""
return {}

@abstractmethod
def invalidate_revision(self, dataset: Dataset):
"""Mark the current revision as VALIDATION_FAILED and reset
last_modified_at on the dataset."""
pass

@abstractmethod
def destroy(self, dataset: Dataset):
pass
Expand Down
6 changes: 6 additions & 0 deletions ingestify/domain/models/dataset/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,9 @@ class SelectorSkipped(DomainEvent):
class DatasetSkipped(DomainEvent):
dataset: Dataset
event_type: ClassVar[str] = "dataset_skipped"


class RevisionInvalidated(DomainEvent):
dataset: Dataset
reason: str
event_type: ClassVar[str] = "revision_invalidated"
3 changes: 3 additions & 0 deletions ingestify/domain/models/fetch_policy.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import timedelta

from ingestify.domain import Dataset, Identifier, DatasetResource
from ingestify.domain.models.dataset.revision import RevisionState
from ingestify.utils import utcnow


Expand All @@ -22,6 +23,8 @@ def should_refetch(
# TODO: this is weird? Dataset without any data. Fetch error?
return True
elif current_revision:
if current_revision.state == RevisionState.VALIDATION_FAILED:
return True
files_last_modified = {
file.file_id: file.last_modified
for file in dataset_resource.files.values()
Expand Down
25 changes: 25 additions & 0 deletions ingestify/infra/store/dataset/sqlalchemy/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from sqlalchemy.orm import Session, Query, sessionmaker, scoped_session

from ingestify.domain import File, Revision
from ingestify.domain.models.dataset.revision import RevisionState
from ingestify.domain.models import (
Dataset,
DatasetCollection,
Expand Down Expand Up @@ -675,6 +676,30 @@ def _save(self, datasets: list[Dataset]):
else:
connection.commit()

def invalidate_revision(self, dataset: Dataset):
current_revision = dataset.current_revision
with self.connect() as connection:
# Set revision state to VALIDATION_FAILED
connection.execute(
self.revision_table.update()
.where(self.revision_table.c.dataset_id == dataset.dataset_id)
.where(
self.revision_table.c.revision_id == current_revision.revision_id
)
.values(state=RevisionState.VALIDATION_FAILED)
)
# Reset last_modified_at so the pre-check cache doesn't skip it
connection.execute(
self.dataset_table.update()
.where(self.dataset_table.c.dataset_id == dataset.dataset_id)
.values(last_modified_at=None)
)
connection.commit()

# Update in-memory state
current_revision.state = RevisionState.VALIDATION_FAILED
dataset.last_modified_at = None

def destroy(self, dataset: Dataset):
with self.connect() as connection:
try:
Expand Down
101 changes: 101 additions & 0 deletions ingestify/tests/test_refetch_validation_failed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""Tests for invalidating revisions and refetching."""
from datetime import datetime, timezone

from ingestify import Source, DatasetResource
from ingestify.domain import DataSpecVersionCollection, DraftFile, Selector
from ingestify.domain.models.dataset.collection_metadata import (
DatasetCollectionMetadata,
)
from ingestify.domain.models.dataset.revision import RevisionState
from ingestify.domain.models.fetch_policy import FetchPolicy
from ingestify.domain.models.ingestion.ingestion_plan import IngestionPlan

# Fixed timestamp so last_modified doesn't change between runs
FIXED_TIME = datetime(2026, 1, 1, tzinfo=timezone.utc)

call_count = 0


def counting_loader(file_resource, current_file, **kwargs):
global call_count
call_count += 1
return DraftFile.from_input(f"data-{call_count}", data_feed_key="f1")


class SimpleSource(Source):
provider = "test_provider"

def find_datasets(
self, dataset_type, data_spec_versions, dataset_collection_metadata, **kwargs
):
r = DatasetResource(
dataset_resource_id={"item_id": 1},
provider=self.provider,
dataset_type="test",
name="item-1",
)
r.add_file(
last_modified=FIXED_TIME,
data_feed_key="f1",
data_spec_version="v1",
file_loader=counting_loader,
)
yield r


def _setup(engine):
dsv = DataSpecVersionCollection.from_dict({"default": {"v1"}})
engine.add_ingestion_plan(
IngestionPlan(
source=SimpleSource("s"),
fetch_policy=FetchPolicy(),
dataset_type="test",
selectors=[Selector.build({}, data_spec_versions=dsv)],
data_spec_versions=dsv,
)
)


def test_normal_second_run_skips(engine):
"""Verify a second run with same last_modified does NOT refetch."""
global call_count
call_count = 0
_setup(engine)

engine.run()
assert call_count == 1

engine.run()
assert call_count == 1, "Should NOT refetch when nothing changed"


def test_invalidate_revision_triggers_refetch(engine):
"""Invalidating a revision causes ingestify to refetch on next run."""
global call_count
call_count = 0
_setup(engine)

# First run: creates the dataset
engine.run()
assert call_count == 1

# Invalidate the current revision
datasets = list(
engine.store.get_dataset_collection(
provider="test_provider", dataset_type="test"
)
)
dataset = datasets[0]
engine.store.invalidate_revision(dataset, reason="Data quality check failed")

# Verify state
datasets = list(
engine.store.get_dataset_collection(
provider="test_provider", dataset_type="test"
)
)
assert datasets[0].current_revision.state == RevisionState.VALIDATION_FAILED

# Second run: should refetch
engine.run()
assert call_count == 2, "Dataset with invalidated revision should be refetched"
Loading