Skip to content

Commit e5eb9be

Browse files
committed
feat: initial event loop
1 parent 9aca336 commit e5eb9be

9 files changed

Lines changed: 391 additions & 19 deletions

File tree

.github/workflows/ci.yaml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,16 @@ permissions:
1212
jobs:
1313
python:
1414
runs-on: ubuntu-latest
15+
strategy:
16+
matrix:
17+
python-version: ["3.10", "3.11", "3.12", "3.13"]
18+
1519
steps:
1620
- uses: actions/checkout@v5
1721
- name: "Set up Python"
1822
uses: actions/setup-python@v6
1923
with:
20-
python-version-file: "pyproject.toml"
24+
python-version: ${{ matrix.python-version }}
2125
- name: Install uv
2226
uses: astral-sh/setup-uv@v6
2327
with:

flake.nix

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,11 @@
3030
default = pkgs.mkShell {
3131
buildInputs = with pkgs; [
3232
gnumake
33-
(python313.withPackages (
34-
p: with p; [
35-
uv
36-
]
37-
))
33+
uv
3834
];
35+
env = {
36+
UV_PYTHON = pkgs.python310.interpreter;
37+
};
3938
};
4039
};
4140

main.py

Lines changed: 0 additions & 6 deletions
This file was deleted.

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ readme = "README.md"
66
requires-python = ">=3.10"
77
dependencies = [
88
"pydantic>=2.11.7",
9+
"typing-extensions>=4.15.0",
910
]
1011

1112
[build-system]
@@ -53,3 +54,4 @@ venv = ".venv"
5354
venvPath = "."
5455
reportAny = false
5556
reportExplicitAny = false
57+
reportUnreachable = false

src/duron/event_loop.py

