Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `tilebox-storage`: Added `quicklook` and `download_quicklook` methods to the `CopernicusStorageClient` to download and
display preview images for Sentinel data.

### Fixed

- `tilebox-workflows`: Registering duplicate task identifiers with a task runner now raises a `ValueError` instead of
overwriting the existing task.

## [0.41.0] - 2025-08-01

### Added
Expand Down
63 changes: 61 additions & 2 deletions tilebox-workflows/tests/runner/test_runner.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import os
from pathlib import Path
from unittest.mock import patch
from unittest.mock import MagicMock, patch

import pytest

from _tilebox.grpc.replay import open_recording_channel, open_replay_channel
from tilebox.workflows import ExecutionContext, Task
from tilebox.workflows.cache import InMemoryCache, JobCache
from tilebox.workflows.client import Client
from tilebox.workflows.data import JobState
from tilebox.workflows.data import JobState, RunnerContext
from tilebox.workflows.runner.task_runner import TaskRunner


def int_to_bytes(n: int) -> bytes:
Expand Down Expand Up @@ -143,3 +146,59 @@ def record_client(recording_file: str) -> Client:
open_channel_mock.assert_called_once()

return client


class ExplicitIdentifierTaskV1(Task):
@classmethod
def identifier(cls) -> tuple[str, str]:
return "tilebox.com/explicit", "v1.0"

def execute(self, context: ExecutionContext) -> None:
pass


class ExplicitIdentifierTaskV2(Task):
@classmethod
def identifier(cls) -> tuple[str, str]:
return "tilebox.com/explicit", "v2.0"

def execute(self, context: ExecutionContext) -> None:
pass


def test_runner_disallow_duplicate_task_identifiers() -> None:
runner = TaskRunner(
MagicMock(),
"dummy-cluster",
InMemoryCache(),
None,
None,
MagicMock(),
RunnerContext(),
)

runner.register(FlakyTask)
with pytest.raises(
ValueError, match="Duplicate task identifier: A task 'FlakyTask' with version 'v0.0' is already registered."
):
runner.register(FlakyTask)

runner.register(SumResultTask)
with pytest.raises(
ValueError, match="Duplicate task identifier: A task 'SumResultTask' with version 'v0.0' is already registered."
):
runner.register(SumResultTask)

runner.register(ExplicitIdentifierTaskV1)
with pytest.raises(
ValueError,
match="Duplicate task identifier: A task 'tilebox.com/explicit' with version 'v1.0' is already registered.",
):
runner.register(ExplicitIdentifierTaskV1)

runner.register(ExplicitIdentifierTaskV2) # this one has a different version, so it's fine
with pytest.raises(
ValueError,
match="Duplicate task identifier: A task 'tilebox.com/explicit' with version 'v2.0' is already registered.",
):
runner.register(ExplicitIdentifierTaskV2)
5 changes: 5 additions & 0 deletions tilebox-workflows/tilebox/workflows/runner/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,11 @@ def register(self, task: type) -> None:
f"Task {task_repr} is not executable. It must have an execute method in order to "
f"register it with a task runner."
)
if meta.identifier in self.tasks_to_run.identifiers:
raise ValueError(
f"Duplicate task identifier: A task '{meta.identifier.name}' with version '{meta.identifier.version}' "
f"is already registered."
)
self.tasks_to_run.identifiers[meta.identifier] = task

def add_interceptor(self, interceptor: InterceptorType) -> None:
Expand Down
Loading