diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f058c8..cbd9d85 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `tilebox-storage`: Added `quicklook` and `download_quicklook` methods to the `CopernicusStorageClient` to download and display preview images for Sentinel data. +### Fixed + +- `tilebox-workflows`: Registering duplicate task identifiers with a task runner now raises a `ValueError` instead of + overwriting the existing task. + ## [0.41.0] - 2025-08-01 ### Added diff --git a/tilebox-workflows/tests/runner/test_runner.py b/tilebox-workflows/tests/runner/test_runner.py index 96a7449..53367c2 100644 --- a/tilebox-workflows/tests/runner/test_runner.py +++ b/tilebox-workflows/tests/runner/test_runner.py @@ -1,12 +1,15 @@ import os from pathlib import Path -from unittest.mock import patch +from unittest.mock import MagicMock, patch + +import pytest from _tilebox.grpc.replay import open_recording_channel, open_replay_channel from tilebox.workflows import ExecutionContext, Task from tilebox.workflows.cache import InMemoryCache, JobCache from tilebox.workflows.client import Client -from tilebox.workflows.data import JobState +from tilebox.workflows.data import JobState, RunnerContext +from tilebox.workflows.runner.task_runner import TaskRunner def int_to_bytes(n: int) -> bytes: @@ -143,3 +146,59 @@ def record_client(recording_file: str) -> Client: open_channel_mock.assert_called_once() return client + + +class ExplicitIdentifierTaskV1(Task): + @classmethod + def identifier(cls) -> tuple[str, str]: + return "tilebox.com/explicit", "v1.0" + + def execute(self, context: ExecutionContext) -> None: + pass + + +class ExplicitIdentifierTaskV2(Task): + @classmethod + def identifier(cls) -> tuple[str, str]: + return "tilebox.com/explicit", "v2.0" + + def execute(self, context: ExecutionContext) -> None: + pass + + +def test_runner_disallow_duplicate_task_identifiers() -> None: + runner = TaskRunner( + MagicMock(), + "dummy-cluster", + InMemoryCache(), + None, + None, + MagicMock(), + RunnerContext(), + ) + + runner.register(FlakyTask) + with pytest.raises( + ValueError, match="Duplicate task identifier: A task 'FlakyTask' with version 'v0.0' is already registered." + ): + runner.register(FlakyTask) + + runner.register(SumResultTask) + with pytest.raises( + ValueError, match="Duplicate task identifier: A task 'SumResultTask' with version 'v0.0' is already registered." + ): + runner.register(SumResultTask) + + runner.register(ExplicitIdentifierTaskV1) + with pytest.raises( + ValueError, + match="Duplicate task identifier: A task 'tilebox.com/explicit' with version 'v1.0' is already registered.", + ): + runner.register(ExplicitIdentifierTaskV1) + + runner.register(ExplicitIdentifierTaskV2) # this one has a different version, so it's fine + with pytest.raises( + ValueError, + match="Duplicate task identifier: A task 'tilebox.com/explicit' with version 'v2.0' is already registered.", + ): + runner.register(ExplicitIdentifierTaskV2) diff --git a/tilebox-workflows/tilebox/workflows/runner/task_runner.py b/tilebox-workflows/tilebox/workflows/runner/task_runner.py index c4db898..fc1eeeb 100644 --- a/tilebox-workflows/tilebox/workflows/runner/task_runner.py +++ b/tilebox-workflows/tilebox/workflows/runner/task_runner.py @@ -317,6 +317,11 @@ def register(self, task: type) -> None: f"Task {task_repr} is not executable. It must have an execute method in order to " f"register it with a task runner." ) + if meta.identifier in self.tasks_to_run.identifiers: + raise ValueError( + f"Duplicate task identifier: A task '{meta.identifier.name}' with version '{meta.identifier.version}' " + f"is already registered." + ) self.tasks_to_run.identifiers[meta.identifier] = task def add_interceptor(self, interceptor: InterceptorType) -> None: