-
Notifications
You must be signed in to change notification settings - Fork 35
Fix mcp client rebuild bug && support labels for experiement && add metrics #70
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
bba78be
44f5d2d
0ff4502
51d86aa
75129b1
a04cf32
60211f8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -117,6 +117,7 @@ def __init__( | |
| api_key: Optional[str] = None, | ||
| skip_for_healthy: bool = False, | ||
| owner: Optional[str] = None, | ||
| labels: Optional[Dict[str, str]] = None, | ||
| ): | ||
| """ | ||
| Initialize environment. | ||
|
|
@@ -141,6 +142,7 @@ def __init__( | |
| self.dummy_instance_ip = os.getenv("DUMMY_INSTANCE_IP") | ||
| self.skip_for_healthy = skip_for_healthy | ||
| self.owner = owner | ||
| self.labels = labels | ||
|
|
||
| if not aenv_url: | ||
| aenv_url = self.dummy_instance_ip or os.getenv( | ||
|
|
@@ -163,6 +165,7 @@ def __init__( | |
| self._initialized = False | ||
| self._client: Optional[AEnvSchedulerClient] = None | ||
| self._mcp_client: Optional[Client] = None | ||
| self._mcp_session_active: bool = False | ||
|
|
||
| def _log_prefix(self) -> str: | ||
| """Get log prefix with instance ID.""" | ||
|
|
@@ -257,6 +260,7 @@ async def release(self): | |
| ) | ||
| finally: | ||
| self._mcp_client = None | ||
| self._mcp_session_active = False | ||
|
|
||
| if self._client: | ||
| if self._instance and not self.dummy_instance_ip: | ||
|
|
@@ -298,23 +302,22 @@ async def list_tools(self) -> List[Dict[str, Any]]: | |
| await self._ensure_initialized() | ||
|
|
||
| try: | ||
| client = await self._get_mcp_client() | ||
| async with client: | ||
| tools = await client.list_tools() | ||
| logger.info( | ||
| f"{self._log_prefix()} Found {len(tools)} tools in environment {self.env_name}" | ||
| ) | ||
| client = await self._ensure_mcp_session() | ||
| tools = await client.list_tools() | ||
| logger.info( | ||
| f"{self._log_prefix()} Found {len(tools)} tools in environment {self.env_name}" | ||
| ) | ||
|
|
||
| formatted_tools = [ | ||
| { | ||
| "name": f"{self.env_name}/{tool.name}", | ||
| "description": tool.description, | ||
| "inputSchema": tool.inputSchema, | ||
| } | ||
| for tool in tools | ||
| ] | ||
| formatted_tools = [ | ||
| { | ||
| "name": f"{self.env_name}/{tool.name}", | ||
| "description": tool.description, | ||
| "inputSchema": tool.inputSchema, | ||
| } | ||
| for tool in tools | ||
| ] | ||
|
|
||
| return formatted_tools | ||
| return formatted_tools | ||
| except Exception as e: | ||
| logger.error( | ||
| f"{self._log_prefix()} Failed to list tools for {self.env_name}: {str(e)} | " | ||
|
|
@@ -459,6 +462,7 @@ async def _call_function( | |
| method: str = "POST", | ||
| timeout: Optional[float] = None, | ||
| ensure_initialized: bool = True, | ||
| quiet: bool = False, | ||
| ) -> Dict[str, Any]: | ||
| """ | ||
| Execute a registered function via HTTP endpoint. | ||
|
|
@@ -467,6 +471,7 @@ async def _call_function( | |
| function_url: url of the registered function | ||
| arguments: Arguments to pass to the function | ||
| timeout: Override default timeout | ||
| quiet: If True, log at debug level instead of error on transient issues | ||
|
|
||
| Returns: | ||
| Function execution result | ||
|
|
@@ -538,28 +543,30 @@ async def _call_function( | |
| if server_error: | ||
| error_msg = f"{error_msg} | Server error: {server_error}" | ||
|
|
||
| logger.error( | ||
| f"{self._log_prefix()} Function '{function_url}' execution http request failed: {error_msg} | " | ||
| _log = logger.debug if quiet else logger.error | ||
| _log( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| f"{self._log_prefix()} Function '{function_url}' execution http request not ready: {error_msg} | " | ||
| f"Type: {type(e).__name__} | " | ||
| f"Environment: {self.env_name} | " | ||
| f"Arguments: {arguments} | " | ||
| f"Timeout: {timeout or self.timeout}s | " | ||
| f"Function URL: {function_url}" | ||
| ) | ||
| raise EnvironmentError( | ||
| f"Function '{function_url}' execution failed: {error_msg}" | ||
| f"Function '{function_url}' execution not ready: {error_msg}" | ||
| ) | ||
| except Exception as e: | ||
| logger.error( | ||
| f"{self._log_prefix()} Function '{function_url}' execution failed: {str(e)} | " | ||
| _log = logger.debug if quiet else logger.error | ||
| _log( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| f"{self._log_prefix()} Function '{function_url}' execution encountered an issue: {str(e)} | " | ||
| f"Type: {type(e).__name__} | " | ||
| f"Environment: {self.env_name} | " | ||
| f"Arguments: {arguments} | " | ||
| f"Timeout: {timeout or self.timeout}s | " | ||
| f"Function URL: {function_url}" | ||
| ) | ||
| raise EnvironmentError( | ||
| f"Function '{function_url}' execution failed: {str(e)}" | ||
| f"Function '{function_url}' execution encountered an issue: {str(e)}" | ||
| ) | ||
|
|
||
| async def check_health( | ||
|
|
@@ -639,40 +646,41 @@ async def call_tool( | |
| actual_tool_name = tool_name | ||
|
|
||
| logger.info( | ||
| f"{self._log_prefix()} Executing tool: {actual_tool_name} in environment {self.env_name}" | ||
| f"{self._log_prefix()} Executing tool: {actual_tool_name} in environment {self.env_name}, arguments={arguments}" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| ) | ||
|
|
||
| try: | ||
| client = await self._get_mcp_client() | ||
| async with client: | ||
| result = await client.call_tool_mcp( | ||
| name=actual_tool_name, arguments=arguments, timeout=timeout | ||
| ) | ||
| client = await self._ensure_mcp_session() | ||
| result = await client.call_tool_mcp( | ||
| name=actual_tool_name, arguments=arguments, timeout=timeout | ||
| ) | ||
|
|
||
| # Convert FastMCP result to ToolResult | ||
| content = [] | ||
| if result.content: | ||
| for item in result.content: | ||
| if hasattr(item, "text") and item.text: | ||
| content.append({"type": "text", "text": item.text}) | ||
| elif hasattr(item, "type") and hasattr(item, "data"): | ||
| content.append({"type": item.type, "data": item.data}) | ||
| else: | ||
| content.append({"type": "text", "text": str(item)}) | ||
| # Convert FastMCP result to ToolResult | ||
| content = [] | ||
| if result.content: | ||
| for item in result.content: | ||
| if hasattr(item, "text") and item.text: | ||
| content.append({"type": "text", "text": item.text}) | ||
| elif hasattr(item, "type") and hasattr(item, "data"): | ||
| content.append({"type": item.type, "data": item.data}) | ||
| else: | ||
| content.append({"type": "text", "text": str(item)}) | ||
|
|
||
| return ToolResult(content=content, is_error=result.isError) | ||
| return ToolResult(content=content, is_error=result.isError) | ||
|
|
||
| except Exception as e: | ||
| logger.error( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| f"{self._log_prefix()} Tool execution failed: {str(e)} | " | ||
| f"{self._log_prefix()} Tool execution encountered an issue: {str(e)} | " | ||
| f"Type: {type(e).__name__} | " | ||
| f"Environment: {self.env_name} | " | ||
| f"Tool: {actual_tool_name} | " | ||
| f"Arguments: {arguments} | " | ||
| f"Timeout: {timeout or self.timeout}s | " | ||
| f"MCP URL: {self.aenv_data_url}" | ||
| ) | ||
| raise ToolError(f"Tool '{actual_tool_name}' execution failed: {str(e)}") | ||
| raise ToolError( | ||
| f"Tool '{actual_tool_name}' execution encountered an issue: {str(e)}" | ||
| ) | ||
|
|
||
| async def get_env_info(self) -> Dict[str, Any]: | ||
| """Get environment information.""" | ||
|
|
@@ -734,6 +742,7 @@ async def _wait_for_healthy(self, timeout: float = 300.0) -> None: | |
| timeout=3.0, | ||
| method="GET", | ||
| ensure_initialized=False, | ||
| quiet=True, | ||
| ) | ||
|
|
||
| logger.debug( | ||
|
|
@@ -751,7 +760,7 @@ async def _wait_for_healthy(self, timeout: float = 300.0) -> None: | |
|
|
||
| except Exception as e: | ||
| logger.debug( | ||
| f"{self._log_prefix()} Health check failed: {str(e)}, retrying..." | ||
| f"{self._log_prefix()} Health check attempt {times + 1}: {str(e)}, retrying..." | ||
| ) | ||
|
|
||
| if asyncio.get_event_loop().time() - start_time > timeout: | ||
|
|
@@ -829,6 +838,7 @@ async def _create_env_instance(self): | |
| arguments=self.arguments, | ||
| ttl=self.ttl, | ||
| owner=self.owner, | ||
| labels=self.labels, | ||
| ) | ||
| logger.info( | ||
| f"{self._log_prefix()} Environment instance created with ID: {self._instance.id}" | ||
|
|
@@ -908,3 +918,60 @@ async def _get_mcp_client(self) -> Client: | |
| f"Timeout: {self.timeout}s " | ||
| ) | ||
| raise EnvironmentError(f"Failed to create MCP client: {str(e)}") | ||
|
|
||
| async def _ensure_mcp_session(self) -> Client: | ||
| """ | ||
| Ensure MCP client exists and its session is active. | ||
|
|
||
| Lazily creates the Client and enters its async context (establishing | ||
| the MCP session) on first call. Subsequent calls return the same | ||
| connected client. The session is only torn down in release(). | ||
|
|
||
| Returns: | ||
| Connected Client with an active MCP session. | ||
| """ | ||
| # Fast path: session already active and connected | ||
| if self._mcp_session_active and self._mcp_client is not None: | ||
| if self._mcp_client.is_connected(): | ||
| return self._mcp_client | ||
| # Session died unexpectedly; will reconnect below | ||
| logger.warning(f"{self._log_prefix()} MCP session lost, reconnecting...") | ||
| self._mcp_session_active = False | ||
|
|
||
| # Lazy-init the lock | ||
| if not hasattr(self, "_mcp_session_lock"): | ||
| self._mcp_session_lock = asyncio.Lock() | ||
|
|
||
| async with self._mcp_session_lock: | ||
| # Double-check after acquiring lock | ||
| if self._mcp_session_active and self._mcp_client is not None: | ||
| if self._mcp_client.is_connected(): | ||
| return self._mcp_client | ||
| self._mcp_session_active = False | ||
|
|
||
| # Close stale client if any | ||
| if self._mcp_client is not None: | ||
| try: | ||
| await self._mcp_client.close() | ||
| except Exception as e: | ||
| logger.debug( | ||
| f"{self._log_prefix()} Error closing stale MCP client: {e}" | ||
| ) | ||
| self._mcp_client = None | ||
|
|
||
| # Create fresh client and establish session | ||
| client = await self._get_mcp_client() | ||
| try: | ||
| await client.__aenter__() | ||
| self._mcp_session_active = True | ||
| logger.info( | ||
| f"{self._log_prefix()} MCP session established and will be reused" | ||
| ) | ||
| return client | ||
| except Exception as e: | ||
| self._mcp_client = None | ||
| self._mcp_session_active = False | ||
| logger.error( | ||
| f"{self._log_prefix()} Failed to establish MCP session: {e}" | ||
| ) | ||
| raise EnvironmentError(f"Failed to establish MCP session: {e}") from e | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
create_env_instancemethod logs the entireenvironment_variablesandlabelsdictionaries at theINFOlevel. These dictionaries are highly likely to contain sensitive information such as API keys, passwords, or Personally Identifiable Information (PII) required for the environment setup. Logging this information can lead to sensitive data leakage in log files.