-
Notifications
You must be signed in to change notification settings - Fork 25
[FEATURE] support daily/ periodic task scheduling #246
Description
Is your feature request related to a problem? Please describe.
Repeating a task every x seconds is useful but for having daily task is not great as the hour would be determined by time of deployment + delay added
Describe the solution you'd like
include a decorator for periodically repeating a task.
as an example i adapted the current decorator to schedule task daily
import asyncio
import datetime
from functools import wraps
from traceback import format_exception
from typing import Any, Callable, Coroutine, Union
from starlette.concurrency import run_in_threadpool
NoArgsNoReturnFuncT = Callable[[], None]
NoArgsNoReturnAsyncFuncT = Callable[[], Coroutine[Any, Any, None]]
NoArgsNoReturnDecorator = Callable[[Union[NoArgsNoReturnFuncT, NoArgsNoReturnAsyncFuncT]], NoArgsNoReturnAsyncFuncT]
def repeat_every_day(*, time: datetime.time, raise_exceptions: bool = False,
max_repetitions: int | None = None) -> NoArgsNoReturnDecorator:
def decorator(func: NoArgsNoReturnAsyncFuncT | NoArgsNoReturnFuncT) -> NoArgsNoReturnAsyncFuncT:
is_coroutine = asyncio.iscoroutinefunction(func)
@wraps(func)
async def wrapped() -> None:
repetitions = 0
async def loop() -> None:
nonlocal repetitions
while max_repetitions is None or repetitions < max_repetitions:
now = datetime.datetime.now()
target_datetime = datetime.datetime.combine(now.date(), time)
if now.time() > time:
target_datetime += datetime.timedelta(days=1)
sleep_seconds = (target_datetime - now).total_seconds()
print(f"Sleeping for {sleep_seconds} seconds")
await asyncio.sleep(sleep_seconds)
try:
if is_coroutine:
await func()
else:
await run_in_threadpool(func)
except Exception as exc:
if logger is not None:
formatted_exception = "".join(format_exception(type(exc), exc, exc.__traceback__))
logger.error(formatted_exception)
if raise_exceptions:
raise exc
repetitions += 1
await loop()
return wrapped
return decorator
# Example Usage:
@repeat_every_day(time=datetime.time(16, 28))
async def daily_task():
print("Running daily task at", datetime.datetime.now())notice that a very similar behavior can be obtained with the current decorator, making it wait the amount of seconds left to the hour and putting it to wait for 60 *60 * 24 seconds of a day, but that naive approach would have fail today ( change of hour in europe) and also if func() takes long at may delay every day the execution by some small amount of time, that could accumulate in time.