Skip to content

Commit aa3f974

Browse files
committed
feat: add benchmark
1 parent 6d1de93 commit aa3f974

8 files changed

Lines changed: 249 additions & 97 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@
22
/site
33
/.venv
44
/.nox
5+
/.codspeed
56
/logs
67
__pycache__

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ dev = [
1818
"pydantic>=2.11.9",
1919
"pytest>=7.0",
2020
"pytest-asyncio>=0.21",
21+
"pytest-codspeed>=4.1.1",
2122
]
2223
type-checking = [
2324
"basedmypy>=2.10.0",

src/duron/_core/context.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import annotations
22

33
import asyncio
4-
import base64
4+
import binascii
55
from contextvars import ContextVar
66
from random import Random
77
from typing import (
@@ -192,7 +192,10 @@ async def create_promise(
192192
self._loop,
193193
ExternalPromiseCreate(metadata=metadata, return_type=dtype),
194194
)
195-
return (base64.b64encode(fut.id).decode(), cast("asyncio.Future[_T]", fut))
195+
return (
196+
binascii.b2a_base64(fut.id, newline=False).decode(),
197+
cast("asyncio.Future[_T]", fut),
198+
)
196199

197200
async def barrier(self) -> int:
198201
if asyncio.get_running_loop() is not self._loop:

src/duron/_core/invoke.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ def __init__(
289289
]
290290
| None = None,
291291
) -> None:
292-
self._loop = create_loop(asyncio.get_event_loop(), b"")
292+
self._loop = create_loop(asyncio.get_running_loop(), b"")
293293
self._task = self._loop.create_task(task)
294294
self._log = log
295295
self._codec = codec
@@ -370,14 +370,18 @@ async def _step(self) -> WaitSet | None:
370370
while True:
371371
self._loop.tick(self.now())
372372
result = self._loop.poll_completion(self._task)
373-
if result is None or self._pending_ops.issuperset(o.id for o in result.ops):
373+
if result is None:
374374
return result
375375

376+
new_ops = False
376377
for s in result.ops:
377378
sid = s.id
378379
if sid not in self._pending_ops:
379380
self._pending_ops.add(sid)
380381
await self.enqueue_op(sid, s)
382+
new_ops = True
383+
if not new_ops:
384+
return result
381385

382386
async def handle_message(
383387
self,
@@ -489,7 +493,7 @@ async def enqueue_op(self, id_: bytes, fut: OpFuture[object]) -> None:
489493
async def cb() -> None:
490494
entry: PromiseCompleteEntry = {
491495
"ts": self.now(),
492-
"id": _encode_id(id_, 1),
496+
"id": _encode_id(id_, ack=True),
493497
"type": "promise/complete",
494498
"promise_id": _encode_id(id_),
495499
}
@@ -604,7 +608,7 @@ async def complete_external_promise(
604608
raise ValueError(msg)
605609
entry: PromiseCompleteEntry = {
606610
"ts": self.now(),
607-
"id": _encode_id(_decode_id(id_), 1),
611+
"id": _encode_id(_decode_id(id_), ack=True),
608612
"type": "promise/complete",
609613
"promise_id": id_,
610614
}
@@ -672,10 +676,10 @@ async def close_stream(
672676
return cnt
673677

674678

675-
def _encode_id(id_: bytes, flag: int = 0) -> str:
676-
if flag != 0:
679+
def _encode_id(id_: bytes, *, ack: bool = False) -> str:
680+
if ack:
677681
id_ = blake2b(
678-
flag.to_bytes(4, "little", signed=True) + id_,
682+
b"\x01\x00\x00\x00" + id_,
679683
digest_size=12,
680684
).digest()
681685
return base64.b64encode(id_).decode()

0 commit comments

Comments
 (0)