Skip to content

Commit f5da43e

Browse files
authored
Merge pull request #6 from Vectorworks/feature/optimize_get_next
Refactor "_get_next" to not use recursion
2 parents 4778e2c + f3c8602 commit f5da43e

3 files changed

Lines changed: 286 additions & 13 deletions

File tree

taskflow/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
__title__ = "Taskflow"
2-
__version__ = "0.3.0"
2+
__version__ = "0.4.0"
33

44
from .flow import * # noqa
55
from .tasks import * # noqa

taskflow/flow.py

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from collections import deque
12
from copy import deepcopy
23
from uuid import uuid4
34

@@ -63,18 +64,27 @@ def _get_next(self, task):
6364
if self.is_halted:
6465
return None
6566

66-
sub_tasks = task.get_all_tasks()
67-
for sub_task in sub_tasks:
68-
if sub_task == task:
69-
if sub_task.status == BaseTask.STATUS_PENDING:
70-
return task
71-
else:
72-
next_task = self._get_next(sub_task)
73-
if next_task:
74-
return next_task
75-
76-
if task.status == BaseTask.STATUS_COMPLETE and task.next:
77-
return self._get_next(task.next)
67+
stack = deque()
68+
stack.append(task)
69+
70+
while stack:
71+
current_task = stack.pop()
72+
current_status = current_task.status
73+
if current_status != BaseTask.STATUS_PENDING:
74+
if current_status == BaseTask.STATUS_COMPLETE and current_task.next:
75+
# if it has a dependent, add that to the stack
76+
stack.append(current_task.next)
77+
78+
# this task cannot be handled - move on.
79+
continue
80+
81+
if current_task.is_standalone:
82+
return current_task
83+
84+
sub_tasks = current_task.get_all_tasks()
85+
# trying to avoid copying
86+
for task_index in range(len(sub_tasks) - 1, -1, -1):
87+
stack.append(sub_tasks[task_index])
7888

