Skip to content

Commit ff5f14e

Browse files
committed
feat: delta waitset
1 parent 04a6556 commit ff5f14e

3 files changed

Lines changed: 20 additions & 21 deletions

File tree

src/duron/_core/invoke.py

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,6 @@ class _InvokeRun:
299299
"_loop",
300300
"_now",
301301
"_pending_msg",
302-
"_prev_ops",
303302
"_running",
304303
"_stream_manager",
305304
"_task",
@@ -322,7 +321,6 @@ def __init__(
322321
self._running: bool = False
323322
self._lease: bytes | None = None
324323
self._pending_msg: list[Entry] = []
325-
self._prev_ops: set[str] = set()
326324
self._now = 0
327325
self._stream_manager = StreamManager(watchers)
328326
self._tracer: Tracer | None = Tracer.current()
@@ -409,20 +407,12 @@ async def _step(self) -> WaitSet | None:
409407

410408
while True:
411409
result = self._loop.poll_completion(self._task)
412-
if result is None:
410+
if result is None or not result.added:
413411
return result
414412

415-
new_ops = False
416-
for s in result.ops:
413+
for s in result.added:
417414
sid = s.id
418-
if sid not in self._prev_ops:
419-
await self.enqueue_op(sid, s)
420-
new_ops = True
421-
if new_ops or len(self._prev_ops) != len(result.ops):
422-
self._prev_ops.clear()
423-
self._prev_ops.update(s.id for s in result.ops)
424-
if not new_ops:
425-
return result
415+
await self.enqueue_op(sid, s)
426416

427417
async def _send_traces(self, *, flush: bool = False) -> None:
428418
if not self._tracer:

src/duron/_loop.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
if TYPE_CHECKING:
1818
import sys
1919
from asyncio.futures import Future
20-
from collections.abc import Callable, Coroutine, Generator
20+
from collections.abc import Callable, Coroutine, Generator, Sequence
2121
from contextvars import Context
2222

2323
_T = TypeVar("_T")
@@ -46,7 +46,7 @@ def __init__(
4646

4747
@dataclass(slots=True)
4848
class WaitSet:
49-
ops: list[OpFuture]
49+
added: Sequence[OpFuture]
5050
timer: int | None
5151
event: asyncio.Event
5252

@@ -74,6 +74,7 @@ class _TaskCtx:
7474

7575
class EventLoop(asyncio.AbstractEventLoop):
7676
__slots__: tuple[str, ...] = (
77+
"_added",
7778
"_closed",
7879
"_ctx",
7980
"_event",
@@ -98,6 +99,7 @@ def __init__(self, host: asyncio.AbstractEventLoop) -> None:
9899
self._closed: bool = False
99100
self._event: asyncio.Event = asyncio.Event() # loop = _host
100101
self._timers: list[asyncio.TimerHandle] = []
102+
self._added: list[OpFuture] = []
101103

102104
def set_key(self, key: bytes) -> None:
103105
# Derive a fixed-length key from the provided key
@@ -223,12 +225,17 @@ def poll_completion(self, task: Future[_T]) -> WaitSet | None:
223225

224226
if task.done():
225227
return None
226-
return WaitSet(ops=[*self._ops.values()], timer=deadline, event=self._event)
228+
229+
added, self._added = self._added, []
230+
return WaitSet(added=added, timer=deadline, event=self._event)
227231
finally:
228232
events._set_running_loop(self._host) # noqa: SLF001
229233
if prev_task:
230234
tasks._enter_task(self._host, prev_task) # noqa: SLF001
231235

236+
def pending_ops(self) -> Sequence[OpFuture]:
237+
return tuple(self._ops.values())
238+
232239
def create_op(self, params: object, *, external: bool = False) -> OpFuture:
233240
if external:
234241
id_ = random_id()
@@ -237,6 +244,7 @@ def create_op(self, params: object, *, external: bool = False) -> OpFuture:
237244
id_ = self.generate_op_id()
238245
op_fut = OpFuture(id_, params, self)
239246
self._ops[id_] = op_fut
247+
self._added.append(op_fut)
240248
return op_fut
241249

242250
@overload
@@ -276,7 +284,7 @@ def close(self) -> None:
276284
p = self.poll_completion(op)
277285
if p is not None:
278286
logger.warning(
279-
"Event loop closed with pending operations: %r", p.ops
287+
"Event loop closed with pending operations: %r", self._ops
280288
)
281289
while self._timers:
282290
th = heappop(self._timers)

tests/test_loop.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,11 @@ def tick(n: int, expect: set[int] | None) -> list[str] | None:
5757
waitset = loop.poll_completion(tsk)
5858
if expect:
5959
assert waitset
60-
assert len(waitset.ops) == n
61-
assert {o.params for o in waitset.ops}.issubset(expect)
62-
ids.update(o.id for o in waitset.ops)
63-
return [o.id for o in waitset.ops]
60+
pending = loop.pending_ops()
61+
assert len(pending) == n
62+
assert {o.params for o in pending}.issubset(expect)
63+
ids.update(o.id for o in pending)
64+
return [o.id for o in pending]
6465
assert waitset is None
6566
return None
6667

0 commit comments

Comments
 (0)