Skip to content

Commit 65ec83c

Browse files
committed
feat: initial event loop
1 parent d3f47b0 commit 65ec83c

6 files changed

Lines changed: 356 additions & 12 deletions

File tree

flake.nix

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,11 @@
3030
default = pkgs.mkShell {
3131
buildInputs = with pkgs; [
3232
gnumake
33-
(python313.withPackages (
34-
p: with p; [
35-
uv
36-
]
37-
))
33+
uv
3834
];
35+
env = {
36+
UV_PYTHON = pkgs.python310.interpreter;
37+
};
3938
};
4039
};
4140

main.py

Lines changed: 0 additions & 6 deletions
This file was deleted.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ readme = "README.md"
66
requires-python = ">=3.10"
77
dependencies = [
88
"pydantic>=2.11.7",
9+
"typing-extensions>=4.15.0",
910
]
1011

1112
[build-system]

src/duron/event_loop.py

Lines changed: 336 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,336 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import contextvars
5+
import heapq
6+
import logging
7+
import threading
8+
from asyncio import AbstractEventLoop, Handle, Task, TimerHandle, events
9+
from collections import deque
10+
from dataclasses import dataclass
11+
from typing import TYPE_CHECKING
12+
13+
from typing_extensions import (
14+
Any,
15+
TypeVar,
16+
TypeVarTuple,
17+
Unpack,
18+
override,
19+
)
20+
21+
from duron.utils import mix_id
22+
23+
if TYPE_CHECKING:
24+
from collections.abc import Callable, Coroutine, Generator
25+
from contextvars import Context
26+
T = TypeVar("T")
27+
Ts = TypeVarTuple("Ts")
28+
29+
logger = logging.getLogger(__name__)
30+
31+
32+
_task_id_ctx: contextvars.ContextVar[TaskCtx] = contextvars.ContextVar("task_id")
33+
34+
35+
@dataclass(slots=True)
36+
class SyscallFuture:
37+
id: str
38+
params: object
39+
_future: asyncio.Future[Any]
40+
41+
def __await__(self) -> Generator[Any, None, Any]:
42+
return self._future.__await__()
43+
44+
@override
45+
def __hash__(self) -> int:
46+
return hash(self.id)
47+
48+
@override
49+
def __eq__(self, other: object) -> bool:
50+
if not isinstance(other, SyscallFuture):
51+
return NotImplemented
52+
return self.id == other.id
53+
54+
55+
@dataclass(eq=True, frozen=True, slots=True)
56+
class WaitSet:
57+
syscalls: frozenset[SyscallFuture]
58+
timer: float | None
59+
60+
61+
@dataclass(slots=True)
62+
class TaskCtx:
63+
parent_id: str
64+
value: int
65+
66+
67+
class EventLoop(AbstractEventLoop):
68+
def __init__(self, event: asyncio.Event) -> None:
69+
self._ready: deque[Handle] = deque()
70+
self._timers: list[TimerHandle] = []
71+
self._cv: threading.Condition = threading.Condition()
72+
self._stopping: bool = False
73+
self._closed: bool = False
74+
self._debug: bool = False
75+
self._exc_handler: (
76+
Callable[[AbstractEventLoop, dict[str, Any]], object] | None
77+
) = None
78+
self._blocked: dict[str, SyscallFuture] = {}
79+
self._root_seq: int = 0
80+
self._now: int = 0
81+
self._event: asyncio.Event = event
82+
83+
def syscall(self, params: object) -> SyscallFuture:
84+
id = self._generate_id()
85+
fut = self.create_future()
86+
s = SyscallFuture(_future=fut, id=id, params=params)
87+
self._blocked[id] = s
88+
return s
89+
90+
def _generate_id(self) -> str:
91+
ctx = _task_id_ctx.get(None)
92+
if not ctx:
93+
parent_id = ""
94+
seq = self._root_seq
95+
self._root_seq += 1
96+
else:
97+
parent_id = ctx.parent_id
98+
seq = ctx.value
99+
ctx.value += 1
100+
101+
return mix_id(parent_id.encode(), int(seq).to_bytes(8, 'big'))
102+
103+
@override
104+
def call_soon(
105+
self,
106+
callback: Callable[[*Ts], Any],
107+
*args: Unpack[Ts],
108+
context: Context | None = None,
109+
) -> Handle:
110+
h = Handle(
111+
callback,
112+
args,
113+
self,
114+
context=context,
115+
)
116+
with self._cv:
117+
self._ready.append(h)
118+
self._cv.notify()
119+
return h
120+
121+
@override
122+
def call_soon_threadsafe(
123+
self,
124+
callback: Callable[[*Ts], Any],
125+
*args: Unpack[Ts],
126+
context: Context | None = None,
127+
task_id: str | None = None,
128+
) -> Handle:
129+
self._event.set()
130+
h = Handle(
131+
callback,
132+
args,
133+
self,
134+
context=self._context_with_task_id(context, task_id=task_id),
135+
)
136+
with self._cv:
137+
self._ready.append(h)
138+
self._cv.notify()
139+
return h
140+
141+
@override
142+
def call_at(
143+
self,
144+
when: float,
145+
callback: Callable[[*Ts], Any],
146+
*args: Unpack[Ts],
147+
context: Context | None = None,
148+
) -> TimerHandle:
149+
th = TimerHandle(
150+
when,
151+
callback,
152+
args,
153+
loop=self,
154+
context=self._context_with_task_id(context),
155+
)
156+
with self._cv:
157+
heapq.heappush(self._timers, th)
158+
self._cv.notify()
159+
return th
160+
161+
@override
162+
def call_later(
163+
self,
164+
delay: float,
165+
callback: Callable[[*Ts], Any],
166+
*args: Unpack[Ts],
167+
context: Context | None = None,
168+
) -> TimerHandle:
169+
return self.call_at(self.time() + delay, callback, *args, context=context)
170+
171+
@override
172+
def time(self) -> float:
173+
return self._now / 1000.0
174+
175+
def tick(self, time: int) -> None:
176+
self._now = time
177+
178+
@override
179+
def create_future(self) -> asyncio.Future[Any]:
180+
return asyncio.Future(loop=self)
181+
182+
@override
183+
def create_task(
184+
self,
185+
coro: Generator[Any, None, T] | Coroutine[Any, Any, T],
186+
*,
187+
name: str | None = None,
188+
context: Context | None = None,
189+
**kwargs: Any,
190+
) -> Task[T]:
191+
ctx = self._context_with_task_id(context, task_id=self._generate_id())
192+
return ctx.run(Task[T], coro, name=name, loop=self, **kwargs)
193+
194+
def _run_once(self) -> None:
195+
now = self.time()
196+
# promote due timers
197+
with self._cv:
198+
while self._timers and self._timers[0].when() <= now:
199+
ht = heapq.heappop(self._timers)
200+
if not ht.cancelled():
201+
self._ready.append(ht)
202+
203+
# drain ready queue in batches for better performance
204+
batch_size = min(len(self._ready), 100) # Process up to 100 at a time
205+
for _ in range(batch_size):
206+
if not self._ready:
207+
break
208+
h = self._ready.popleft()
209+
if h.cancelled():
210+
continue
211+
try:
212+
h._run()
213+
except BaseException as exc:
214+
self.call_exception_handler(
215+
{"message": "exception in callback", "exception": exc, "handle": h}
216+
)
217+
218+
@override
219+
def stop(self) -> None:
220+
with self._cv:
221+
self._stopping = True
222+
self._cv.notify_all()
223+
224+
def poll_completion(self, task: Task[T]) -> None | WaitSet:
225+
old = events._get_running_loop()
226+
events._set_running_loop(self)
227+
try:
228+
while not task.done():
229+
self._run_once()
230+
231+
with self._cv:
232+
if self._ready:
233+
continue
234+
if self._timers and self._timers[0].when() <= self.time():
235+
# timer is due
236+
continue
237+
238+
break
239+
240+
if task.done():
241+
return None
242+
243+
return WaitSet(
244+
syscalls=frozenset(self._blocked.values()),
245+
timer=self._timers[0].when() if self._timers else None,
246+
)
247+
finally:
248+
events._set_running_loop(old)
249+
250+
def complete_syscall(
251+
self,
252+
syscall_id: str,
253+
*,
254+
result: Any = None,
255+
exception: BaseException | None = None,
256+
) -> None:
257+
sc = self._blocked.pop(syscall_id, None)
258+
if sc is None:
259+
return
260+
261+
fut = sc._future # pyright: ignore[reportPrivateUsage]
262+
tid = mix_id(sc.id.encode(), b"end")
263+
if exception is not None:
264+
_ = self.call_soon_threadsafe(fut.set_exception, exception, task_id=tid)
265+
else:
266+
_ = self.call_soon_threadsafe(fut.set_result, result, task_id=tid)
267+
268+
@override
269+
def is_running(self) -> bool:
270+
return not self._stopping
271+
272+
@override
273+
def is_closed(self) -> bool:
274+
return self._closed
275+
276+
@override
277+
def close(self) -> None:
278+
self._closed = True
279+
280+
@override
281+
def get_debug(self) -> bool:
282+
return self._debug
283+
284+
@override
285+
def set_debug(self, enabled: bool) -> None:
286+
self._debug = enabled
287+
288+
@override
289+
def default_exception_handler(self, context: dict[str, Any]) -> None:
290+
msg = context.get("message", "Unhandled exception")
291+
exc = context.get("exception")
292+
if exc:
293+
logger.error("%s: %r", msg, exc)
294+
else:
295+
logger.error("%s", msg)
296+
297+
@override
298+
def set_exception_handler(
299+
self, handler: Callable[[AbstractEventLoop, dict[str, Any]], object] | None
300+
) -> None:
301+
self._exc_handler = handler
302+
303+
@override
304+
def call_exception_handler(self, context: dict[str, Any]) -> None:
305+
if self._exc_handler is None:
306+
self.default_exception_handler(context)
307+
else:
308+
_ = self._exc_handler(self, context)
309+
310+
@override
311+
async def shutdown_asyncgens(self):
312+
pass
313+
314+
@override
315+
async def shutdown_default_executor(self):
316+
pass
317+
318+
def _timer_handle_cancelled(self, th: TimerHandle) -> None:
319+
try:
320+
self._timers.remove(th)
321+
heapq.heapify(self._timers)
322+
except ValueError:
323+
pass
324+
325+
def _context_with_task_id(
326+
self, context: Context | None, task_id: str | None = None
327+
) -> Context:
328+
base = context or contextvars.copy_context()
329+
if task_id is None:
330+
task_id = self._generate_id()
331+
_ = base.run(_task_id_ctx.set, TaskCtx(parent_id=task_id, value=0))
332+
return base
333+
334+
335+
def create_loop(event: asyncio.Event) -> EventLoop:
336+
return EventLoop(event) # type: ignore[abstract]

src/duron/utils.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import base64
2+
import hashlib
3+
4+
_SEP = b"\x00"
5+
6+
7+
def mix_id(*parts: bytes) -> str:
8+
buf = _SEP.join(parts)
9+
h = hashlib.blake2b(buf, digest_size=12)
10+
return base64.b64encode(h.digest()).decode("ascii")

uv.lock

Lines changed: 5 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)