Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
# Changelog

## Version 0.24.0

* Reverted "Added type safety to task decorator [302](https://github.com/closeio/tasktiger/issues/302)" due to buggy interaction with scheduled tasks.

## Version 0.23.0
* Added `Task.current_task_is_batch`

**Should not be used, use 0.24.0 and above instead**

* Added `Task.current_task_is_batch`

## Version 0.22.0

**Should not be used, use 0.24.0 and above instead**

* Minimum python version is now 3.10
* Added type safety to task decorator [302](https://github.com/closeio/tasktiger/issues/302). Wrapped functions retain type information for params and return value, plus capture some config that is added to the function.

Expand Down
2 changes: 1 addition & 1 deletion tasktiger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from .tasktiger import TaskTiger, run_worker
from .worker import Worker

__version__ = "0.23.0"
__version__ = "0.24.0"
__all__ = [
"TaskTiger",
"Worker",
Expand Down
7 changes: 1 addition & 6 deletions tasktiger/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,12 +569,7 @@ def tasks_from_queue(
@classmethod
def queue_from_function(cls, func: Any, tiger: "TaskTiger") -> str:
"""Get queue from function."""
# Can't use the default here because the attribue might exist, but be
# None. In that case we would *not* use the default, which is wrong.
attr = getattr(func, "_task_queue", None)

# If the attribute is not set or does not exist, use the default
return attr or tiger.config["DEFAULT_QUEUE"]
return getattr(func, "_task_queue", tiger.config["DEFAULT_QUEUE"])

def n_executions(self) -> int:
"""
Expand Down
152 changes: 41 additions & 111 deletions tasktiger/tasktiger.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,18 @@
import datetime
import functools
import importlib
import logging
from collections import defaultdict
from dataclasses import dataclass
from typing import (
Any,
Callable,
Collection,
Dict,
Generic,
Iterable,
List,
Optional,
ParamSpec,
Tuple,
Type,
TypeVar,
Union,
overload,
)

import click
Expand Down Expand Up @@ -95,41 +89,6 @@
STRING <prefix>:qlock:<queue> (Legacy queue locks that are no longer used)
"""

P = ParamSpec("P")
R = TypeVar("R")


@dataclass
class TaskCallable(Generic[P, R]):
_func: Callable[P, R]
_tiger: "TaskTiger"

_task_hard_timeout: float | None = None
_task_queue: str | None = None
_task_unique: bool | None = None
_task_unique_key: Collection[str] | None = None
_task_lock: bool | None = None
_task_lock_key: Collection[str] | None = None
_task_retry: int | None = None
_task_retry_on: Collection[type[BaseException]] | None = None
_task_retry_method: (
Callable[[int], float] | Tuple[Callable[..., float], Tuple] | None
) = None
_task_batch: bool | None = None
_task_schedule: Callable | None = None
_task_max_queue_size: int | None = None
_task_max_stored_executions: int | None = None
_task_runner_class: type | None = None

def __post_init__(self) -> None:
functools.update_wrapper(self, self._func)

def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R:
return self._func(*args, **kwargs)

def delay(self, *args: P.args, **kwargs: P.kwargs) -> "Task":
return self._tiger.delay(self, args=args, kwargs=kwargs)


class TaskTiger:
log: BoundLogger
Expand Down Expand Up @@ -342,56 +301,9 @@ def _key(self, *parts: str) -> str:
"""
return ":".join([self.config["REDIS_PREFIX"]] + list(parts))

@overload
def task(
self,
_fn: Callable[P, R],
*,
queue: Optional[str] = ...,
hard_timeout: Optional[float] = ...,
unique: Optional[bool] = ...,
unique_key: Optional[Collection[str]] = ...,
lock: Optional[bool] = ...,
lock_key: Optional[Collection[str]] = ...,
retry: Optional[bool] = ...,
retry_on: Optional[Collection[Type[BaseException]]] = ...,
retry_method: Optional[
Union[Callable[[int], float], Tuple[Callable[..., float], Tuple]]
] = ...,
schedule: Optional[Callable] = ...,
batch: bool = ...,
max_queue_size: Optional[int] = ...,
max_stored_executions: Optional[int] = ...,
runner_class: Optional[Type["BaseRunner"]] = ...,
) -> TaskCallable[P, R]: ...

@overload
def task(
self,
_fn: None = None,
*,
queue: Optional[str] = ...,
hard_timeout: Optional[float] = ...,
unique: Optional[bool] = ...,
unique_key: Optional[Collection[str]] = ...,
lock: Optional[bool] = ...,
lock_key: Optional[Collection[str]] = ...,
retry: Optional[bool] = ...,
retry_on: Optional[Collection[Type[BaseException]]] = ...,
retry_method: Optional[
Union[Callable[[int], float], Tuple[Callable[..., float], Tuple]]
] = ...,
schedule: Optional[Callable] = ...,
batch: bool = ...,
max_queue_size: Optional[int] = ...,
max_stored_executions: Optional[int] = ...,
runner_class: Optional[Type["BaseRunner"]] = ...,
) -> Callable[[Callable[P, R]], TaskCallable[P, R]]: ...

def task(
self,
_fn: Optional[Callable[P, R]] = None,
*,
_fn: Optional[Callable] = None,
queue: Optional[str] = None,
hard_timeout: Optional[float] = None,
unique: Optional[bool] = None,
Expand All @@ -408,7 +320,7 @@ def task(
max_queue_size: Optional[int] = None,
max_stored_executions: Optional[int] = None,
runner_class: Optional[Type["BaseRunner"]] = None,
) -> Callable[[Callable[P, R]], TaskCallable[P, R]] | TaskCallable[P, R]:
) -> Callable:
"""
Function decorator that defines the behavior of the function when it is
used as a task. To use the default behavior, tasks don't need to be
Expand All @@ -417,38 +329,56 @@ def task(
See README.rst for an explanation of the options.
"""

def _delay(func: Callable) -> Callable:
def _delay_inner(*args: Any, **kwargs: Any) -> Task:
return self.delay(func, args=args, kwargs=kwargs)

return _delay_inner

# Periodic tasks are unique.
if schedule is not None:
unique = True

def _wrap(func: Callable[P, R]) -> TaskCallable[P, R]:
tc = TaskCallable(
_func=func,
_tiger=self,
_task_hard_timeout=hard_timeout,
_task_queue=queue,
_task_unique=unique,
_task_unique_key=unique_key,
_task_lock=lock,
_task_lock_key=lock_key,
_task_retry=retry,
_task_retry_on=retry_on,
_task_retry_method=retry_method,
_task_batch=batch,
_task_schedule=schedule,
_task_max_queue_size=max_queue_size,
_task_max_stored_executions=max_stored_executions,
_task_runner_class=runner_class,
)
def _wrap(func: Callable) -> Callable:
if hard_timeout is not None:
func._task_hard_timeout = hard_timeout # type: ignore[attr-defined]
if queue is not None:
func._task_queue = queue # type: ignore[attr-defined]
if unique is not None:
func._task_unique = unique # type: ignore[attr-defined]
if unique_key is not None:
func._task_unique_key = unique_key # type: ignore[attr-defined]
if lock is not None:
func._task_lock = lock # type: ignore[attr-defined]
if lock_key is not None:
func._task_lock_key = lock_key # type: ignore[attr-defined]
if retry is not None:
func._task_retry = retry # type: ignore[attr-defined]
if retry_on is not None:
func._task_retry_on = retry_on # type: ignore[attr-defined]
if retry_method is not None:
func._task_retry_method = retry_method # type: ignore[attr-defined]
if batch is not None:
func._task_batch = batch # type: ignore[attr-defined]
if schedule is not None:
func._task_schedule = schedule # type: ignore[attr-defined]
if max_queue_size is not None:
func._task_max_queue_size = max_queue_size # type: ignore[attr-defined]
if max_stored_executions is not None:
func._task_max_stored_executions = max_stored_executions # type: ignore[attr-defined]
if runner_class is not None:
func._task_runner_class = runner_class # type: ignore[attr-defined]

func.delay = _delay(func) # type: ignore[attr-defined]

if schedule is not None:
serialized_func = serialize_func_name(func)
assert serialized_func not in self.periodic_task_funcs, (
"attempted duplicate registration of periodic task"
)
self.periodic_task_funcs[serialized_func] = tc
self.periodic_task_funcs[serialized_func] = func

return tc
return func

return _wrap if _fn is None else _wrap(_fn)

Expand Down