4040_metadata : ContextVar [dict [str , JSONValue ] | None ] = ContextVar (
4141 "duron.metadata" , default = None
4242)
43+ _labels : ContextVar [dict [str , str ] | None ] = ContextVar ("duron.labels" , default = None )
4344
4445
4546@final
@@ -125,6 +126,7 @@ async def run(
125126 kwargs = kwargs ,
126127 return_type = return_type ,
127128 metadata = self ._get_metadata (metadata ),
129+ labels = self ._get_labels (None ),
128130 ),
129131 )
130132 return cast ("_T" , await op )
@@ -163,6 +165,7 @@ async def create_stream(
163165 dtype ,
164166 external = external ,
165167 metadata = self ._get_metadata (None ),
168+ labels = self ._get_labels (None ),
166169 )
167170
168171 async def create_signal (
@@ -172,7 +175,12 @@ async def create_signal(
172175 if asyncio .get_running_loop () is not self ._loop :
173176 msg = "Context time can only be used in the context loop"
174177 raise RuntimeError (msg )
175- return await create_signal (self ._loop , dtype , metadata = self ._get_metadata (None ))
178+ return await create_signal (
179+ self ._loop ,
180+ dtype ,
181+ metadata = self ._get_metadata (None ),
182+ labels = self ._get_labels (None ),
183+ )
176184
177185 async def create_promise (
178186 self ,
@@ -183,7 +191,11 @@ async def create_promise(
183191 raise RuntimeError (msg )
184192 fut = create_op (
185193 self ._loop ,
186- ExternalPromiseCreate (metadata = self ._get_metadata (None ), return_type = dtype ),
194+ ExternalPromiseCreate (
195+ metadata = self ._get_metadata (None ),
196+ return_type = dtype ,
197+ labels = self ._get_labels (None ),
198+ ),
187199 )
188200 return (
189201 fut .id ,
@@ -231,6 +243,23 @@ def metadata(self, metadata: dict[str, JSONValue]) -> Generator[None, None, None
231243 finally :
232244 _metadata .reset (token )
233245
246+ @contextmanager
247+ def labels (self , labels : dict [str , str ]) -> Generator [None , None , None ]:
248+ if asyncio .get_running_loop () is not self ._loop :
249+ msg = "Context labels can only be used in the context loop"
250+ raise RuntimeError (msg )
251+ if not labels :
252+ yield
253+ return
254+
255+ current = _labels .get ()
256+ merged = {** current , ** labels } if current is not None else labels
257+ token = _labels .set (merged )
258+ try :
259+ yield
260+ finally :
261+ _labels .reset (token )
262+
234263 @staticmethod
235264 def _get_metadata (
236265 merge : dict [str , JSONValue ] | None ,
@@ -241,3 +270,14 @@ def _get_metadata(
241270 if current is None :
242271 return merge
243272 return {** current , ** merge }
273+
274+ @staticmethod
275+ def _get_labels (
276+ merge : dict [str , str ] | None ,
277+ ) -> dict [str , str ] | None :
278+ current = _labels .get ()
279+ if merge is None :
280+ return current
281+ if current is None :
282+ return merge
283+ return {** current , ** merge }
0 commit comments