1414 overload ,
1515)
1616
17- from duron ._core .config import config
1817from duron ._core .context import Context
1918from duron ._core .ops import (
2019 Barrier ,
2928from duron ._loop import EventLoop , create_loop
3029from duron .codec import Codec , JSONValue
3130from duron .log import derive_id , is_entry , random_id
31+ from duron .tracing import current_tracer
3232from duron .typing import Unspecified , inspect_function
3333
3434if TYPE_CHECKING :
@@ -106,7 +106,6 @@ def get_init() -> InitParams:
106106 self ._log ,
107107 codec ,
108108 watchers = self ._watchers ,
109- debug = config .debug ,
110109 )
111110 await self ._run .resume ()
112111
@@ -122,7 +121,6 @@ def cb() -> InitParams:
122121 self ._log ,
123122 self ._fn .codec ,
124123 watchers = self ._watchers ,
125- debug = config .debug ,
126124 )
127125 await self ._run .resume ()
128126
@@ -287,6 +285,7 @@ class _InvokeRun:
287285 __slots__ = (
288286 "_codec" ,
289287 "_debug" ,
288+ "_lease" ,
290289 "_log" ,
291290 "_loop" ,
292291 "_now" ,
@@ -310,15 +309,13 @@ def __init__(
310309 tuple [Callable [[dict [str , JSONValue ]], bool ], StreamObserver [object ]]
311310 ]
312311 | None = None ,
313- debug : bool = False ,
314312 ) -> None :
315313 self ._loop = create_loop (asyncio .get_running_loop ())
316- if debug :
317- self ._loop .set_debug (True )
318314 self ._task = self ._loop .create_task (task )
319315 self ._log = log
320316 self ._codec = codec
321- self ._running : bytes | None = None
317+ self ._running : bool = False
318+ self ._lease : bytes | None = None
322319 self ._pending_msg : list [Entry ] = []
323320 self ._pending_task : dict [
324321 str ,
@@ -336,11 +333,15 @@ def __init__(
336333 ],
337334 ] = {}
338335 self ._watchers = watchers or []
336+ tracer = current_tracer ()
339337 self ._debug : dict [str , JSONValue ] | None = (
340- {"run .id" : random_id ()} if debug else None
338+ {"trace .id" : tracer . id ()} if tracer else None
341339 )
342340
343341 async def close (self ) -> None :
342+ if self ._lease :
343+ await self ._log .release_lease (self ._lease )
344+ self ._lease = None
344345 for task , _ in self ._tasks .values ():
345346 _ = task .cancel ()
346347 with contextlib .suppress (asyncio .CancelledError ):
@@ -359,6 +360,7 @@ def now(self) -> int:
359360 return self ._now
360361
361362 async def resume (self ) -> None :
363+ self ._lease = await self ._log .acquire_lease ()
362364 recvd_msgs : set [str ] = set ()
363365 async for o , entry in self ._log .stream (None , live = False ):
364366 ts = entry ["ts" ]
@@ -378,21 +380,17 @@ async def run(self) -> object:
378380 if self ._task .done ():
379381 return self ._task .result ()
380382
381- self ._running = await self ._log .acquire_lease ()
382- try :
383- for msg in self ._pending_msg :
384- await self .enqueue_log (msg )
385- self ._pending_msg .clear ()
386- for key , (task_fn , return_type ) in self ._pending_task .items ():
387- self ._tasks [key ] = (asyncio .create_task (task_fn ()), return_type )
388- self ._pending_task .clear ()
389-
390- while waitset := await self ._step ():
391- await waitset .block (self .now ())
392- return self ._task .result ()
393- finally :
394- await self ._log .release_lease (self ._running )
395- self ._running = None
383+ self ._running = True
384+ for msg in self ._pending_msg :
385+ await self .enqueue_log (msg )
386+ self ._pending_msg .clear ()
387+ for key , (task_fn , return_type ) in self ._pending_task .items ():
388+ self ._tasks [key ] = (asyncio .create_task (task_fn ()), return_type )
389+ self ._pending_task .clear ()
390+
391+ while waitset := await self ._step ():
392+ await waitset .block (self .now ())
393+ return self ._task .result ()
396394
397395 async def _step (self ) -> WaitSet | None :
398396 while True :
@@ -499,6 +497,9 @@ async def handle_message(
499497 async def enqueue_log (self , entry : Entry , * , flush : bool = False ) -> None :
500498 if not self ._running :
501499 self ._pending_msg .append (entry )
500+ elif self ._lease is None :
501+ # closed
502+ return
502503 else :
503504 if self ._debug :
504505 if "debug" in entry :
@@ -510,9 +511,9 @@ async def enqueue_log(self, entry: Entry, *, flush: bool = False) -> None:
510511 entry ["debug" ] = self ._debug
511512 else :
512513 _ = entry .pop ("debug" , None )
513- offset = await self ._log .append (self ._running , entry )
514+ offset = await self ._log .append (self ._lease , entry )
514515 if flush :
515- await self ._log .flush (self ._running )
516+ await self ._log .flush (self ._lease )
516517 await self .handle_message (offset , entry )
517518
518519 async def enqueue_op (self , id_ : str , fut : OpFuture [object ]) -> None :
0 commit comments