Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 60 additions & 37 deletions backend/app/agent/listen_chat_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
set_process_task,
)
from app.utils.event_loop_utils import _schedule_async_task
from app.utils.perf_timer import PerfTimer

# Logger for agent tracking
logger = logging.getLogger("agent")
Expand Down Expand Up @@ -249,7 +250,10 @@ def step(
f"Agent {self.agent_name} starting step with message: {msg}"
)
try:
res = super().step(input_message, response_format)
with PerfTimer(
"agent_step", agent_name=self.agent_name, agent_id=self.agent_id
):
res = super().step(input_message, response_format)
except ModelProcessingError as e:
res = None
error_info = e
Expand Down Expand Up @@ -348,7 +352,12 @@ async def astep(
)

try:
res = await super().astep(input_message, response_format)
async with PerfTimer(
"agent_astep",
agent_name=self.agent_name,
agent_id=self.agent_id,
):
res = await super().astep(input_message, response_format)
if isinstance(res, AsyncStreamingChatAgentResponse):
# Use reusable async stream wrapper to send chunks to frontend
return AsyncStreamingChatAgentResponse(
Expand Down Expand Up @@ -474,7 +483,13 @@ def _execute_tool(
)
# Set process_task context for all tool executions
with set_process_task(self.process_task_id):
raw_result = tool(**args)
with PerfTimer(
"_execute_tool",
tool_name=func_name,
toolkit=toolkit_name,
agent_name=self.agent_name,
):
raw_result = tool(**args)
logger.debug(f"Tool {func_name} executed successfully")
if self.mask_tool_output:
self._secure_result_store[tool_call_id] = raw_result
Expand Down Expand Up @@ -606,43 +621,51 @@ async def _aexecute_tool(
try:
# Set process_task context for all tool executions
with set_process_task(self.process_task_id):
# Try different invocation paths in order of preference
if hasattr(tool, "func") and hasattr(tool.func, "async_call"):
# MCP FunctionTool: always use async_call (sync wrapper can timeout)
result = await tool.func.async_call(**args)

elif hasattr(tool, "async_call") and callable(tool.async_call):
# Case: tool itself has async_call
# Check if this is a sync tool to avoid run_in_executor
# (which breaks ContextVar)
if hasattr(tool, "is_async") and not tool.is_async:
# Sync tool: call directly to preserve ContextVar
# in same thread
async with PerfTimer(
"_aexecute_tool",
tool_name=func_name,
toolkit=toolkit_name,
agent_name=self.agent_name,
):
# Try different invocation paths in order of preference
if hasattr(tool, "func") and hasattr(
tool.func, "async_call"
):
# MCP FunctionTool: always use async_call (sync wrapper can timeout)
result = await tool.func.async_call(**args)

elif hasattr(tool, "async_call") and callable(tool.async_call):
# Case: tool itself has async_call
# Check if this is a sync tool to avoid run_in_executor
# (which breaks ContextVar)
if hasattr(tool, "is_async") and not tool.is_async:
# Sync tool: call directly to preserve ContextVar
# in same thread
result = tool(**args)
# Handle case where sync call returns a coroutine
if asyncio.iscoroutine(result):
result = await result
else:
# Async tool: use async_call
result = await tool.async_call(**args)

elif hasattr(tool, "func") and asyncio.iscoroutinefunction(
tool.func
):
# Case: tool wraps a direct async function
result = await tool.func(**args)

elif asyncio.iscoroutinefunction(tool):
# Case: tool is itself a coroutine function
result = await tool(**args)

else:
# Fallback: sync call - call directly in current context
# DO NOT use run_in_executor to preserve ContextVar
result = tool(**args)
# Handle case where sync call returns a coroutine
# Handle case where synchronous call returns a coroutine
if asyncio.iscoroutine(result):
result = await result
else:
# Async tool: use async_call
result = await tool.async_call(**args)

elif hasattr(tool, "func") and asyncio.iscoroutinefunction(
tool.func
):
# Case: tool wraps a direct async function
result = await tool.func(**args)

elif asyncio.iscoroutinefunction(tool):
# Case: tool is itself a coroutine function
result = await tool(**args)

else:
# Fallback: sync call - call directly in current context
# DO NOT use run_in_executor to preserve ContextVar
result = tool(**args)
# Handle case where synchronous call returns a coroutine
if asyncio.iscoroutine(result):
result = await result

except Exception as e:
# Capture the error message to prevent framework crash
Expand Down
39 changes: 28 additions & 11 deletions backend/app/service/chat_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from app.agent.toolkit.terminal_toolkit import TerminalToolkit
from app.agent.tools import get_mcp_tools, get_toolkits
from app.model.chat import Chat, NewAgent, Status, TaskContent, sse_json
from app.utils.perf_timer import PerfTimer
from app.service.task import (
Action,
ActionDecomposeProgressData,
Expand Down Expand Up @@ -371,6 +372,7 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
"🚀 [LIFECYCLE] step_solve STARTED",
extra={"project_id": options.project_id, "task_id": options.task_id},
)
session_start_time = datetime.datetime.now().timestamp()
logger.info("=" * 80)
logger.debug(
"Step solve options",
Expand Down Expand Up @@ -537,9 +539,14 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
", treating as complex task"
)
else:
is_complex_task = await question_confirm(
question_agent, question, task_lock
)
async with PerfTimer(
"question_confirm",
project_id=options.project_id,
task_id=options.task_id,
):
is_complex_task = await question_confirm(
question_agent, question, task_lock
)
logger.info(
"[NEW-QUESTION] question_confirm"
" result: is_complex="
Expand Down Expand Up @@ -664,7 +671,12 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
logger.info(
"[NEW-QUESTION] Creating NEW workforce instance"
)
(workforce, mcp) = await construct_workforce(options)
async with PerfTimer(
"construct_workforce",
project_id=options.project_id,
task_id=options.task_id,
):
(workforce, mcp) = await construct_workforce(options)
for new_agent in options.new_agents:
workforce.add_single_agent_worker(
format_agent_description(new_agent),
Expand Down Expand Up @@ -747,13 +759,18 @@ def on_stream_text(chunk):
async def run_decomposition():
nonlocal summary_task_content
try:
sub_tasks = await asyncio.to_thread(
workforce.eigent_make_sub_tasks,
camel_task,
context_for_coordinator,
on_stream_batch,
on_stream_text,
)
with PerfTimer(
"eigent_make_sub_tasks",
project_id=options.project_id,
task_id=options.task_id,
):
sub_tasks = await asyncio.to_thread(
workforce.eigent_make_sub_tasks,
camel_task,
context_for_coordinator,
on_stream_batch,
on_stream_text,
)

if stream_state["subtasks"]:
sub_tasks = stream_state["subtasks"]
Expand Down
135 changes: 135 additions & 0 deletions backend/app/utils/perf_timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# ========= Copyright 2025 @ EIGENT.AI. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2025 @ EIGENT.AI. All Rights Reserved. =========

"""Performance timing utilities for measuring setup and task execution durations.

Provides a context manager and decorator for capturing wall-clock timing of
critical code paths. All timing data is emitted through the traceroot logger
so it appears in production logs without extra dependencies.

Usage::

from app.utils.perf_timer import PerfTimer, perf_measure

# Context manager
with PerfTimer("backend_startup") as t:
do_startup()
print(t.duration_ms) # 1234.56

# Decorator
@perf_measure("my_function")
async def my_function():
...
"""

import asyncio
import functools
import time
from typing import Any

import logging

logger = logging.getLogger("perf")


class PerfTimer:
"""High-resolution wall-clock timer as a context manager.

Attributes:
operation: Human-readable label for the timed block.
start_time: Monotonic timestamp when the block was entered.
end_time: Monotonic timestamp when the block was exited.
duration_ms: Elapsed wall-clock time in milliseconds.
context: Arbitrary key-value metadata included in the log line.
"""

def __init__(self, operation: str, **context: Any) -> None:
self.operation = operation
self.context = context
self.start_time: float = 0.0
self.end_time: float = 0.0
self.duration_ms: float = 0.0

def __enter__(self) -> "PerfTimer":
self.start_time = time.perf_counter()
return self

def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.end_time = time.perf_counter()
self.duration_ms = (self.end_time - self.start_time) * 1000.0

extra = {
"perf_operation": self.operation,
"perf_duration_ms": round(self.duration_ms, 2),
**self.context,
}
if exc_type is not None:
extra["perf_error"] = str(exc_val)
logger.warning(
f"[PERF] {self.operation} completed with error in {self.duration_ms:.2f}ms",
extra=extra,
)
else:
logger.info(
f"[PERF] {self.operation} completed in {self.duration_ms:.2f}ms",
extra=extra,
)
# Do not suppress exceptions
return None

async def __aenter__(self) -> "PerfTimer":
self.start_time = time.perf_counter()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
self.__exit__(exc_type, exc_val, exc_tb)


def perf_measure(_func=None, *, operation: str | None = None, **extra_context: Any):
"""Decorator that logs execution duration for sync and async functions.

Supports both ``@perf_measure`` and ``@perf_measure(operation="label")``
usage patterns.

Args:
_func: Internal; set automatically when used as ``@perf_measure``
without parentheses.
operation: Label for the timed operation. Defaults to the function name.
**extra_context: Additional key-value pairs included in the log output.

Returns:
Decorated function that logs its execution time.
"""

def decorator(func):
label = operation or func.__qualname__

@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
with PerfTimer(label, **extra_context):
return await func(*args, **kwargs)

@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
with PerfTimer(label, **extra_context):
return func(*args, **kwargs)

if asyncio.iscoroutinefunction(func):
return async_wrapper
return sync_wrapper

if _func is not None:
# Called as @perf_measure without parentheses
return decorator(_func)
return decorator
Loading