From 18a11c9b6bee69175b424b266ef20d888b9ad3eb Mon Sep 17 00:00:00 2001 From: Alan Hamlett Date: Mon, 13 Aug 2018 21:44:03 -0700 Subject: [PATCH 1/2] fix pep8 issues --- tasktiger/task.py | 50 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 37 insertions(+), 13 deletions(-) diff --git a/tasktiger/task.py b/tasktiger/task.py index 1e3c25df..f89575c5 100644 --- a/tasktiger/task.py +++ b/tasktiger/task.py @@ -3,11 +3,26 @@ import redis import time -from ._internal import * +from ._internal import ( + ACTIVE, + QUEUED, + ERROR, + TaskImportError, + SCHEDULED, + g, + gen_id, + gen_unique_id, + get_timestamp, + import_attribute, + serialize_func_name, + serialize_retry_method, +) from .exceptions import TaskNotFound + __all__ = ['Task'] + class Task(object): def __init__(self, tiger, func=None, args=None, kwargs=None, queue=None, hard_timeout=None, unique=None, lock=None, lock_key=None, @@ -209,27 +224,36 @@ def _move(self, from_state=None, to_state=None, when=None, mode=None): if not when: when = time.time() if mode: - scripts.zadd(_key(to_state, queue), when, self.id, - mode, client=pipeline) + scripts.zadd( + _key(to_state, queue), + when, + self.id, + mode, + client=pipeline, + ) else: pipeline.zadd(_key(to_state, queue), self.id, when) pipeline.sadd(_key(to_state), queue) pipeline.zrem(_key(from_state, queue), self.id) - if not to_state: # Remove the task if necessary + if not to_state: # Remove the task if necessary if self.unique: # Only delete if it's not in any other queue check_states = set([ACTIVE, QUEUED, ERROR, SCHEDULED]) check_states.remove(from_state) # TODO: Do the following two in one call. - scripts.delete_if_not_in_zsets(_key('task', self.id, 'executions'), - self.id, [ - _key(state, queue) for state in check_states - ], client=pipeline) - scripts.delete_if_not_in_zsets(_key('task', self.id), - self.id, [ - _key(state, queue) for state in check_states - ], client=pipeline) + scripts.delete_if_not_in_zsets( + _key('task', self.id, 'executions'), + self.id, + [_key(state, queue) for state in check_states], + client=pipeline, + ) + scripts.delete_if_not_in_zsets( + _key('task', self.id), + self.id, + [_key(state, queue) for state in check_states], + client=pipeline, + ) else: # Safe to remove pipeline.delete(_key('task', self.id, 'executions')) @@ -370,7 +394,7 @@ def tasks_from_queue(self, tiger, queue, state, skip=0, limit=1000, key = tiger._key(state, queue) pipeline = tiger.connection.pipeline() pipeline.zcard(key) - pipeline.zrange(key, -limit-skip, -1-skip, withscores=True) + pipeline.zrange(key, -limit - skip, -1 - skip, withscores=True) n, items = pipeline.execute() tasks = [] From 10b5fe7057f43eb1dd5cbdc2035b1ab4c3cfc1d4 Mon Sep 17 00:00:00 2001 From: Alan Hamlett Date: Mon, 13 Aug 2018 21:44:36 -0700 Subject: [PATCH 2/2] prevent TypeError when serialized task json data missing --- tasktiger/task.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/tasktiger/task.py b/tasktiger/task.py index f89575c5..4bb0f1df 100644 --- a/tasktiger/task.py +++ b/tasktiger/task.py @@ -409,20 +409,20 @@ def tasks_from_queue(self, tiger, queue, state, skip=0, limit=1000, results = pipeline.execute() for serialized_data, serialized_executions, ts in zip(results[0], results[1:], tss): - data = json.loads(serialized_data) - executions = [json.loads(e) for e in serialized_executions if e] - - task = Task(tiger, queue=queue, _data=data, _state=state, - _ts=ts, _executions=executions) - - tasks.append(task) + if serialized_data: + data = json.loads(serialized_data) + executions = [json.loads(e) for e in serialized_executions if e] + task = Task(tiger, queue=queue, _data=data, _state=state, + _ts=ts, _executions=executions) + tasks.append(task) else: data = tiger.connection.mget([tiger._key('task', item[0]) for item in items]) for serialized_data, ts in zip(data, tss): - data = json.loads(serialized_data) - task = Task(tiger, queue=queue, _data=data, _state=state, - _ts=ts) - tasks.append(task) + if serialized_data: + data = json.loads(serialized_data) + task = Task(tiger, queue=queue, _data=data, _state=state, + _ts=ts) + tasks.append(task) return n, tasks