diff --git a/CHANGELOG.md b/CHANGELOG.md index 81fdf90f..9c5569bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,19 @@ # Changelog +## Version 0.24.0 + +* Reverted "Added type safety to task decorator [302](https://github.com/closeio/tasktiger/issues/302)" due to buggy interaction with scheduled tasks. + ## Version 0.23.0 -* Added `Task.current_task_is_batch` +**Should not be used, use 0.24.0 and above instead** + +* Added `Task.current_task_is_batch` ## Version 0.22.0 + +**Should not be used, use 0.24.0 and above instead** + * Minimum python version is now 3.10 * Added type safety to task decorator [302](https://github.com/closeio/tasktiger/issues/302). Wrapped functions retain type information for params and return value, plus capture some config that is added to the function. diff --git a/tasktiger/__init__.py b/tasktiger/__init__.py index 3f4e3fdb..3624b005 100644 --- a/tasktiger/__init__.py +++ b/tasktiger/__init__.py @@ -12,7 +12,7 @@ from .tasktiger import TaskTiger, run_worker from .worker import Worker -__version__ = "0.23.0" +__version__ = "0.24.0" __all__ = [ "TaskTiger", "Worker", diff --git a/tasktiger/task.py b/tasktiger/task.py index 927cea4a..4c43547d 100644 --- a/tasktiger/task.py +++ b/tasktiger/task.py @@ -569,12 +569,7 @@ def tasks_from_queue( @classmethod def queue_from_function(cls, func: Any, tiger: "TaskTiger") -> str: """Get queue from function.""" - # Can't use the default here because the attribue might exist, but be - # None. In that case we would *not* use the default, which is wrong. - attr = getattr(func, "_task_queue", None) - - # If the attribute is not set or does not exist, use the default - return attr or tiger.config["DEFAULT_QUEUE"] + return getattr(func, "_task_queue", tiger.config["DEFAULT_QUEUE"]) def n_executions(self) -> int: """ diff --git a/tasktiger/tasktiger.py b/tasktiger/tasktiger.py index ef8d3dfd..aeb611b9 100644 --- a/tasktiger/tasktiger.py +++ b/tasktiger/tasktiger.py @@ -1,24 +1,18 @@ import datetime -import functools import importlib import logging from collections import defaultdict -from dataclasses import dataclass from typing import ( Any, Callable, Collection, Dict, - Generic, Iterable, List, Optional, - ParamSpec, Tuple, Type, - TypeVar, Union, - overload, ) import click @@ -95,41 +89,6 @@ STRING :qlock: (Legacy queue locks that are no longer used) """ -P = ParamSpec("P") -R = TypeVar("R") - - -@dataclass -class TaskCallable(Generic[P, R]): - _func: Callable[P, R] - _tiger: "TaskTiger" - - _task_hard_timeout: float | None = None - _task_queue: str | None = None - _task_unique: bool | None = None - _task_unique_key: Collection[str] | None = None - _task_lock: bool | None = None - _task_lock_key: Collection[str] | None = None - _task_retry: int | None = None - _task_retry_on: Collection[type[BaseException]] | None = None - _task_retry_method: ( - Callable[[int], float] | Tuple[Callable[..., float], Tuple] | None - ) = None - _task_batch: bool | None = None - _task_schedule: Callable | None = None - _task_max_queue_size: int | None = None - _task_max_stored_executions: int | None = None - _task_runner_class: type | None = None - - def __post_init__(self) -> None: - functools.update_wrapper(self, self._func) - - def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R: - return self._func(*args, **kwargs) - - def delay(self, *args: P.args, **kwargs: P.kwargs) -> "Task": - return self._tiger.delay(self, args=args, kwargs=kwargs) - class TaskTiger: log: BoundLogger @@ -342,56 +301,9 @@ def _key(self, *parts: str) -> str: """ return ":".join([self.config["REDIS_PREFIX"]] + list(parts)) - @overload - def task( - self, - _fn: Callable[P, R], - *, - queue: Optional[str] = ..., - hard_timeout: Optional[float] = ..., - unique: Optional[bool] = ..., - unique_key: Optional[Collection[str]] = ..., - lock: Optional[bool] = ..., - lock_key: Optional[Collection[str]] = ..., - retry: Optional[bool] = ..., - retry_on: Optional[Collection[Type[BaseException]]] = ..., - retry_method: Optional[ - Union[Callable[[int], float], Tuple[Callable[..., float], Tuple]] - ] = ..., - schedule: Optional[Callable] = ..., - batch: bool = ..., - max_queue_size: Optional[int] = ..., - max_stored_executions: Optional[int] = ..., - runner_class: Optional[Type["BaseRunner"]] = ..., - ) -> TaskCallable[P, R]: ... - - @overload def task( self, - _fn: None = None, - *, - queue: Optional[str] = ..., - hard_timeout: Optional[float] = ..., - unique: Optional[bool] = ..., - unique_key: Optional[Collection[str]] = ..., - lock: Optional[bool] = ..., - lock_key: Optional[Collection[str]] = ..., - retry: Optional[bool] = ..., - retry_on: Optional[Collection[Type[BaseException]]] = ..., - retry_method: Optional[ - Union[Callable[[int], float], Tuple[Callable[..., float], Tuple]] - ] = ..., - schedule: Optional[Callable] = ..., - batch: bool = ..., - max_queue_size: Optional[int] = ..., - max_stored_executions: Optional[int] = ..., - runner_class: Optional[Type["BaseRunner"]] = ..., - ) -> Callable[[Callable[P, R]], TaskCallable[P, R]]: ... - - def task( - self, - _fn: Optional[Callable[P, R]] = None, - *, + _fn: Optional[Callable] = None, queue: Optional[str] = None, hard_timeout: Optional[float] = None, unique: Optional[bool] = None, @@ -408,7 +320,7 @@ def task( max_queue_size: Optional[int] = None, max_stored_executions: Optional[int] = None, runner_class: Optional[Type["BaseRunner"]] = None, - ) -> Callable[[Callable[P, R]], TaskCallable[P, R]] | TaskCallable[P, R]: + ) -> Callable: """ Function decorator that defines the behavior of the function when it is used as a task. To use the default behavior, tasks don't need to be @@ -417,38 +329,56 @@ def task( See README.rst for an explanation of the options. """ + def _delay(func: Callable) -> Callable: + def _delay_inner(*args: Any, **kwargs: Any) -> Task: + return self.delay(func, args=args, kwargs=kwargs) + + return _delay_inner + # Periodic tasks are unique. if schedule is not None: unique = True - def _wrap(func: Callable[P, R]) -> TaskCallable[P, R]: - tc = TaskCallable( - _func=func, - _tiger=self, - _task_hard_timeout=hard_timeout, - _task_queue=queue, - _task_unique=unique, - _task_unique_key=unique_key, - _task_lock=lock, - _task_lock_key=lock_key, - _task_retry=retry, - _task_retry_on=retry_on, - _task_retry_method=retry_method, - _task_batch=batch, - _task_schedule=schedule, - _task_max_queue_size=max_queue_size, - _task_max_stored_executions=max_stored_executions, - _task_runner_class=runner_class, - ) + def _wrap(func: Callable) -> Callable: + if hard_timeout is not None: + func._task_hard_timeout = hard_timeout # type: ignore[attr-defined] + if queue is not None: + func._task_queue = queue # type: ignore[attr-defined] + if unique is not None: + func._task_unique = unique # type: ignore[attr-defined] + if unique_key is not None: + func._task_unique_key = unique_key # type: ignore[attr-defined] + if lock is not None: + func._task_lock = lock # type: ignore[attr-defined] + if lock_key is not None: + func._task_lock_key = lock_key # type: ignore[attr-defined] + if retry is not None: + func._task_retry = retry # type: ignore[attr-defined] + if retry_on is not None: + func._task_retry_on = retry_on # type: ignore[attr-defined] + if retry_method is not None: + func._task_retry_method = retry_method # type: ignore[attr-defined] + if batch is not None: + func._task_batch = batch # type: ignore[attr-defined] + if schedule is not None: + func._task_schedule = schedule # type: ignore[attr-defined] + if max_queue_size is not None: + func._task_max_queue_size = max_queue_size # type: ignore[attr-defined] + if max_stored_executions is not None: + func._task_max_stored_executions = max_stored_executions # type: ignore[attr-defined] + if runner_class is not None: + func._task_runner_class = runner_class # type: ignore[attr-defined] + + func.delay = _delay(func) # type: ignore[attr-defined] if schedule is not None: serialized_func = serialize_func_name(func) assert serialized_func not in self.periodic_task_funcs, ( "attempted duplicate registration of periodic task" ) - self.periodic_task_funcs[serialized_func] = tc + self.periodic_task_funcs[serialized_func] = func - return tc + return func return _wrap if _fn is None else _wrap(_fn)