Skip to content

Commit dabf29c

Browse files
authored
[feat] add to_runnable decorator & lcc tool message add name and tool_call_id (#45)
* add to_runnable decorator & lcc tool message add name and tool_call_id * lcc tool message artifact * support set span finish time * modify get_default_client
1 parent 2a6eeb2 commit dabf29c

File tree

11 files changed

+236
-11
lines changed

11 files changed

+236
-11
lines changed

CHANGLOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
## [0.1.25] - 2026-01-22
2+
### Added
3+
- add to_runnable decorator
4+
- lcc tool message add name and tool_call_id
5+
- support set finish time of span
6+
17
## [0.1.24] - 2026-01-16
28
### Added
39
- client init set default client if not exist

cozeloop/_client.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -406,14 +406,21 @@ def set_default_client(client: Client):
406406

407407
def get_default_client() -> Client:
408408
global _default_client
409-
if _default_client is None:
409+
temp_client = None
410+
with _client_lock:
411+
temp_client = _default_client
412+
if temp_client is None:
410413
with _client_lock:
411-
if _default_client is None:
412-
try:
413-
_default_client = new_client()
414-
atexit.register(_graceful_shutdown)
415-
except Exception as e:
416-
new_exception = e
414+
temp_client = _default_client
415+
if temp_client is None:
416+
try:
417+
temp_client = new_client()
418+
with _client_lock:
419+
_default_client = temp_client
420+
atexit.register(_graceful_shutdown)
421+
except Exception as e:
422+
new_exception = e
423+
with _client_lock:
417424
_default_client = _NoopClient(new_exception)
418425
return _default_client
419426

cozeloop/decorator/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@
55

66
coze_loop_decorator= CozeLoopDecorator()
77
observe = coze_loop_decorator.observe
8+
to_runnable = coze_loop_decorator.to_runnable

cozeloop/decorator/decorator.py

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,183 @@ async def async_stream_wrapper(*args: Any, **kwargs: Any):
311311
else:
312312
return decorator(func)
313313

314+
def to_runnable(
315+
self,
316+
func: Callable = None,
317+
) -> Callable:
318+
"""
319+
Decorator to be RunnableLambda.
320+
321+
:param func: The function to be decorated, Requirements are as follows:
322+
1. When the func is called, parameter config(RunnableConfig) is required, you must use the config containing cozeloop callback handler of 'current request', otherwise, the trace may be lost!
323+
324+
Examples:
325+
@to_runnable
326+
def runnable_func(my_input: dict) -> str:
327+
return input
328+
329+
async def scorer_leader(state: MyState) -> dict | str:
330+
await runnable_func({"a": "111", "b": 222, "c": "333"}, config=state.config) # config is required
331+
"""
332+
333+
def decorator(func: Callable):
334+
from langchain_core.runnables import RunnableLambda, RunnableConfig
335+
336+
@wraps(func)
337+
def sync_wrapper(*args: Any, **kwargs: Any):
338+
config = kwargs.pop("config", None)
339+
config = _convert_config(config)
340+
res = None
341+
try:
342+
extra = {}
343+
if len(args) > 0 and is_class_func(func):
344+
extra = {"_inner_class_self": args[0]}
345+
args = args[1:]
346+
inp = {}
347+
if len(args) > 0:
348+
inp['args'] = args
349+
if len(kwargs) > 0:
350+
inp['kwargs'] = kwargs
351+
res = RunnableLambda(_param_wrapped_func).invoke(input=inp, config=config, **extra)
352+
if hasattr(res, "__iter__"):
353+
return res
354+
except StopIteration:
355+
pass
356+
except Exception as e:
357+
raise e
358+
finally:
359+
if res is not None:
360+
return res
361+
362+
@wraps(func)
363+
async def async_wrapper(*args: Any, **kwargs: Any):
364+
config = kwargs.pop("config", None)
365+
config = _convert_config(config)
366+
res = None
367+
try:
368+
extra = {}
369+
if len(args) > 0 and is_class_func(func):
370+
extra = {"_inner_class_self": args[0]}
371+
args = args[1:]
372+
inp = {}
373+
if len(args) > 0:
374+
inp['args'] = args
375+
if len(kwargs) > 0:
376+
inp['kwargs'] = kwargs
377+
res = await RunnableLambda(_param_wrapped_func_async).ainvoke(input=inp, config=config, **extra)
378+
if hasattr(res, "__aiter__"):
379+
return res
380+
except StopIteration:
381+
pass
382+
except StopAsyncIteration:
383+
pass
384+
except Exception as e:
385+
if e.args and e.args[0] == 'coroutine raised StopIteration': # coroutine StopIteration
386+
pass
387+
else:
388+
raise e
389+
finally:
390+
if res is not None:
391+
return res
392+
393+
@wraps(func)
394+
def gen_wrapper(*args: Any, **kwargs: Any):
395+
config = kwargs.pop("config", None)
396+
config = _convert_config(config)
397+
try:
398+
extra = {}
399+
if len(args) > 0 and is_class_func(func):
400+
extra = {"_inner_class_self": args[0]}
401+
args = args[1:]
402+
inp = {}
403+
if len(args) > 0:
404+
inp['args'] = args
405+
if len(kwargs) > 0:
406+
inp['kwargs'] = kwargs
407+
gen = RunnableLambda(_param_wrapped_func).invoke(input=inp, config=config, *extra)
408+
try:
409+
for item in gen:
410+
yield item
411+
except StopIteration:
412+
pass
413+
except Exception as e:
414+
raise e
415+
416+
@wraps(func)
417+
async def async_gen_wrapper(*args: Any, **kwargs: Any):
418+
config = kwargs.pop("config", None)
419+
config = _convert_config(config)
420+
try:
421+
extra = {}
422+
if len(args) > 0 and is_class_func(func):
423+
extra = {"_inner_class_self": args[0]}
424+
args = args[1:]
425+
inp = {}
426+
if len(args) > 0:
427+
inp['args'] = args
428+
if len(kwargs) > 0:
429+
inp['kwargs'] = kwargs
430+
gen = RunnableLambda(_param_wrapped_func_async).invoke(input=inp, config=config, **extra)
431+
items = []
432+
try:
433+
async for item in gen:
434+
items.append(item)
435+
yield item
436+
finally:
437+
pass
438+
except StopIteration:
439+
pass
440+
except StopAsyncIteration:
441+
pass
442+
except Exception as e:
443+
if e.args and e.args[0] == 'coroutine raised StopIteration':
444+
pass
445+
else:
446+
raise e
447+
448+
# for convert parameter
449+
def _param_wrapped_func(input_dict: dict, **kwargs) -> Any:
450+
real_args = input_dict.get("args", ())
451+
real_kwargs = input_dict.get("kwargs", {})
452+
453+
inner_class_self = kwargs.get("_inner_class_self", None)
454+
if inner_class_self is not None:
455+
real_args = (inner_class_self, *real_args)
456+
457+
return func(*real_args, **real_kwargs)
458+
459+
async def _param_wrapped_func_async(input_dict: dict, **kwargs) -> Any:
460+
real_args = input_dict.get("args", ())
461+
real_kwargs = input_dict.get("kwargs", {})
462+
463+
inner_class_self = kwargs.get("_inner_class_self", None)
464+
if inner_class_self is not None:
465+
real_args = (inner_class_self, *real_args)
466+
467+
return await func(*real_args, **real_kwargs)
468+
469+
def _convert_config(config: RunnableConfig = None) -> RunnableConfig | None:
470+
if config is None:
471+
config = RunnableConfig(run_name=func.__name__)
472+
config['run_name'] = func.__name__
473+
elif isinstance(config, dict):
474+
config['run_name'] = func.__name__
475+
return config
476+
477+
if is_async_gen_func(func):
478+
return async_gen_wrapper
479+
if is_gen_func(func):
480+
return gen_wrapper
481+
elif is_async_func(func):
482+
return async_wrapper
483+
else:
484+
return sync_wrapper
485+
486+
if func is None:
487+
return decorator
488+
else:
489+
return decorator(func)
490+
314491

315492
class _CozeLoopTraceStream(Generic[S]):
316493
def __init__(

cozeloop/integration/langchain/trace_callback.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from langchain_core.outputs import LLMResult, ChatGeneration
1515
from langchain_core.agents import AgentFinish, AgentAction
1616
from langchain_core.prompt_values import PromptValue, ChatPromptValue
17-
from langchain_core.messages import BaseMessage, AIMessageChunk, AIMessage
17+
from langchain_core.messages import BaseMessage, AIMessageChunk, AIMessage, ToolMessage
1818
from langchain_core.prompts import AIMessagePromptTemplate, HumanMessagePromptTemplate, SystemMessagePromptTemplate
1919
from langchain_core.outputs import ChatGenerationChunk, GenerationChunk
2020

@@ -581,6 +581,15 @@ def _convert_inputs(inputs: Any) -> Any:
581581
if inputs.content != '':
582582
format_inputs['content'] = inputs.content
583583
return format_inputs
584+
if isinstance(inputs, ToolMessage):
585+
"""
586+
Must be before BaseMessage.
587+
"""
588+
content = {"content": inputs.content}
589+
if inputs.artifact is not None:
590+
content['artifact'] = _convert_inputs(inputs.artifact) # artifact is existed when response_format="content_and_artifact".
591+
message = Message(role=inputs.type, content=content)
592+
return message
584593
if isinstance(inputs, BaseMessage):
585594
message = Message(role=inputs.type, content=inputs.content,
586595
tool_calls=inputs.additional_kwargs.get('tool_calls', []))

cozeloop/integration/langchain/trace_model/llm_model.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ class Message:
5353
tool_calls: List[ToolCall] = None
5454
metadata: Optional[dict] = None
5555
reasoning_content: Optional[str] = None
56+
name: Optional[str] = None
57+
tool_call_id: Optional[str] = None
5658

5759
def __post_init__(self):
5860
if self.role is not None and (self.role == 'AIMessageChunk' or self.role == 'ai'):
@@ -155,7 +157,7 @@ def __init__(self, messages: List[Union[BaseMessage, List[BaseMessage]]], invoca
155157
if message.additional_kwargs is not None and message.additional_kwargs.get('name', ''):
156158
name = message.additional_kwargs.get('name', '')
157159
tool_call = ToolCall(id=message.tool_call_id, type=message.type, function=ToolFunction(name=name))
158-
self._messages.append(Message(role=message.type, content=message.content, tool_calls=[tool_call]))
160+
self._messages.append(Message(role=message.type, content=message.content, tool_calls=[tool_call], name=name, tool_call_id=message.tool_call_id))
159161
else:
160162
self._messages.append(Message(role=message.type, content=message.content))
161163

cozeloop/internal/trace/noop_span.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,9 @@ def set_system_tags(self, system_tags: Dict[str, Any]) -> None:
125125
def set_deployment_env(self, deployment_env: str) -> None:
126126
pass
127127

128+
def set_finish_time(self, finish_time: datetime) -> None:
129+
pass
130+
128131
def __enter__(self):
129132
return self
130133

cozeloop/internal/trace/span.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ def __init__(self, span_type: str = '', name: str = '', space_id: str = '', trac
7878
self.space_id = space_id
7979
self.parent_span_id = parent_span_id
8080
self.start_time = start_time if start_time else datetime.now()
81+
self.finish_time: datetime = None
8182
self.duration = duration
8283
self.tag_map = tag_map if tag_map else {}
8384
self.system_tag_map = system_tag_map if system_tag_map else {}
@@ -396,6 +397,13 @@ def set_system_tags(self, system_tags: Dict[str, Any]) -> None:
396397
def set_deployment_env(self, deployment_env: str) -> None:
397398
self.set_tags({DEPLOYMENT_ENV: deployment_env})
398399

400+
def set_finish_time(self, finish_time: datetime) -> None:
401+
"""
402+
Set the finish time of the span. DO NOT use this method unless you know what you are doing.
403+
"""
404+
self.finish_time = finish_time
405+
406+
399407
def get_rectified_map(self, input_map: Dict[str, Any]) -> (Dict[str, Any], List[str], int):
400408
validate_map = {}
401409
cut_off_keys = []
@@ -541,7 +549,10 @@ def set_stat_info(self):
541549
if input_tokens > 0 or output_tokens > 0:
542550
self.set_tags({TOKENS: int(input_tokens) + int(output_tokens)})
543551

544-
duration = int((datetime.now().timestamp() - self.start_time.timestamp()) * 1000000)
552+
finish_time_stamp = datetime.now().timestamp()
553+
if self.finish_time is not None:
554+
finish_time_stamp = self.finish_time.timestamp()
555+
duration = int((finish_time_stamp - self.start_time.timestamp()) * 1000000)
545556
with self.lock:
546557
self.duration = duration
547558

cozeloop/span.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,11 +186,18 @@ def set_system_tags(self, system_tags: Dict[str, Any]) -> None:
186186
Set system tags. DO NOT use this method unless you know what you are doing.
187187
"""
188188

189+
@abstractmethod
189190
def set_deployment_env(self, deployment_env: str) -> None:
190191
"""
191192
Set the deployment environment of the span, identify custom environments.
192193
"""
193194

195+
@abstractmethod
196+
def set_finish_time(self, finish_time: datetime) -> None:
197+
"""
198+
Set the finish time of the span. DO NOT use this method unless you know what you are doing.
199+
"""
200+
194201

195202
class Span(CommonSpanSetter, SpanContext):
196203
"""

examples/trace/simple.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import logging
55
import os
66
import time
7+
from datetime import datetime, timedelta
78

89
import cozeloop
910

@@ -100,6 +101,7 @@ def do_simple_demo():
100101
span.set_error(str(e))
101102

102103
# 3. span finish
104+
span.set_finish_time(datetime.now() + timedelta(seconds=3)) # set finish time as your need to change duration
103105
span.finish()
104106

105107
# 4. (optional) flush or close

0 commit comments

Comments
 (0)