7989
return None
8090

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
"""
2+
Comprehensive test suite for Flow._get_next method.
3+
4+
Covers:
5+
- Simple chained tasks with different states (pending, running, complete, halted)
6+
- Composite tasks with various child states
7+
- Chained composite tasks with different states
8+
"""
9+
10+
from taskflow.flow import Flow
11+
from taskflow.tasks import BaseTask, CompositeTask, Task
12+
13+
from .fixtures import Handlers
14+
15+
16+
def _make_simple_chain(n=3):
17+
"""Create a simple chain: task1 -> task2 -> task3 (or more)."""
18+
tasks = [Task(Handlers.repeat, args=(i,)) for i in range(1, n + 1)]
19+
for i in range(len(tasks) - 1):
20+
tasks[i].then(tasks[i + 1])
21+
return tasks
22+
23+
24+
def _make_flow_with_composite():
25+
"""Create flow: task1 -> task2 -> Composite(task31, task32) -> task4."""
26+
task1 = Task(Handlers.repeat, args=(1,))
27+
task2 = task1.then(Task(Handlers.repeat, args=(2,)))
28+
task31 = Task(Handlers.repeat, args=(31,))
29+
task32 = Task(Handlers.repeat, args=(32,))
30+
task3 = task2.then(CompositeTask(task31, task32))
31+
task4 = task3.then(Task(Handlers.repeat, args=(4,)))
32+
flow = Flow(task1)
33+
return flow, task1, task2, task31, task32, task3, task4
34+
35+
36+
def _reset_task(task, status=BaseTask.STATUS_PENDING, result=None):
37+
"""Reset a task's state."""
38+
task._status = status
39+
task._result = result
40+
41+
42+
class TestGetNextSimpleChain:
43+
"""Tests for _get_next with simple chained tasks (no composites)."""
44+
45+
def test_all_pending_returns_first_task(self):
46+
task1, _, _ = _make_simple_chain()
47+
flow = Flow(task1)
48+
assert flow._get_next(task1) == task1
49+
50+
def test_first_pending_returns_first(self):
51+
task1, task2, task3 = _make_simple_chain()
52+
flow = Flow(task1)
53+
_reset_task(task2, BaseTask.STATUS_PENDING)
54+
_reset_task(task3, BaseTask.STATUS_PENDING)
55+
assert flow._get_next(task1) == task1
56+
57+
def test_first_complete_second_pending_returns_second(self):
58+
task1, task2, _ = _make_simple_chain()
59+
flow = Flow(task1)
60+
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
61+
assert flow._get_next(task1) == task2
62+
63+
def test_first_running_returns_none(self):
64+
task1, _, _ = _make_simple_chain()
65+
flow = Flow(task1)
66+
_reset_task(task1, BaseTask.STATUS_RUNNING)
67+
assert flow._get_next(task1) is None
68+
69+
def test_first_halted_returns_none(self):
70+
task1, _, _ = _make_simple_chain()
71+
flow = Flow(task1)
72+
_reset_task(task1, BaseTask.STATUS_HALTED)
73+
assert flow._get_next(task1) is None
74+
75+
def test_second_running_returns_none(self):
76+
task1, task2, _ = _make_simple_chain()
77+
flow = Flow(task1)
78+
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
79+
_reset_task(task2, BaseTask.STATUS_RUNNING)
80+
assert flow._get_next(task1) is None
81+
82+
def test_second_halted_returns_none(self):
83+
task1, task2, _ = _make_simple_chain()
84+
flow = Flow(task1)
85+
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
86+
_reset_task(task2, BaseTask.STATUS_HALTED)
87+
assert flow._get_next(task1) is None
88+
89+
def test_all_complete_returns_none(self):
90+
task1, task2, task3 = _make_simple_chain()
91+
flow = Flow(task1)
92+
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
93+
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
94+
_reset_task(task3, BaseTask.STATUS_COMPLETE, 3)
95+
assert flow._get_next(task1) is None
96+
97+
def test_chain_of_four_progression(self):
98+
tasks = _make_simple_chain(4)
99+
task1, task2, task3, task4 = tasks
100+
flow = Flow(task1)
101+
102+
assert flow._get_next(task1) == task1
103+
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
104+
assert flow._get_next(task1) == task2
105+
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
106+
assert flow._get_next(task1) == task3
107+
_reset_task(task3, BaseTask.STATUS_COMPLETE, 3)
108+
assert flow._get_next(task1) == task4
109+
_reset_task(task4, BaseTask.STATUS_COMPLETE, 4)
110+
assert flow._get_next(task1) is None
111+
112+
113+
class TestGetNextSingleComposite:
114+
"""Tests for _get_next with a single CompositeTask and various child states."""
115+
116+
def test_both_children_pending_returns_first_child(self):
117+
flow, task1, task2, task31, _, _, _ = _make_flow_with_composite()
118+
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
119+
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
120+
result = flow._get_next(task1)
121+
assert result == task31
122+
123+
def test_first_child_complete_second_pending_returns_second(self):
124+
flow, task1, task2, task31, task32, _, _ = _make_flow_with_composite()
125+
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
126+
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
127+
_reset_task(task31, BaseTask.STATUS_COMPLETE, 31)
128+
assert flow._get_next(task1) == task32
129+
130+
def test_one_child_running_one_pending_returns_pending(self):
131+
flow, task1, task2, task31, task32, _, _ = _make_flow_with_composite()
132+
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
133+
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
134+
_reset_task(task31, BaseTask.STATUS_RUNNING)
135+
assert flow._get_next(task1) == task32
136+
137+
def test_one_child_running_one_complete_returns_none(self):
138+
"""Regression: avoids infinite loop when no pending task exists."""
139+
flow, task1, task2, task31, task32, _, _ = _make_flow_with_composite()
140+
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
141+
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
142+
_reset_task(task31, BaseTask.STATUS_RUNNING)
143+
_reset_task(task32, BaseTask.STATUS_COMPLETE, 32)
144+
assert flow._get_next(task1) is None
145+
146+
def test_both_children_running_returns_none(self):
147+
flow, task1, task2, task31, task32, _, _ = _make_flow_with_composite()
148+
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
149+
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
150+
_reset_task(task31, BaseTask.STATUS_RUNNING)
151+
_reset_task(task32, BaseTask.STATUS_RUNNING)
152+
assert flow._get_next(task1) is None
153+
154+
def test_one_child_halted_returns_none(self):
155+
flow, task1, task2, task31, task32, _, _ = _make_flow_with_composite()
156+
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
157+
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
158+
_reset_task(task31, BaseTask.STATUS_HALTED)
159+
assert flow._get_next(task1) is None
160+
161+
def test_both_children_complete_returns_next_in_chain(self):
162+
flow, task1, task2, task31, task32, _, task4 = _make_flow_with_composite()
163+
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
164+
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
165+
_reset_task(task31, BaseTask.STATUS_COMPLETE, 31)
166+
_reset_task(task32, BaseTask.STATUS_COMPLETE, 32)
167+
assert flow._get_next(task1) == task4
168+
169+
def test_composite_with_three_children_returns_first_pending(self):
170+
task1 = Task(Handlers.repeat, args=(1,))
171+
task2 = task1.then(Task(Handlers.repeat, args=(2,)))
172+
t31 = Task(Handlers.repeat, args=(31,))
173+
t32 = Task(Handlers.repeat, args=(32,))
174+
t33 = Task(Handlers.repeat, args=(33,))
175+
task2.then(CompositeTask(t31, t32, t33))
176+
flow = Flow(task1)
177+
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
178+
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
179+
assert flow._get_next(task1) == t31
180+
_reset_task(t31, BaseTask.STATUS_COMPLETE, 31)
181+
assert flow._get_next(task1) == t32
182+
_reset_task(t32, BaseTask.STATUS_COMPLETE, 32)
183+
assert flow._get_next(task1) == t33
184+
185+
186+
class TestGetNextChainedComposites:
187+
"""Tests for _get_next with multiple composites chained together."""
188+
189+
def test_first_composite_both_pending_returns_first_child(self):
190+
task1 = Task(Handlers.repeat, args=(1,))
191+
task2 = task1.then(Task(Handlers.repeat, args=(2,)))
192+
c1a = Task(Handlers.repeat, args=(101,))
193+
c1b = Task(Handlers.repeat, args=(102,))
194+
comp1 = task2.then(CompositeTask(c1a, c1b))
195+
c2a = Task(Handlers.repeat, args=(201,))
196+
c2b = Task(Handlers.repeat, args=(202,))
197+
comp1.then(CompositeTask(c2a, c2b))
198+
flow = Flow(task1)
199+
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
200+
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
201+
assert flow._get_next(task1) == c1a
202+
203+
def test_first_composite_complete_second_has_pending_returns_second_composite_child(
204+
self,
205+
):
206+
task1 = Task(Handlers.repeat, args=(1,))
207+
task2 = task1.then(Task(Handlers.repeat, args=(2,)))
208+
c1a = Task(Handlers.repeat, args=(101,))
209+
c1b = Task(Handlers.repeat, args=(102,))
210+
comp1 = task2.then(CompositeTask(c1a, c1b))
211+
c2a = Task(Handlers.repeat, args=(201,))
212+
c2b = Task(Handlers.repeat, args=(202,))
213+
comp1.then(CompositeTask(c2a, c2b))
214+
flow = Flow(task1)
215+
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
216+
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
217+
_reset_task(c1a, BaseTask.STATUS_COMPLETE, 101)
218+
_reset_task(c1b, BaseTask.STATUS_COMPLETE, 102)
219+
assert flow._get_next(task1) == c2a
220+
221+
def test_first_composite_one_running_second_composite_pending_returns_none(self):
222+
"""First composite blocks; second composite's children not yet reachable."""
223+
task1 = Task(Handlers.repeat, args=(1,))
224+
task2 = task1.then(Task(Handlers.repeat, args=(2,)))
225+
c1a = Task(Handlers.repeat, args=(101,))
226+
c1b = Task(Handlers.repeat, args=(102,))
227+
comp1 = task2.then(CompositeTask(c1a, c1b))
228+
c2a = Task(Handlers.repeat, args=(201,))
229+
c2b = Task(Handlers.repeat, args=(202,))
230+
comp1.then(CompositeTask(c2a, c2b))
231+
flow = Flow(task1)
232+
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
233+
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
234+
_reset_task(c1a, BaseTask.STATUS_RUNNING)
235+
_reset_task(c1b, BaseTask.STATUS_COMPLETE, 102)
236+
assert flow._get_next(task1) is None
237+
238+
def test_first_composite_one_running_one_pending_returns_pending(self):
239+
task1 = Task(Handlers.repeat, args=(1,))
240+
task2 = task1.then(Task(Handlers.repeat, args=(2,)))
241+
c1a = Task(Handlers.repeat, args=(101,))
242+
c1b = Task(Handlers.repeat, args=(102,))
243+
comp1 = task2.then(CompositeTask(c1a, c1b))
244+
comp1.then(Task(Handlers.repeat, args=(4,)))
245+
flow = Flow(task1)
246+
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
247+
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
248+
_reset_task(c1a, BaseTask.STATUS_RUNNING)
249+
assert flow._get_next(task1) == c1b
250+
251+
def test_composite_complete_returns_next_task_in_chain(self):
252+
task1 = Task(Handlers.repeat, args=(1,))
253+
task2 = task1.then(Task(Handlers.repeat, args=(2,)))
254+
c1a = Task(Handlers.repeat, args=(101,))
255+
c1b = Task(Handlers.repeat, args=(102,))
256+
comp1 = task2.then(CompositeTask(c1a, c1b))
257+
task4 = comp1.then(Task(Handlers.repeat, args=(4,)))
258+
flow = Flow(task1)
259+
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
260+
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
261+
_reset_task(c1a, BaseTask.STATUS_COMPLETE, 101)
262+
_reset_task(c1b, BaseTask.STATUS_COMPLETE, 102)
263+
assert flow._get_next(task1) == task4

0 commit comments

Comments
 (0)