1010 TypedDict ,
1111 TypeVar ,
1212 assert_never ,
13+ assert_type ,
1314 final ,
1415 overload ,
1516)
1617
17- from duron ._core .config import config
1818from duron ._core .context import Context
1919from duron ._core .ops import (
2020 Barrier ,
2828from duron ._core .stream import ObserverStream , Stream , StreamWriter
2929from duron ._loop import EventLoop , create_loop
3030from duron .codec import Codec , JSONValue
31- from duron .log import derive_id , is_entry , random_id
31+ from duron .log import derive_id , is_entry , random_id , set_metadata
32+ from duron .tracing import current_tracer
3233from duron .typing import Unspecified , inspect_function
3334
3435if TYPE_CHECKING :
5354 StreamCreateEntry ,
5455 StreamEmitEntry ,
5556 )
57+ from duron .tracing import Tracer
5658 from duron .typing import FunctionType , TypeHint
5759
5860
@@ -106,7 +108,6 @@ def get_init() -> InitParams:
106108 self ._log ,
107109 codec ,
108110 watchers = self ._watchers ,
109- debug = config .debug ,
110111 )
111112 await self ._run .resume ()
112113
@@ -122,7 +123,6 @@ def cb() -> InitParams:
122123 self ._log ,
123124 self ._fn .codec ,
124125 watchers = self ._watchers ,
125- debug = config .debug ,
126126 )
127127 await self ._run .resume ()
128128
@@ -286,7 +286,7 @@ async def _invoke_prelude(
286286class _InvokeRun :
287287 __slots__ = (
288288 "_codec" ,
289- "_debug " ,
289+ "_lease " ,
290290 "_log" ,
291291 "_loop" ,
292292 "_now" ,
@@ -297,6 +297,7 @@ class _InvokeRun:
297297 "_streams" ,
298298 "_task" ,
299299 "_tasks" ,
300+ "_tracer" ,
300301 "_watchers" ,
301302 )
302303
@@ -310,15 +311,13 @@ def __init__(
310311 tuple [Callable [[dict [str , JSONValue ]], bool ], StreamObserver [object ]]
311312 ]
312313 | None = None ,
313- debug : bool = False ,
314314 ) -> None :
315315 self ._loop = create_loop (asyncio .get_running_loop ())
316- if debug :
317- self ._loop .set_debug (True )
318316 self ._task = self ._loop .create_task (task )
319317 self ._log = log
320318 self ._codec = codec
321- self ._running : bytes | None = None
319+ self ._running : bool = False
320+ self ._lease : bytes | None = None
322321 self ._pending_msg : list [Entry ] = []
323322 self ._pending_task : dict [
324323 str ,
@@ -336,11 +335,12 @@ def __init__(
336335 ],
337336 ] = {}
338337 self ._watchers = watchers or []
339- self ._debug : dict [str , JSONValue ] | None = (
340- {"run.id" : random_id ()} if debug else None
341- )
338+ self ._tracer : Tracer | None = current_tracer ()
342339
343340 async def close (self ) -> None :
341+ if self ._lease :
342+ await self ._log .release_lease (self ._lease )
343+ self ._lease = None
344344 for task , _ in self ._tasks .values ():
345345 _ = task .cancel ()
346346 with contextlib .suppress (asyncio .CancelledError ):
@@ -359,6 +359,7 @@ def now(self) -> int:
359359 return self ._now
360360
361361 async def resume (self ) -> None :
362+ self ._lease = await self ._log .acquire_lease ()
362363 recvd_msgs : set [str ] = set ()
363364 async for o , entry in self ._log .stream (None , live = False ):
364365 ts = entry ["ts" ]
@@ -378,21 +379,17 @@ async def run(self) -> object:
378379 if self ._task .done ():
379380 return self ._task .result ()
380381
381- self ._running = await self ._log .acquire_lease ()
382- try :
383- for msg in self ._pending_msg :
384- await self .enqueue_log (msg )
385- self ._pending_msg .clear ()
386- for key , (task_fn , return_type ) in self ._pending_task .items ():
387- self ._tasks [key ] = (asyncio .create_task (task_fn ()), return_type )
388- self ._pending_task .clear ()
389-
390- while waitset := await self ._step ():
391- await waitset .block (self .now ())
392- return self ._task .result ()
393- finally :
394- await self ._log .release_lease (self ._running )
395- self ._running = None
382+ self ._running = True
383+ for msg in self ._pending_msg :
384+ await self .enqueue_log (msg )
385+ self ._pending_msg .clear ()
386+ for key , (task_fn , return_type ) in self ._pending_task .items ():
387+ self ._tasks [key ] = (asyncio .create_task (task_fn ()), return_type )
388+ self ._pending_task .clear ()
389+
390+ while waitset := await self ._step ():
391+ await waitset .block (self .now ())
392+ return self ._task .result ()
396393
397394 async def _step (self ) -> WaitSet | None :
398395 while True :
@@ -495,24 +492,23 @@ async def handle_message(
495492 id_ = e ["id" ]
496493 self ._loop .post_completion (id_ , result = offset )
497494 self ._pending_ops .discard (id_ )
495+ elif e ["type" ] == "trace" :
496+ pass
497+ else :
498+ assert_type (e ["type" ], Literal ["promise/create" ])
498499
499500 async def enqueue_log (self , entry : Entry , * , flush : bool = False ) -> None :
500501 if not self ._running :
501502 self ._pending_msg .append (entry )
503+ elif self ._lease is None :
504+ # closed
505+ return
502506 else :
503- if self ._debug :
504- if "debug" in entry :
505- entry ["debug" ] = {
506- ** entry ["debug" ],
507- ** self ._debug ,
508- }
509- else :
510- entry ["debug" ] = self ._debug
511- else :
512- _ = entry .pop ("debug" , None )
513- offset = await self ._log .append (self ._running , entry )
507+ if self ._tracer :
508+ self ._tracer .attach_metadata (entry )
509+ offset = await self ._log .append (self ._lease , entry )
514510 if flush :
515- await self ._log .flush (self ._running )
511+ await self ._log .flush (self ._lease )
516512 await self .handle_message (offset , entry )
517513
518514 async def enqueue_op (self , id_ : str , fut : OpFuture [object ]) -> None :
@@ -524,14 +520,18 @@ async def enqueue_op(self, id_: str, fut: OpFuture[object]) -> None:
524520 "id" : id_ ,
525521 "type" : "promise/create" ,
526522 }
527- if op .metadata :
528- promise_create_entry ["metadata" ] = op .metadata
529- if self ._debug :
530- promise_create_entry ["debug" ] = {
523+
524+ set_metadata (
525+ promise_create_entry ,
526+ op .metadata ,
527+ {
531528 "fn.name" : str (
532529 getattr (op .callable , "__qualname__" , op .callable )
533530 ),
534531 }
532+ if self ._tracer
533+ else None ,
534+ )
535535 await self .enqueue_log (promise_create_entry )
536536
537537 async def cb () -> None :
@@ -590,8 +590,7 @@ def done(f: OpFuture[object]) -> None:
590590 "id" : stream_id ,
591591 "type" : "stream/create" ,
592592 }
593- if op .metadata :
594- stream_create_entry ["metadata" ] = op .metadata
593+ set_metadata (stream_create_entry , op .metadata )
595594 await self .enqueue_log (stream_create_entry )
596595
597596 case StreamEmit ():
@@ -634,8 +633,7 @@ def done(f: OpFuture[object]) -> None:
634633 "id" : id_ ,
635634 "type" : "promise/create" ,
636635 }
637- if op .metadata :
638- promise_create_entry ["metadata" ] = op .metadata
636+ set_metadata (promise_create_entry , op .metadata )
639637 self ._tasks [id_ ] = (asyncio .Future (), op .return_type )
640638 await self .enqueue_log (promise_create_entry )
641639 case _:
0 commit comments