11from __future__ import annotations
22
3+ import enum
34import logging
45import os
56import threading
67import time
78from contextvars import ContextVar
89from dataclasses import dataclass
910from hashlib import blake2b
10- from typing import TYPE_CHECKING , cast
11+ from typing import TYPE_CHECKING , Literal , cast
1112from typing_extensions import Self , override
1213
1314from duron .log import set_annotations
3435)
3536
3637
38+ class TracerState (enum .Enum ):
39+ INIT = "init"
40+ STARTED = "started"
41+ CLOSED = "closed"
42+
43+
3744class Tracer :
38- __slots__ : tuple [str , ...] = ("_events" , "_lock" , "instance_id" , "trace_id" )
45+ __slots__ : tuple [str , ...] = (
46+ "_events" ,
47+ "_init_buffer" ,
48+ "_lock" ,
49+ "_open_spans" ,
50+ "_state" ,
51+ "instance_id" ,
52+ "trace_id" ,
53+ )
3954
4055 def __init__ (
4156 self ,
@@ -48,10 +63,90 @@ def __init__(
4863 self .instance_id : str = instance_id or _trace_id ()
4964 self ._events : list [TraceEvent ] = []
5065 self ._lock = threading .Lock ()
66+ self ._state : TracerState = TracerState .INIT
67+ self ._init_buffer : list [TraceEvent ] = []
68+ self ._open_spans : dict [str , SpanStart ] = {}
5169
5270 def emit_event (self , event : TraceEvent ) -> None :
5371 with self ._lock :
54- self ._events .append (event )
72+ if self ._state == TracerState .CLOSED :
73+ return
74+
75+ if self ._state == TracerState .INIT :
76+ # Buffer events in INIT state
77+ self ._init_buffer .append (event )
78+ # Track open/closed spans
79+ if event ["type" ] == "span.start" :
80+ self ._open_spans [event ["span_id" ]] = event
81+ elif event ["type" ] == "span.end" :
82+ self ._open_spans .pop (event ["span_id" ], None )
83+ else : # STARTED state
84+ self ._events .append (event )
85+ # Track open spans even in STARTED state for close()
86+ if event ["type" ] == "span.start" :
87+ self ._open_spans [event ["span_id" ]] = event
88+ elif event ["type" ] == "span.end" :
89+ self ._open_spans .pop (event ["span_id" ], None )
90+
91+ def start (self ) -> None :
92+ """Transition to STARTED state, clear completed spans, emit remaining."""
93+ with self ._lock :
94+ if self ._state != TracerState .INIT :
95+ return
96+
97+ # Find all span IDs that have been completed (have both start and end)
98+ completed_span_ids : set [str ] = set ()
99+ span_ends : set [str ] = set ()
100+
101+ for event in self ._init_buffer :
102+ if event ["type" ] == "span.end" :
103+ span_ends .add (event ["span_id" ])
104+
105+ for event in self ._init_buffer :
106+ if event ["type" ] == "span.start" and event ["span_id" ] in span_ends :
107+ completed_span_ids .add (event ["span_id" ])
108+
109+ # Filter out completed spans (both start and end) and their related events
110+ # Keep only span.start events for incomplete spans and other events
111+ for event in self ._init_buffer :
112+ event_span_id = event .get ("span_id" )
113+
114+ # Skip completed span events (both start and end)
115+ if event_span_id in completed_span_ids :
116+ continue
117+
118+ # Skip span.end events for incomplete spans (keep only starts)
119+ if event ["type" ] == "span.end" :
120+ continue
121+
122+ # Emit everything else (span.start for incomplete spans, other events)
123+ self ._events .append (event )
124+
125+ # Clear the init buffer
126+ self ._init_buffer .clear ()
127+
128+ # Transition to STARTED state
129+ self ._state = TracerState .STARTED
130+
131+ def close (self ) -> None :
132+ """Transition to CLOSED state and mark all open spans as failed."""
133+ with self ._lock :
134+ if self ._state == TracerState .CLOSED or self ._state == TracerState .INIT :
135+ return
136+
137+ for span_id in list (self ._open_spans .keys ()):
138+ end_event : SpanEnd = {
139+ "type" : "span.end" ,
140+ "span_id" : span_id ,
141+ "ts" : time .time_ns () // 1000 ,
142+ "status" : "ERROR" ,
143+ "status_message" : "tracer closed" ,
144+ }
145+ self ._events .append (end_event )
146+
147+ self ._open_spans .clear ()
148+
149+ self ._state = TracerState .CLOSED
55150
56151 def pop_events (self , * , flush : bool ) -> list [dict [str , JSONValue ]]:
57152 with self ._lock :
@@ -156,6 +251,8 @@ class _TracerSpan:
156251 attributes : dict [str , JSONValue ] | None = None
157252 links : tuple [LinkRef , ...] | None = None
158253 _token : Token [_TracerSpan | None ] | None = None
254+ _status : Literal ["OK" , "ERROR" ] | None = None
255+ _status_message : str | None = None
159256
160257 def __enter__ (self ) -> Self :
161258 start_ns = time .time_ns ()
@@ -184,6 +281,12 @@ def record(self, key: str, value: JSONValue) -> None:
184281 else :
185282 self .attributes = {key : value }
186283
284+ def set_status (
285+ self , status : Literal ["OK" , "ERROR" ], message : str | None = None
286+ ) -> None :
287+ self ._status = status
288+ self ._status_message = message
289+
187290 def __exit__ (
188291 self ,
189292 exc_type : type [BaseException ] | None ,
@@ -199,6 +302,10 @@ def __exit__(
199302 if self .attributes :
200303 evnt ["attributes" ] = self .attributes
201304 self .attributes = None
305+ if self ._status :
306+ evnt ["status" ] = self ._status
307+ if self ._status_message :
308+ evnt ["status_message" ] = self ._status_message
202309
203310 self .tracer .emit_event (evnt )
204311 if self ._token :
0 commit comments