-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathflask_tasktiger.py
More file actions
80 lines (66 loc) · 2.67 KB
/
flask_tasktiger.py
File metadata and controls
80 lines (66 loc) · 2.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
from flask import current_app, _app_ctx_stack
import tasktiger
class TaskTiger(object):
def __init__(self, redis, app=None):
self.redis = redis
if app:
self.init_app(app)
def init_app(self, app):
app.config.setdefault('TASKTIGER_ALWAYS_EAGER', app.debug)
def _create(self):
return tasktiger.TaskTiger(self.redis, self._config, setup_structlog=True)
@property
def _config(self):
return dict([(key[10:], value) for key, value in current_app.config.items() if key.startswith('TASKTIGER_')])
def delay(self, *args, **kwargs):
ctx = _app_ctx_stack.top
if ctx is not None:
if not hasattr(ctx, 'TaskTiger'):
ctx.tiger = self._create()
return ctx.tiger.delay(*args, **kwargs)
else:
raise RuntimeError("You need to use this from a flask app context")
def run_worker_with_args(self, *args, **kwargs):
ctx = _app_ctx_stack.top
if ctx is not None:
if not hasattr(ctx, 'TaskTiger'):
ctx.tiger = self._create()
return ctx.tiger.run_worker_with_args(*args, **kwargs)
else:
raise RuntimeError("You need to use this from a flask app context")
def task(self, queue=None, hard_timeout=None, unique=None, lock=None, lock_key=None, retry=None, retry_on=None, retry_method=None, batch=False):
def _delay(func):
def _delay_inner(*args, **kwargs):
self.delay(func, args=args, kwargs=kwargs)
return _delay_inner
def _wrap(func):
if hard_timeout is not None:
func._task_hard_timeout = hard_timeout
if queue is not None:
func._task_queue = queue
if unique is not None:
func._task_unique = unique
if lock is not None:
func._task_lock = lock
if lock_key is not None:
func._task_lock_key = lock_key
if retry is not None:
func._task_retry = retry
if retry_on is not None:
func._task_retry_on = retry_on
if retry_method is not None:
func._task_retry_method = retry_method
if batch is not None:
func._task_batch = batch
func.delay = _delay(func)
return func
return _wrap
@property
def log(self):
ctx = _app_ctx_stack.top
if ctx is not None:
if not hasattr(ctx, 'TaskTiger'):
ctx.tiger = self._create()
return ctx.tiger.log
else:
raise RuntimeError("You need to use this from a flask app context")