Skip to content

Commit e27994a

Browse files
committed
feat: str id throughout
1 parent 2f7deed commit e27994a

11 files changed

Lines changed: 75 additions & 83 deletions

File tree

examples/agent.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from rich.console import Console
2121

2222
import duron
23-
from duron import Deferred, Signal, SignalInterrupt, Stream, StreamWriter
23+
from duron import Defer, Signal, SignalInterrupt, Stream, StreamWriter
2424
from duron.codec import Codec
2525
from duron.contrib.storage import FileLogStorage
2626

@@ -53,9 +53,9 @@ def decode_json(self, encoded: JSONValue, expected_type: TypeHint[Any]) -> objec
5353
@duron.fn(codec=PydanticCodec())
5454
async def agent_fn(
5555
ctx: duron.Context,
56-
input_: Stream[str] = Deferred,
57-
signal: Signal[None] = Deferred,
58-
output: StreamWriter[tuple[str, str]] = Deferred,
56+
input_: Stream[str] = Defer,
57+
signal: Signal[None] = Defer,
58+
output: StreamWriter[tuple[str, str]] = Defer,
5959
) -> None:
6060
history: list[ChatCompletionMessageParam] = [
6161
{

src/duron/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@
88
from ._core.stream import StreamWriter as StreamWriter
99
from ._decorator.fn import fn as fn
1010
from ._decorator.op import op as op
11-
from .typing import Deferred as Deferred
11+
from .typing import Defer as Defer

src/duron/_core/context.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
from duron._core.signal import create_signal
1919
from duron._core.stream import create_stream, run_stream
2020
from duron._decorator.op import CheckpointOp, Op
21-
from duron.log import encode_id
2221
from duron.typing import inspect_function
2322

2423
if TYPE_CHECKING:
@@ -185,7 +184,7 @@ async def create_promise(
185184
ExternalPromiseCreate(metadata=self._get_metadata(None), return_type=dtype),
186185
)
187186
return (
188-
encode_id(fut.id),
187+
fut.id,
189188
cast("asyncio.Future[_T]", fut),
190189
)
191190

src/duron/_core/invoke.py

Lines changed: 30 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import asyncio
44
import contextlib
55
import time
6-
from typing import TYPE_CHECKING, Generic, Literal, cast
6+
from typing import TYPE_CHECKING, Final, Generic, Literal, cast
77
from typing_extensions import (
88
Any,
99
ParamSpec,
@@ -28,7 +28,7 @@
2828
from duron._core.stream import ObserverStream, Stream, StreamWriter
2929
from duron._loop import EventLoop, create_loop
3030
from duron.codec import Codec, JSONValue
31-
from duron.log import decode_id, derive_id, encode_id, is_entry, random_id
31+
from duron.log import derive_id, is_entry, random_id
3232
from duron.typing import Unspecified, inspect_function
3333

3434
if TYPE_CHECKING:
@@ -60,7 +60,7 @@
6060
_T = TypeVar("_T")
6161
_P = ParamSpec("_P")
6262

63-
_CURRENT_VERSION = 0
63+
_CURRENT_VERSION: Final = 0
6464

6565

6666
@final
@@ -95,7 +95,7 @@ def get_init() -> InitParams:
9595
"version": _CURRENT_VERSION,
9696
"args": [codec.encode_json(arg) for arg in args],
9797
"kwargs": {k: codec.encode_json(v) for k, v in kwargs.items()},
98-
"seed": _generate_id(),
98+
"nonce": random_id(),
9999
}
100100

101101
codec = self._fn.codec
@@ -238,7 +238,7 @@ class InitParams(TypedDict):
238238
version: int
239239
args: list[JSONValue]
240240
kwargs: dict[str, JSONValue]
241-
seed: str
241+
nonce: str
242242

243243

244244
async def _invoke_prelude(
@@ -254,7 +254,7 @@ async def _invoke_prelude(
254254
if init_params["version"] != _CURRENT_VERSION:
255255
msg = "version mismatch"
256256
raise RuntimeError(msg)
257-
loop.set_key(decode_id(init_params["seed"]))
257+
loop.set_key(init_params["nonce"].encode())
258258
extra_kwargs: dict[str, object] = {}
259259
for name, type_, dtype in job_fn.inject:
260260
with ctx.metadata({"param.name": name}):
@@ -324,7 +324,7 @@ def __init__(
324324
str,
325325
tuple[Callable[[], Coroutine[Any, Any, None]], TypeHint[Any]],
326326
] = {}
327-
self._pending_ops: set[bytes] = set()
327+
self._pending_ops: set[str] = set()
328328
self._now = 0
329329
self._tasks: dict[str, tuple[asyncio.Future[None], TypeHint[Any]]] = {}
330330
self._streams: dict[
@@ -337,7 +337,7 @@ def __init__(
337337
] = {}
338338
self._watchers = watchers or []
339339
self._debug: dict[str, JSONValue] | None = (
340-
{"run.id": _generate_id()} if debug else None
340+
{"run.id": random_id()} if debug else None
341341
)
342342

343343
async def close(self) -> None:
@@ -420,7 +420,7 @@ async def handle_message(
420420
pending_info = self._pending_task.pop(e["promise_id"], None)
421421
task_info = self._tasks.get(e["promise_id"], None)
422422

423-
id_ = decode_id(e["promise_id"])
423+
id_ = e["promise_id"]
424424

425425
return_type: TypeHint[Any] = Unspecified
426426
if pending_info is not None:
@@ -451,7 +451,7 @@ async def handle_message(
451451
msg = f"Invalid promise/complete entry: {e!r}"
452452
raise ValueError(msg)
453453
elif e["type"] == "stream/create":
454-
id_ = decode_id(e["id"])
454+
id_ = e["id"]
455455
if e["id"] not in self._streams:
456456
self._loop.post_completion(
457457
id_, exception=ValueError("Stream not found")
@@ -460,7 +460,7 @@ async def handle_message(
460460
self._loop.post_completion(id_, result=e["id"])
461461
self._pending_ops.discard(id_)
462462
elif e["type"] == "stream/emit":
463-
id_ = decode_id(e["id"])
463+
id_ = e["id"]
464464
if e["stream_id"] not in self._streams:
465465
self._loop.post_completion(
466466
id_, exception=ValueError("Stream not found")
@@ -475,7 +475,7 @@ async def handle_message(
475475
self._loop.post_completion(id_, result=None)
476476
self._pending_ops.discard(id_)
477477
elif e["type"] == "stream/complete":
478-
id_ = decode_id(e["id"])
478+
id_ = e["id"]
479479
if e["stream_id"] not in self._streams:
480480
self._loop.post_completion(
481481
id_, exception=ValueError("Stream not found")
@@ -492,7 +492,7 @@ async def handle_message(
492492
_ = self._streams.pop(e["stream_id"], None)
493493
self._pending_ops.discard(id_)
494494
elif e["type"] == "barrier":
495-
id_ = decode_id(e["id"])
495+
id_ = e["id"]
496496
self._loop.post_completion(id_, result=offset)
497497
self._pending_ops.discard(id_)
498498

@@ -515,13 +515,13 @@ async def enqueue_log(self, entry: Entry, *, flush: bool = False) -> None:
515515
await self._log.flush(self._running)
516516
await self.handle_message(offset, entry)
517517

518-
async def enqueue_op(self, id_: bytes, fut: OpFuture[object]) -> None:
518+
async def enqueue_op(self, id_: str, fut: OpFuture[object]) -> None:
519519
op = cast("Op", fut.params)
520520
match op:
521521
case FnCall():
522522
promise_create_entry: PromiseCreateEntry = {
523523
"ts": self.now(),
524-
"id": encode_id(id_),
524+
"id": id_,
525525
"type": "promise/create",
526526
}
527527
if op.metadata:
@@ -538,9 +538,9 @@ async def cb() -> None:
538538
now_us = self.now()
539539
entry: PromiseCompleteEntry = {
540540
"ts": now_us,
541-
"id": encode_id(derive_id(id_)),
541+
"id": derive_id(id_),
542542
"type": "promise/complete",
543-
"promise_id": encode_id(id_),
543+
"promise_id": id_,
544544
}
545545
try:
546546
result = op.callable(*op.args, **op.kwargs)
@@ -556,7 +556,7 @@ async def cb() -> None:
556556

557557
def done(f: OpFuture[object]) -> None:
558558
if f.cancelled():
559-
sid = encode_id(f.id)
559+
sid = f.id
560560
if self._pending_task.get(sid, None):
561561
# pending task cancelled
562562
pass
@@ -566,14 +566,14 @@ def done(f: OpFuture[object]) -> None:
566566
_ = task.get_loop().call_soon(task.cancel)
567567

568568
fut.add_done_callback(done)
569-
sid = encode_id(id_)
569+
sid = id_
570570
if self._running:
571571
self._tasks[sid] = (asyncio.create_task(cb()), op.return_type)
572572
else:
573573
self._pending_task[sid] = (cb, op.return_type)
574574

575575
case StreamCreate():
576-
stream_id = encode_id(id_)
576+
stream_id = id_
577577

578578
# Determine which observer to use
579579
ob = [op.observer] if op.observer else []
@@ -597,7 +597,7 @@ def done(f: OpFuture[object]) -> None:
597597
case StreamEmit():
598598
stream_emit_entry: StreamEmitEntry = {
599599
"ts": self.now(),
600-
"id": encode_id(id_),
600+
"id": id_,
601601
"stream_id": op.stream_id,
602602
"type": "stream/emit",
603603
"value": self._codec.encode_json(op.value),
@@ -607,7 +607,7 @@ def done(f: OpFuture[object]) -> None:
607607
if op.exception:
608608
stream_close_entry_err: StreamCompleteEntry = {
609609
"ts": self.now(),
610-
"id": encode_id(id_),
610+
"id": id_,
611611
"stream_id": op.stream_id,
612612
"type": "stream/complete",
613613
"error": _encode_error(op.exception),
@@ -616,27 +616,27 @@ def done(f: OpFuture[object]) -> None:
616616
else:
617617
stream_close_entry: StreamCompleteEntry = {
618618
"ts": self.now(),
619-
"id": encode_id(id_),
619+
"id": id_,
620620
"stream_id": op.stream_id,
621621
"type": "stream/complete",
622622
}
623623
await self.enqueue_log(stream_close_entry)
624624
case Barrier():
625625
barrier_entry: BarrierEntry = {
626626
"ts": self.now(),
627-
"id": encode_id(id_),
627+
"id": id_,
628628
"type": "barrier",
629629
}
630630
await self.enqueue_log(barrier_entry, flush=True)
631631
case ExternalPromiseCreate():
632632
promise_create_entry = {
633633
"ts": self.now(),
634-
"id": encode_id(id_),
634+
"id": id_,
635635
"type": "promise/create",
636636
}
637637
if op.metadata:
638638
promise_create_entry["metadata"] = op.metadata
639-
self._tasks[encode_id(id_)] = (asyncio.Future(), op.return_type)
639+
self._tasks[id_] = (asyncio.Future(), op.return_type)
640640
await self.enqueue_log(promise_create_entry)
641641
case _:
642642
assert_never(op)
@@ -654,7 +654,7 @@ async def complete_external_promise(
654654
now_us = self.now()
655655
entry: PromiseCompleteEntry = {
656656
"ts": now_us,
657-
"id": encode_id(derive_id(decode_id(id_))),
657+
"id": derive_id(id_),
658658
"type": "promise/complete",
659659
"promise_id": id_,
660660
}
@@ -678,7 +678,7 @@ async def send_stream(
678678
if predicate(md or {}):
679679
entry: StreamEmitEntry = {
680680
"ts": ts,
681-
"id": _generate_id(),
681+
"id": random_id(),
682682
"type": "stream/emit",
683683
"stream_id": stream_id,
684684
"value": self._codec.encode_json(value),
@@ -705,15 +705,15 @@ async def close_stream(
705705
if error:
706706
entry: StreamCompleteEntry = {
707707
"ts": ts,
708-
"id": _generate_id(),
708+
"id": random_id(),
709709
"type": "stream/complete",
710710
"stream_id": stream_id,
711711
"error": _encode_error(error),
712712
}
713713
else:
714714
entry = {
715715
"ts": ts,
716-
"id": _generate_id(),
716+
"id": random_id(),
717717
"type": "stream/complete",
718718
"stream_id": stream_id,
719719
}
@@ -722,10 +722,6 @@ async def close_stream(
722722
return cnt
723723

724724

725-
def _generate_id() -> str:
726-
return encode_id(random_id())
727-
728-
729725
def _encode_error(error: BaseException) -> ErrorInfo:
730726
if type(error) is asyncio.CancelledError:
731727
return {

src/duron/_loop.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from asyncio import events, tasks
88
from collections import deque
99
from dataclasses import dataclass
10+
from hashlib import blake2b
1011
from heapq import heappop, heappush
1112
from typing import TYPE_CHECKING, Generic
1213
from typing_extensions import (
@@ -45,11 +46,11 @@
4546
class OpFuture(asyncio.Future[_T], Generic[_T]):
4647
__slots__: tuple[str, ...] = ("id", "params")
4748

48-
id: bytes
49+
id: str
4950
params: object
5051

5152
def __init__(
52-
self, id_: bytes, params: object, loop: asyncio.AbstractEventLoop
53+
self, id_: str, params: object, loop: asyncio.AbstractEventLoop
5354
) -> None:
5455
super().__init__(loop=loop)
5556
self.id = id_
@@ -74,7 +75,7 @@ async def block(self, now_us: int) -> None:
7475

7576
@dataclass(slots=True)
7677
class _TaskCtx:
77-
parent_id: bytes
78+
parent_id: str
7879
seq: int = 0
7980

8081

@@ -99,18 +100,19 @@ def __init__(self, host: asyncio.AbstractEventLoop) -> None:
99100
self._exc_handler: (
100101
Callable[[asyncio.AbstractEventLoop, dict[str, object]], object] | None
101102
) = None
102-
self._ops: dict[bytes, OpFuture[object]] = {}
103-
self._ctx: _TaskCtx = _TaskCtx(parent_id=b"")
103+
self._ops: dict[str, OpFuture[object]] = {}
104+
self._ctx: _TaskCtx = _TaskCtx(parent_id="")
104105
self._key: bytes = b""
105106
self._now_us: int = 0
106107
self._closed: bool = False
107108
self._event: asyncio.Event = asyncio.Event() # loop = _host
108109
self._timers: list[asyncio.TimerHandle] = []
109110

110111
def set_key(self, key: bytes) -> None:
111-
self._key = derive_id(key, key=self._key)
112+
# Derive a fixed-length key from the provided key
113+
self._key = blake2b(key, digest_size=16, key=self._key).digest()
112114

113-
def generate_op_id(self) -> bytes:
115+
def generate_op_id(self) -> str:
114116
ctx = _task_ctx.get(self._ctx)
115117
ctx.seq += 1
116118
return derive_id(
@@ -266,20 +268,20 @@ def create_op(self, params: object, *, external: bool = False) -> OpFuture[objec
266268
@overload
267269
def post_completion(
268270
self,
269-
id_: bytes,
271+
id_: str,
270272
*,
271273
result: object,
272274
) -> None: ...
273275
@overload
274276
def post_completion(
275277
self,
276-
id_: bytes,
278+
id_: str,
277279
*,
278280
exception: BaseException,
279281
) -> None: ...
280282
def post_completion(
281283
self,
282-
id_: bytes,
284+
id_: str,
283285
*,
284286
result: object = None,
285287
exception: BaseException | None = None,

0 commit comments

Comments
 (0)