Skip to content

Commit 74e9bba

Browse files
committed
feat: initial task runner
1 parent a9c8e39 commit 74e9bba

4 files changed

Lines changed: 135 additions & 0 deletions

File tree

src/duron/context.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
from typing import TYPE_CHECKING, Any, TypeVar, cast
5+
6+
from duron.event_loop import EventLoop
7+
from duron.ops import FnCall
8+
9+
if TYPE_CHECKING:
10+
from collections.abc import Callable, Coroutine
11+
12+
_T = TypeVar("_T")
13+
14+
15+
class Context:
16+
def __init__(self):
17+
loop = asyncio.get_event_loop()
18+
assert isinstance(loop, EventLoop)
19+
self._loop: EventLoop = loop
20+
pass
21+
22+
async def run(self, fn: Callable[..., Coroutine[Any, Any, _T] | _T]) -> _T:
23+
return cast("_T", await self._loop.create_op(FnCall(callable=fn)))

src/duron/ops.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from __future__ import annotations
2+
3+
from dataclasses import dataclass
4+
from typing import TYPE_CHECKING, Any
5+
6+
if TYPE_CHECKING:
7+
from collections.abc import Callable, Coroutine
8+
9+
10+
@dataclass(slots=True)
11+
class FnCall:
12+
callable: Callable[..., Coroutine[Any, Any, object] | object]
13+
14+
15+
Op = FnCall

src/duron/task_runner.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import time
5+
from collections.abc import Coroutine
6+
from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast, final
7+
8+
from duron.event_loop import create_loop
9+
from duron.ops import FnCall
10+
11+
if TYPE_CHECKING:
12+
from duron.ops import Op
13+
14+
_T = TypeVar("_T")
15+
16+
17+
class TaskRunner:
18+
def __init__(self):
19+
pass
20+
21+
async def run(self, task_id: bytes, task_co: Coroutine[Any, Any, _T]) -> _T:
22+
return await _Task[_T](task_id, task_co).run()
23+
24+
25+
@final
26+
class _Task(Generic[_T]):
27+
def __init__(self, id: bytes, task_co: Coroutine[Any, Any, _T]) -> None:
28+
self._loop = create_loop(id)
29+
self._task = self._loop.create_task(task_co)
30+
31+
def now(self) -> int:
32+
return time.time_ns()
33+
34+
async def run(self) -> _T:
35+
self._loop.tick(self.now())
36+
while (waitset := self._loop.poll_completion(self._task)) is not None:
37+
for op in waitset.ops:
38+
await self.enqueue_op(op.id, op.params)
39+
await waitset.wait(self.now())
40+
self._loop.tick(self.now())
41+
return self._task.result()
42+
43+
async def enqueue_op(self, id: bytes, op: Op | object) -> None:
44+
match op:
45+
case FnCall():
46+
try:
47+
result = op.callable()
48+
if isinstance(result, Coroutine):
49+
50+
def cb(t: asyncio.Future[object]) -> None:
51+
try:
52+
res = t.result()
53+
self._loop.post_completion_threadsafe(
54+
id,
55+
result=res,
56+
)
57+
except BaseException as e:
58+
self._loop.post_completion_threadsafe(
59+
id,
60+
exception=e,
61+
)
62+
63+
asyncio.create_task(
64+
cast("Coroutine[Any, Any, object]", result)
65+
).add_done_callback(cb)
66+
else:
67+
self._loop.post_completion_threadsafe(
68+
id,
69+
result=result,
70+
)
71+
except BaseException as e:
72+
self._loop.post_completion_threadsafe(
73+
id,
74+
exception=e,
75+
)
76+
77+
case _:
78+
raise NotImplementedError(f"Unsupported op: {op!r}")

tests/test_task.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import asyncio
2+
import uuid
3+
4+
import pytest
5+
6+
import duron.context
7+
import duron.task_runner
8+
9+
10+
@pytest.mark.asyncio
11+
async def test_task_runner():
12+
async def activity() -> str:
13+
ctx = duron.context.Context()
14+
x = await ctx.run(lambda: str(uuid.uuid4()))
15+
_ = await ctx.run(lambda: asyncio.sleep(0.1))
16+
return x
17+
18+
tr = duron.task_runner.TaskRunner()
19+
_ = await tr.run(b"123", activity())

0 commit comments

Comments
 (0)