-
Notifications
You must be signed in to change notification settings - Fork 35
Add pprof & mcp metrics in api-service and request retry in aenv sdk #71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
fbd1906
feat(metrics): add Prometheus metrics and instance labels support
JacksonMei 700a941
feat(metrics): add metrics middleware to MCP proxy (8081)
JacksonMei a11e4f4
refactor(metrics): remove /health error filter per linter
JacksonMei b23f209
fix(api-service): remove unused mock types in labels test
JacksonMei 1b0c9fb
fix(aenv): keep MCP session alive across Environment lifecycle
JacksonMei f74dff5
fix(api-service): add pprof, fix fd leaks, add MCP metrics
JacksonMei cae590d
support retry in aenv sdk
JacksonMei 352eaf9
fix(aenv): add circuit breaker and session retry for MCP resilience
JacksonMei 9ed56cf
fix(api-service): share http.Transport in MCP proxy to prevent socket…
JacksonMei a98ce18
fix(api-service): prevent log disk bloat with body truncation and rot…
JacksonMei de97a06
fix(api-service): harden MCPMetricsMiddleware for safety and metrics …
JacksonMei b998193
update metrics feature
JacksonMei 0df2eaa
refactor(api-service): rename label "env" to "envName" for clarity
JacksonMei 7290eee
test(aenv): add e2e resilience tests for pod crash and network jitter
JacksonMei 385a022
fix(api-service): revert instance label key to "env", only rename met…
JacksonMei 8aa3e43
fix(aenv,api-service): increase session retry to 3 and cap MCP body read
JacksonMei e9ef659
optimize instance list
JacksonMei 84b50f6
remove unused cleanup
JacksonMei ba84480
upgrade sdk to 0.1.6
JacksonMei File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ | |
| import asyncio | ||
| import json | ||
| import os | ||
| import random | ||
| import traceback | ||
| from datetime import datetime, timezone | ||
| from typing import Any, Dict, List, Optional, Tuple | ||
|
|
@@ -33,7 +34,11 @@ | |
| from fastmcp.client.transports import StreamableHttpTransport | ||
|
|
||
| from aenv.client.scheduler_client import AEnvSchedulerClient | ||
| from aenv.core.exceptions import EnvironmentError, ToolError | ||
| from aenv.core.exceptions import ( | ||
| EnvironmentError, | ||
| ToolError, | ||
| UnrecoverableEnvironmentError, | ||
| ) | ||
| from aenv.core.logging import getLogger | ||
| from aenv.core.models import EnvInstance, EnvStatus | ||
| from aenv.core.tool import Tool | ||
|
|
@@ -166,21 +171,59 @@ def __init__( | |
| self._client: Optional[AEnvSchedulerClient] = None | ||
| self._mcp_client: Optional[Client] = None | ||
| self._mcp_session_active: bool = False | ||
| self._consecutive_tool_errors: int = 0 | ||
| self._CIRCUIT_BREAKER_THRESHOLD = 5 | ||
|
|
||
| def _log_prefix(self) -> str: | ||
| """Get log prefix with instance ID.""" | ||
| """Get log prefix with instance ID and SDK version.""" | ||
| from aenv import __version__ | ||
|
|
||
| instance_id = ( | ||
| getattr(self._instance, "id", "None") if self._instance else "None" | ||
| ) | ||
| return f"[ENV:{instance_id}]" | ||
| return f"[ENV:{instance_id}][sdk:v{__version__}]" | ||
|
|
||
| async def _backoff(self, attempt: int, base: float = 2.0) -> None: | ||
| """Exponential backoff with jitter.""" | ||
| wait = base**attempt + random.uniform(0, 1) | ||
| logger.debug(f"{self._log_prefix()} Backoff {wait:.1f}s (attempt {attempt})") | ||
| await asyncio.sleep(wait) | ||
|
|
||
| async def _rebuild_mcp_client(self) -> None: | ||
| """Destroy and recreate MCP client for session recovery.""" | ||
| logger.warning(f"{self._log_prefix()} Full MCP client rebuild") | ||
| if self._mcp_client is not None: | ||
| try: | ||
| await asyncio.wait_for(self._mcp_client.close(), timeout=2.0) | ||
| except Exception as e: | ||
| logger.debug(f"{self._log_prefix()} Error closing MCP client: {e}") | ||
| self._mcp_client = None | ||
| self._mcp_session_active = False | ||
| await asyncio.sleep(0.5) | ||
|
|
||
| async def __aenter__(self): | ||
| """Async context manager entry.""" | ||
| await self.initialize() | ||
| max_attempts = 3 | ||
| for attempt in range(max_attempts): | ||
| try: | ||
| await self._ensure_mcp_session() | ||
| return self | ||
| except (Exception, asyncio.CancelledError) as e: | ||
| if attempt < max_attempts - 1: | ||
| logger.warning( | ||
| f"{self._log_prefix()} Initial session failed " | ||
| f"({attempt+1}/{max_attempts}), retrying: {type(e).__name__}: {e}" | ||
| ) | ||
| await self._rebuild_mcp_client() | ||
| await asyncio.sleep(1.0) | ||
| else: | ||
| raise | ||
| return self | ||
|
|
||
| async def __aexit__(self, exc_type, exc_val, exc_tb): | ||
| """Async context manager exit.""" | ||
| await self._close_mcp_session() | ||
| await self.release() | ||
|
|
||
| async def initialize(self) -> bool: | ||
|
|
@@ -244,6 +287,17 @@ async def initialize(self) -> bool: | |
| f"Failed to initialize environment '{self.env_name}': {str(e)}" | ||
| ) from e | ||
|
|
||
| async def _close_mcp_session(self): | ||
| """Close the persistent MCP session if one is active.""" | ||
| if self._mcp_client and self._mcp_session_active: | ||
| logger.info(f"{self._log_prefix()} Closing persistent MCP session") | ||
| try: | ||
| await self._mcp_client.__aexit__(None, None, None) | ||
| except Exception as e: | ||
| logger.warning(f"{self._log_prefix()} Error closing MCP session: {e}") | ||
| finally: | ||
| self._mcp_session_active = False | ||
|
|
||
| async def release(self): | ||
| """Release environment resources.""" | ||
| logger.info( | ||
|
|
@@ -254,7 +308,7 @@ async def release(self): | |
| try: | ||
| await self._mcp_client.close() | ||
| logger.debug(f"{self._log_prefix()} MCP client closed") | ||
| except Exception as e: | ||
| except (Exception, asyncio.CancelledError) as e: | ||
| logger.warning( | ||
| f"{self._log_prefix()} Failed to close MCP client: {str(e)}" | ||
| ) | ||
|
|
@@ -621,6 +675,12 @@ async def call_tool( | |
| """ | ||
| Execute a tool with given arguments using MCP client. | ||
|
|
||
| Retry strategy (idempotent-safe): | ||
| - Session establishment failures are retried (tool was never sent to server) | ||
| - Once call_tool_mcp() is invoked, the tool MAY have executed on the server. | ||
| On failure after invocation, we rebuild the session but do NOT re-invoke the | ||
| tool, because the tool may be non-idempotent (e.g. bash "echo x >> file"). | ||
|
|
||
| Args: | ||
| tool_name: Name of the tool (format: "env_name/tool_name" or just "tool_name") | ||
| arguments: Tool arguments | ||
|
|
@@ -630,11 +690,18 @@ async def call_tool( | |
| Tool execution result | ||
|
|
||
| Raises: | ||
| ToolError: If tool execution fails | ||
| ToolTimeoutError: If execution times out | ||
| ToolError: If tool execution fails after invocation | ||
| EnvironmentError: If session cannot be established | ||
| """ | ||
| await self._ensure_initialized() | ||
|
|
||
| # Circuit breaker: fail fast if too many consecutive tool errors | ||
| if self._consecutive_tool_errors >= self._CIRCUIT_BREAKER_THRESHOLD: | ||
| raise UnrecoverableEnvironmentError( | ||
| f"Circuit breaker open: {self._consecutive_tool_errors} consecutive failures", | ||
| env_name=self.env_name, | ||
| ) | ||
|
|
||
| # Parse tool name | ||
| if "/" in tool_name: | ||
| env_name, actual_tool_name = tool_name.split("/", 1) | ||
|
|
@@ -649,8 +716,39 @@ async def call_tool( | |
| f"{self._log_prefix()} Executing tool: {actual_tool_name} in environment {self.env_name}, arguments={arguments}" | ||
| ) | ||
|
|
||
| # Establish session (safe to retry) | ||
| max_session_attempts = 3 | ||
| client = None | ||
|
|
||
| for attempt in range(max_session_attempts): | ||
| try: | ||
| client = await self._ensure_mcp_session() | ||
| break | ||
| except (Exception, asyncio.CancelledError) as e: | ||
| if ( | ||
| isinstance(e, httpx.HTTPStatusError) | ||
| and 400 <= e.response.status_code < 500 | ||
| ): | ||
| raise EnvironmentError( | ||
| f"Non-retryable error establishing session: HTTP {e.response.status_code}" | ||
| ) from e | ||
|
|
||
| if attempt < max_session_attempts - 1: | ||
| logger.warning( | ||
| f"{self._log_prefix()} Session establish failed ({attempt+1}/{max_session_attempts}), " | ||
| f"retrying: {type(e).__name__}: {e}" | ||
| ) | ||
| await self._backoff(attempt, base=1.5) | ||
| await self._rebuild_mcp_client() | ||
| continue | ||
|
|
||
| raise EnvironmentError( | ||
| f"Failed to establish session after {max_session_attempts} attempts: {e}", | ||
| env_name=self.env_name, | ||
| ) from e | ||
|
|
||
| # Execute tool (not safe to retry — may be non-idempotent) | ||
| try: | ||
| client = await self._ensure_mcp_session() | ||
| result = await client.call_tool_mcp( | ||
| name=actual_tool_name, arguments=arguments, timeout=timeout | ||
| ) | ||
|
|
@@ -666,20 +764,28 @@ async def call_tool( | |
| else: | ||
| content.append({"type": "text", "text": str(item)}) | ||
|
|
||
| self._consecutive_tool_errors = 0 | ||
| return ToolResult(content=content, is_error=result.isError) | ||
|
|
||
| except Exception as e: | ||
| except (Exception, asyncio.CancelledError) as e: | ||
| self._mcp_session_active = False | ||
| self._consecutive_tool_errors += 1 | ||
| # Rebuild client to break ClosedResourceError cascade | ||
| await self._rebuild_mcp_client() | ||
|
|
||
| # Tool may have executed — do NOT retry | ||
| logger.error( | ||
| f"{self._log_prefix()} Tool execution encountered an issue: {str(e)} | " | ||
| f"Type: {type(e).__name__} | " | ||
| f"{self._log_prefix()} Tool '{actual_tool_name}' failed after invocation " | ||
| f"({type(e).__name__}: {e}). Tool may have executed on server — " | ||
| f"NOT retrying (non-idempotent safety). | " | ||
| f"Environment: {self.env_name} | " | ||
| f"Tool: {actual_tool_name} | " | ||
| f"Arguments: {arguments} | " | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| f"Timeout: {timeout or self.timeout}s | " | ||
| f"MCP URL: {self.aenv_data_url}" | ||
| ) | ||
| raise ToolError( | ||
| f"Tool '{actual_tool_name}' execution encountered an issue: {str(e)}" | ||
| f"Tool '{actual_tool_name}' execution failed: {e}. " | ||
| f"Tool may have already executed on server — not retried for safety." | ||
| ) | ||
|
|
||
| async def get_env_info(self) -> Dict[str, Any]: | ||
|
|
@@ -953,7 +1059,7 @@ async def _ensure_mcp_session(self) -> Client: | |
| if self._mcp_client is not None: | ||
| try: | ||
| await self._mcp_client.close() | ||
| except Exception as e: | ||
| except (Exception, asyncio.CancelledError) as e: | ||
| logger.debug( | ||
| f"{self._log_prefix()} Error closing stale MCP client: {e}" | ||
| ) | ||
|
|
@@ -968,10 +1074,7 @@ async def _ensure_mcp_session(self) -> Client: | |
| f"{self._log_prefix()} MCP session established and will be reused" | ||
| ) | ||
| return client | ||
| except Exception as e: | ||
| except (Exception, asyncio.CancelledError) as e: | ||
| self._mcp_client = None | ||
| self._mcp_session_active = False | ||
| logger.error( | ||
| f"{self._log_prefix()} Failed to establish MCP session: {e}" | ||
| ) | ||
| raise EnvironmentError(f"Failed to establish MCP session: {e}") from e | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
argumentsdictionary, which may contain sensitive information such as API keys, passwords, or tokens, is logged in plain text when a tool execution is initiated or fails. This can lead to sensitive data being exposed in the logs.