From ec6a4e1b272a1ac90b82cd375d2a867c44202f27 Mon Sep 17 00:00:00 2001 From: Gautam Prajapati Date: Mon, 2 Jun 2025 14:46:10 +0530 Subject: [PATCH 1/3] Add MCP support --- README.md | 43 ++- mcp.toml.example | 111 ++++++++ mxtoai/MCP_MIGRATION_GUIDE.md | 226 ++++++++++++++++ mxtoai/agents/email_agent.py | 442 +++++++++++++++++-------------- mxtoai/mcp/__init__.py | 51 ++++ mxtoai/mcp/client.py | 270 +++++++++++++++++++ mxtoai/mcp/tool_adapter.py | 197 ++++++++++++++ mxtoai/mcp/tool_collection.py | 165 ++++++++++++ mxtoai/run_github_mcp_example.py | 101 +++++++ mxtoai/test_custom_mcp_client.py | 146 ++++++++++ poetry.lock | 133 +++++++++- pyproject.toml | 2 +- 12 files changed, 1682 insertions(+), 205 deletions(-) create mode 100644 mcp.toml.example create mode 100644 mxtoai/MCP_MIGRATION_GUIDE.md create mode 100644 mxtoai/mcp/__init__.py create mode 100644 mxtoai/mcp/client.py create mode 100644 mxtoai/mcp/tool_adapter.py create mode 100644 mxtoai/mcp/tool_collection.py create mode 100644 mxtoai/run_github_mcp_example.py create mode 100644 mxtoai/test_custom_mcp_client.py diff --git a/README.md b/README.md index f1d40f4..6979958 100644 --- a/README.md +++ b/README.md @@ -106,20 +106,25 @@ Copy the `.env.example` file to `.env` and update with your specific configurati ```env LITELLM_CONFIG_PATH=model.config.toml +# Optional: Path to your MCP server configuration file +# MCP_CONFIG_PATH=mcp.toml # Redis configuration REDIS_HOST=localhost REDIS_PORT=6379 -# Optional for research functionality -JINA_API_KEY=your-jina-api-key +# JINA API Key +# JINA_API_KEY=your-jina-api-key # Uncomment and set if using deep research # For image processing -AZURE_VISION_ENDPOINT=your-azure-vision-endpoint -AZURE_VISION_KEY=your-azure-vision-key +# AZURE_VISION_ENDPOINT=your-azure-vision-endpoint # Uncomment and set if using Azure vision +# AZURE_VISION_KEY=your-azure-vision-key # Uncomment and set if using Azure vision # For web search functionality -SERPAPI_API_KEY=your-serpapi-api-key +# SERPAPI_API_KEY=your-serpapi-api-key # Uncomment and set for Google Search via SerpAPI +# SERPER_API_KEY=your-serper-api-key # Uncomment and set for Google Search via Serper + +SENDER_EMAIL=ai-assistant@mxtoai.com ``` This project supports load balancing and routing across multiple models, so you can define as many models as you'd like. Copy `model.config.example.toml` to a toml file and update it with your preferred configuration. Update `.env` with the path your toml relative to root. @@ -257,6 +262,34 @@ The system now supports: - Fallback responses for partial failures - Comprehensive error logging +### MCP Server Integration (Optional) + +The system supports integration with Model Context Protocol (MCP) servers, allowing the EmailAgent to leverage additional tools and data sources. + +**Configuration:** + +1. Create or copy `mcp.toml.example` to `mcp.toml` in the project root. +2. Edit `mcp.toml` to define your MCP server configurations. Refer to the comments within `mcp.toml.example` for detailed instructions and examples for both Stdio and SSE based servers. +3. Ensure each server configuration you want to use has `enabled = true`. +4. You can specify a custom path for this configuration file by setting the `MCP_CONFIG_PATH` environment variable in your `.env` file. + +**Example `mcp.toml` entry:** +```toml +[mcp_servers.my_filesystem_reader] +type = "stdio" +command = "npx" +args = [ + "-y", + "@modelcontextprotocol/server-filesystem", + "/path/to/readable/directory" +] +env = { "SOME_VAR" = "some_value" } +enabled = true +``` + +**Security Note:** +Using MCP servers, especially Stdio-based ones, involves running external commands and code. The system uses `trust_remote_code=True` when loading these tools via `smolagents`, which is often necessary for their functionality but carries inherent security risks. **Only configure and enable MCP servers from sources you explicitly trust.** + ## Load Testing The project uses Locust for load testing various email processing scenarios. diff --git a/mcp.toml.example b/mcp.toml.example new file mode 100644 index 0000000..12ac47f --- /dev/null +++ b/mcp.toml.example @@ -0,0 +1,111 @@ +# mcp.toml.example +# Configuration file for Model Context Protocol (MCP) servers. +# +# This file allows the EmailAgent to connect to various MCP servers +# and utilize the tools they provide. +# +# Security Warning: +# Enabling MCP servers, especially Stdio-based ones, involves running external commands +# and potentially arbitrary code. The `trust_remote_code=True` flag is used when loading +# these tools with smolagents, which is necessary for their operation but carries inherent risks. +# ALWAYS ENSURE YOU TRUST THE SOURCE AND IMPLEMENTATION OF ANY MCP SERVER YOU CONFIGURE. +# For Stdio-based servers, the commands are executed on the machine where the agent is running. +# For SSE-based servers, while the server itself runs remotely, ensure the endpoint is trusted. + +# [[mcp_servers]] # Use a list of tables for multiple servers of the same type or for clarity +# name = "filesystem_example" +# type = "stdio" # "stdio" or "sse" +# command = "npx" +# args = [ +# "-y", +# "@modelcontextprotocol/server-filesystem", +# "/path/to/your/Desktop", # Replace with actual accessible paths +# "/path/to/your/Downloads" # Replace with actual accessible paths +# ] +# # Optional environment variables for the command +# # env = { "SOME_VARIABLE" = "some_value" } + +# [[mcp_servers]] +# name = "github_stdio_example" +# type = "stdio" +# command = "npx" +# args = [ +# "-y", +# "@modelcontextprotocol/server-github" +# ] +# env = { GITHUB_PERSONAL_ACCESS_TOKEN = "" } # Replace with your token + +# [[mcp_servers]] +# name = "github_docker_example" +# type = "stdio" +# command = "docker" +# args = [ +# "run", +# "-i", # For interactive processes +# "--rm", # Automatically remove the container when it exits +# "-e", "GITHUB_PERSONAL_ACCESS_TOKEN", # Pass the environment variable to the container +# "mcp/github" # The Docker image for the GitHub MCP server +# ] +# env = { GITHUB_PERSONAL_ACCESS_TOKEN = "" } # Replace with your token + +# [[mcp_servers]] +# name = "remote_sse_example" +# type = "sse" +# url = "http://127.0.0.1:8000/sse" # Replace with the actual URL of your SSE MCP server +# # Optional: any other parameters required by the mcp.client.sse.sse_client +# # extra_params = { "some_other_sse_param" = "value" } + + +# More structured way using TOML tables: +# Each key under [mcp_servers] will be treated as a server configuration. +# The name of the server will be the key itself (e.g., "filesystem", "github_service"). + +[mcp_servers.filesystem] +type = "stdio" # "stdio" or "sse" +command = "npx" +args = [ + "-y", + "@modelcontextprotocol/server-filesystem", + "/path/to/your/Desktop", # IMPORTANT: Replace with actual, accessible paths + "/path/to/your/Downloads" # IMPORTANT: Replace with actual, accessible paths +] +# Optional environment variables for the command +# env = { "SOME_VARIABLE" = "some_value" } +# enabled = true # Optional: defaults to true if not specified. Set to false to disable. + +[mcp_servers.github_stdio] +type = "stdio" +command = "npx" +args = [ + "-y", + "@modelcontextprotocol/server-github" +] +env = { GITHUB_PERSONAL_ACCESS_TOKEN = "" } # IMPORTANT: Replace with your token +# enabled = true + +[mcp_servers.github_docker] +type = "stdio" +command = "docker" +args = [ + "run", + "-i", # For interactive processes + "--rm", # Automatically remove the container when it exits + "-e", "GITHUB_PERSONAL_ACCESS_TOKEN", # Pass the environment variable to the container + "mcp/github" # The Docker image for the GitHub MCP server +] +env = { GITHUB_PERSONAL_ACCESS_TOKEN = "" } # IMPORTANT: Replace with your token +# enabled = true + +[mcp_servers.pubmed_example] +type = "stdio" +command = "uvx" # Using uvx as per smolagents documentation example +args = ["--quiet", "pubmedmcp@0.1.3"] +env = {"UV_PYTHON" = "3.12"} # Ensure this matches your environment if needed, os.environ will be merged. +# enabled = true + +[mcp_servers.remote_sse_service] +type = "sse" +url = "http://127.0.0.1:8000/sse" # IMPORTANT: Replace with the actual URL of your SSE MCP server +# Optional: any other parameters required by the mcp.client.sse.sse_client for SSE connections +# extra_params = { "some_other_sse_param" = "value" } +# enabled = true \ No newline at end of file diff --git a/mxtoai/MCP_MIGRATION_GUIDE.md b/mxtoai/MCP_MIGRATION_GUIDE.md new file mode 100644 index 0000000..8366e04 --- /dev/null +++ b/mxtoai/MCP_MIGRATION_GUIDE.md @@ -0,0 +1,226 @@ +# MCP Migration Guide: From MCPAdapt to Custom Implementation + +## Overview + +This document describes the migration from the `MCPAdapt` library dependency to a custom Model Context Protocol (MCP) client implementation for the mxtoai project. + +## Background + +The original implementation used: +- `smolagents.ToolCollection.from_mcp()` which relied on the `MCPAdapt` library +- External dependency on `mcpadapt.smolagents_adapter.SmolAgentsAdapter` + +**Issues with the original approach:** +- MCPAdapt library wasn't working reliably +- Limited control over MCP client behavior +- Difficult to debug and customize +- Dependency on external library for core functionality + +## New Custom Implementation + +### Architecture + +``` +mxtoai/mcp/ +├── __init__.py # Module exports +├── client.py # Core MCP client implementation +├── tool_adapter.py # MCP-to-smolagents tool conversion +└── tool_collection.py # Tool collection with context managers +``` + +### Key Components + +#### 1. MCP Client (`client.py`) +- **BaseMCPClient**: Abstract base class for MCP clients +- **StdioMCPClient**: Handles stdio-based MCP servers (e.g., Docker containers) +- **SSEMCPClient**: Handles Server-Sent Events MCP servers +- **Synchronous Interface**: Wraps async MCP operations for smolagents compatibility + +```python +# Factory function for creating MCP clients +with create_mcp_client(server_name, server_config) as client: + tools = client.list_tools() + result = client.call_tool("tool_name", {"arg": "value"}) +``` + +#### 2. Tool Adapter (`tool_adapter.py`) +- **MCPToolAdapter**: Converts MCP tools to smolagents Tool instances +- **Function Name Sanitization**: Handles Python keyword conflicts and naming issues +- **Schema Conversion**: Maps MCP JSON schemas to smolagents input formats + +```python +adapter = MCPToolAdapter() +smolagents_tools = adapter.adapt_all_tools(mcp_client) +``` + +#### 3. Tool Collection (`tool_collection.py`) +- **CustomMCPToolCollection**: Replacement for smolagents ToolCollection.from_mcp +- **Context Managers**: Proper resource management for MCP connections +- **Configuration Loading**: Direct integration with mcp.toml format + +```python +# Load from configuration +with load_mcp_tools_from_config(mcp_servers_config) as tools: + agent = ToolCallingAgent(tools=tools, ...) + +# Load from parameters (backward compatibility) +with load_mcp_tools_from_stdio_params(server_params) as tools: + agent = CodeAgent(tools=tools, ...) +``` + +### Features + +#### ✅ Synchronous Operation +- Compatible with smolagents (no async support) +- Works with Dramatiq workers +- Proper event loop management + +#### ✅ Dual Server Support +- **Stdio servers**: Docker containers, local processes +- **SSE servers**: HTTP-based MCP servers + +#### ✅ Robust Error Handling +- Connection timeouts and retries +- Graceful fallback to base tools +- Detailed logging and debugging + +#### ✅ Resource Management +- Automatic connection cleanup +- Context managers for safe resource handling +- Thread pool management for async operations + +#### ✅ Configuration Integration +- Native mcp.toml support +- Environment variable merging +- Server enable/disable controls + +## Migration Changes + +### EmailAgent Updates + +**Before (MCPAdapt):** +```python +from smolagents import ToolCollection + +with ToolCollection.from_mcp(params, trust_remote_code=True) as tool_collection: + all_tools.extend(tool_collection.tools) +``` + +**After (Custom Implementation):** +```python +from mxtoai.mcp import load_mcp_tools_from_config + +mcp_servers_config = self._get_mcp_servers_config() +with load_mcp_tools_from_config(mcp_servers_config) as mcp_tools: + all_tools.extend(mcp_tools) +``` + +### Configuration Format + +The `mcp.toml` format remains the same, but now uses native parsing: + +```toml +[mcp_servers.github_stdio] +type = "stdio" +command = "docker" +args = ["run", "-i", "--rm", "-e", "GITHUB_PERSONAL_ACCESS_TOKEN", "ghcr.io/github/github-mcp-server"] +env = { GITHUB_PERSONAL_ACCESS_TOKEN = "your_token" } +enabled = true + +[mcp_servers.remote_sse_service] +type = "sse" +url = "http://127.0.0.1:8000/sse" +enabled = true +``` + +## Benefits of Custom Implementation + +### 🚀 Performance +- Reduced dependency overhead +- Optimized for synchronous operation +- Better resource management + +### 🔧 Control & Debugging +- Full control over MCP client behavior +- Detailed logging at every step +- Easy to extend and customize + +### 🛡️ Reliability +- No external library dependency issues +- Robust error handling and recovery +- Proper connection lifecycle management + +### 🔄 Maintainability +- Clean, documented codebase +- Modular architecture +- Easy to test and modify + +## Testing + +### Test Script +Run the migration test: +```bash +python mxtoai/test_custom_mcp_client.py +``` + +### Integration Testing +The custom implementation is fully integrated into: +- EmailAgent.process_email() +- MCP server configuration loading +- Tool adaptation and execution + +## Security Considerations + +The custom implementation maintains the same security model: +- **Trust Remote Code**: Still required for MCP tool execution +- **Environment Isolation**: Docker containers for stdio servers +- **Configuration Validation**: Server configs are validated before use + +## Backward Compatibility + +The migration maintains backward compatibility: +- Same mcp.toml configuration format +- Same smolagents Tool interface +- Same agent execution patterns + +## Migration Checklist + +- [x] Implement core MCP client classes +- [x] Create tool adaptation layer +- [x] Build tool collection with context managers +- [x] Update EmailAgent to use custom implementation +- [x] Remove MCPAdapt dependency usage +- [x] Create test and validation scripts +- [x] Document migration process +- [x] Maintain configuration compatibility + +## Next Steps + +1. **Remove MCPAdapt Dependency**: Update pyproject.toml to remove mcpadapt requirement +2. **Performance Testing**: Test with various MCP servers to ensure performance +3. **Documentation Updates**: Update README and API documentation +4. **Production Deployment**: Deploy and monitor in production environment + +## Troubleshooting + +### Common Issues + +**Connection Timeouts** +```python +# Increase timeout in client.py if needed +future.result(timeout=60.0) # Adjust as needed +``` + +**Tool Name Conflicts** +```python +# Function names are automatically sanitized +# Check logs for name mappings: "tool-name" -> "tool_name" +``` + +**Environment Variables** +```python +# Ensure MCP server env vars are properly merged +server_config["env"] = {**os.environ, **server_config["env"]} +``` + +This custom implementation provides a robust, maintainable, and performant replacement for the MCPAdapt library dependency. diff --git a/mxtoai/agents/email_agent.py b/mxtoai/agents/email_agent.py index 85ea630..e04b5fc 100644 --- a/mxtoai/agents/email_agent.py +++ b/mxtoai/agents/email_agent.py @@ -1,15 +1,14 @@ import ast import os import re +import tomllib # For Python 3.11+ from datetime import datetime from typing import Any, Optional, Union +import contextlib # Added import from dotenv import load_dotenv - -# Update imports to use proper classes from smolagents +from mcp import StdioServerParameters from smolagents import Tool, ToolCallingAgent - -# Add imports for the new default tools from smolagents.default_tools import ( GoogleSearchTool, PythonInterpreterTool, @@ -19,6 +18,7 @@ ) from mxtoai._logging import get_logger +from mxtoai.mcp import load_mcp_tools_from_config from mxtoai.models import ProcessingInstructions from mxtoai.prompts.base_prompts import ( LIST_FORMATTING_REQUIREMENTS, @@ -46,8 +46,6 @@ from mxtoai.tools.attachment_processing_tool import AttachmentProcessingTool from mxtoai.tools.deep_research_tool import DeepResearchTool from mxtoai.tools.schedule_tool import ScheduleTool - -# Import the refactored fallback search tool from mxtoai.tools.search_with_fallback_tool import SearchWithFallbackTool # Import the new Brave Search tool from mxtoai.tools.brave_search_tool import initialize_brave_search_tool @@ -80,37 +78,36 @@ class EmailAgent: """ def __init__( - self, attachment_dir: str = "email_attachments", verbose: bool = False, enable_deep_research: bool = False + self, + attachment_dir: str = "email_attachments", + verbose: bool = False, + enable_deep_research: bool = False, ): """ Initialize the email agent with tools for different operations. Args: - attachment_dir: Directory to store email attachments - verbose: Whether to enable verbose logging - enable_deep_research: Whether to enable Jina AI deep research functionality (uses API tokens) - + attachment_dir: Directory to store email attachments. + verbose: Whether to enable verbose logging. + enable_deep_research: Whether to enable Jina AI deep research functionality (uses API tokens). """ - # Set up logging if verbose: - # Consider configuring logging level via environment variables or central logging setup logger.debug("Verbose logging potentially enabled (actual level depends on logger config).") self.attachment_dir = attachment_dir os.makedirs(self.attachment_dir, exist_ok=True) + # Initialize base tools (non-MCP) self.attachment_tool = AttachmentProcessingTool() self.report_formatter = ReportFormatter() self.schedule_tool = ScheduleTool() self.visit_webpage_tool = VisitWebpageTool() self.python_tool = PythonInterpreterTool(authorized_imports=ALLOWED_PYTHON_IMPORTS) self.wikipedia_search_tool = WikipediaSearchTool() - - # Initialize complex tools using helper methods self.search_with_fallback_tool = self._initialize_search_tools() self.research_tool = self._initialize_deep_research_tool(enable_deep_research) - self.available_tools: list[Tool] = [ + self.base_tools: list[Tool] = [ self.attachment_tool, self.schedule_tool, self.visit_webpage_tool, @@ -120,30 +117,57 @@ def __init__( azure_visualizer, ] if self.research_tool: - self.available_tools.append(self.research_tool) + self.base_tools.append(self.research_tool) + + self.routed_model = RoutedLiteLLMModel() # Keep routed model initialization - logger.info(f"Agent tools initialized: {[tool.name for tool in self.available_tools]}") - self._init_agent() - logger.info("Email agent initialized successfully") + tool_names = [tool.name for tool in self.base_tools] + logger.info(f"Base agent tools initialized: {tool_names}") + logger.info("Email agent initialized (MCP tools will be loaded dynamically per request)") - def _init_agent(self): + def _get_mcp_servers_config(self) -> dict[str, dict[str, Any]]: """ - Initialize the ToolCallingAgent with Azure OpenAI. + Parses the mcp.toml file and returns server configurations + for enabled MCP servers. """ - # Initialize the routed model with the default model group - self.routed_model = RoutedLiteLLMModel() - - self.agent = ToolCallingAgent( - model=self.routed_model, - tools=self.available_tools, - max_steps=12, - verbosity_level=2, - planning_interval=4, - name="email_processing_agent", - description="An agent that processes emails, generates summaries, replies, and conducts research with advanced capabilities including web search, web browsing, and code execution.", - provide_run_summary=True, - ) - logger.debug("Agent initialized with routed model configuration") + effective_mcp_config_path = os.getenv("MCP_CONFIG_PATH", "mcp.toml") + + if not os.path.exists(effective_mcp_config_path): + logger.debug(f"MCP configuration file not found at '{effective_mcp_config_path}'. No MCP tools will be loaded.") + return {} + + try: + with open(effective_mcp_config_path, "rb") as f: + config = tomllib.load(f) + except tomllib.TOMLDecodeError as e: + logger.error(f"Error decoding MCP TOML file at '{effective_mcp_config_path}': {e}") + return {} + except OSError as e: + logger.error(f"Error reading MCP TOML file at '{effective_mcp_config_path}': {e}") + return {} + + mcp_servers_config = config.get("mcp_servers", {}) + if not mcp_servers_config: + logger.info(f"No 'mcp_servers' table found in '{effective_mcp_config_path}'. No MCP tools will be loaded.") + return {} + + # Filter for enabled servers and merge environment variables + enabled_servers = {} + for server_name, server_details in mcp_servers_config.items(): + if not server_details.get("enabled", True): # Default to enabled if not specified + logger.debug(f"MCP server '{server_name}' is disabled in config. Skipping.") + continue + + # Create a copy and merge environment variables for stdio servers + server_config = server_details.copy() + if server_config.get("type") == "stdio" and "env" in server_config: + # Merge with current environment + server_config["env"] = {**os.environ, **server_config["env"]} + + enabled_servers[server_name] = server_config + + logger.info(f"Found {len(enabled_servers)} enabled MCP servers in configuration.") + return enabled_servers def _initialize_search_tools(self) -> SearchWithFallbackTool: """ @@ -179,7 +203,9 @@ def _initialize_search_tools(self) -> SearchWithFallbackTool: primary_names = [getattr(p, "name", "UnknownTool") for p in primary_search_engines] fallback_name = getattr(google_search_fallback_tool, "name", "None") if google_search_fallback_tool else "None" - logger.info(f"Initialized SearchWithFallbackTool. Primary engines: {primary_names}, Fallback: {fallback_name}") + logger.info( + f"Initialized SearchWithFallbackTool. Primary engines: {primary_names}, Fallback: {fallback_name}" + ) return search_tool def _get_required_actions(self, mode: str) -> list[str]: @@ -187,11 +213,10 @@ def _get_required_actions(self, mode: str) -> list[str]: Get list of required actions based on mode. Args: - mode: The mode of operation (e.g., "summary", "reply", "research", "full") + mode: The mode of operation (e.g., "summary", "reply", "research", "full"). Returns: - List[str]: List of actions to be performed by the agent - + List of actions to be performed by the agent. """ actions = [] if mode in ["summary", "full"]: @@ -207,8 +232,7 @@ def _initialize_google_search_tool(self) -> Optional[GoogleSearchTool]: Initialize Google search tool with either SerpAPI or Serper provider. Returns: - Optional[GoogleSearchTool]: Initialized GoogleSearchTool instance or None if initialization fails - + Optional[GoogleSearchTool]: Initialized GoogleSearchTool instance or None if fails. """ if os.getenv("SERPAPI_API_KEY"): try: @@ -216,19 +240,16 @@ def _initialize_google_search_tool(self) -> Optional[GoogleSearchTool]: logger.debug("Initialized GoogleSearchTool with SerpAPI for fallback.") return tool except ValueError as e: - logger.warning(f"Failed to initialize GoogleSearchTool with SerpAPI for fallback: {e}") + logger.warning(f"Failed to initialize GoogleSearchTool with SerpAPI: {e}") elif os.getenv("SERPER_API_KEY"): try: tool = GoogleSearchTool(provider="serper") logger.debug("Initialized GoogleSearchTool with Serper for fallback.") return tool except ValueError as e: - logger.warning(f"Failed to initialize GoogleSearchTool with Serper for fallback: {e}") + logger.warning(f"Failed to initialize GoogleSearchTool with Serper: {e}") else: - logger.warning( - "GoogleSearchTool (for fallback) not initialized. Missing SERPAPI_API_KEY or SERPER_API_KEY." - ) - + logger.warning("GoogleSearchTool (fallback) not initialized: SERPAPI_API_KEY or SERPER_API_KEY missing.") return None def _initialize_deep_research_tool(self, enable_deep_research: bool) -> Optional[DeepResearchTool]: @@ -236,27 +257,24 @@ def _initialize_deep_research_tool(self, enable_deep_research: bool) -> Optional Initializes the DeepResearchTool if API key is available. Args: - enable_deep_research: Flag to enable deep research functionality + enable_deep_research: Flag to enable deep research functionality. Returns: - Optional[DeepResearchTool]: Initialized DeepResearchTool instance or None if API key is not found - + Optional[DeepResearchTool]: Initialized DeepResearchTool instance or None. """ - research_tool: Optional[DeepResearchTool] = None - if os.getenv("JINA_API_KEY"): - research_tool = DeepResearchTool() - if enable_deep_research: - # Assuming DeepResearchTool is enabled by its presence and API key. - # If specific enabling logic is needed in DeepResearchTool, it should be called here. - logger.debug( - "DeepResearchTool instance created; deep research functionality is active if enable_deep_research is true." - ) - else: - logger.debug( - "DeepResearchTool instance created, but deep research is not explicitly enabled via agent config (enable_deep_research=False). Tool may operate in a basic mode or not be used by agent logic if dependent on this flag." - ) - else: + if not os.getenv("JINA_API_KEY"): logger.info("JINA_API_KEY not found. DeepResearchTool not initialized.") + return None + + research_tool = DeepResearchTool() + if enable_deep_research: + logger.debug( + "DeepResearchTool instance created; deep research functionality is active." + ) + else: + logger.debug( + "DeepResearchTool instance created, but deep research is not explicitly enabled via agent config." + ) return research_tool def _create_task(self, email_request: EmailRequest, email_instructions: ProcessingInstructions) -> str: @@ -264,19 +282,15 @@ def _create_task(self, email_request: EmailRequest, email_instructions: Processi Create a task description for the agent based on email handle instructions. Args: - email_request: EmailRequest instance containing email data - email_instructions: EmailHandleInstructions object containing processing configuration + email_request: EmailRequest instance containing email data. + email_instructions: EmailHandleInstructions object with processing configuration. Returns: - str: The task description for the agent - + The task description for the agent. """ - # process attachments if specified - attachments = ( - self._format_attachments(email_request.attachments) - if email_instructions.process_attachments and email_request.attachments - else [] - ) + attachments = [] + if email_instructions.process_attachments and email_request.attachments: + attachments = self._format_attachments(email_request.attachments) return self._create_task_template( handle=email_instructions.handle, @@ -292,28 +306,26 @@ def _format_attachments(self, attachments: list[EmailAttachment]) -> list[str]: Format attachment details for inclusion in the task. Args: - attachments: List of EmailAttachment objects + attachments: List of EmailAttachment objects. Returns: - List[str]: Formatted attachment details - + List of formatted attachment details. """ return [ f'- {att.filename} (Type: {att.contentType}, Size: {att.size} bytes)\n EXACT FILE PATH: "{att.path}"' for att in attachments ] - def _create_email_context(self, email_request: EmailRequest, attachment_details=None) -> str: + def _create_email_context(self, email_request: EmailRequest, attachment_details: Optional[list[str]] = None) -> str: """ Generate context information from the email request. Args: - email_request: EmailRequest instance containing email data - attachment_details: List of formatted attachment details + email_request: EmailRequest instance containing email data. + attachment_details: List of formatted attachment details. Returns: - str: The context information for the agent - + The context information for the agent. """ recipients = ", ".join(email_request.recipients) if email_request.recipients else "N/A" attachments_info = ( @@ -321,6 +333,7 @@ def _create_email_context(self, email_request: EmailRequest, attachment_details= if attachment_details else "No attachments provided." ) + body_content = email_request.textContent or email_request.htmlContent or "" return f"""Email Content: Subject: {email_request.subject} From: {email_request.from_email} @@ -328,7 +341,7 @@ def _create_email_context(self, email_request: EmailRequest, attachment_details= Recipients: {recipients} CC: {email_request.cc or "N/A"} BCC: {email_request.bcc or "N/A"} - Body: {email_request.textContent or email_request.htmlContent or ""} + Body: {body_content} {attachments_info} """ @@ -338,11 +351,10 @@ def _create_attachment_task(self, attachment_details: list[str]) -> str: Return instructions for processing attachments, if any. Args: - attachment_details: List of formatted attachment details + attachment_details: List of formatted attachment details. Returns: - str: Instructions for processing attachments - + Instructions for processing attachments. """ return f"Process these attachments:\n{chr(10).join(attachment_details)}" if attachment_details else "" @@ -357,24 +369,12 @@ def _create_task_template( ) -> str: """ Combine all task components into the final task description. - - Args: - handle: The email handle being processed. - email_context: The context information extracted from the email. - handle_specific_template: Any specific template for the handle. - attachment_task: Instructions for processing attachments. - deep_research_mandatory: Flag indicating if deep research is mandatory. - output_template: The output template to use. - - Returns: - str: The complete task description for the agent. - """ - # Merge the task components into a single string by listing the sections + research_guideline = RESEARCH_GUIDELINES["mandatory"] if deep_research_mandatory else RESEARCH_GUIDELINES["optional"] sections = [ f"Process this email according to the '{handle}' instruction type.\n", email_context, - RESEARCH_GUIDELINES["mandatory"] if deep_research_mandatory else RESEARCH_GUIDELINES["optional"], + research_guideline, attachment_task, handle_specific_template, output_template, @@ -389,21 +389,16 @@ def _process_agent_result( ) -> DetailedEmailProcessingResult: processed_at_time = datetime.now().isoformat() - # Initialize schema components errors_list: list[ProcessingError] = [] email_sent_status = EmailSentStatus(status="pending", timestamp=processed_at_time) - - attachment_proc_summary: Union[str, None] = None + attachment_proc_summary: Optional[str] = None processed_attachment_details: list[ProcessedAttachmentDetail] = [] - - calendar_result_data: Union[CalendarResult, None] = None - - research_output_findings: Union[str, None] = None - research_output_metadata: Union[AgentResearchMetadata, None] = None - - final_answer_from_llm: Union[str, None] = None - email_text_content: Union[str, None] = None - email_html_content: Union[str, None] = None + calendar_result_data: Optional[CalendarResult] = None + research_output_findings: Optional[str] = None + research_output_metadata: Optional[AgentResearchMetadata] = None + final_answer_from_llm: Optional[str] = None + email_text_content: Optional[str] = None + email_html_content: Optional[str] = None try: logger.debug(f"Processing final answer object type: {type(final_answer_obj)}") @@ -411,17 +406,14 @@ def _process_agent_result( for i, step in enumerate(agent_steps): logger.debug(f"[Memory Step {i + 1}] Type: {type(step)}") + tool_name: Optional[str] = None + tool_output: Any = None - tool_name = None - tool_output = None - - if hasattr(step, "tool_calls") and isinstance(step.tool_calls, list) and len(step.tool_calls) > 0: + if hasattr(step, "tool_calls") and isinstance(step.tool_calls, list) and step.tool_calls: first_tool_call = step.tool_calls[0] tool_name = getattr(first_tool_call, "name", None) if not tool_name: logger.warning(f"[Memory Step {i + 1}] Could not extract tool name from first call.") - tool_name = None - action_out = getattr(step, "action_output", None) obs_out = getattr(step, "observations", None) tool_output = action_out if action_out is not None else obs_out @@ -432,24 +424,29 @@ def _process_agent_result( try: tool_output = ast.literal_eval(tool_output) except (ValueError, SyntaxError) as e: - logger.error( - f"[Memory Step {i + 1}] Failed to parse '{tool_name}' output: {e!s}. Content: {tool_output[:200]}..." + msg = ( + f"[Memory Step {i + 1}] Failed to parse '{tool_name}' output: {e!s}. " + f"Content: {tool_output[:200]}..." ) + logger.error(msg) errors_list.append( ProcessingError(message=f"Failed to parse {tool_name} output", details=str(e)) ) continue - except Exception as e: - logger.error( - f"[Memory Step {i + 1}] Unexpected error parsing '{tool_name}' output: {e!s}. Content: {tool_output[:200]}..." + except Exception as e: # pylint: disable=broad-except + msg = ( + f"[Memory Step {i + 1}] Unexpected error parsing '{tool_name}' output: {e!s}. " + f"Content: {tool_output[:200]}..." ) + logger.error(msg) errors_list.append( ProcessingError(message=f"Unexpected error parsing {tool_name} output", details=str(e)) ) continue logger.debug( - f"[Memory Step {i + 1}] Processing tool call: '{tool_name}', Output Type: '{type(tool_output)}'" + f"[Memory Step {i + 1}] Processing tool call: '{tool_name}', " + f"Output Type: '{type(tool_output)}'" ) if tool_name == "attachment_processor" and isinstance(tool_output, dict): @@ -495,14 +492,15 @@ def _process_agent_result( errors_list.append(ProcessingError(message="Schedule Tool Error", details=error_msg)) else: logger.debug( - f"[Memory Step {i + 1}] Tool '{tool_name}' output processed (no specific handler). Output: {str(tool_output)[:200]}..." + f"[Memory Step {i + 1}] Tool '{tool_name}' output processed (no specific handler). " + f"Output: {str(tool_output)[:200]}..." ) else: logger.debug( - f"[Memory Step {i + 1}] Skipping step (Type: {type(step)}), not a relevant ActionStep or missing output." + f"[Memory Step {i + 1}] Skipping step (Type: {type(step)}), " + "not a relevant ActionStep or missing output." ) - # Extract final answer from LLM if hasattr(final_answer_obj, "text"): final_answer_from_llm = str(final_answer_obj.text).strip() logger.debug("Extracted final answer from AgentResponse.text") @@ -512,12 +510,10 @@ def _process_agent_result( elif hasattr(final_answer_obj, "_value"): # Check for older AgentText structure final_answer_from_llm = str(final_answer_obj._value).strip() logger.debug("Extracted final answer from AgentText._value") - elif hasattr(final_answer_obj, "answer"): # Handle final_answer tool call argument - # Check if the argument itself is the content string + elif hasattr(final_answer_obj, "answer"): if isinstance(getattr(final_answer_obj, "answer", None), str): final_answer_from_llm = str(final_answer_obj.answer).strip() logger.debug("Extracted final answer from final_answer tool argument string") - # Or if it's nested in arguments (less likely for final_answer but check) elif ( isinstance(getattr(final_answer_obj, "arguments", None), dict) and "answer" in final_answer_obj.arguments @@ -527,15 +523,16 @@ def _process_agent_result( else: final_answer_from_llm = str(final_answer_obj).strip() logger.warning( - f"Could not find specific answer attribute in final_answer object, using str(). Result: {final_answer_from_llm[:100]}..." + f"Could not find specific answer attribute in final_answer object, using str(). " + f"Result: {final_answer_from_llm[:100]}..." ) else: final_answer_from_llm = str(final_answer_obj).strip() logger.warning( - f"Could not find specific answer attribute in final_answer object, using str(). Result: {final_answer_from_llm[:100]}..." + f"Could not find specific answer attribute in final_answer object, using str(). " + f"Result: {final_answer_from_llm[:100]}..." ) - # Determine email body content email_body_content_source = research_output_findings if research_output_findings else final_answer_from_llm if email_body_content_source: @@ -550,7 +547,10 @@ def _process_agent_result( temp_content = email_body_content_source for marker in signature_markers: temp_content = re.sub( - r"^[\s\n]*" + re.escape(marker) + r".*$", "", temp_content, flags=re.IGNORECASE | re.MULTILINE + r"^[\s\n]*" + re.escape(marker) + r".*$", + "", + temp_content, + flags=re.IGNORECASE | re.MULTILINE, ).strip() email_text_content = self.report_formatter.format_report( @@ -560,7 +560,10 @@ def _process_agent_result( temp_content, format_type="html", include_signature=True ) else: - fallback_msg = "I apologize, but I encountered an issue generating the detailed response. Please try again later or contact support if this issue persists." + fallback_msg = ( + "I apologize, but I encountered an issue generating the detailed response. " + "Please try again later or contact support if this issue persists." + ) email_text_content = self.report_formatter.format_report( fallback_msg, format_type="text", include_signature=True ) @@ -571,18 +574,16 @@ def _process_agent_result( email_sent_status.status = "error" email_sent_status.error = "No reply text was generated" - # Construct the final Pydantic model INSIDE the try block return DetailedEmailProcessingResult( metadata=ProcessingMetadata( processed_at=processed_at_time, - mode=current_email_handle, # Use the passed handle for mode + mode=current_email_handle, errors=errors_list, email_sent=email_sent_status, ), email_content=EmailContentDetails( text=email_text_content, html=email_html_content, - # Assuming enhanced content is same as base for now enhanced={"text": email_text_content, "html": email_html_content}, ), attachments=AttachmentsProcessingResult( @@ -596,40 +597,28 @@ def _process_agent_result( else None, ) - except Exception as e: + except Exception as e: # pylint: disable=broad-except logger.exception(f"Critical error in _process_agent_result: {e!s}") - # Ensure errors_list and email_sent_status are updated - # If these were initialized outside and before this try-except, they might already exist. - # Re-initialize or ensure they are correctly formed for the error state. - # This part already handles populating errors_list and setting email_sent_status. - - # Ensure basic structure for fallback if critical error happened early - if not errors_list: # If the error happened before any specific error was added + if not errors_list: errors_list.append(ProcessingError(message="Critical error in _process_agent_result", details=str(e))) - current_timestamp = datetime.now().isoformat() # Use a fresh timestamp - if email_sent_status.status != "error": # If not already set to error by prior logic + current_timestamp = datetime.now().isoformat() + if email_sent_status.status != "error": email_sent_status.status = "error" email_sent_status.error = f"Critical error in _process_agent_result: {e!s}" email_sent_status.timestamp = current_timestamp - # Fallback email content if not already set fb_text = "I encountered a critical error processing your request during result generation." - final_email_text = ( - email_text_content - if email_text_content - else self.report_formatter.format_report(fb_text, format_type="text", include_signature=True) + final_email_text = email_text_content or self.report_formatter.format_report( + fb_text, format_type="text", include_signature=True ) - final_email_html = ( - email_html_content - if email_html_content - else self.report_formatter.format_report(fb_text, format_type="html", include_signature=True) + final_email_html = email_html_content or self.report_formatter.format_report( + fb_text, format_type="html", include_signature=True ) - # Construct and return an error-state DetailedEmailProcessingResult return DetailedEmailProcessingResult( metadata=ProcessingMetadata( - processed_at=processed_at_time, # or current_timestamp, consider consistency + processed_at=processed_at_time, mode=current_email_handle, errors=errors_list, email_sent=email_sent_status, @@ -637,16 +626,14 @@ def _process_agent_result( email_content=EmailContentDetails( text=final_email_text, html=final_email_html, - enhanced={"text": final_email_text, "html": final_email_html}, # ensure enhanced also has fallback + enhanced={"text": final_email_text, "html": final_email_html}, ), attachments=AttachmentsProcessingResult( - summary=attachment_proc_summary - if attachment_proc_summary - else None, # Keep any partial data if available - processed=processed_attachment_details if processed_attachment_details else [], + summary=attachment_proc_summary, + processed=processed_attachment_details, ), - calendar_data=calendar_result_data, # Keep any partial data - research=AgentResearchOutput( # Keep any partial data + calendar_data=calendar_result_data, + research=AgentResearchOutput( findings_content=research_output_findings, metadata=research_output_metadata ) if research_output_findings or research_output_metadata @@ -657,29 +644,100 @@ def process_email( self, email_request: EmailRequest, email_instructions: ProcessingInstructions, - ) -> DetailedEmailProcessingResult: # Updated return type annotation + ) -> DetailedEmailProcessingResult: """ Process an email using the agent based on the provided email handle instructions. Args: - email_request: EmailRequest instance containing email data - email_instructions: ProcessingInstructions object containing processing configuration + email_request: EmailRequest instance containing email data. + email_instructions: ProcessingInstructions object with processing configuration. Returns: DetailedEmailProcessingResult: Pydantic model with structured processing results. - """ try: self.routed_model.current_handle = email_instructions task = self._create_task(email_request, email_instructions) + logger.info("Starting agent execution setup...") + + # Get MCP server configurations + mcp_servers_config = self._get_mcp_servers_config() + + all_tools = list(self.base_tools) # Start with base tools + agent_steps = [] + final_answer_obj = None + + agent_description = ( + "An agent that processes emails, generates summaries, replies, and conducts research " + "with advanced capabilities including web search, web browsing, code execution, and MCP tools." + ) - logger.info("Starting agent execution...") - final_answer_obj = self.agent.run(task) - logger.info("Agent execution completed.") - - agent_steps = list(self.agent.memory.steps) - logger.info(f"Captured {len(agent_steps)} steps from agent memory.") + # Load MCP tools using our custom implementation + if mcp_servers_config: + try: + logger.info(f"Loading MCP tools from {len(mcp_servers_config)} configured servers") + + with load_mcp_tools_from_config(mcp_servers_config) as mcp_tools: + all_tools.extend(mcp_tools) + mcp_tools_count = len(mcp_tools) + + logger.info(f"Successfully loaded {mcp_tools_count} MCP tools") + logger.info(f"Total tools available: {len(all_tools)} (Base: {len(self.base_tools)}, MCP: {mcp_tools_count})") + + # Initialize ToolCallingAgent with all tools + agent = ToolCallingAgent( + model=self.routed_model, + tools=all_tools, + max_steps=12, + verbosity_level=2, + planning_interval=4, + name="email_processing_agent_with_mcp", + description=agent_description, + provide_run_summary=True, + ) + logger.debug("Initialized ToolCallingAgent with MCP tools") + + logger.info("Starting agent.run() with MCP tools...") + final_answer_obj = agent.run(task) + logger.info("Agent.run() execution completed.") + + agent_steps = list(agent.memory.steps) + logger.debug(f"Captured {len(agent_steps)} steps from agent memory.") + + except Exception as e: + logger.error(f"Failed to load MCP tools, falling back to base tools only: {e}") + # Fall back to base tools only + agent = ToolCallingAgent( + model=self.routed_model, + tools=all_tools, # Just base tools at this point + max_steps=12, + verbosity_level=2, + planning_interval=4, + name="email_processing_agent_base_only", + description=agent_description, + provide_run_summary=True, + ) + logger.info("Starting agent.run() with base tools only...") + final_answer_obj = agent.run(task) + agent_steps = list(agent.memory.steps) + else: + # No MCP servers configured, use base tools only + logger.info("No MCP servers configured, using base tools only") + agent = ToolCallingAgent( + model=self.routed_model, + tools=all_tools, + max_steps=12, + verbosity_level=2, + planning_interval=4, + name="email_processing_agent_base_only", + description=agent_description, + provide_run_summary=True, + ) + logger.info("Starting agent.run() with base tools only...") + final_answer_obj = agent.run(task) + agent_steps = list(agent.memory.steps) + # Process the results processed_result = self._process_agent_result(final_answer_obj, agent_steps, email_instructions.handle) if not processed_result.email_content or not processed_result.email_content.text: @@ -688,19 +746,27 @@ def process_email( processed_result.metadata.errors.append(ProcessingError(message=msg)) processed_result.metadata.email_sent.status = "error" processed_result.metadata.email_sent.error = msg - logger.info(f"Email processed (but no reply text generated) with handle: {email_instructions.handle}") return processed_result logger.info(f"Email processed successfully with handle: {email_instructions.handle}") - return processed_result # Added return for the successful case + return processed_result - except Exception as e: + except Exception as e: # pylint: disable=broad-except error_msg = f"Critical error in email processing: {e!s}" logger.error(error_msg, exc_info=True) - # Construct a DetailedEmailProcessingResult for error cases now_iso = datetime.now().isoformat() + error_email_text = self.report_formatter.format_report( + "I encountered a critical error processing your request.", + format_type="text", + include_signature=True, + ) + error_email_html = self.report_formatter.format_report( + "I encountered a critical error processing your request.", + format_type="html", + include_signature=True, + ) return DetailedEmailProcessingResult( metadata=ProcessingMetadata( processed_at=now_iso, @@ -709,17 +775,9 @@ def process_email( email_sent=EmailSentStatus(status="error", error=error_msg, timestamp=now_iso), ), email_content=EmailContentDetails( - text=self.report_formatter.format_report( - "I encountered a critical error processing your request.", - format_type="text", - include_signature=True, - ), - html=self.report_formatter.format_report( - "I encountered a critical error processing your request.", - format_type="html", - include_signature=True, - ), - enhanced={"text": None, "html": None}, + text=error_email_text, + html=error_email_html, + enhanced={"text": error_email_text, "html": error_email_html}, ), attachments=AttachmentsProcessingResult(processed=[]), calendar_data=None, diff --git a/mxtoai/mcp/__init__.py b/mxtoai/mcp/__init__.py new file mode 100644 index 0000000..a57ee30 --- /dev/null +++ b/mxtoai/mcp/__init__.py @@ -0,0 +1,51 @@ +""" +Custom MCP (Model Context Protocol) client implementation for mxtoai. + +This module provides a synchronous MCP client that works with smolagents and dramatiq workers. +It replaces the dependency on MCPAdapt library with a custom implementation. +""" + +from .client import ( + BaseMCPClient, + StdioMCPClient, + SSEMCPClient, + MCPClientError, + MCPConnectionError, + MCPToolExecutionError, + create_mcp_client +) + +from .tool_adapter import ( + MCPToolAdapter, + sanitize_function_name, + create_smolagents_tools_from_mcp_client +) + +from .tool_collection import ( + CustomMCPToolCollection, + load_mcp_tools_from_config, + load_mcp_tools_from_stdio_params +) + +__all__ = [ + # Client classes and functions + "BaseMCPClient", + "StdioMCPClient", + "SSEMCPClient", + "create_mcp_client", + + # Exceptions + "MCPClientError", + "MCPConnectionError", + "MCPToolExecutionError", + + # Tool adapter + "MCPToolAdapter", + "sanitize_function_name", + "create_smolagents_tools_from_mcp_client", + + # Tool collection + "CustomMCPToolCollection", + "load_mcp_tools_from_config", + "load_mcp_tools_from_stdio_params" +] diff --git a/mxtoai/mcp/client.py b/mxtoai/mcp/client.py new file mode 100644 index 0000000..d50d7a4 --- /dev/null +++ b/mxtoai/mcp/client.py @@ -0,0 +1,270 @@ +import asyncio +import json +import logging +import threading +import time +from abc import ABC, abstractmethod +from concurrent.futures import ThreadPoolExecutor +from contextlib import contextmanager +from typing import Any, Dict, List, Optional, Union + +import mcp.types as mcp_types +from mcp import ClientSession, StdioServerParameters +from mcp.client.session import ClientSession +from mcp.client.stdio import stdio_client +from mcp.client.sse import sse_client +from smolagents import Tool + +logger = logging.getLogger(__name__) + + +class MCPClientError(Exception): + """Base exception for MCP client errors.""" + pass + + +class MCPConnectionError(MCPClientError): + """Raised when MCP connection fails.""" + pass + + +class MCPToolExecutionError(MCPClientError): + """Raised when MCP tool execution fails.""" + pass + + +class BaseMCPClient(ABC): + """Base class for MCP clients.""" + + def __init__(self, server_name: str): + self.server_name = server_name + self.session: Optional[ClientSession] = None + self.is_connected = False + self._tools: List[mcp_types.Tool] = [] + self._executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix=f"mcp-{server_name}") + + @abstractmethod + async def _create_session(self) -> ClientSession: + """Create and return a new MCP session.""" + pass + + def connect(self) -> bool: + """Connect to the MCP server synchronously.""" + try: + future = self._executor.submit(self._run_async_connect) + return future.result(timeout=30.0) # 30 second timeout + except Exception as e: + logger.error(f"Failed to connect to MCP server {self.server_name}: {e}") + return False + + def _run_async_connect(self) -> bool: + """Run async connect in a new event loop.""" + try: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete(self._async_connect()) + finally: + loop.close() + except Exception as e: + logger.error(f"Error in async connect wrapper: {e}") + return False + + async def _async_connect(self) -> bool: + """Async implementation of connect.""" + try: + self.session = await self._create_session() + + # Initialize the session + init_result = await self.session.initialize() + logger.info(f"MCP server {self.server_name} initialized: {init_result}") + + # List available tools + tools_result = await self.session.list_tools() + self._tools = tools_result.tools + logger.info(f"Found {len(self._tools)} tools from MCP server {self.server_name}") + + self.is_connected = True + return True + + except Exception as e: + logger.error(f"Error connecting to MCP server {self.server_name}: {e}") + self.is_connected = False + return False + + def list_tools(self) -> List[mcp_types.Tool]: + """List available tools from the MCP server.""" + if not self.is_connected: + logger.warning(f"MCP server {self.server_name} is not connected") + return [] + return self._tools.copy() + + def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str: + """Execute a tool synchronously and return the result.""" + if not self.is_connected: + raise MCPConnectionError(f"MCP server {self.server_name} is not connected") + + try: + future = self._executor.submit(self._run_async_call_tool, tool_name, arguments) + return future.result(timeout=60.0) # 60 second timeout for tool execution + except Exception as e: + logger.error(f"Error calling tool {tool_name} on server {self.server_name}: {e}") + raise MCPToolExecutionError(f"Tool execution failed: {e}") from e + + def _run_async_call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str: + """Run async tool call in a new event loop.""" + try: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete(self._async_call_tool(tool_name, arguments)) + finally: + loop.close() + except Exception as e: + logger.error(f"Error in async tool call wrapper: {e}") + raise + + async def _async_call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str: + """Async implementation of tool calling.""" + if not self.session: + raise MCPConnectionError("No active session") + + try: + result = await self.session.call_tool(tool_name, arguments) + + if not result.content: + return "" + + # Extract text content from the result + text_parts = [] + for content in result.content: + if isinstance(content, mcp_types.TextContent): + text_parts.append(content.text) + else: + # Handle other content types if needed + text_parts.append(str(content)) + + return "\n".join(text_parts) + + except Exception as e: + logger.error(f"Error in async tool call {tool_name}: {e}") + raise + + def disconnect(self): + """Disconnect from the MCP server.""" + if self.session: + try: + # Run cleanup in executor to handle async cleanup + future = self._executor.submit(self._async_disconnect) + future.result(timeout=10.0) + except Exception as e: + logger.error(f"Error during disconnect from {self.server_name}: {e}") + + self.is_connected = False + self._tools = [] + + # Shutdown the executor + self._executor.shutdown(wait=True, timeout=5.0) + + async def _async_disconnect(self): + """Async implementation of disconnect.""" + if self.session: + try: + # Close the session if it has a close method + if hasattr(self.session, 'close'): + await self.session.close() + except Exception as e: + logger.error(f"Error closing session for {self.server_name}: {e}") + finally: + self.session = None + + +class StdioMCPClient(BaseMCPClient): + """MCP client for stdio-based servers.""" + + def __init__(self, server_name: str, server_params: StdioServerParameters): + super().__init__(server_name) + self.server_params = server_params + + async def _create_session(self) -> ClientSession: + """Create a stdio-based MCP session.""" + try: + read, write = await stdio_client(self.server_params) + session = ClientSession(read, write) + return session + except Exception as e: + logger.error(f"Failed to create stdio session for {self.server_name}: {e}") + raise MCPConnectionError(f"Failed to create stdio session: {e}") from e + + +class SSEMCPClient(BaseMCPClient): + """MCP client for SSE-based servers.""" + + def __init__(self, server_name: str, url: str, **kwargs): + super().__init__(server_name) + self.url = url + self.sse_kwargs = kwargs + + async def _create_session(self) -> ClientSession: + """Create an SSE-based MCP session.""" + try: + read, write = await sse_client(self.url, **self.sse_kwargs) + session = ClientSession(read, write) + return session + except Exception as e: + logger.error(f"Failed to create SSE session for {self.server_name}: {e}") + raise MCPConnectionError(f"Failed to create SSE session: {e}") from e + + +@contextmanager +def create_mcp_client(server_name: str, server_config: Dict[str, Any]): + """Factory function to create appropriate MCP client based on configuration.""" + client = None + + try: + server_type = server_config.get("type") + + if server_type == "stdio": + # Create StdioServerParameters + command = server_config.get("command") + args = server_config.get("args", []) + env = server_config.get("env", {}) + + if not command: + raise ValueError(f"Stdio server {server_name} missing 'command'") + + server_params = StdioServerParameters( + command=command, + args=args, + env=env + ) + client = StdioMCPClient(server_name, server_params) + + elif server_type == "sse": + url = server_config.get("url") + if not url: + raise ValueError(f"SSE server {server_name} missing 'url'") + + extra_params = server_config.get("extra_params", {}) + client = SSEMCPClient(server_name, url, **extra_params) + + else: + raise ValueError(f"Unknown server type: {server_type}") + + # Connect to the server + if not client.connect(): + raise MCPConnectionError(f"Failed to connect to MCP server {server_name}") + + logger.info(f"Successfully connected to MCP server {server_name}") + yield client + + except Exception as e: + logger.error(f"Error with MCP server {server_name}: {e}") + raise + finally: + if client: + try: + client.disconnect() + logger.info(f"Disconnected from MCP server {server_name}") + except Exception as e: + logger.error(f"Error disconnecting from {server_name}: {e}") diff --git a/mxtoai/mcp/tool_adapter.py b/mxtoai/mcp/tool_adapter.py new file mode 100644 index 0000000..9ebcbea --- /dev/null +++ b/mxtoai/mcp/tool_adapter.py @@ -0,0 +1,197 @@ +import json +import keyword +import logging +import re +from typing import Any, Dict, List, Optional + +import jsonref +import mcp.types as mcp_types +from smolagents import Tool + +from .client import BaseMCPClient, MCPToolExecutionError + +logger = logging.getLogger(__name__) + + +def sanitize_function_name(name: str) -> str: + """ + Sanitize function names to be valid Python identifiers. + Based on MCPAdapt's implementation but with improvements. + """ + # Replace dashes and other non-alphanumeric chars with underscores + name = re.sub(r"[^\w]", "_", name) + + # Remove consecutive underscores + name = re.sub(r"_{2,}", "_", name) + + # Ensure it doesn't start with a number + if name and name[0].isdigit(): + name = f"tool_{name}" + + # Check if it's a Python keyword + if keyword.iskeyword(name): + name = f"{name}_tool" + + # Ensure it's not empty + if not name: + name = "unnamed_tool" + + return name + + +class MCPToolAdapter: + """Adapter to convert MCP tools to smolagents Tools.""" + + def __init__(self): + self.adapted_tools: Dict[str, Tool] = {} + + def adapt_mcp_tool(self, mcp_tool: mcp_types.Tool, mcp_client: BaseMCPClient) -> Tool: + """Convert an MCP tool to a smolagents Tool.""" + + sanitized_name = sanitize_function_name(mcp_tool.name) + + class MCPAdaptedTool(Tool): + def __init__(self, original_name: str, mcp_client: BaseMCPClient): + self.original_name = original_name + self.mcp_client = mcp_client + self.name = sanitized_name + self.description = mcp_tool.description or f"MCP tool: {original_name}" + self.inputs = self._convert_input_schema(mcp_tool.inputSchema) + self.output_type = "string" # MCP tools return text content + self.is_initialized = True + self.skip_forward_signature_validation = True + + def _convert_input_schema(self, input_schema: Dict[str, Any]) -> Dict[str, Dict[str, str]]: + """Convert MCP input schema to smolagents format.""" + if not input_schema: + return {} + + # Resolve JSON references + try: + resolved_schema = jsonref.replace_refs(input_schema) + except Exception as e: + logger.warning(f"Failed to resolve JSON refs for {mcp_tool.name}: {e}") + resolved_schema = input_schema + + properties = resolved_schema.get("properties", {}) + converted_inputs = {} + + for param_name, param_info in properties.items(): + param_type = param_info.get("type", "string") + param_description = param_info.get("description", "Parameter for MCP tool") + + # Map JSON schema types to smolagents types + smolagents_type = self._map_json_type_to_smolagents(param_type) + + converted_inputs[param_name] = { + "type": smolagents_type, + "description": param_description + } + + # Handle nullable parameters + if param_info.get("nullable", False): + converted_inputs[param_name]["nullable"] = True + + return converted_inputs + + def _map_json_type_to_smolagents(self, json_type: str) -> str: + """Map JSON schema types to smolagents types.""" + type_mapping = { + "string": "string", + "integer": "integer", + "number": "number", + "boolean": "boolean", + "array": "array", + "object": "object" + } + return type_mapping.get(json_type, "string") + + def forward(self, *args, **kwargs) -> str: + """Execute the MCP tool.""" + try: + # Handle arguments - convert positional args to kwargs if needed + tool_args = {} + + if len(args) == 1 and isinstance(args[0], dict) and not kwargs: + # Single dict argument + tool_args = args[0] + elif args and not kwargs: + # Multiple positional arguments - map to input parameters + input_keys = list(self.inputs.keys()) + for i, arg in enumerate(args): + if i < len(input_keys): + tool_args[input_keys[i]] = arg + else: + # Use keyword arguments + tool_args = kwargs + + # Call the MCP tool through the client + result = self.mcp_client.call_tool(self.original_name, tool_args) + return result + + except Exception as e: + error_msg = f"Error executing MCP tool {self.original_name}: {e}" + logger.error(error_msg) + raise MCPToolExecutionError(error_msg) from e + + # Create and return the adapted tool + adapted_tool = MCPAdaptedTool(mcp_tool.name, mcp_client) + self.adapted_tools[sanitized_name] = adapted_tool + + logger.debug(f"Adapted MCP tool '{mcp_tool.name}' -> '{sanitized_name}'") + return adapted_tool + + def adapt_all_tools(self, mcp_client: BaseMCPClient) -> List[Tool]: + """Adapt all tools from an MCP client.""" + adapted_tools = [] + + try: + mcp_tools = mcp_client.list_tools() + logger.info(f"Adapting {len(mcp_tools)} tools from MCP server {mcp_client.server_name}") + + for mcp_tool in mcp_tools: + try: + adapted_tool = self.adapt_mcp_tool(mcp_tool, mcp_client) + adapted_tools.append(adapted_tool) + logger.debug(f"Successfully adapted tool: {mcp_tool.name}") + except Exception as e: + logger.error(f"Failed to adapt tool {mcp_tool.name}: {e}") + continue + + logger.info(f"Successfully adapted {len(adapted_tools)} tools from {mcp_client.server_name}") + return adapted_tools + + except Exception as e: + logger.error(f"Error adapting tools from {mcp_client.server_name}: {e}") + return [] + + def get_tool_info(self, tool_name: str) -> Optional[Dict[str, Any]]: + """Get information about an adapted tool.""" + if tool_name in self.adapted_tools: + tool = self.adapted_tools[tool_name] + return { + "name": tool.name, + "description": tool.description, + "inputs": tool.inputs, + "output_type": tool.output_type, + "original_name": getattr(tool, "original_name", tool_name) + } + return None + + def list_adapted_tools(self) -> List[str]: + """List all adapted tool names.""" + return list(self.adapted_tools.keys()) + + +def create_smolagents_tools_from_mcp_client(mcp_client: BaseMCPClient) -> List[Tool]: + """ + Convenience function to create smolagents tools from an MCP client. + + Args: + mcp_client: Connected MCP client + + Returns: + List of smolagents Tool instances + """ + adapter = MCPToolAdapter() + return adapter.adapt_all_tools(mcp_client) diff --git a/mxtoai/mcp/tool_collection.py b/mxtoai/mcp/tool_collection.py new file mode 100644 index 0000000..37fb132 --- /dev/null +++ b/mxtoai/mcp/tool_collection.py @@ -0,0 +1,165 @@ +import os +import logging +from contextlib import contextmanager +from typing import Dict, List, Any, Union + +from smolagents import Tool +from mcp import StdioServerParameters + +from .client import create_mcp_client, MCPConnectionError +from .tool_adapter import create_smolagents_tools_from_mcp_client + +logger = logging.getLogger(__name__) + + +class CustomMCPToolCollection: + """ + Custom MCP tool collection that replaces smolagents ToolCollection.from_mcp + """ + + def __init__(self, tools: List[Tool]): + self.tools = tools + + def __len__(self) -> int: + return len(self.tools) + + def __iter__(self): + return iter(self.tools) + + def __getitem__(self, index): + return self.tools[index] + + @classmethod + @contextmanager + def from_mcp_config(cls, mcp_servers_config: Dict[str, Dict[str, Any]]): + """ + Create a tool collection from MCP server configurations. + + Args: + mcp_servers_config: Dictionary of server configurations from mcp.toml + + Yields: + CustomMCPToolCollection: Collection of adapted MCP tools + """ + active_clients = [] + all_tools = [] + + try: + for server_name, server_config in mcp_servers_config.items(): + if not server_config.get("enabled", True): + logger.debug(f"MCP server '{server_name}' is disabled, skipping") + continue + + try: + logger.info(f"Connecting to MCP server '{server_name}'") + + # Create and connect to MCP client + client_ctx = create_mcp_client(server_name, server_config) + mcp_client = client_ctx.__enter__() + active_clients.append((mcp_client, client_ctx)) + + # Adapt tools from this client + adapted_tools = create_smolagents_tools_from_mcp_client(mcp_client) + all_tools.extend(adapted_tools) + + logger.info(f"Successfully loaded {len(adapted_tools)} tools from '{server_name}'") + + except Exception as e: + logger.error(f"Failed to connect to MCP server '{server_name}': {e}") + continue + + logger.info(f"Total MCP tools loaded: {len(all_tools)} from {len(active_clients)} servers") + + # Yield the tool collection + yield cls(all_tools) + + except Exception as e: + logger.error(f"Error in MCP tool collection context manager: {e}") + raise + finally: + # Cleanup all active clients + for mcp_client, client_ctx in active_clients: + try: + client_ctx.__exit__(None, None, None) + logger.debug(f"Cleaned up MCP client: {mcp_client.server_name}") + except Exception as e: + logger.error(f"Error cleaning up MCP client {mcp_client.server_name}: {e}") + logger.debug("MCP tool collection context manager cleanup completed") + + @classmethod + @contextmanager + def from_single_server(cls, server_name: str, server_config: Dict[str, Any]): + """ + Create a tool collection from a single MCP server. + + Args: + server_name: Name of the MCP server + server_config: Server configuration dictionary + + Yields: + CustomMCPToolCollection: Collection of adapted MCP tools + """ + try: + logger.info(f"Connecting to single MCP server '{server_name}'") + + with create_mcp_client(server_name, server_config) as mcp_client: + adapted_tools = create_smolagents_tools_from_mcp_client(mcp_client) + logger.info(f"Successfully loaded {len(adapted_tools)} tools from '{server_name}'") + + yield cls(adapted_tools) + + except Exception as e: + logger.error(f"Failed to connect to MCP server '{server_name}': {e}") + # Yield empty collection on failure + yield cls([]) + + +@contextmanager +def load_mcp_tools_from_config(mcp_servers_config: Dict[str, Dict[str, Any]]) -> List[Tool]: + """ + Convenience function to load MCP tools from configuration. + + Args: + mcp_servers_config: Dictionary of server configurations + + Yields: + List[Tool]: List of smolagents Tool instances + """ + with CustomMCPToolCollection.from_mcp_config(mcp_servers_config) as tool_collection: + yield tool_collection.tools + + +@contextmanager +def load_mcp_tools_from_stdio_params(server_params: Union[StdioServerParameters, Dict[str, Any]], server_name: str = "mcp_server") -> List[Tool]: + """ + Load MCP tools from StdioServerParameters or SSE config dict. + This provides compatibility with the original smolagents interface. + + Args: + server_params: StdioServerParameters or SSE config dict + server_name: Name for the server (for logging) + + Yields: + List[Tool]: List of smolagents Tool instances + """ + # Convert to our config format + if isinstance(server_params, StdioServerParameters): + server_config = { + "type": "stdio", + "command": server_params.command, + "args": server_params.args, + "env": server_params.env, + "enabled": True + } + elif isinstance(server_params, dict): + # Assume it's SSE config + server_config = { + "type": "sse", + "enabled": True, + **server_params + } + else: + raise ValueError(f"Unsupported server_params type: {type(server_params)}") + + with CustomMCPToolCollection.from_single_server(server_name, server_config) as tool_collection: + yield tool_collection.tools diff --git a/mxtoai/run_github_mcp_example.py b/mxtoai/run_github_mcp_example.py new file mode 100644 index 0000000..d385aa1 --- /dev/null +++ b/mxtoai/run_github_mcp_example.py @@ -0,0 +1,101 @@ +import os +import logging +from dotenv import load_dotenv + +from mcp import StdioServerParameters +from smolagents import CodeAgent, AzureOpenAIServerModel # Changed import from LiteLLMModel +from mcpadapt.core import MCPAdapt +from mcpadapt.smolagents_adapter import SmolAgentsAdapter + +# Configure basic logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +def run_github_agent_example(): + """ + Runs a SmolAgent ToolCallingAgent with GitHub MCP tools via MCPAdapt + to list repositories and issue counts for a user, using Azure OpenAI. + """ + # Load environment variables from .env file + load_dotenv() + + github_pat = os.getenv("GITHUB_PERSONAL_ACCESS_TOKEN") + if not github_pat: + logger.error("GITHUB_PERSONAL_ACCESS_TOKEN not found in environment variables.") + logger.error("Please set it in your .env file or environment.") + return + + # Azure OpenAI environment variables + azure_openai_model = os.getenv("AZURE_OPENAI_MODEL", "o3-mini-deep-research") + azure_openai_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT", "https://gauta-m45ryzo6-eastus2.cognitiveservices.azure.com/") + azure_openai_api_key = os.getenv("AZURE_OPENAI_API_KEY", "3MmGJJxhTpDBUYykqxUZoe4UAVMxsBnY6R9rRdpvnlOLhb3vngk6JQQJ99ALACHYHv6XJ3w3AAAAACOG67Gz") + openai_api_version = os.getenv("OPENAI_API_VERSION", "2024-12-01-preview") + + if not all([azure_openai_model, azure_openai_endpoint, azure_openai_api_key, openai_api_version]): + logger.error("One or more Azure OpenAI environment variables are missing.") + logger.error("Please ensure AZURE_OPENAI_MODEL, AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_API_KEY, and OPENAI_API_VERSION are set.") + return + + logger.info(f"Using Azure OpenAI Model: {azure_openai_model} at endpoint: {azure_openai_endpoint}") + + # Configure MCP StdioServerParameters for the GitHub Docker runner + mcp_server_params = StdioServerParameters( + command="docker", + args=[ + "run", + "-i", # Interactive mode to allow stdio communication + "--rm", # Remove container after exit + "-e", f"GITHUB_PERSONAL_ACCESS_TOKEN={github_pat}", # Pass PAT to the container + "ghcr.io/github/github-mcp-server" + ], + env=os.environ.copy() # Pass current environment to the Docker command itself + ) + logger.info(f"Configured MCP server with command: docker and image: ghcr.io/github/github-mcp-server") + + query = "List down all the github repositories of satwikkansal along with number of issues in top 3 of them" + logger.info(f"Agent Query: {query}") + + try: + # Use MCPAdapt to make MCP tools available + with MCPAdapt( + mcp_server_params, + SmolAgentsAdapter(), + ) as mcp_tools: + if not mcp_tools: + logger.error("No tools loaded from MCP. Check MCP server and configuration.") + return + + logger.info(f"Successfully loaded {len(mcp_tools)} tools from GitHub MCP server.") + + # Initialize the AzureOpenAIServerModel + model = AzureOpenAIServerModel( + model_id=azure_openai_model, + azure_endpoint=azure_openai_endpoint, + api_key=azure_openai_api_key, + api_version=openai_api_version + ) + logger.info("AzureOpenAIServerModel initialized.") + + # Initialize ToolCallingAgent + agent = CodeAgent( + tools=mcp_tools, # Tools from MCPAdapt + model=model, + max_steps=10, # Adjust as needed + verbosity_level=1 # 0 for quiet, 1 for tool calls, 2 for thoughts + ) + logger.info("ToolCallingAgent initialized.") + + # Run the agent + logger.info("Running agent...") + result = agent.run(query) + + logger.info("Agent execution finished.") + print("\n--- Agent Result ---") + print(result) + print("--- End of Agent Result ---") + + except Exception as e: + logger.error(f"An error occurred: {e}", exc_info=True) + +if __name__ == "__main__": + run_github_agent_example() \ No newline at end of file diff --git a/mxtoai/test_custom_mcp_client.py b/mxtoai/test_custom_mcp_client.py new file mode 100644 index 0000000..44015c5 --- /dev/null +++ b/mxtoai/test_custom_mcp_client.py @@ -0,0 +1,146 @@ +""" +Test script to demonstrate the custom MCP client implementation. +This replaces the dependency on MCPAdapt library. +""" + +import os +import logging +from dotenv import load_dotenv + +from smolagents import CodeAgent, AzureOpenAIServerModel +from mxtoai.mcp import load_mcp_tools_from_stdio_params +from mcp import StdioServerParameters + +# Configure basic logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +def test_custom_mcp_github_client(): + """ + Test our custom MCP client with GitHub MCP tools. + This demonstrates the replacement of MCPAdapt with our custom implementation. + """ + # Load environment variables + load_dotenv() + + github_pat = os.getenv("GITHUB_PERSONAL_ACCESS_TOKEN") + if not github_pat: + logger.error("GITHUB_PERSONAL_ACCESS_TOKEN not found in environment variables.") + logger.error("Please set it in your .env file or environment.") + return + + # Azure OpenAI environment variables + azure_openai_model = os.getenv("AZURE_OPENAI_MODEL", "o3-mini-deep-research") + azure_openai_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT") + azure_openai_api_key = os.getenv("AZURE_OPENAI_API_KEY") + openai_api_version = os.getenv("OPENAI_API_VERSION", "2024-12-01-preview") + + if not all([azure_openai_model, azure_openai_endpoint, azure_openai_api_key]): + logger.error("One or more Azure OpenAI environment variables are missing.") + logger.error("Please ensure AZURE_OPENAI_MODEL, AZURE_OPENAI_ENDPOINT, and AZURE_OPENAI_API_KEY are set.") + return + + logger.info(f"Using Azure OpenAI Model: {azure_openai_model} at endpoint: {azure_openai_endpoint}") + + # Configure MCP StdioServerParameters for the GitHub server + mcp_server_params = StdioServerParameters( + command="docker", + args=[ + "run", + "-i", # Interactive mode to allow stdio communication + "--rm", # Remove container after exit + "-e", f"GITHUB_PERSONAL_ACCESS_TOKEN={github_pat}", # Pass PAT to the container + "ghcr.io/github/github-mcp-server" + ], + env=os.environ.copy() # Pass current environment to the Docker command itself + ) + logger.info(f"Configured MCP server with command: docker and image: ghcr.io/github/github-mcp-server") + + query = "List the repositories of satwikkansal and show the number of issues for the top 3 repositories" + logger.info(f"Agent Query: {query}") + + try: + # Use our custom MCP implementation + logger.info("Testing custom MCP client implementation...") + + with load_mcp_tools_from_stdio_params(mcp_server_params, "github_server") as mcp_tools: + if not mcp_tools: + logger.error("No tools loaded from MCP. Check MCP server and configuration.") + return + + logger.info(f"Successfully loaded {len(mcp_tools)} tools from GitHub MCP server using custom client.") + + # Log the available tools + for tool in mcp_tools: + logger.info(f"Available tool: {tool.name} - {tool.description}") + + # Initialize the AzureOpenAIServerModel + model = AzureOpenAIServerModel( + model_id=azure_openai_model, + azure_endpoint=azure_openai_endpoint, + api_key=azure_openai_api_key, + api_version=openai_api_version + ) + logger.info("AzureOpenAIServerModel initialized.") + + # Initialize CodeAgent with custom MCP tools + agent = CodeAgent( + tools=mcp_tools, + model=model, + max_steps=10, + verbosity_level=1 + ) + logger.info("CodeAgent initialized with custom MCP tools.") + + # Run the agent + logger.info("Running agent with custom MCP client...") + result = agent.run(query) + + logger.info("Agent execution finished.") + print("\n--- Custom MCP Client Result ---") + print(result) + print("--- End of Custom MCP Client Result ---") + + except Exception as e: + logger.error(f"An error occurred: {e}", exc_info=True) + + +def test_custom_mcp_config_loading(): + """ + Test loading MCP tools from configuration file. + """ + logger.info("Testing MCP configuration loading...") + + # This would use the mcp.toml file if it exists + from mxtoai.agents.email_agent import EmailAgent + + try: + agent = EmailAgent(verbose=True) + mcp_config = agent._get_mcp_servers_config() + + if mcp_config: + logger.info(f"Found {len(mcp_config)} MCP servers in configuration:") + for server_name, config in mcp_config.items(): + logger.info(f"- {server_name}: {config.get('type', 'unknown')} server") + else: + logger.info("No MCP servers found in configuration") + + except Exception as e: + logger.error(f"Error testing MCP config loading: {e}", exc_info=True) + + +if __name__ == "__main__": + print("Testing Custom MCP Client Implementation") + print("=" * 50) + + # Test 1: Direct MCP client usage + print("\n1. Testing direct MCP client with GitHub server...") + test_custom_mcp_github_client() + + print("\n" + "=" * 50) + + # Test 2: Configuration loading + print("\n2. Testing MCP configuration loading...") + test_custom_mcp_config_loading() + + print("\nCustom MCP client tests completed!") diff --git a/poetry.lock b/poetry.lock index 3f3dfd4..b2128c8 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1571,12 +1571,6 @@ files = [ {file = "geventhttpclient-2.3.3-pp310-pypy310_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:447fc2d49a41449684154c12c03ab80176a413e9810d974363a061b71bdbf5a0"}, {file = "geventhttpclient-2.3.3-pp310-pypy310_pp73-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4598c2aa14c866a10a07a2944e2c212f53d0c337ce211336ad68ae8243646216"}, {file = "geventhttpclient-2.3.3-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:69d2bd7ab7f94a6c73325f4b88fd07b0d5f4865672ed7a519f2d896949353761"}, - {file = "geventhttpclient-2.3.3-pp311-pypy311_pp73-macosx_10_15_x86_64.whl", hash = "sha256:45a3f7e3531dd2650f5bb840ed11ce77d0eeb45d0f4c9cd6985eb805e17490e6"}, - {file = "geventhttpclient-2.3.3-pp311-pypy311_pp73-macosx_11_0_arm64.whl", hash = "sha256:73b427e0ea8c2750ee05980196893287bfc9f2a155a282c0f248b472ea7ae3e7"}, - {file = "geventhttpclient-2.3.3-pp311-pypy311_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c2959ef84271e4fa646c3dbaad9e6f2912bf54dcdfefa5999c2ef7c927d92127"}, - {file = "geventhttpclient-2.3.3-pp311-pypy311_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0a800fcb8e53a8f4a7c02b4b403d2325a16cad63a877e57bd603aa50bf0e475b"}, - {file = "geventhttpclient-2.3.3-pp311-pypy311_pp73-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:528321e9aab686435ba09cc6ff90f12e577ace79762f74831ec2265eeab624a8"}, - {file = "geventhttpclient-2.3.3-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:034be44ff3318359e3c678cb5c4ed13efd69aeb558f2981a32bd3e3fb5355700"}, {file = "geventhttpclient-2.3.3-pp39-pypy39_pp73-macosx_10_15_x86_64.whl", hash = "sha256:7a3182f1457599c2901c48a1def37a5bc4762f696077e186e2050fcc60b2fbdf"}, {file = "geventhttpclient-2.3.3-pp39-pypy39_pp73-macosx_11_0_arm64.whl", hash = "sha256:86b489238dc2cbfa53cdd5621e888786a53031d327e0a8509529c7568292b0ce"}, {file = "geventhttpclient-2.3.3-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c4c8aca6ab5da4211870c1d8410c699a9d543e86304aac47e1558ec94d0da97a"}, @@ -1843,6 +1837,18 @@ files = [ aiohttp = ">=3,<4" httpx = ">=0.28.1,<1" +[[package]] +name = "httpx-sse" +version = "0.4.0" +description = "Consume Server-Sent Event (SSE) messages with HTTPX." +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "httpx-sse-0.4.0.tar.gz", hash = "sha256:1e81a3a3070ce322add1d3529ed42eb5f70817f45ed6ec915ab753f961139721"}, + {file = "httpx_sse-0.4.0-py3-none-any.whl", hash = "sha256:f329af6eae57eaa2bdfd962b42524764af68075ea87370a2de920af5341e318f"}, +] + [[package]] name = "huggingface-hub" version = "0.32.0" @@ -2191,6 +2197,18 @@ files = [ {file = "jmespath-1.0.1.tar.gz", hash = "sha256:90261b206d6defd58fdd5e85f478bf633a2901798906be2ad389150c5c60edbe"}, ] +[[package]] +name = "jsonref" +version = "1.1.0" +description = "jsonref is a library for automatic dereferencing of JSON Reference objects for Python." +optional = false +python-versions = ">=3.7" +groups = ["main"] +files = [ + {file = "jsonref-1.1.0-py3-none-any.whl", hash = "sha256:590dc7773df6c21cbf948b5dac07a72a251db28b0238ceecce0a2abfa8ec30a9"}, + {file = "jsonref-1.1.0.tar.gz", hash = "sha256:32fe8e1d85af0fdefbebce950af85590b22b60f9e95443176adbde4e1ecea552"}, +] + [[package]] name = "jsonschema" version = "4.23.0" @@ -2686,6 +2704,60 @@ files = [ {file = "mccabe-0.7.0.tar.gz", hash = "sha256:348e0240c33b60bbdf4e523192ef919f28cb2c3d7d5c7794f74009290f236325"}, ] +[[package]] +name = "mcp" +version = "1.9.2" +description = "Model Context Protocol SDK" +optional = false +python-versions = ">=3.10" +groups = ["main"] +files = [ + {file = "mcp-1.9.2-py3-none-any.whl", hash = "sha256:bc29f7fd67d157fef378f89a4210384f5fecf1168d0feb12d22929818723f978"}, + {file = "mcp-1.9.2.tar.gz", hash = "sha256:3c7651c053d635fd235990a12e84509fe32780cd359a5bbef352e20d4d963c05"}, +] + +[package.dependencies] +anyio = ">=4.5" +httpx = ">=0.27" +httpx-sse = ">=0.4" +pydantic = ">=2.7.2,<3.0.0" +pydantic-settings = ">=2.5.2" +python-multipart = ">=0.0.9" +sse-starlette = ">=1.6.1" +starlette = ">=0.27" +uvicorn = {version = ">=0.23.1", markers = "sys_platform != \"emscripten\""} + +[package.extras] +cli = ["python-dotenv (>=1.0.0)", "typer (>=0.12.4)"] +rich = ["rich (>=13.9.4)"] +ws = ["websockets (>=15.0.1)"] + +[[package]] +name = "mcpadapt" +version = "0.1.9" +description = "Adapt MCP servers to many agentic framework." +optional = false +python-versions = ">=3.10" +groups = ["main"] +files = [ + {file = "mcpadapt-0.1.9-py3-none-any.whl", hash = "sha256:9f2a6ad1155efdf1a43c11e8449ae9258295c4e140c3c6ff672983a8ac8bde33"}, + {file = "mcpadapt-0.1.9.tar.gz", hash = "sha256:03e601c4c083f3f4eb178e6a6bcd157bcb45e25c140ea0895567bab346b67645"}, +] + +[package.dependencies] +jsonref = ">=1.1.0" +mcp = ">=1.9.0" +pydantic = ">=2.10.6" +python-dotenv = ">=1.0.1" + +[package.extras] +crewai = ["crewai (>=0.108.0)"] +google-genai = ["google-genai (>=1.2.0)"] +langchain = ["langchain (>=0.3.14)", "langchain-anthropic (>=0.3.1)", "langgraph (>=0.2.62)"] +llamaindex = ["llama-index (>=0.12.14)"] +smolagents = ["smolagents (>=1.2.2)"] +test = ["crewai (>=0.108.0)", "google-genai (>=1.2.0)", "langchain (>=0.3.14)", "langchain-anthropic (>=0.3.1)", "langgraph (>=0.2.62)", "pytest (>=8.3.4)", "pytest-asyncio (>=0.25.2)", "smolagents (>=1.2.2)"] + [[package]] name = "mdurl" version = "0.1.2" @@ -3891,6 +3963,30 @@ files = [ [package.dependencies] typing-extensions = ">=4.6.0,<4.7.0 || >4.7.0" +[[package]] +name = "pydantic-settings" +version = "2.9.1" +description = "Settings management using Pydantic" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "pydantic_settings-2.9.1-py3-none-any.whl", hash = "sha256:59b4f431b1defb26fe620c71a7d3968a710d719f5f4cdbbdb7926edeb770f6ef"}, + {file = "pydantic_settings-2.9.1.tar.gz", hash = "sha256:c509bf79d27563add44e8446233359004ed85066cd096d8b510f715e6ef5d268"}, +] + +[package.dependencies] +pydantic = ">=2.7.0" +python-dotenv = ">=0.21.0" +typing-inspection = ">=0.4.0" + +[package.extras] +aws-secrets-manager = ["boto3 (>=1.35.0)", "boto3-stubs[secretsmanager]"] +azure-key-vault = ["azure-identity (>=1.16.0)", "azure-keyvault-secrets (>=4.8.0)"] +gcp-secret-manager = ["google-cloud-secret-manager (>=2.23.1)"] +toml = ["tomli (>=2.0.1)"] +yaml = ["pyyaml (>=6.0.1)"] + [[package]] name = "pydub" version = "0.25.1" @@ -4858,6 +4954,8 @@ files = [ [package.dependencies] huggingface-hub = ">=0.31.2" jinja2 = ">=3.1.4" +mcp = {version = "*", optional = true, markers = "extra == \"mcp\""} +mcpadapt = {version = ">=0.0.19", optional = true, markers = "extra == \"mcp\""} pillow = ">=10.0.1" python-dotenv = "*" requests = ">=2.32.3" @@ -4948,6 +5046,27 @@ openai = ["httpx (<0.28)", "openai"] pocketsphinx = ["pocketsphinx"] whisper-local = ["openai-whisper", "soundfile"] +[[package]] +name = "sse-starlette" +version = "2.3.6" +description = "SSE plugin for Starlette" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "sse_starlette-2.3.6-py3-none-any.whl", hash = "sha256:d49a8285b182f6e2228e2609c350398b2ca2c36216c2675d875f81e93548f760"}, + {file = "sse_starlette-2.3.6.tar.gz", hash = "sha256:0382336f7d4ec30160cf9ca0518962905e1b69b72d6c1c995131e0a703b436e3"}, +] + +[package.dependencies] +anyio = ">=4.7.0" + +[package.extras] +daphne = ["daphne (>=4.2.0)"] +examples = ["aiosqlite (>=0.21.0)", "fastapi (>=0.115.12)", "sqlalchemy[asyncio,examples] (>=2.0.41)", "starlette (>=0.41.3)", "uvicorn (>=0.34.0)"] +granian = ["granian (>=2.3.1)"] +uvicorn = ["uvicorn (>=0.34.0)"] + [[package]] name = "stack-data" version = "0.6.3" @@ -5922,4 +6041,4 @@ testing = ["coverage[toml]", "zope.event", "zope.testing"] [metadata] lock-version = "2.1" python-versions = ">=3.12,<3.14" -content-hash = "f20bb21c7868b75d071485498c990b81aef3656cb5d779cbf19b2d0792dd8ff7" +content-hash = "33caa871696f7ad667f3993b71aa8d481342ab8b9df102492070cfab808fe16a" diff --git a/pyproject.toml b/pyproject.toml index 560ecbd..6e4d361 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,7 @@ dependencies = [ "loguru (>=0.7.3,<0.8.0)", "boto3 (>=1.34.69,<2.0.0)", "aiohttp (>=3.11.14,<4.0.0)", - "smolagents (>=1.12.0,<2.0.0)", + "smolagents[mcp] (>=1.16.1,<2.0.0)", "mammoth (>=1.9.0,<2.0.0)", "pdfminer-six (>=20240706,<20240707)", "python-pptx (>=1.0.2,<2.0.0)", From 67e39fe979be351821146b85c9b66423d4171eab Mon Sep 17 00:00:00 2001 From: Giridhar M Date: Thu, 5 Jun 2025 00:52:51 +0530 Subject: [PATCH 2/3] refactoring on mcp, github server test --- mxtoai/agents/email_agent.py | 2 +- mxtoai/{mcp => mcp_support}/__init__.py | 0 mxtoai/{mcp => mcp_support}/client.py | 198 ++++++++++++++---- mxtoai/{mcp => mcp_support}/tool_adapter.py | 2 +- .../{mcp => mcp_support}/tool_collection.py | 0 mxtoai/test_custom_mcp_client.py | 10 +- 6 files changed, 165 insertions(+), 47 deletions(-) rename mxtoai/{mcp => mcp_support}/__init__.py (100%) rename mxtoai/{mcp => mcp_support}/client.py (52%) rename mxtoai/{mcp => mcp_support}/tool_adapter.py (99%) rename mxtoai/{mcp => mcp_support}/tool_collection.py (100%) diff --git a/mxtoai/agents/email_agent.py b/mxtoai/agents/email_agent.py index e04b5fc..753766e 100644 --- a/mxtoai/agents/email_agent.py +++ b/mxtoai/agents/email_agent.py @@ -18,7 +18,7 @@ ) from mxtoai._logging import get_logger -from mxtoai.mcp import load_mcp_tools_from_config +from mxtoai.mcp_support import load_mcp_tools_from_config from mxtoai.models import ProcessingInstructions from mxtoai.prompts.base_prompts import ( LIST_FORMATTING_REQUIREMENTS, diff --git a/mxtoai/mcp/__init__.py b/mxtoai/mcp_support/__init__.py similarity index 100% rename from mxtoai/mcp/__init__.py rename to mxtoai/mcp_support/__init__.py diff --git a/mxtoai/mcp/client.py b/mxtoai/mcp_support/client.py similarity index 52% rename from mxtoai/mcp/client.py rename to mxtoai/mcp_support/client.py index d50d7a4..0a91990 100644 --- a/mxtoai/mcp/client.py +++ b/mxtoai/mcp_support/client.py @@ -1,19 +1,16 @@ import asyncio -import json import logging -import threading -import time from abc import ABC, abstractmethod from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional +# Import from the official MCP library using specific paths to avoid local package conflict import mcp.types as mcp_types -from mcp import ClientSession, StdioServerParameters +from mcp import StdioServerParameters from mcp.client.session import ClientSession from mcp.client.stdio import stdio_client from mcp.client.sse import sse_client -from smolagents import Tool logger = logging.getLogger(__name__) @@ -42,6 +39,7 @@ def __init__(self, server_name: str): self.is_connected = False self._tools: List[mcp_types.Tool] = [] self._executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix=f"mcp-{server_name}") + self._event_loop = None # Store the event loop used for connection @abstractmethod async def _create_session(self) -> ClientSession: @@ -50,45 +48,68 @@ async def _create_session(self) -> ClientSession: def connect(self) -> bool: """Connect to the MCP server synchronously.""" + logger.info(f"Attempting to connect to MCP server {self.server_name}...") try: future = self._executor.submit(self._run_async_connect) - return future.result(timeout=30.0) # 30 second timeout + result = future.result(timeout=30.0) # 30 second timeout + if result: + logger.info(f"Successfully connected to MCP server {self.server_name}") + else: + logger.error(f"Failed to connect to MCP server {self.server_name}") + return result except Exception as e: - logger.error(f"Failed to connect to MCP server {self.server_name}: {e}") + logger.error(f"Failed to connect to MCP server {self.server_name}: {e}", exc_info=True) return False def _run_async_connect(self) -> bool: - """Run async connect in a new event loop.""" + """Run async connect in a new event loop and store it for reuse.""" try: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) + self._event_loop = loop # Store the loop for reuse try: return loop.run_until_complete(self._async_connect()) - finally: - loop.close() + except Exception as e: + logger.error(f"Error in _async_connect: {e}", exc_info=True) + return False + # Note: We don't close the loop here since we need to reuse it for tool calls except Exception as e: - logger.error(f"Error in async connect wrapper: {e}") + logger.error(f"Error in async connect wrapper for {self.server_name}: {e}", exc_info=True) return False async def _async_connect(self) -> bool: """Async implementation of connect.""" try: + logger.info(f"Creating session for MCP server {self.server_name}...") self.session = await self._create_session() + logger.info(f"Session created successfully for {self.server_name}") - # Initialize the session - init_result = await self.session.initialize() + # Initialize the session with timeout + logger.info(f"Initializing MCP server {self.server_name}...") + init_result = await asyncio.wait_for( + self.session.initialize(), + timeout=30.0 # 30 second timeout for initialization + ) logger.info(f"MCP server {self.server_name} initialized: {init_result}") # List available tools - tools_result = await self.session.list_tools() + logger.info(f"Listing tools from MCP server {self.server_name}...") + tools_result = await asyncio.wait_for( + self.session.list_tools(), + timeout=10.0 # 10 second timeout for listing tools + ) self._tools = tools_result.tools logger.info(f"Found {len(self._tools)} tools from MCP server {self.server_name}") self.is_connected = True return True + except asyncio.TimeoutError as e: + logger.error(f"Timeout connecting to MCP server {self.server_name}: {e}") + self.is_connected = False + return False except Exception as e: - logger.error(f"Error connecting to MCP server {self.server_name}: {e}") + logger.error(f"Error connecting to MCP server {self.server_name}: {e}", exc_info=True) self.is_connected = False return False @@ -112,16 +133,16 @@ def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str: raise MCPToolExecutionError(f"Tool execution failed: {e}") from e def _run_async_call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str: - """Run async tool call in a new event loop.""" + """Run async tool call in the stored event loop.""" + if not self._event_loop: + raise MCPConnectionError("No event loop available - connection not established") + try: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - return loop.run_until_complete(self._async_call_tool(tool_name, arguments)) - finally: - loop.close() + # Set the stored event loop as current and run the async call + asyncio.set_event_loop(self._event_loop) + return self._event_loop.run_until_complete(self._async_call_tool(tool_name, arguments)) except Exception as e: - logger.error(f"Error in async tool call wrapper: {e}") + logger.error(f"Error in async tool call wrapper: {e}", exc_info=True) raise async def _async_call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str: @@ -152,20 +173,31 @@ async def _async_call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> s def disconnect(self): """Disconnect from the MCP server.""" - if self.session: + logger.info(f"Disconnecting from MCP server {self.server_name}...") + if self.session and self._event_loop: try: - # Run cleanup in executor to handle async cleanup - future = self._executor.submit(self._async_disconnect) - future.result(timeout=10.0) + # Run cleanup in the stored event loop + asyncio.set_event_loop(self._event_loop) + self._event_loop.run_until_complete(self._async_disconnect()) except Exception as e: - logger.error(f"Error during disconnect from {self.server_name}: {e}") + logger.error(f"Error during disconnect from {self.server_name}: {e}", exc_info=True) + + # Clean up event loop + if self._event_loop: + try: + self._event_loop.close() + except Exception as e: + logger.error(f"Error closing event loop for {self.server_name}: {e}") + finally: + self._event_loop = None self.is_connected = False self._tools = [] # Shutdown the executor - self._executor.shutdown(wait=True, timeout=5.0) - + self._executor.shutdown(wait=True) + logger.info(f"Disconnected from MCP server {self.server_name}") + async def _async_disconnect(self): """Async implementation of disconnect.""" if self.session: @@ -185,16 +217,68 @@ class StdioMCPClient(BaseMCPClient): def __init__(self, server_name: str, server_params: StdioServerParameters): super().__init__(server_name) self.server_params = server_params + self.stdio_context = None + self.session_context = None + self.read_stream = None + self.write_stream = None async def _create_session(self) -> ClientSession: - """Create a stdio-based MCP session.""" + """Create a stdio-based MCP session using proper async context managers.""" try: - read, write = await stdio_client(self.server_params) - session = ClientSession(read, write) + logger.info(f"Creating stdio client for {self.server_name} with command: {self.server_params.command} {' '.join(self.server_params.args)}") + + # Create and enter the stdio context manager + self.stdio_context = stdio_client(self.server_params) + logger.info(f"Stdio context manager created for {self.server_name}") + + logger.info(f"Entering stdio context for {self.server_name}...") + self.read_stream, self.write_stream = await self.stdio_context.__aenter__() + logger.info(f"Stdio streams established for {self.server_name}") + + # Create and enter the session context manager + self.session_context = ClientSession(self.read_stream, self.write_stream) + session = await self.session_context.__aenter__() + logger.info(f"ClientSession context entered for {self.server_name}") return session except Exception as e: - logger.error(f"Failed to create stdio session for {self.server_name}: {e}") + logger.error(f"Failed to create stdio session for {self.server_name}: {e}", exc_info=True) + # Clean up on failure + await self._cleanup_contexts(e) raise MCPConnectionError(f"Failed to create stdio session: {e}") from e + + async def _cleanup_contexts(self, exception=None): + """Clean up both session and stdio contexts.""" + # Clean up session context first + if self.session_context: + try: + if exception: + await self.session_context.__aexit__(type(exception), exception, exception.__traceback__) + else: + await self.session_context.__aexit__(None, None, None) + except Exception as cleanup_error: + logger.error(f"Error closing session context for {self.server_name}: {cleanup_error}") + finally: + self.session_context = None + + # Then clean up stdio context + if self.stdio_context: + try: + if exception: + await self.stdio_context.__aexit__(type(exception), exception, exception.__traceback__) + else: + await self.stdio_context.__aexit__(None, None, None) + except Exception as cleanup_error: + logger.error(f"Error closing stdio context for {self.server_name}: {cleanup_error}") + finally: + self.stdio_context = None + self.read_stream = None + self.write_stream = None + + async def _async_disconnect(self): + """Async implementation of disconnect with proper context cleanup.""" + # Note: We don't need to call session.close() since we're using context managers + self.session = None + await self._cleanup_contexts() class SSEMCPClient(BaseMCPClient): @@ -204,16 +288,54 @@ def __init__(self, server_name: str, url: str, **kwargs): super().__init__(server_name) self.url = url self.sse_kwargs = kwargs + self.sse_context = None + self.read_stream = None + self.write_stream = None async def _create_session(self) -> ClientSession: """Create an SSE-based MCP session.""" try: - read, write = await sse_client(self.url, **self.sse_kwargs) - session = ClientSession(read, write) + # Create the SSE context manager + self.sse_context = sse_client(self.url, **self.sse_kwargs) + # Enter the context manager + self.read_stream, self.write_stream = await self.sse_context.__aenter__() + # Create session with the streams + session = ClientSession(self.read_stream, self.write_stream) return session except Exception as e: logger.error(f"Failed to create SSE session for {self.server_name}: {e}") + # Clean up on failure + if self.sse_context: + try: + await self.sse_context.__aexit__(type(e), e, e.__traceback__) + except: + pass # Ignore cleanup errors + self.sse_context = None raise MCPConnectionError(f"Failed to create SSE session: {e}") from e + + async def _async_disconnect(self): + """Async implementation of disconnect with proper SSE cleanup.""" + # Close the session first + if self.session: + try: + # Close the session if it has a close method + if hasattr(self.session, 'close'): + await self.session.close() + except Exception as e: + logger.error(f"Error closing session for {self.server_name}: {e}") + finally: + self.session = None + + # Clean up SSE context + if self.sse_context: + try: + await self.sse_context.__aexit__(None, None, None) + except Exception as e: + logger.error(f"Error closing SSE context for {self.server_name}: {e}") + finally: + self.sse_context = None + self.read_stream = None + self.write_stream = None @contextmanager @@ -267,4 +389,4 @@ def create_mcp_client(server_name: str, server_config: Dict[str, Any]): client.disconnect() logger.info(f"Disconnected from MCP server {server_name}") except Exception as e: - logger.error(f"Error disconnecting from {server_name}: {e}") + logger.error(f"Error disconnecting from {server_name}: {e}") \ No newline at end of file diff --git a/mxtoai/mcp/tool_adapter.py b/mxtoai/mcp_support/tool_adapter.py similarity index 99% rename from mxtoai/mcp/tool_adapter.py rename to mxtoai/mcp_support/tool_adapter.py index 9ebcbea..2236c81 100644 --- a/mxtoai/mcp/tool_adapter.py +++ b/mxtoai/mcp_support/tool_adapter.py @@ -5,7 +5,7 @@ from typing import Any, Dict, List, Optional import jsonref -import mcp.types as mcp_types +from mcp import types as mcp_types from smolagents import Tool from .client import BaseMCPClient, MCPToolExecutionError diff --git a/mxtoai/mcp/tool_collection.py b/mxtoai/mcp_support/tool_collection.py similarity index 100% rename from mxtoai/mcp/tool_collection.py rename to mxtoai/mcp_support/tool_collection.py diff --git a/mxtoai/test_custom_mcp_client.py b/mxtoai/test_custom_mcp_client.py index 44015c5..29ca6cc 100644 --- a/mxtoai/test_custom_mcp_client.py +++ b/mxtoai/test_custom_mcp_client.py @@ -8,7 +8,7 @@ from dotenv import load_dotenv from smolagents import CodeAgent, AzureOpenAIServerModel -from mxtoai.mcp import load_mcp_tools_from_stdio_params +from mxtoai.mcp_support import load_mcp_tools_from_stdio_params from mcp import StdioServerParameters # Configure basic logging @@ -56,8 +56,8 @@ def test_custom_mcp_github_client(): ) logger.info(f"Configured MCP server with command: docker and image: ghcr.io/github/github-mcp-server") - query = "List the repositories of satwikkansal and show the number of issues for the top 3 repositories" - logger.info(f"Agent Query: {query}") + query = "List the repositories of XLander03 and tell me the number of issues for the top 3 repositories" + logger.info(f"Agent Query: {query}") try: # Use our custom MCP implementation @@ -70,10 +70,6 @@ def test_custom_mcp_github_client(): logger.info(f"Successfully loaded {len(mcp_tools)} tools from GitHub MCP server using custom client.") - # Log the available tools - for tool in mcp_tools: - logger.info(f"Available tool: {tool.name} - {tool.description}") - # Initialize the AzureOpenAIServerModel model = AzureOpenAIServerModel( model_id=azure_openai_model, From 058e1dc464d9fa94a56449bb0b948cd12c7cca3b Mon Sep 17 00:00:00 2001 From: Giridhar M Date: Tue, 10 Jun 2025 13:22:01 +0530 Subject: [PATCH 3/3] further refactor/debugging --- mxtoai/agents/email_agent.py | 5 +- mxtoai/mcp_support/client.py | 34 +++++--- mxtoai/mcp_support/tool_collection.py | 94 ++++++++++++++-------- mxtoai/test_custom_mcp_client.py | 107 +++++++++++++++++++++++++- 4 files changed, 194 insertions(+), 46 deletions(-) diff --git a/mxtoai/agents/email_agent.py b/mxtoai/agents/email_agent.py index 753766e..c4a17d3 100644 --- a/mxtoai/agents/email_agent.py +++ b/mxtoai/agents/email_agent.py @@ -18,7 +18,7 @@ ) from mxtoai._logging import get_logger -from mxtoai.mcp_support import load_mcp_tools_from_config +from mxtoai.mcp_support import load_mcp_tools_from_stdio_params from mxtoai.models import ProcessingInstructions from mxtoai.prompts.base_prompts import ( LIST_FORMATTING_REQUIREMENTS, @@ -662,6 +662,7 @@ def process_email( # Get MCP server configurations mcp_servers_config = self._get_mcp_servers_config() + print(f"MCP servers config: {mcp_servers_config}") all_tools = list(self.base_tools) # Start with base tools agent_steps = [] @@ -677,7 +678,7 @@ def process_email( try: logger.info(f"Loading MCP tools from {len(mcp_servers_config)} configured servers") - with load_mcp_tools_from_config(mcp_servers_config) as mcp_tools: + with load_mcp_tools_from_stdio_params(mcp_servers_config) as mcp_tools: all_tools.extend(mcp_tools) mcp_tools_count = len(mcp_tools) diff --git a/mxtoai/mcp_support/client.py b/mxtoai/mcp_support/client.py index 0a91990..e41e4d2 100644 --- a/mxtoai/mcp_support/client.py +++ b/mxtoai/mcp_support/client.py @@ -4,7 +4,7 @@ from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager from typing import Any, Dict, List, Optional - +import os # Import from the official MCP library using specific paths to avoid local package conflict import mcp.types as mcp_types from mcp import StdioServerParameters @@ -225,6 +225,14 @@ def __init__(self, server_name: str, server_params: StdioServerParameters): async def _create_session(self) -> ClientSession: """Create a stdio-based MCP session using proper async context managers.""" try: + # Debug logging for environment variables + github_token = self.server_params.env.get('GITHUB_PERSONAL_ACCESS_TOKEN', 'NOT_FOUND') + if github_token != 'NOT_FOUND': + logger.info(f"GitHub token found in environment (starts with: {github_token[:20]}...)") + else: + logger.warning(f"GITHUB_PERSONAL_ACCESS_TOKEN not found in MCP server environment!") + logger.debug(f"Available environment keys: {list(self.server_params.env.keys())[:10]}...") + logger.info(f"Creating stdio client for {self.server_name} with command: {self.server_params.command} {' '.join(self.server_params.args)}") # Create and enter the stdio context manager @@ -350,7 +358,11 @@ def create_mcp_client(server_name: str, server_config: Dict[str, Any]): # Create StdioServerParameters command = server_config.get("command") args = server_config.get("args", []) - env = server_config.get("env", {}) + + # Start with current environment and merge in custom env vars + env = os.environ.copy() + custom_env = server_config.get("env", {}) + env.update(custom_env) if not command: raise ValueError(f"Stdio server {server_name} missing 'command'") @@ -361,14 +373,16 @@ def create_mcp_client(server_name: str, server_config: Dict[str, Any]): env=env ) client = StdioMCPClient(server_name, server_params) - - elif server_type == "sse": - url = server_config.get("url") - if not url: - raise ValueError(f"SSE server {server_name} missing 'url'") - - extra_params = server_config.get("extra_params", {}) - client = SSEMCPClient(server_name, url, **extra_params) + """ + elif server_type == "sse": + url = server_config.get("url") + if not url: + raise ValueError(f"SSE server {server_name} missing 'url'") + + extra_params = server_config.get("extra_params", {}) + client = SSEMCPClient(server_name, url, **extra_params) + """ + else: raise ValueError(f"Unknown server type: {server_type}") diff --git a/mxtoai/mcp_support/tool_collection.py b/mxtoai/mcp_support/tool_collection.py index 37fb132..2cdca80 100644 --- a/mxtoai/mcp_support/tool_collection.py +++ b/mxtoai/mcp_support/tool_collection.py @@ -41,7 +41,6 @@ def from_mcp_config(cls, mcp_servers_config: Dict[str, Dict[str, Any]]): Yields: CustomMCPToolCollection: Collection of adapted MCP tools """ - active_clients = [] all_tools = [] try: @@ -53,38 +52,25 @@ def from_mcp_config(cls, mcp_servers_config: Dict[str, Dict[str, Any]]): try: logger.info(f"Connecting to MCP server '{server_name}'") - # Create and connect to MCP client - client_ctx = create_mcp_client(server_name, server_config) - mcp_client = client_ctx.__enter__() - active_clients.append((mcp_client, client_ctx)) - - # Adapt tools from this client - adapted_tools = create_smolagents_tools_from_mcp_client(mcp_client) - all_tools.extend(adapted_tools) - - logger.info(f"Successfully loaded {len(adapted_tools)} tools from '{server_name}'") + # Use the same pattern as from_single_server + with create_mcp_client(server_name, server_config) as mcp_client: + adapted_tools = create_smolagents_tools_from_mcp_client(mcp_client) + all_tools.extend(adapted_tools) + logger.info(f"Successfully loaded {len(adapted_tools)} tools from '{server_name}'") except Exception as e: logger.error(f"Failed to connect to MCP server '{server_name}': {e}") continue - logger.info(f"Total MCP tools loaded: {len(all_tools)} from {len(active_clients)} servers") + logger.info(f"Total MCP tools loaded: {len(all_tools)} from {len(mcp_servers_config)} configured servers") # Yield the tool collection yield cls(all_tools) except Exception as e: logger.error(f"Error in MCP tool collection context manager: {e}") - raise - finally: - # Cleanup all active clients - for mcp_client, client_ctx in active_clients: - try: - client_ctx.__exit__(None, None, None) - logger.debug(f"Cleaned up MCP client: {mcp_client.server_name}") - except Exception as e: - logger.error(f"Error cleaning up MCP client {mcp_client.server_name}: {e}") - logger.debug("MCP tool collection context manager cleanup completed") + # Yield empty collection on failure + yield cls([]) @classmethod @contextmanager @@ -132,17 +118,19 @@ def load_mcp_tools_from_config(mcp_servers_config: Dict[str, Dict[str, Any]]) -> @contextmanager def load_mcp_tools_from_stdio_params(server_params: Union[StdioServerParameters, Dict[str, Any]], server_name: str = "mcp_server") -> List[Tool]: """ - Load MCP tools from StdioServerParameters or SSE config dict. + Load MCP tools from StdioServerParameters or config dict. This provides compatibility with the original smolagents interface. Args: - server_params: StdioServerParameters or SSE config dict - server_name: Name for the server (for logging) + server_params: StdioServerParameters, single server config dict, or multi-server config dict + server_name: Name for the server (for logging, used only for single server) Yields: List[Tool]: List of smolagents Tool instances """ - # Convert to our config format + print(f"Server params in load_mcp_tools_from_stdio_params: {server_params}") + + # Handle StdioServerParameters object if isinstance(server_params, StdioServerParameters): server_config = { "type": "stdio", @@ -151,15 +139,59 @@ def load_mcp_tools_from_stdio_params(server_params: Union[StdioServerParameters, "env": server_params.env, "enabled": True } - elif isinstance(server_params, dict): - # Assume it's SSE config + with CustomMCPToolCollection.from_single_server(server_name, server_config) as tool_collection: + yield tool_collection.tools + + # Handle multi-server config dictionary (like from mcp.toml) + # Check if this is a dict where values are server configs (have 'type', 'command', etc.) + elif isinstance(server_params, dict) and all( + isinstance(v, dict) and 'type' in v + for v in server_params.values() + if isinstance(v, dict) + ): + # This is a multi-server config, loop through each server individually + print(f"Detected multi-server config with {len(server_params)} servers") + all_tools = [] + + for individual_server_name, individual_server_config in server_params.items(): + if not individual_server_config.get("enabled", True): + print(f"Server '{individual_server_name}' is disabled, skipping") + continue + + try: + print(f"Processing server: {individual_server_name}") + with CustomMCPToolCollection.from_single_server(individual_server_name, individual_server_config) as tool_collection: + server_tools = list(tool_collection.tools) + all_tools.extend(server_tools) + print(f"Successfully loaded {len(server_tools)} tools from '{individual_server_name}'") + except Exception as e: + print(f"Failed to load tools from server '{individual_server_name}': {e}") + continue + + print(f"Total tools loaded from all servers: {len(all_tools)}") + yield all_tools + + # Handle single server config dictionary + elif isinstance(server_params, dict) and server_params.get("type") == "stdio": + server_config = { + "type": "stdio", + "command": server_params.get("command"), + "args": server_params.get("args", []), + "env": server_params.get("env"), + "enabled": True + } + with CustomMCPToolCollection.from_single_server(server_name, server_config) as tool_collection: + yield tool_collection.tools + + # Handle single SSE server config dictionary + elif isinstance(server_params, dict) and server_params.get("type") == "sse": server_config = { "type": "sse", "enabled": True, **server_params } + with CustomMCPToolCollection.from_single_server(server_name, server_config) as tool_collection: + yield tool_collection.tools + else: raise ValueError(f"Unsupported server_params type: {type(server_params)}") - - with CustomMCPToolCollection.from_single_server(server_name, server_config) as tool_collection: - yield tool_collection.tools diff --git a/mxtoai/test_custom_mcp_client.py b/mxtoai/test_custom_mcp_client.py index 29ca6cc..fe0d2a0 100644 --- a/mxtoai/test_custom_mcp_client.py +++ b/mxtoai/test_custom_mcp_client.py @@ -39,6 +39,7 @@ def test_custom_mcp_github_client(): logger.error("One or more Azure OpenAI environment variables are missing.") logger.error("Please ensure AZURE_OPENAI_MODEL, AZURE_OPENAI_ENDPOINT, and AZURE_OPENAI_API_KEY are set.") return + logger.info(f"Using Azure OpenAI Model: {azure_openai_model} at endpoint: {azure_openai_endpoint}") @@ -101,6 +102,100 @@ def test_custom_mcp_github_client(): logger.error(f"An error occurred: {e}", exc_info=True) +def test_sequential_thinking_mcp_client(): + """ + Test our custom MCP client with Sequential Thinking MCP server. + This demonstrates the sequential thinking tool for complex problem-solving. + """ + # Load environment variables + load_dotenv() + + # Azure OpenAI environment variables + azure_openai_model = os.getenv("AZURE_OPENAI_MODEL", "o3-mini-deep-research") + azure_openai_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT") + azure_openai_api_key = os.getenv("AZURE_OPENAI_API_KEY") + openai_api_version = os.getenv("OPENAI_API_VERSION", "2024-12-01-preview") + + if not all([azure_openai_model, azure_openai_endpoint, azure_openai_api_key]): + logger.error("One or more Azure OpenAI environment variables are missing.") + logger.error("Please ensure AZURE_OPENAI_MODEL, AZURE_OPENAI_ENDPOINT, and AZURE_OPENAI_API_KEY are set.") + return + + logger.info(f"Using Azure OpenAI Model: {azure_openai_model} at endpoint: {azure_openai_endpoint}") + + # Configure MCP StdioServerParameters for the Sequential Thinking server + mcp_server_params = StdioServerParameters( + command="npx", + args=[ + "-y", + "@modelcontextprotocol/server-sequential-thinking" + ], + env=os.environ.copy() + ) + logger.info("Configured Sequential Thinking MCP server with NPX command") + + # Complex query that would benefit from sequential thinking + query = """ + I need to design a scalable microservices architecture for an e-commerce platform that handles: + - 1 million users + - 10,000 concurrent transactions + - Real-time inventory management + - Payment processing + - Order fulfillment + - Customer notifications + + Break this down step by step, considering trade-offs, potential issues, and revisions as needed. + Use the sequential thinking approach to work through this complex problem. + """ + logger.info(f"Agent Query for Sequential Thinking: {query}") + + try: + # Use our custom MCP implementation with Sequential Thinking server + logger.info("Testing Sequential Thinking MCP client implementation...") + + with load_mcp_tools_from_stdio_params(mcp_server_params, "sequential_thinking_server") as mcp_tools: + if not mcp_tools: + logger.error("No tools loaded from Sequential Thinking MCP server. Check server configuration.") + return + + logger.info(f"Successfully loaded {len(mcp_tools)} tools from Sequential Thinking MCP server.") + + # Log the available tools + for tool in mcp_tools: + logger.info(f"Available tool: {tool.name} - {tool.description}") + logger.info(f"Tool inputs: {tool.inputs}") + + # Initialize the AzureOpenAIServerModel + model = AzureOpenAIServerModel( + model_id=azure_openai_model, + azure_endpoint=azure_openai_endpoint, + api_key=azure_openai_api_key, + api_version=openai_api_version + ) + logger.info("AzureOpenAIServerModel initialized.") + + # Initialize CodeAgent with Sequential Thinking MCP tools + agent = CodeAgent( + tools=mcp_tools, + model=model, + max_steps=15, # More steps for complex thinking process + verbosity_level=2 # Higher verbosity to see the thinking process + ) + logger.info("CodeAgent initialized with Sequential Thinking MCP tools.") + + # Run the agent + logger.info("Running agent with Sequential Thinking MCP client...") + result = agent.run(query) + + logger.info("Agent execution finished.") + print("\n--- Sequential Thinking MCP Client Result ---") + print(result) + print("--- End of Sequential Thinking MCP Client Result ---") + + except Exception as e: + logger.error(f"An error occurred with Sequential Thinking MCP: {e}", exc_info=True) + + def test_custom_mcp_config_loading(): """ Test loading MCP tools from configuration file. @@ -129,14 +224,20 @@ def test_custom_mcp_config_loading(): print("Testing Custom MCP Client Implementation") print("=" * 50) - # Test 1: Direct MCP client usage + # Test 1: Direct MCP client usage with GitHub print("\n1. Testing direct MCP client with GitHub server...") test_custom_mcp_github_client() print("\n" + "=" * 50) - # Test 2: Configuration loading - print("\n2. Testing MCP configuration loading...") + # Test 2: Sequential Thinking MCP server + print("\n2. Testing Sequential Thinking MCP server...") + test_sequential_thinking_mcp_client() + + print("\n" + "=" * 50) + + # Test 3: Configuration loading + print("\n3. Testing MCP configuration loading...") test_custom_mcp_config_loading() print("\nCustom MCP client tests completed!")