Conversation
|
Would be awesome to get this merged in. Do you need any help reviewing it? |
|
Hey guys, sorry about the delay. And thanks for submitting this @lsaavedr
I'm also curious about what celery does with the task ID on retry. Does it re-queue with a new ID or re-use the same ID? |
|
Any progress here? I need this functionality but I do not want to maintain a separate package just for this fix. |
|
Progress? |
|
Sorry all, been neglecting this library as I don't use celery much anymore. I'm not sure about this PR. I would like to see some reasoning for the Tests would be appreciated as well. |
|
Thanks for the response. Can you explain a bit more about how this would introduce a race condition? If you were to implement retry, how would you do it? |
|
Maybe racing is not such an issue, I haven't looked deeply into how retrying works in celery. This might be an issue as well: celery-singleton/celery_singleton/singleton.py Lines 120 to 124 in 527d227 With this approach the lock would not be removed when |
|
Thanks. After trying out this MR and hacking around it, I could not get it to work. This project does the same as I was able to modify this library to provide locking post This sample code should be enough to build off of if anyone is interested: class Singleton(Task):
def __init__(self, *args, **kwargs):
self.singleton_backend = RedisBackend(REDIS_URI)
self._lock_key = None
self.max_retries = None # Try forever, change if you must
self.__run = None
@property
def lock_key(self, task_args=None, task_kwargs=None):
if self._lock_key:
return self._lock_key
# Generate your lock key however
return self._lock_key
def lock(self):
lock = self.singleton_backend.lock(self.lock_key, self.request.id, expiry=60*5)
logger.info(f'Attempted lock for {self.lock_key} = {lock}')
if not lock:
"""
Override the task function so it is not called but retried.
"""
def terminated(*args, **kwargs):
self.retry(countdown=60)
# may need to do the same for __call__
self.__run = self.run
self.run = terminated
else:
if self.__run:
self.run = self.__run
return lock
def unlock(self):
unlock = self.singleton_backend.unlock(self.lock_key)
logger.info(f'Attempted unlock for {self.lock_key} = {unlock}')
...
@signals.task_prerun.connect
def connect_task_prerun(sender=None, task_id=None, task=None, args=None, kwargs=None, **e):
if isinstance(task, Singleton):
task.lock()
...
@signals.task_postrun.connect
def connect_task_postrun(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **e):
if isinstance(task, Singleton) and state is not states.RETRY:
task.unlock() |
|
Any progress?, maybe it could help to test this fix. |
|
We actually found a simpler way: Happy to submit a PR if needed |
|
Is this project abandoned? |
|
Discovered this independently, was hard to debug - but you already have it this is definitely the simplest fix.
|
This close definitively pull request GH-8, GH-9 and issues GH-10, GH-20. I think that with some like this can be chain works also.