Lines changed: 339 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,339 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import contextlib
5+
import contextvars
6+
import heapq
7+
import logging
8+
import threading
9+
from asyncio import AbstractEventLoop, Handle, Task, TimerHandle, events
10+
from collections import deque
11+
from dataclasses import dataclass
12+
from typing import TYPE_CHECKING, cast, overload
13+
14+
from typing_extensions import (
15+
Any,
16+
TypeVar,
17+
TypeVarTuple,
18+
Unpack,
19+
override,
20+
)
21+
22+
from duron.utils import mix_id
23+
24+
if TYPE_CHECKING:
25+
import sys
26+
from asyncio.futures import Future
27+
from collections.abc import Callable, Coroutine, Generator
28+
from contextvars import Context
29+
30+
_T = TypeVar("_T")
31+
_Ts = TypeVarTuple("_Ts")
32+
33+
if sys.version_info >= (3, 12):
34+
_TaskCompatibleCoro = Coroutine[Any, Any, _T]
35+
else:
36+
_TaskCompatibleCoro = Generator[Any, None, _T] | Coroutine[Any, Any, _T]
37+
38+
39+
logger = logging.getLogger(__name__)
40+
41+
42+
_task_id_ctx: contextvars.ContextVar[TaskCtx] = contextvars.ContextVar("task_id")
43+
44+
45+
class OpFuture(asyncio.Future[object]):
46+
id: str
47+
params: object
48+
49+
def __init__(self, id: str, params: object, loop: EventLoop) -> None:
50+
super().__init__(loop=loop)
51+
self.id = id
52+
self.params = params
53+
54+
55+
@dataclass(eq=True, frozen=True, slots=True)
56+
class WaitSet:
57+
ops: frozenset[OpFuture]
58+
timer: float | None
59+
event: asyncio.Event
60+
61+
async def wait(self, now_ns: int) -> None:
62+
if self.timer is None:
63+
_ = await self.event.wait()
64+
return
65+
with contextlib.suppress(asyncio.TimeoutError):
66+
t = self.timer - (now_ns / 1e9)
67+
if t <= 0:
68+
return
69+
_ = await asyncio.wait_for(self.event.wait(), timeout=t)
70+
71+
72+
@dataclass(slots=True)
73+
class TaskCtx:
74+
parent_id: str
75+
value: int
76+
77+
78+
class EventLoop(AbstractEventLoop):
79+
def __init__(self) -> None:
80+
self._ready: deque[Handle] = deque()
81+
self._debug: bool = False
82+
self._exc_handler: (
83+
Callable[[AbstractEventLoop, dict[str, object]], object] | None
84+
) = None
85+
self._blocked: dict[str, OpFuture] = {}
86+
self._root_seq: int = 0
87+
self._now_ns: int = 0
88+
self._closed: bool = False
89+
self._event: asyncio.Event = asyncio.Event()
90+
91+
self._lock: threading.Lock = threading.Lock()
92+
self._timers: list[TimerHandle] = []
93+
94+
def _generate_id(self) -> str:
95+
ctx = _task_id_ctx.get(None)
96+
if not ctx:
97+
parent_id = ""
98+
seq = self._root_seq
99+
self._root_seq += 1
100+
else:
101+
parent_id = ctx.parent_id
102+
seq = ctx.value
103+
ctx.value += 1
104+
105+
return mix_id(parent_id.encode(), int(seq).to_bytes(8, "big"))
106+
107+
@override
108+
def call_soon(
109+
self,
110+
callback: Callable[[Unpack[_Ts]], object],
111+
*args: Unpack[_Ts],
112+
context: Context | None = None,
113+
) -> Handle:
114+
h = Handle(
115+
callback,
116+
args,
117+
self,
118+
context=context,
119+
)
120+
self._ready.append(h)
121+
return h
122+
123+
@override
124+
def call_soon_threadsafe(
125+
self,
126+
callback: Callable[[Unpack[_Ts]], object],
127+
*args: Unpack[_Ts],
128+
context: Context | None = None,
129+
task_id: str | None = None,
130+
) -> Handle:
131+
self._event.set()
132+
h = TimerHandle(
133+
0,
134+
callback,
135+
args,
136+
self,
137+
context=self._context_with_task_id(context, task_id=task_id),
138+
)
139+
with self._lock:
140+
heapq.heappush(self._timers, h)
141+
return h
142+
143+
@override
144+
def call_at(
145+
self,
146+
when: float,
147+
callback: Callable[[Unpack[_Ts]], object],
148+
*args: Unpack[_Ts],
149+
context: Context | None = None,
150+
) -> TimerHandle:
151+
th = TimerHandle(
152+
when,
153+
callback,
154+
args,
155+
loop=self,
156+
context=self._context_with_task_id(context),
157+
)
158+
with self._lock:
159+
heapq.heappush(self._timers, th)
160+
return th
161+
162+
@override
163+
def call_later(
164+
self,
165+
delay: float,
166+
callback: Callable[[Unpack[_Ts]], object],
167+
*args: Unpack[_Ts],
168+
context: Context | None = None,
169+
) -> TimerHandle:
170+
return self.call_at(self.time() + delay, callback, *args, context=context)
171+
172+
@override
173+
def time(self) -> float:
174+
return self._now_ns / 1e9
175+
176+
def tick(self, time: int) -> None:
177+
self._now_ns = time
178+
179+
@override
180+
def create_future(self) -> asyncio.Future[object]:
181+
return asyncio.Future(loop=self)
182+
183+
@override
184+
def create_task(
185+
self,
186+
coro: _TaskCompatibleCoro[_T],
187+
*,
188+
name: str | None = None,
189+
context: Context | None = None,
190+
**kwargs: object,
191+
) -> Task[_T]:
192+
ctx = self._context_with_task_id(context, task_id=self._generate_id())
193+
return ctx.run(
194+
cast("type[Task[_T]]", Task), coro, name=name, loop=self, **kwargs
195+
)
196+
197+
def _run_once(self) -> bool:
198+
found = False
199+
now = self.time()
200+
with self._lock:
201+
while self._timers and (
202+
self._timers[0].when() <= now or self._timers[0].cancelled()
203+
):
204+
found = True
205+
ht = heapq.heappop(self._timers)
206+
if not ht.cancelled():
207+
self._ready.append(ht)
208+
209+
while self._ready:
210+
h = self._ready.popleft()
211+
found = True
212+
if h.cancelled():
213+
continue
214+
try:
215+
h._run()
216+
except BaseException as exc:
217+
self.call_exception_handler(
218+
{"message": "exception in callback", "exception": exc, "handle": h}
219+
)
220+
return found
221+
222+
def poll_completion(self, task: Future[_T]) -> None | WaitSet:
223+
old = events._get_running_loop()
224+
events._set_running_loop(self)
225+
try:
226+
self._event.clear()
227+
while self._run_once():
228+
if task.done():
229+
return None
230+
231+
with self._lock:
232+
return WaitSet(
233+
ops=frozenset(self._blocked.values()),
234+
timer=self._timers[0].when() if self._timers else None,
235+
event=self._event,
236+
)
237+
finally:
238+
events._set_running_loop(old)
239+
240+
def create_op(self, params: object) -> OpFuture:
241+
id = self._generate_id()
242+
s = OpFuture(id, params, self)
243+
self._blocked[id] = s
244+
return s
245+
246+
@overload
247+
def post_completion_threadsafe(
248+
self,
249+
id: str,
250+
*,
251+
result: object,
252+
) -> None: ...
253+
@overload
254+
def post_completion_threadsafe(
255+
self,
256+
id: str,
257+
*,
258+
exception: BaseException,
259+
) -> None: ...
260+
def post_completion_threadsafe(
261+
self,
262+
id: str,
263+
*,
264+
result: object = None,
265+
exception: BaseException | None = None,
266+
) -> None:
267+
sc = self._blocked.pop(id, None)
268+
if sc is None:
269+
return
270+
271+
tid = mix_id(sc.id.encode(), b"end")
272+
if exception is not None:
273+
_ = self.call_soon_threadsafe(sc.set_exception, exception, task_id=tid)
274+
else:
275+
_ = self.call_soon_threadsafe(sc.set_result, result, task_id=tid)
276+
277+
@override
278+
def is_closed(self) -> bool:
279+
return self._closed
280+
281+
@override
282+
def close(self) -> None:
283+
self._ready.clear()
284+
self._timers.clear()
285+
self._closed = True
286+
287+
@override
288+
def get_debug(self) -> bool:
289+
return self._debug
290+
291+
@override
292+
def set_debug(self, enabled: bool) -> None:
293+
self._debug = enabled
294+
295+
@override
296+
def default_exception_handler(self, context: dict[str, object]) -> None:
297+
msg = context.get("message", "Unhandled exception")
298+
exc = context.get("exception")
299+
if exc:
300+
logger.error("%s: %r", msg, exc)
301+
else:
302+
logger.error("%s", msg)
303+
304+
@override
305+
def set_exception_handler(
306+
self, handler: Callable[[AbstractEventLoop, dict[str, object]], object] | None
307+
) -> None:
308+
self._exc_handler = handler
309+
310+
@override
311+
def call_exception_handler(self, context: dict[str, object]) -> None:
312+
if self._exc_handler is None:
313+
self.default_exception_handler(context)
314+
else:
315+
_ = self._exc_handler(self, context)
316+
317+
@override
318+
async def shutdown_asyncgens(self):
319+
pass
320+
321+
@override
322+
async def shutdown_default_executor(self):
323+
pass
324+
325+
def _timer_handle_cancelled(self, _th: TimerHandle) -> None:
326+
pass
327+
328+
def _context_with_task_id(
329+
self, context: Context | None, task_id: str | None = None
330+
) -> Context:
331+
base = context or contextvars.copy_context()
332+
if task_id is None:
333+
task_id = self._generate_id()
334+
_ = base.run(_task_id_ctx.set, TaskCtx(parent_id=task_id, value=0))
335+
return base
336+
337+
338+
def create_loop() -> EventLoop:
339+
return EventLoop() # type: ignore[abstract]

0 commit comments

Comments
 (0)