diff --git a/examples/MCP/business_data.db b/examples/MCP/business_data.db new file mode 100644 index 0000000..bae95e1 Binary files /dev/null and b/examples/MCP/business_data.db differ diff --git a/examples/MCP/echo_server_stdio.log b/examples/MCP/echo_server_stdio.log new file mode 100644 index 0000000..e69de29 diff --git a/examples/MCP/echo_server_stdio.py b/examples/MCP/echo_server_stdio.py new file mode 100644 index 0000000..b5a25ca --- /dev/null +++ b/examples/MCP/echo_server_stdio.py @@ -0,0 +1,140 @@ +# examples/MCP/echo_server_stdio.py +import asyncio +import logging +import os +import sys # For sys.stdout in logging handlers +from mcp.server import Server, NotificationOptions +from mcp.server.models import InitializationOptions +from mcp.types import Tool, TextContent, Resource +import mcp.server.stdio # For stdio_server context manager + +# --- Logging Setup --- +# Ensure the 'logs' directory exists in the same directory as this script, +# or adjust the log_file_path. +LOG_DIR = os.path.join(os.path.dirname(__file__), "logs") +if not os.path.exists(LOG_DIR): + os.makedirs(LOG_DIR, exist_ok=True) +LOG_FILE_PATH = os.path.join(LOG_DIR, 'echo_server_stdio.log') + +# Configure logging to write to a file and also to stdout (for direct runs) +logging.basicConfig( + level=logging.DEBUG, # Set to DEBUG for verbose output + format="%(asctime)s - ECHO_SRV_STDIO - %(process)d - %(levelname)s - %(filename)s:%(lineno)d - %(message)s", + handlers=[ + logging.FileHandler(LOG_FILE_PATH, mode='w'), # Overwrite log file on each start + logging.StreamHandler(sys.stdout) # Log to console as well + ] +) +logger = logging.getLogger("echo_mcp_stdio_srv") + +# --- Server Configuration --- +ECHO_PREFIX = os.getenv("ECHO_PREFIX", "[EchoStdioDefault]") + +# Create the MCP Server instance +# The server name here is for identification within this process, +# the actual name presented to clients is in InitializationOptions. +server = Server("EchoStdioServerExampleInstance") +logger.debug(f"MCP Server object '{server.name}' created.") + +# --- Tool Definitions --- +@server.list_tools() +async def list_tools_impl() -> list[Tool]: + logger.debug("Handler: list_tools_impl called.") + return [ + Tool( + name="echo", + description="Echoes the input message with a prefix.", + inputSchema={ + "type": "object", + "properties": { + "message": {"type": "string", "description": "The message to echo."} + }, + "required": ["message"], + } + ) + ] + +@server.call_tool() +async def call_tool_impl(name: str, args: dict | None) -> list[TextContent]: + logger.debug(f"Handler: call_tool_impl called with tool_name='{name}', args={args!r}") + if name == "echo": + message_to_echo = args.get("message", "No message provided.") if args else "No message provided." + response_text = f"{ECHO_PREFIX} {message_to_echo}" + logger.info(f"Tool 'echo' responding with: '{response_text}'") + return [TextContent(type="text", text=response_text)] + logger.warning(f"Handler: call_tool_impl received unknown tool name: '{name}'") + # According to MCP spec, should ideally raise an error or return an error in CallToolResult. + # For simplicity, returning empty list for unknown tools. + return [] + +# --- Resource Definitions --- +@server.list_resources() +async def list_resources_impl() -> list[Resource]: + logger.debug("Handler: list_resources_impl called.") + return [ + Resource( + uri="echo://status", + name="Server Status", + description="Provides the current status and prefix of the echo server.", + mimeType="text/plain" + ) + ] + +@server.read_resource() +async def read_resource_impl(uri: str) -> str: + logger.debug(f"Handler: read_resource_impl called with uri='{uri}'") + if str(uri) == "echo://status": + status_text = f"Echo server is operational. Current prefix: {ECHO_PREFIX}" + logger.info(f"Resource 'echo://status' responding with: '{status_text}'") + return status_text + logger.warning(f"Handler: read_resource_impl received unknown resource URI: '{uri}'") + raise ValueError(f"Resource not found: {uri}") # MCP server should handle this and convert to error response + +# --- Main Server Logic --- +async def main_echo(): + """Initializes and runs the MCP stdio server.""" + logger.info(f"Echo MCP stdio Server starting up (prefix: {ECHO_PREFIX}). PID: {os.getpid()}") + try: + # mcp.server.stdio.stdio_server() provides the binary read/write streams for MCP communication + async with mcp.server.stdio.stdio_server() as (binary_reader, binary_writer): + logger.debug("stdio_server context manager entered. Binary reader/writer obtained.") + + # Define initialization options for the client + init_options = InitializationOptions( + server_name="EchoStdioSrvFriendlyName", # Name presented to clients + server_version="0.2.0", # Version of this server + capabilities=server.get_capabilities( + notification_options=NotificationOptions(receive=False, send=False), # Example: no notifications + experimental_capabilities={} # **FIX APPLIED HERE** + ) + ) + logger.debug(f"Server capabilities defined for client: {init_options.capabilities!r}") + + # Start the MCP server loop, listening on the provided streams + await server.run(binary_reader, binary_writer, init_options) + + # This line is typically only reached if server.run() exits gracefully (e.g., client disconnects cleanly) + logger.info("MCP server.run() completed.") + + except asyncio.CancelledError: + logger.info("main_echo task was cancelled (e.g., during shutdown).") + except Exception as e: + # Catch any unexpected errors during server setup or run + logger.critical(f"CRITICAL ERROR in main_echo: {e!r}", exc_info=True) + # Re-raise to ensure the process exits with an error, signaling failure to the parent. + raise + finally: + logger.info("main_echo function finished or exited.") + +if __name__ == "__main__": + # This block executes when the script is run directly (e.g., `python echo_server_stdio.py`) + logger.info(f"Executing echo_server_stdio.py directly. Logging to: {LOG_FILE_PATH}") + try: + asyncio.run(main_echo()) + except KeyboardInterrupt: + logger.info("echo_server_stdio.py terminated by user (KeyboardInterrupt).") + except Exception as e_main: + # Catch errors from asyncio.run(main_echo()) itself or unhandled exceptions from main_echo + logger.critical(f"Unhandled exception in echo_server_stdio.py __main__ block: {e_main!r}", exc_info=True) + finally: + logger.info("echo_server_stdio.py process exiting.") \ No newline at end of file diff --git a/examples/MCP/main.py b/examples/MCP/main.py new file mode 100644 index 0000000..2dbe264 --- /dev/null +++ b/examples/MCP/main.py @@ -0,0 +1,150 @@ +# mcp_chatbot_main.py +import asyncio +import logging +import os +import json +from dotenv import load_dotenv + +from tframex import TFrameXApp, OpenAIChatLLM, Message # TFrameXRuntimeContext is used internally by app.run_context() +# from tframex.util.llms import BaseLLMWrapper # Not directly needed for this script + +load_dotenv() + +from tframex.util.logging import setup_logging +setup_logging(level=logging.INFO) +logging.getLogger("tframex.app").setLevel(logging.INFO) +logging.getLogger("tframex.mcp").setLevel(logging.INFO) +logging.getLogger("tframex.mcp.server_connector").setLevel(logging.INFO) +logging.getLogger("tframex.agents.llm_agent").setLevel(logging.INFO) # Set to DEBUG for tool call details +logging.getLogger("tframex.engine").setLevel(logging.INFO) +logging.getLogger("mcp.client").setLevel(logging.WARNING) + +logger = logging.getLogger("TFrameX_MCP_Chatbot") + +async def main(): + logger.info("--- TFrameX with MCP - Interactive Chatbot Example ---") + + # 1. Configure LLM + llm_api_key = os.getenv("OPENAI_API_KEY") + llm_api_base = os.getenv("OPENAI_API_BASE") + llm_model_name = os.getenv("OPENAI_MODEL_NAME") + + if not all([llm_api_key, llm_api_base, llm_model_name]): + logger.error("LLM configuration missing in .env. Exiting.") + return + + default_llm = OpenAIChatLLM( + model_name=llm_model_name, + api_base_url=llm_api_base, + api_key=llm_api_key + ) + logger.info(f"Using LLM: {llm_model_name} at {llm_api_base}") + + # 2. Initialize TFrameXApp with MCP configuration + app = TFrameXApp( + default_llm=default_llm, + mcp_config_file="servers_config.json" + ) + + # MCP servers will be initialized when app.run_context() is entered, + # or can be done explicitly here if needed before agent registration (not typical). + # await app.initialize_mcp_servers() # Optional explicit call + + # 3. Define a native TFrameX tool + @app.tool(description="Gets the current date and time.") + async def get_current_datetime() -> str: + from datetime import datetime + now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + logger.info(f"NATIVE TOOL: get_current_datetime executed, returning: {now_str}") + return now_str + + # 4. Define the UniversalAssistant agent + @app.agent( + name="UniversalAssistant", + system_prompt=( + "/no_think You are UniversalAssistant, a helpful AI. You have access to tools for time, MCP server introspection, and specific MCP server functionalities.\n" + "MCP server tools are prefixed (e.g., 'math_http_service__add', 'echo_stdio_service__echo').\n" + "Use 'tframex_read_mcp_resource' to read MCP resources if you know the server_alias and resource_uri.\n" + "Carefully choose tools and provide arguments. Available tools: {available_tools_descriptions}" + ), + tools=[ # Native TFrameX tools + MCP meta-tools + "get_current_datetime", + "tframex_list_mcp_servers", + "tframex_list_mcp_resources", + "tframex_read_mcp_resource", + "tframex_list_mcp_prompts", + "tframex_use_mcp_prompt", + ], + mcp_tools_from_servers="ALL" # Agent can use tools from ALL connected MCP servers + ) + async def universal_assistant_placeholder(): + pass # Logic is handled by LLMAgent base class + + # 5. Run the built-in interactive chat with the UniversalAssistant + # TFrameXRuntimeContext is created and managed by app.run_context() + async with app.run_context() as rt: # MCP servers will be initialized here. + logger.info("Starting interactive chat with 'UniversalAssistant'.") + logger.info("MCP Servers should connect now if not already.") + logger.info("Try asking about time, math (e.g., '10 + 5'), or echoing (e.g., 'echo hello from stdio').") + logger.info("You can also ask it to 'list mcp servers' or 'list resources from echo_stdio_service'.") + + await rt.interactive_chat(default_agent_name="UniversalAssistant") + # The interactive_chat method in TFrameXRuntimeContext will handle the + # user input loop and calling the specified agent. + + # Crucial for stdio MCP servers to terminate properly after chat ends + logger.info("Interactive chat finished. Shutting down MCP servers...") + await app.shutdown_mcp_servers() + logger.info("--- TFrameX MCP Chatbot Example Finished ---") + +if __name__ == "__main__": + # Setup dummy/example config files if they don't exist + # Ensure 'echo_server_stdio.py' is in the same directory or adjust paths in 'servers_config.json' + if not os.path.exists("echo_server_stdio.py"): + logger.error("echo_server_stdio.py not found. Please create it or update servers_config.json.") + # For the example to run without manual setup, you might choose to exit or simplify servers_config + + if not os.path.exists("servers_config.json"): + logger.warning("servers_config.json not found. Creating a dummy config with only math_http_service.") + dummy_config = { + "mcpServers": { + "math_http_service": {"type": "streamable-http", "url": "http://localhost:8000/mcp/"} + # Add echo_stdio_service here if echo_server_stdio.py exists + # "echo_stdio_service": { + # "type": "stdio", + # "command": "python", + # "args": ["./echo_server_stdio.py"], # Assuming it's in the same dir + # "env": {"ECHO_PREFIX": "[EchoStdioFromChat]"} + # } + } + } + # Check again if echo_server_stdio.py exists before adding to dummy config + if os.path.exists("echo_server_stdio.py") and "echo_stdio_service" not in dummy_config["mcpServers"] : + dummy_config["mcpServers"]["echo_stdio_service"] = { + "type": "stdio", + "command": "python", # Assuming python is in PATH + "args": ["./echo_server_stdio.py"], # Path relative to where this script is run + "env": {"ECHO_PREFIX": "[EchoStdioExample]"} + } + else: + logger.warning("echo_server_stdio.py not found, dummy config will not include it.") + + with open("servers_config.json", "w") as f: + json.dump(dummy_config, f, indent=2) + logger.info(f"Created/updated dummy servers_config.json: {dummy_config}") + + if not os.path.exists(".env"): + logger.warning(".env file not found. Creating a dummy .env. PLEASE UPDATE IT with your actual LLM details.") + with open(".env", "w") as f: + f.write('OPENAI_API_KEY="your_llm_api_key_here"\n') + f.write('OPENAI_API_BASE="your_llm_api_base_here_e.g.http://localhost:8080/v1"\n') + f.write('OPENAI_MODEL_NAME="your_llm_model_name_here"\n') + + try: + asyncio.run(main()) + except KeyboardInterrupt: + logger.info("Application terminated by user (KeyboardInterrupt).") + except Exception as e: + logger.critical("Unhandled exception in main asyncio run.", exc_info=True) + finally: + logger.info("Application exiting process.") \ No newline at end of file diff --git a/examples/MCP/servers_config.json b/examples/MCP/servers_config.json new file mode 100644 index 0000000..31bc4d9 --- /dev/null +++ b/examples/MCP/servers_config.json @@ -0,0 +1,26 @@ +{ + "mcpServers": { + "math_server": { + "type": "streamable-http", + "url": "http://localhost:8000/mcp/" + }, + "sqlite_bi_tool": { + "type": "stdio", + "command": "uvx", + "args": ["mcp-server-sqlite", "--db-path", "./business_data.db"], + "env": {} + }, + "blender_service": { + "type": "stdio", + "command": "cmd", + "args": [ + "/c", + "uvx", + "blender-mcp" + ], + "env": {}, + "init_step_timeout": 60.0, + "tool_call_timeout": 120.0 + } + } +} \ No newline at end of file diff --git a/examples/blender-mcp/main.py b/examples/blender-mcp/main.py new file mode 100644 index 0000000..61660ac --- /dev/null +++ b/examples/blender-mcp/main.py @@ -0,0 +1,148 @@ +# blender_interactive_control.py +import asyncio +import logging +import os +import json +from dotenv import load_dotenv + +from tframex import TFrameXApp, OpenAIChatLLM, Message +from tframex.util.logging import setup_logging + +# --- Environment and Logging Setup --- +load_dotenv() +setup_logging(level=logging.INFO) # Set TFrameX root logger level + +# Configure specific logger levels +logging.getLogger("tframex.app").setLevel(logging.INFO) +logging.getLogger("tframex.mcp").setLevel(logging.INFO) +logging.getLogger("tframex.mcp.server_connector").setLevel(logging.INFO) # DEBUG for connection details +logging.getLogger("tframex.agents.llm_agent").setLevel(logging.DEBUG) # DEBUG for tool call details and LLM thoughts +logging.getLogger("tframex.engine").setLevel(logging.INFO) +logging.getLogger("mcp.client").setLevel(logging.WARNING) # Reduce noise from underlying MCP library + +logger = logging.getLogger("TFrameX_Blender_Control") + +async def main_blender_chat(): + logger.info("--- TFrameX Blender Interactive Control ---") + + # 1. Configure LLM + llm_api_key = os.getenv("OPENAI_API_KEY") + llm_api_base = os.getenv("OPENAI_API_BASE") + llm_model_name = os.getenv("OPENAI_MODEL_NAME") + + if not all([llm_api_base, llm_model_name]): # API key might be optional for local LLMs + logger.error("LLM API base or model name missing in .env. Please configure them. Exiting.") + return + + default_llm = OpenAIChatLLM( + model_name=llm_model_name, + api_base_url=llm_api_base, + api_key=llm_api_key # Will be None if not set, which is fine for some local LLMs + ) + logger.info(f"Using LLM: {llm_model_name} at {llm_api_base}") + + # 2. Initialize TFrameXApp with MCP configuration + # Ensure 'servers_config.json' is in the same directory or provide the correct path. + app = TFrameXApp( + default_llm=default_llm, + mcp_config_file="servers_config.json" + ) + + # 3. Define the BlenderAssistant agent + # The system prompt guides the LLM on how to interact with Blender via MCP tools. + # It's crucial for the LLM to understand it should use tools prefixed with 'blender_service__' + # and to break down complex tasks. + blender_system_prompt = ( + "/no_think You are a Blender AI assistant. You can control a Blender instance using a set of tools. " + "All Blender-specific tools are prefixed with 'blender_service__'.\n" + "When asked to perform complex tasks (e.g., 'create a scene with a house, a tree, and a car'), " + "break them down into a sequence of individual tool calls. " + "For example, first create the house, then the tree, then the car, then position them. " + "Confirm intermediate steps if helpful.\n" + "Always use the provided tools to interact with Blender. Do not try to write Blender Python code directly " + "unless you are explicitly asked to use a tool like 'blender_service__execute_blender_code' " + "and you are confident in the code's safety and correctness.\n" + "Available tools (prefixed with 'blender_service__'): {available_tools_descriptions}\n" + "REMEMBER: If a tool involves generating assets (like from Hyper3D), it might be a multi-step process: " + "1. Initiate generation (e.g., `blender_service__generate_hyper3d_model_via_text`). This returns a job/task ID." + "2. Poll for completion (e.g., `blender_service__poll_rodin_job_status`) using the ID until it's 'Done' or 'COMPLETED'." + "3. Import the asset (e.g., `blender_service__import_generated_asset`) using the ID." + "Be patient and guide the user through these steps if necessary or perform them sequentially." + ) + + @app.agent( + name="BlenderAssistant", + system_prompt=blender_system_prompt, + # This agent will only have access to tools from the MCP server aliased as "blender_service" + # (as defined in your servers_config.json) + max_tool_iterations=20, + mcp_tools_from_servers=["blender_service"], + strip_think_tags=True # Set to False if you want to see tags from the LLM + ) + async def blender_assistant_placeholder(): + # The logic for an LLMAgent is handled by the TFrameX framework. + # It will use the system_prompt, configured LLM, and available tools. + pass + + # 4. Run the interactive chat with the BlenderAssistant + async with app.run_context() as rt: # MCP servers (Blender) will be initialized here. + logger.info("Starting interactive chat with 'BlenderAssistant'.") + logger.info("Ensure Blender is running and the BlenderMCP addon is active and 'Connected'.") + logger.info("The 'blender_service' MCP server (uvx blender-mcp) will be started by TFrameX.") + logger.info("Try commands like: 'create a cube', 'get scene info', 'generate a 3D model of a cat using Hyper3D and name it fluffy'.") + + await rt.interactive_chat(default_agent_name="BlenderAssistant") + + # 5. Crucial for stdio MCP servers (like blender-mcp) to terminate properly after chat ends + logger.info("Interactive chat finished. Shutting down MCP servers...") + await app.shutdown_mcp_servers() + logger.info("--- TFrameX Blender Interactive Control Finished ---") + +if __name__ == "__main__": + # Ensure servers_config.json exists + if not os.path.exists("servers_config.json"): + logger.warning("'servers_config.json' not found. Creating a default for Blender.") + default_blender_config = { + "mcpServers": { + "blender_service": { # Make sure this alias matches agent config + "type": "stdio", + "command": "uvx", # For Mac/Linux. For Windows, might need "cmd" with "/c", "uvx" + "args": ["blender-mcp"], + "env": {}, + "init_step_timeout": 60.0, + "tool_call_timeout": 180.0, + "resource_read_timeout": 60.0 + } + } + } + # Check if on Windows to apply the cmd /c pattern as a default + if os.name == 'nt': + logger.info("Windows detected, adjusting default Blender MCP command in servers_config.json to use 'cmd /c uvx'.") + default_blender_config["mcpServers"]["blender_service"]["command"] = "cmd" + default_blender_config["mcpServers"]["blender_service"]["args"] = ["/c", "uvx", "blender-mcp"] + + with open("servers_config.json", "w") as f: + json.dump(default_blender_config, f, indent=2) + logger.info(f"Created default servers_config.json for Blender: {json.dumps(default_blender_config)}") + + # Ensure .env file exists for LLM configuration + if not os.path.exists(".env"): + logger.warning(".env file not found. Creating a dummy .env. PLEASE UPDATE IT with your actual LLM details.") + with open(".env", "w") as f: + f.write('# Example for a local Ollama setup (most common for Blender experiments)\n') + f.write('OPENAI_API_BASE="http://localhost:11434/v1"\n') + f.write('OPENAI_API_KEY="ollama" # Ollama doesn\'t strictly require a key\n') + f.write('OPENAI_MODEL_NAME="llama3" # Or any model you have pulled in Ollama, e.g., mistral, codellama\n\n') + f.write('# Example for OpenAI API (if you prefer)\n') + f.write('# OPENAI_API_KEY="your_actual_openai_api_key"\n') + f.write('# OPENAI_API_BASE="https://api.openai.com/v1"\n') + f.write('# OPENAI_MODEL_NAME="gpt-4-turbo-preview"\n') # Or "gpt-3.5-turbo" + + try: + asyncio.run(main_blender_chat()) + except KeyboardInterrupt: + logger.info("Application terminated by user (KeyboardInterrupt).") + except Exception as e: + logger.critical(f"Unhandled exception in main asyncio run: {e}", exc_info=True) + finally: + logger.info("Application exiting process.") \ No newline at end of file diff --git a/examples/blender-mcp/servers_config.json b/examples/blender-mcp/servers_config.json new file mode 100644 index 0000000..13d9dec --- /dev/null +++ b/examples/blender-mcp/servers_config.json @@ -0,0 +1,13 @@ +{ + "mcpServers": { + "blender_service": { + "type": "stdio", + "command": "cmd", + "args": ["/c", "uvx", "blender-mcp"], + "env": {}, + "init_step_timeout": 60.0, + "tool_call_timeout": 180.0, + "resource_read_timeout": 60.0 + } + } +} \ No newline at end of file diff --git a/examples/website_designer/about.html b/examples/website_designer/about.html new file mode 100644 index 0000000..0e35906 --- /dev/null +++ b/examples/website_designer/about.html @@ -0,0 +1,139 @@ + + + + + + + About Us + + +
+
+

Brew Haven

+ + +
+
+
+
+
+

Our Story

+
+
+
+
+
1
+
+

2010

+

Brew Haven was founded in a small corner café, driven by a passion for quality coffee and community.

+
+
+
+
2
+
+

2015

+

Expanded to two locations, introducing our signature cold brew and seasonal specials.

+
+
+
+
3
+
+

2020

+

Launched our sustainability initiative, focusing on eco-friendly packaging and ethical sourcing.

+
+
+
+
4
+
+

2023

+

Reached our 10th location, continuing to serve great coffee and great people.

+
+
+
+
+
+
+
+
+

Our Team

+
+
+ Manager +

Maria Lopez

+

Head Barista

+

With over 10 years of experience, Maria ensures every cup meets our high standards.

+
+
+ Barista +

James Carter

+

Sustainability Officer

+

James leads our eco-friendly initiatives, from composting to renewable energy use.

+
+
+ Designer +

Priya Mehta

+

Marketing Lead

+

Priya brings creativity and energy to our community engagement and events.

+
+
+
+
+
+
+

Sustainability at Brew Haven

+
+
+ + + +

Eco-Friendly Packaging

+

Our compostable cups and biodegradable straws reduce waste and protect the environment.

+
+
+ + + +

Ethical Sourcing

+

We partner with local farmers to ensure fair trade practices and high-quality beans.

+
+
+ + + +

Energy Efficiency

+

We use renewable energy sources and energy-efficient equipment to minimize our carbon footprint.

+
+
+
+
+
+ + + \ No newline at end of file diff --git a/examples/website_designer/contact.html b/examples/website_designer/contact.html new file mode 100644 index 0000000..6dca101 --- /dev/null +++ b/examples/website_designer/contact.html @@ -0,0 +1,95 @@ + + + + + + + Contact Us + + +
+
+

Brew Haven

+ + +
+
+
+
+
+

Contact Us

+
+
+
+ + +
+
+ + +
+
+ + +
+ +
+
+
+
+
+
+

Find Us

+
+
+
+

Brew Haven - Main Location

+

123 Coffee Street, Bean Town, BT 12345

+

Phone: (123) 456-7890

+

Open: Mon-Fri 7am-7pm | Sat-Sun 8am-6pm

+
+
+

Brew Haven - Downtown Branch

+

456 Espresso Avenue, Cup City, CC 67890

+

Phone: (987) 654-3210

+

Open: Mon-Fri 6am-8pm | Sat-Sun 9am-5pm

+
+
+
+
+
+
+ + + \ No newline at end of file diff --git a/examples/website_designer/index.html b/examples/website_designer/index.html new file mode 100644 index 0000000..f31fcc6 --- /dev/null +++ b/examples/website_designer/index.html @@ -0,0 +1,102 @@ + + + + + + + Homepage + + +
+
+

Brew Haven

+ + +
+
+
+
+
+
+

Welcome to Brew Haven

+

Sip, savor, and stay awhile

+ Explore Menu +
+
+
+
+

Featured Coffee Items

+
+
+ Espresso +
+

Espresso

+

A concentrated coffee made by forcing hot water through finely ground coffee beans.

+
+
+
+ Latte +
+

Latte

+

A creamy coffee drink with steamed milk and a small layer of foam.

+
+
+
+ Cappuccino +
+

Cappuccino

+

A coffee drink with equal parts espresso, steamed milk, and foam.

+
+
+
+
+
+
+

What Our Customers Say

+
+
+

"Brew Haven is my favorite spot to relax and enjoy a great cup of coffee. The atmosphere is warm and welcoming."

+

- Sarah M.

+
+
+

"The menu has something for everyone. I love their seasonal specials and the friendly staff."

+

- James T.

+
+
+

"From the moment you walk in, you feel at home. The coffee is always fresh and the service is top-notch."

+

- Emily R.

+
+
+
+
+
+ + + \ No newline at end of file diff --git a/examples/website_designer/menu.html b/examples/website_designer/menu.html new file mode 100644 index 0000000..4a7544f --- /dev/null +++ b/examples/website_designer/menu.html @@ -0,0 +1,110 @@ + + + + + + + Menu + + +
+
+

Brew Haven

+ + +
+
+
+
+
+

Our Menu

+
+
+

Espresso

+

A concentrated coffee made by forcing hot water through finely ground coffee beans.

+

$2.50

+
+
+

Latte

+

A creamy coffee drink with steamed milk and a small layer of foam.

+

$4.00

+
+
+

Cappuccino

+

A coffee drink with equal parts espresso, steamed milk, and foam.

+

$4.50

+
+
+

Cold Brew

+

A smooth, chilled coffee drink made by steeping coffee grounds in cold water.

+

$3.50

+
+
+

Mocha

+

A rich coffee drink with chocolate and steamed milk.

+

$5.00

+
+
+

Americano

+

A coffee drink made with espresso and hot water.

+

$3.00

+
+
+
+
+
+
+

Pastries & Sweets

+
+
+ Croissant +

Croissant

+

Flaky and buttery, perfect with your morning coffee.

+

$2.00

+
+
+ Brownie +

Brownie

+

Fudgy and rich, made with premium chocolate.

+

$2.50

+
+
+ Muffin +

Muffin

+

Freshly baked with a variety of flavors.

+

$1.50

+
+
+
+
+
+ + + \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index e0ba24f..c00db0a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ dependencies = [ # If you want all examples to work out-of-the-box, include it here. # Otherwise, consider making it an optional dependency. "aiohttp>=3.0.0", + "mcp", ] [project.optional-dependencies] diff --git a/tframex/__init__.py b/tframex/__init__.py index a55ea52..e32b8dd 100644 --- a/tframex/__init__.py +++ b/tframex/__init__.py @@ -1,69 +1,56 @@ -# tframex/__init__.py (NEW VERSION) +# tframex/__init__.py import os -from dotenv import load_dotenv - -# It's generally better for applications to handle dotenv loading. +# from dotenv import load_dotenv # Application should handle this # load_dotenv() -# Import from subpackages from .agents import BaseAgent, LLMAgent, ToolAgent -from .app import TFrameXApp, TFrameXRuntimeContext # TFrameXRuntimeContext is now defined in app.py +from .app import TFrameXApp, TFrameXRuntimeContext from .flows import FlowContext, Flow -from .models.primitives import ( # Note the .models path - FunctionCall, - Message, - MessageChunk, - ToolCall, - ToolDefinition, - ToolParameterProperty, - ToolParameters, +from .models.primitives import ( + FunctionCall, Message, MessageChunk, ToolCall, + ToolDefinition, ToolParameterProperty, ToolParameters, ) -from .patterns import ( # Note the .patterns path - BasePattern, - DiscussionPattern, - ParallelPattern, - RouterPattern, - SequentialPattern, +from .patterns import ( + BasePattern, DiscussionPattern, ParallelPattern, + RouterPattern, SequentialPattern, ) -from .util.engine import Engine # Engine is now directly under util +from .util.engine import Engine from .util.llms import BaseLLMWrapper, OpenAIChatLLM from .util.memory import BaseMemoryStore, InMemoryMemoryStore from .util.tools import Tool -# setup_logging might be called by TFrameXApp itself, not typically part of public API to re-export -# from .util.logging import setup_logging +from .util.logging import setup_logging # Make setup_logging available if users want to call it +# --- MCP Integration Exports --- +from .mcp import ( + MCPManager, + MCPConnectedServer, + MCPConfigError, + load_mcp_server_configs, + # Meta tools are usually not called directly by library users, but by agents. + # No harm in exporting if they might be useful for direct use in advanced scenarios. + tframex_list_mcp_servers, + tframex_list_mcp_resources, + tframex_read_mcp_resource, + tframex_list_mcp_prompts, + tframex_use_mcp_prompt, +) __all__ = [ - # Agents - "BaseAgent", - "LLMAgent", - "ToolAgent", - # App & Runtime - "TFrameXApp", - "TFrameXRuntimeContext", # This was TFrameXRuntimeContext in the old __init__ - "Engine", # New public component - # Flows - "FlowContext", - "Flow", - # Models (Primitives) - "FunctionCall", - "Message", - "MessageChunk", - "ToolCall", - "ToolDefinition", - "ToolParameterProperty", - "ToolParameters", - # Patterns - "BasePattern", - "DiscussionPattern", - "ParallelPattern", - "RouterPattern", - "SequentialPattern", - # Utilities - "BaseLLMWrapper", - "OpenAIChatLLM", - "BaseMemoryStore", - "InMemoryMemoryStore", + "BaseAgent", "LLMAgent", "ToolAgent", + "TFrameXApp", "TFrameXRuntimeContext", + "Engine", + "FlowContext", "Flow", + "FunctionCall", "Message", "MessageChunk", "ToolCall", + "ToolDefinition", "ToolParameterProperty", "ToolParameters", + "BasePattern", "DiscussionPattern", "ParallelPattern", + "RouterPattern", "SequentialPattern", + "BaseLLMWrapper", "OpenAIChatLLM", + "BaseMemoryStore", "InMemoryMemoryStore", "Tool", - # "setup_logging", # Decide if this should be public + "setup_logging", # Export logging setup + + # MCP Integration + "MCPManager", "MCPConnectedServer", "MCPConfigError", "load_mcp_server_configs", + "tframex_list_mcp_servers", "tframex_list_mcp_resources", + "tframex_read_mcp_resource", "tframex_list_mcp_prompts", "tframex_use_mcp_prompt", ] \ No newline at end of file diff --git a/tframex/agents/llm_agent.py b/tframex/agents/llm_agent.py index deedcce..7875387 100644 --- a/tframex/agents/llm_agent.py +++ b/tframex/agents/llm_agent.py @@ -1,225 +1,263 @@ +# tframex/agents/llm_agent.py import json import logging from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union -from tframex.models.primitives import FunctionCall, Message, ToolCall +from tframex.models.primitives import Message, ToolCall, ToolDefinition from tframex.util.llms import BaseLLMWrapper from tframex.util.memory import BaseMemoryStore -from tframex.util.tools import Tool, ToolDefinition +from tframex.util.tools import Tool # Assuming ToolDefinition doesn't need FunctionCall directly here from .base import BaseAgent if TYPE_CHECKING: from tframex.util.engine import Engine -logger = logging.getLogger(__name__) - +logger = logging.getLogger("tframex.agents.llm_agent") class LLMAgent(BaseAgent): - """ - An agent that uses an LLM to decide actions, potentially using tools, callable sub-agents, and memory. - """ - def __init__( self, agent_id: str, - llm: BaseLLMWrapper, # This is the actual resolved LLM the agent will use + llm: BaseLLMWrapper, engine: "Engine", description: Optional[str] = None, - tools: Optional[List[Tool]] = None, + tools: Optional[List[Tool]] = None, # Native TFrameX tools for this agent memory: Optional[BaseMemoryStore] = None, system_prompt_template: Optional[str] = "You are a helpful assistant.", callable_agent_definitions: Optional[List[ToolDefinition]] = None, - strip_think_tags: bool = False, # NEW: Passed to BaseAgent + strip_think_tags: bool = False, max_tool_iterations: int = 5, - **config: Any, + mcp_tools_from_servers_config: Optional[Union[List[str], str]] = None, + **config: Any, # Catches other configurations, including those intended for BaseAgent ): + """ + Initializes an LLMAgent. + + Args: + agent_id: Unique identifier for the agent instance. + llm: The LLM wrapper instance this agent will use. + engine: The TFrameX Engine instance for utility functions like tool execution. + description: An optional description of the agent's purpose. + tools: A list of native TFrameX Tool objects available to this agent. + memory: An optional memory store instance for conversation history. + system_prompt_template: The template for the system prompt. + callable_agent_definitions: Tool definitions for other agents this agent can call. + strip_think_tags: Whether to remove ... tags from the final LLM response. + max_tool_iterations: Maximum number of tool execution loops before forcing a final response. + mcp_tools_from_servers_config: Configuration for which MCP server tools this agent can use + ('ALL' or a list of server aliases). + **config: Additional configuration parameters passed to the BaseAgent. + """ + + # Prepare arguments for the BaseAgent constructor. + # `config` collects any arguments passed to LLMAgent's __init__ that are not explicitly named, + # plus any additional key-value pairs from the agent's registration in TFrameXApp. + # BaseAgent's __init__ expects specific named arguments like 'description', 'system_prompt_template', etc. + # If these are in `config`, Python's **kwargs will map them. If they are also LLMAgent's + # named parameters, those values will be used when constructing `base_agent_super_kwargs`. + + base_agent_super_kwargs = config.copy() # Start with other configs + # Ensure explicit parameters for BaseAgent are correctly passed or overridden + base_agent_super_kwargs['description'] = description + base_agent_super_kwargs['system_prompt_template'] = system_prompt_template + base_agent_super_kwargs['strip_think_tags'] = strip_think_tags + # Note: 'llm', 'tools', 'memory', 'callable_agent_definitions' are passed as named args to super() + super().__init__( - agent_id, - description=description, - llm=llm, # Pass the resolved LLM - tools=tools, - memory=memory, - system_prompt_template=system_prompt_template, - callable_agent_definitions=callable_agent_definitions, - strip_think_tags=strip_think_tags, # NEW: Pass to BaseAgent - **config, # Pass other configs from decorator + agent_id=agent_id, + llm=llm, # LLM for this agent, also passed to BaseAgent + tools=tools, # Native TFrameX tools, resolved by Engine and passed directly + memory=memory, # Resolved by Engine and passed directly + callable_agent_definitions=callable_agent_definitions, # Resolved by Engine + **base_agent_super_kwargs # Passes description, system_prompt_template, etc., to BaseAgent ) + + # LLMAgent specific checks and assignments + if not self.llm: # self.llm is set by BaseAgent from the 'llm' param passed to super() + raise ValueError(f"LLMAgent '{self.agent_id}' requires an LLM instance.") + if not engine: # This 'engine' is the one passed to LLMAgent's __init__ + raise ValueError(f"LLMAgent '{self.agent_id}' requires an Engine instance.") + self.engine = engine + # Use the direct parameter values for these LLMAgent-specific configurations self.max_tool_iterations = max_tool_iterations - if not self.llm: # self.llm is inherited from BaseAgent and set by super() - raise ValueError(f"LLMAgent '{self.agent_id}' requires an LLM instance.") + self.mcp_tools_from_servers_config = mcp_tools_from_servers_config + + + def _get_all_available_tool_definitions_for_llm(self) -> List[ToolDefinition]: + """ + Aggregates all tool definitions available to this agent for the LLM. + This includes native tools, callable agents, and MCP tools. + """ + all_defs: List[ToolDefinition] = [] + + # 1. Native TFrameX tools assigned to this agent (from self.tools, set by BaseAgent) + if self.tools: + for tool_obj in self.tools.values(): + all_defs.append(tool_obj.get_openai_tool_definition()) + + # 2. Callable sub-agents (from self.callable_agent_definitions, set by BaseAgent) + if self.callable_agent_definitions: + all_defs.extend(self.callable_agent_definitions) + + # 3. MCP tools from specified servers (via MCPManager in the engine's runtime context) + mcp_manager = self.engine._runtime_context.mcp_manager + if mcp_manager: + logger.debug(f"Agent '{self.agent_id}': MCP Manager present: True. " + f"Configured servers: {list(mcp_manager.servers.keys())}") + mcp_server_tools: List[ToolDefinition] = [] + + if self.mcp_tools_from_servers_config == "ALL": + mcp_server_tools = mcp_manager.get_all_mcp_tools_for_llm() + if mcp_server_tools: + logger.debug(f"Agent '{self.agent_id}': Processing tools from ALL MCP servers " + f"via mcp_manager.get_all_mcp_tools_for_llm(). Tool count: {len(mcp_server_tools)}") + + elif isinstance(self.mcp_tools_from_servers_config, list): + for server_alias in self.mcp_tools_from_servers_config: + server = mcp_manager.get_server(server_alias) + if server and server.is_initialized and server.tools: + logger.debug(f"Agent '{self.agent_id}': Processing tools from INITIALIZED MCP server " + f"'{server_alias}'. Tool count: {len(server.tools)}") + for mcp_tool_info in server.tools: # mcp_tool_info is ActualMCPTool + parameters = mcp_tool_info.inputSchema if mcp_tool_info.inputSchema else {"type": "object", "properties": {}} + prefixed_name = f"{server_alias}__{mcp_tool_info.name}" + mcp_server_tools.append(ToolDefinition(type="function", function={ + "name": prefixed_name, + "description": mcp_tool_info.description or f"Tool '{mcp_tool_info.name}' from MCP server '{server_alias}'.", + "parameters": parameters + })) + logger.debug(f"Agent '{self.agent_id}': Added MCP tool '{prefixed_name}' from server '{server_alias}'") + elif server: + logger.debug(f"Agent '{self.agent_id}': MCP Server '{server_alias}' found but not suitable for tool extraction. " + f"Initialized: {server.is_initialized}, Tools defined: {bool(server.tools)}") + else: # server is None + logger.debug(f"Agent '{self.agent_id}': MCP server alias '{server_alias}' not found in manager's active server list.") + all_defs.extend(mcp_server_tools) + + # 4. TFrameX MCP Meta-tools: If configured in @app.agent(tools=[...]), they are included in step 1. + + # Ensure uniqueness of tool definitions by name + final_defs_dict: Dict[str, ToolDefinition] = {td.function["name"]: td for td in all_defs} + unique_defs = list(final_defs_dict.values()) + + logger.debug(f"Agent '{self.agent_id}' resolved {len(unique_defs)} unique tool definitions for LLM: " + f"{[d.function['name'] for d in unique_defs]}") + return unique_defs + async def run(self, input_message: Union[str, Message], **kwargs: Any) -> Message: + """ + Main execution logic for the LLMAgent. + Handles interaction with the LLM, tool execution, and memory management. + """ if isinstance(input_message, str): current_user_message = Message(role="user", content=input_message) - else: + elif isinstance(input_message, Message): current_user_message = input_message + else: + logger.error(f"LLMAgent '{self.agent_id}' received invalid input_message type: {type(input_message)}. " + "Expected str or Message.") + return Message(role="assistant", content="Error: Invalid input type provided to agent.") await self.memory.add_message(current_user_message) - template_vars_for_prompt = kwargs.get("template_vars", {}) for iteration_count in range(self.max_tool_iterations + 1): - history = await self.memory.get_history( - limit=self.config.get("history_limit", 10) - ) + history = await self.memory.get_history(limit=self.config.get("history_limit", 10)) messages_for_llm: List[Message] = [] - system_message = self._render_system_prompt(**template_vars_for_prompt) - if system_message: - messages_for_llm.append(system_message) - + system_message_rendered = self._render_system_prompt(**template_vars_for_prompt) # From BaseAgent + if system_message_rendered: + messages_for_llm.append(system_message_rendered) messages_for_llm.extend(history) - llm_call_kwargs = {k: v for k, v in kwargs.items() if k != "template_vars"} - - all_tool_definitions_for_llm: List[Dict[str, Any]] = [] - if self.tools: - all_tool_definitions_for_llm.extend( - [ - tool.get_openai_tool_definition().model_dump() - for tool in self.tools.values() - ] - ) - - if self.callable_agent_definitions: - all_tool_definitions_for_llm.extend( - [cad.model_dump() for cad in self.callable_agent_definitions] - ) + llm_call_kwargs_from_run = {k: v for k, v in kwargs.items() if k != "template_vars"} + all_tool_definitions_for_llm = self._get_all_available_tool_definitions_for_llm() + llm_api_params: Dict[str, Any] = {"stream": False, **llm_call_kwargs_from_run} if all_tool_definitions_for_llm: - llm_call_kwargs["tools"] = all_tool_definitions_for_llm - llm_call_kwargs["tool_choice"] = self.config.get("tool_choice", "auto") - - logger.debug( - f"Agent '{self.agent_id}' (LLM: {self.llm.model_id}) calling LLM " # UPDATED LOG - f"(Iter {iteration_count+1}/{self.max_tool_iterations+1}). " - f"History depth: {len(history)}. " - f"Regular Tools defined: {len(self.tools)}. " - f"Callable Agents as Tools defined: {len(self.callable_agent_definitions)}." + llm_api_params["tools"] = [td.model_dump(exclude_none=True) for td in all_tool_definitions_for_llm] + llm_api_params["tool_choice"] = self.config.get("tool_choice", "auto") + + logger.info( + f"Agent '{self.agent_id}' (LLM: {self.llm.model_id}) calling LLM. " + f"Iteration: {iteration_count+1}/{self.max_tool_iterations + 1}. " + f"Tool definitions for LLM: {len(all_tool_definitions_for_llm)}." ) + # For very detailed debugging, one might enable these: + # logger.debug(f"Messages for LLM call: {[msg.model_dump(exclude_none=True) for msg in messages_for_llm]}") + # logger.debug(f"LLM API parameters (excluding messages): {llm_api_params}") assistant_response_message = await self.llm.chat_completion( - messages_for_llm, stream=False, **llm_call_kwargs + messages_for_llm, **llm_api_params ) await self.memory.add_message(assistant_response_message) - if ( - not assistant_response_message.tool_calls - or iteration_count >= self.max_tool_iterations - ): - logger.info( - f"Agent '{self.agent_id}' concluding with textual response. Iter: {iteration_count+1}." - ) - # NEW: Post-process before returning - return self._post_process_llm_response(assistant_response_message) + if not assistant_response_message.tool_calls or iteration_count >= self.max_tool_iterations: + if iteration_count >= self.max_tool_iterations and assistant_response_message.tool_calls: + logger.warning(f"Agent '{self.agent_id}' reached max_tool_iterations ({self.max_tool_iterations}) " + "but LLM still requested tool calls. Ignoring further tool calls.") + logger.info(f"Agent '{self.agent_id}' concluding processing. Iteration: {iteration_count+1}.") + return self._post_process_llm_response(assistant_response_message) # From BaseAgent - logger.info( - f"Agent '{self.agent_id}' LLM requested tool_calls: {len(assistant_response_message.tool_calls)}" - ) + logger.info(f"Agent '{self.agent_id}': LLM requested {len(assistant_response_message.tool_calls)} tool_calls.") tool_response_messages: List[Message] = [] - for tool_call in assistant_response_message.tool_calls: - tool_name = tool_call.function.name - tool_call_id = tool_call.id - tool_args_json_str = tool_call.function.arguments - - is_sub_agent_call = any( - cad.function["name"] == tool_name - for cad in self.callable_agent_definitions + for tool_call_obj in assistant_response_message.tool_calls: # tool_call_obj is models.primitives.ToolCall + tool_name_for_llm = tool_call_obj.function.name + tool_call_id = tool_call_obj.id + tool_args_json_str = tool_call_obj.function.arguments + + logger.info(f"Agent '{self.agent_id}': Dispatching tool call for '{tool_name_for_llm}' (ID: {tool_call_id}) via Engine.") + logger.debug(f"Agent '{self.agent_id}': Tool arguments for '{tool_name_for_llm}': {tool_args_json_str}") + + tool_result_content_or_error_dict = await self.engine.execute_tool_by_llm_definition( + tool_name_for_llm, tool_args_json_str ) - if tool_name in self.tools: - logger.info( - f"Agent '{self.agent_id}' executing regular tool '{tool_name}'." - ) - tool_to_execute = self.tools[tool_name] - tool_result_content = str( - await tool_to_execute.execute(tool_args_json_str) - ) - elif is_sub_agent_call: - logger.info( - f"Agent '{self.agent_id}' calling sub-agent '{tool_name}' as a tool." - ) + tool_result_content_str = "" + if isinstance(tool_result_content_or_error_dict, dict) and "error" in tool_result_content_or_error_dict: + tool_result_content_str = str(tool_result_content_or_error_dict["error"]) # Ensure it's a string + logger.warning(f"Agent '{self.agent_id}': Tool '{tool_name_for_llm}' execution by engine resulted in error: {tool_result_content_str}") + elif isinstance(tool_result_content_or_error_dict, (str, int, float, bool)): + tool_result_content_str = str(tool_result_content_or_error_dict) + elif tool_result_content_or_error_dict is None: + tool_result_content_str = "[Tool executed successfully but returned no specific content]" + else: # Attempt to serialize other types try: - sub_agent_args = json.loads(tool_args_json_str) - sub_agent_input_content = sub_agent_args.get( - "input_message", "" - ) - if not sub_agent_input_content and isinstance( - sub_agent_args, str - ): - sub_agent_input_content = sub_agent_args - elif not sub_agent_input_content and tool_args_json_str: - sub_agent_input_content = tool_args_json_str - - sub_agent_input_msg = Message( - role="user", content=str(sub_agent_input_content) - ) - sub_agent_call_kwargs = { - "template_vars": template_vars_for_prompt - } - - sub_agent_response = await self.engine.call_agent( - agent_name=tool_name, - input_message=sub_agent_input_msg, - **sub_agent_call_kwargs, - ) - # Sub-agent's response is already post-processed by its own .run() method if it's an LLMAgent - tool_result_content = ( - sub_agent_response.content - or "[Sub-agent produced no content]" - ) - if sub_agent_response.tool_calls: - tc_summary = json.dumps( - [ - tc.model_dump(exclude_none=True) - for tc in sub_agent_response.tool_calls - ] - ) - tool_result_content += f"\n[Sub-agent '{tool_name}' also made tool calls: {tc_summary}]" - logger.debug( - f"Agent '{self.agent_id}': Sub-agent '{tool_name}' response: {tool_result_content[:200]}" - ) - - except json.JSONDecodeError as e: - logger.error( - f"Agent '{self.agent_id}': Invalid JSON arguments for sub-agent '{tool_name}': {tool_args_json_str}. Error: {e}" - ) - tool_result_content = f"Error: Invalid JSON arguments for sub-agent '{tool_name}'." - except Exception as e: - logger.error( - f"Agent '{self.agent_id}': Error calling sub-agent '{tool_name}': {e}", - exc_info=True, - ) - tool_result_content = f"Error: Failed to execute sub-agent '{tool_name}': {str(e)}" - else: - logger.warning( - f"Agent '{self.agent_id}': LLM requested unknown tool/agent '{tool_name}'. Available tools: {list(self.tools.keys())}, Callable agents: {[cad.function['name'] for cad in self.callable_agent_definitions]}" - ) - tool_result_content = ( - f"Error: Tool or agent '{tool_name}' is not available to me." - ) + tool_result_content_str = json.dumps(tool_result_content_or_error_dict) + except TypeError: + tool_result_content_str = (f"[Tool '{tool_name_for_llm}' returned complex non-JSON-serializable data " + f"of type: {type(tool_result_content_or_error_dict)}]") + logger.warning(f"Agent '{self.agent_id}': {tool_result_content_str}") + + logger.debug(f"Agent '{self.agent_id}': Received result for tool '{tool_name_for_llm}' (ID: {tool_call_id}): " + f"'{tool_result_content_str[:200]}{'...' if len(tool_result_content_str) > 200 else ''}'") tool_response_messages.append( Message( role="tool", tool_call_id=tool_call_id, - name=tool_name, - content=tool_result_content, + name=tool_name_for_llm, + content=tool_result_content_str, ) ) for tr_msg in tool_response_messages: await self.memory.add_message(tr_msg) - logger.error( - f"Agent '{self.agent_id}' exceeded max_tool_iterations ({self.max_tool_iterations}). Returning error message." - ) - # NEW: Post-process error message too - error_message = Message( - role="assistant", - content=f"Error: Agent {self.agent_id} exceeded maximum tool processing iterations.", - ) - return self._post_process_llm_response(error_message) + # This part is reached if loop completes due to max_tool_iterations without a break + logger.error(f"Agent '{self.agent_id}' exceeded maximum tool iterations ({self.max_tool_iterations}). " + "Returning last assistant message or error.") + # The last assistant_response_message might still contain tool_calls. + # We should ideally return a message indicating the iteration limit was hit. + error_msg_content = (f"Error: Agent '{self.agent_id}' exceeded maximum tool processing iterations " + f"({self.max_tool_iterations}). The last planned action involved tools: " + f"{[tc.function.name for tc in assistant_response_message.tool_calls if tc.function]}" + if assistant_response_message.tool_calls else + f"Error: Agent '{self.agent_id}' exceeded maximum tool processing iterations.") + + final_error_message = Message(role="assistant", content=error_msg_content) + return self._post_process_llm_response(final_error_message) \ No newline at end of file diff --git a/tframex/app.py b/tframex/app.py index 60e20ab..71b7b54 100644 --- a/tframex/app.py +++ b/tframex/app.py @@ -1,89 +1,151 @@ +# tframex/app.py import asyncio import inspect import logging -import os # Already present, used in __init__ +import os from typing import Any, Callable, Coroutine, Dict, List, Optional, Type, Union from .agents.base import BaseAgent from .agents.llm_agent import LLMAgent -from .agents.tool_agent import ToolAgent from .flows.flow_context import FlowContext from .flows.flows import Flow -from .models.primitives import Message, MessageChunk # MessageChunk already present -from .patterns.patterns import BasePattern +from .models.primitives import Message, MessageChunk, ToolDefinition, ToolParameters, ToolParameterProperty from .util.engine import Engine from .util.llms import BaseLLMWrapper -from .util.logging.logging_config import setup_logging +from .util.logging.logging_config import setup_logging # Assuming this is your setup from .util.memory import BaseMemoryStore, InMemoryMemoryStore -from .util.tools import ( # ToolDefinition already present - Tool, - ToolDefinition, - ToolParameterProperty, - ToolParameters, +from .util.tools import Tool + +# --- MCP Integration Imports --- +from .mcp.manager import MCPManager +from .mcp.meta_tools import ( + tframex_list_mcp_servers, + tframex_list_mcp_resources, + tframex_read_mcp_resource, + tframex_list_mcp_prompts, + tframex_use_mcp_prompt ) -# Setup colored logging -setup_logging(level=logging.INFO) - +# Call setup_logging here or ensure it's called by the application using the library +# setup_logging(level=logging.INFO) # Example: if you want TFrameX to set a default logger = logging.getLogger("tframex.app") - class TFrameXApp: def __init__( self, default_llm: Optional[BaseLLMWrapper] = None, - default_memory_store_factory: Callable[ - [], BaseMemoryStore - ] = InMemoryMemoryStore, + default_memory_store_factory: Callable[[], BaseMemoryStore] = InMemoryMemoryStore, + mcp_config_file: Optional[str] = "servers_config.json", ): - self._tools: Dict[str, Tool] = {} - self._agents: Dict[str, Dict[str, Any]] = ( - {} - ) # Stores registration info: ref, config + self._agents: Dict[str, Dict[str, Any]] = {} self._flows: Dict[str, Flow] = {} self.default_llm = default_llm self.default_memory_store_factory = default_memory_store_factory - if not default_llm and not os.getenv("TFRAMEX_ALLOW_NO_DEFAULT_LLM"): + self._mcp_manager: Optional[MCPManager] = None + if mcp_config_file: + try: + self._mcp_manager = MCPManager(mcp_config_file_path=mcp_config_file) + logger.info(f"MCPManager initialized with config file: {mcp_config_file}") + except Exception as e: + logger.error(f"Failed to initialize MCPManager with {mcp_config_file}: {e}", exc_info=True) + else: + logger.info("No MCP config file provided, MCP integration will be limited or disabled.") + + if not default_llm and not os.getenv("TFRAMEX_ALLOW_NO_DEFAULT_LLM"): # Example env var check logger.warning( - "TFrameXApp initialized without a default LLM. LLM must be provided to run_context or agent if they don't have an override." + "TFrameXApp initialized without a default LLM. LLM must be provided to run_context or agent for LLM-based agents to function." ) + + self._register_mcp_meta_tools() + + def _register_mcp_meta_tools(self): + """Registers client-side functions for interacting with MCP as native TFrameX tools.""" + if not self._mcp_manager: # Only register if MCP manager is set up + logger.debug("MCP meta-tools registration skipped: MCPManager not available.") + return + + logger.debug("Registering MCP meta-tools...") + + self.tool(name="tframex_list_mcp_servers", description="Lists all configured and initialized MCP servers.")(tframex_list_mcp_servers) + + list_resources_params = ToolParameters( + properties={ + "server_alias": ToolParameterProperty(type="string", description="Optional. The alias of a specific MCP server. If omitted, lists from all connected servers.") + }, required=[] ) + self.tool(name="tframex_list_mcp_resources", description="Lists available resources from MCP server(s).", parameters_schema=list_resources_params)(tframex_list_mcp_resources) + + read_resource_params = ToolParameters( + properties={ + "server_alias": ToolParameterProperty(type="string", description="The alias of the MCP server."), + "resource_uri": ToolParameterProperty(type="string", description="The URI of the MCP resource to read.") + }, required=["server_alias", "resource_uri"]) + self.tool(name="tframex_read_mcp_resource", description="Reads content from a specific MCP resource.", parameters_schema=read_resource_params)(tframex_read_mcp_resource) + + list_prompts_params = ToolParameters( + properties={ "server_alias": ToolParameterProperty(type="string", description="Optional. Alias of a specific MCP server.") }, required=[]) + self.tool(name="tframex_list_mcp_prompts", description="Lists available prompts from MCP server(s).", parameters_schema=list_prompts_params)(tframex_list_mcp_prompts) + + use_prompt_params = ToolParameters( + properties={ + "server_alias": ToolParameterProperty(type="string", description="Alias of the MCP server."), + "prompt_name": ToolParameterProperty(type="string", description="Name of the MCP prompt."), + "arguments": ToolParameterProperty(type="object", description="Key-value arguments for the prompt.") + }, required=["server_alias", "prompt_name", "arguments"]) + self.tool(name="tframex_use_mcp_prompt", description="Uses a server-defined MCP prompt, returning its messages for the LLM.", parameters_schema=use_prompt_params)(tframex_use_mcp_prompt) + logger.info("MCP meta-tools registered.") + + async def initialize_mcp_servers(self): + if self._mcp_manager: + logger.info("TFrameXApp: Explicitly initializing MCP servers...") + await self._mcp_manager.initialize_servers() + else: + logger.info("TFrameXApp: No MCP manager to initialize servers from.") + + async def shutdown_mcp_servers(self): + if self._mcp_manager: + logger.info("TFrameXApp: Initiating shutdown of MCP servers...") + await self._mcp_manager.shutdown_all_servers() + else: + logger.info("TFrameXApp: No MCP manager to shutdown.") def tool( self, name: Optional[str] = None, description: Optional[str] = None, - parameters_schema: Optional[Dict[str, Dict[str, Any]]] = None, + parameters_schema: Optional[ToolParameters] = None, ) -> Callable: def decorator(func: Callable[..., Any]) -> Callable: tool_name = name or func.__name__ if tool_name in self._tools: - raise ValueError(f"Tool '{tool_name}' already registered.") - - parsed_params_schema = None - if parameters_schema: + if tool_name.startswith("tframex_"): # Allow re-registration for meta-tools + logger.debug(f"Re-registering MCP meta-tool: '{tool_name}'") + else: + raise ValueError(f"Tool '{tool_name}' already registered.") + + parsed_params_obj = None + if isinstance(parameters_schema, ToolParameters): + parsed_params_obj = parameters_schema + elif isinstance(parameters_schema, dict): props = { p_name: ToolParameterProperty(**p_def) for p_name, p_def in parameters_schema.get("properties", {}).items() } required_list = parameters_schema.get("required") - if not isinstance(required_list, list): - required_list = None - parsed_params_schema = ToolParameters( - properties=props, required=required_list + parsed_params_obj = ToolParameters( + properties=props, required=required_list if isinstance(required_list, list) else [] ) - + self._tools[tool_name] = Tool( name=tool_name, func=func, description=description, - parameters_schema=parsed_params_schema, + parameters_schema=parsed_params_obj, ) logger.debug(f"Registered tool: '{tool_name}'") return func - return decorator def agent( @@ -92,11 +154,12 @@ def agent( description: Optional[str] = None, callable_agents: Optional[List[str]] = None, system_prompt: Optional[str] = None, - tools: Optional[List[str]] = None, - llm: Optional[BaseLLMWrapper] = None, # This is the agent-specific LLM override + tools: Optional[List[str]] = None, + mcp_tools_from_servers: Optional[Union[List[str], str]] = None, + llm: Optional[BaseLLMWrapper] = None, memory_store: Optional[BaseMemoryStore] = None, agent_class: type[BaseAgent] = LLMAgent, - strip_think_tags: bool = True, # NEW: Agent-specific setting + strip_think_tags: bool = True, **agent_config: Any, ) -> Callable: def decorator(target: Union[Callable, type]) -> Union[Callable, type]: @@ -108,39 +171,29 @@ def decorator(target: Union[Callable, type]) -> Union[Callable, type]: "description": description, "callable_agent_names": callable_agents or [], "system_prompt_template": system_prompt, - "tool_names": tools or [], - "llm_instance_override": llm, # CHANGED key from llm_override + "native_tool_names": tools or [], + "mcp_tools_from_servers_config": mcp_tools_from_servers, # Store the user's preference + "llm_instance_override": llm, "memory_override": memory_store, "agent_class_ref": agent_class, - "strip_think_tags": strip_think_tags, # NEW: Storing the setting + "strip_think_tags": strip_think_tags, **agent_config, } - - is_class_based_agent = inspect.isclass(target) and issubclass( - target, BaseAgent - ) - agent_class_to_log = ( - target.__name__ if is_class_based_agent else agent_class.__name__ - ) + + is_class_based_agent = inspect.isclass(target) and issubclass(target, BaseAgent) + agent_class_to_log = target.__name__ if is_class_based_agent else agent_class.__name__ self._agents[agent_name] = { - "type": ( - "custom_class_agent" - if is_class_based_agent - else "framework_managed_agent" - ), - "ref": target, + "type": "custom_class_agent" if is_class_based_agent else "framework_managed_agent", + "ref": target, # For class agents, this is the class itself "config": final_config, } - logger.debug( - f"Registered agent: '{agent_name}' (Description: '{description}', " - f"Class: {agent_class_to_log}, " - f"LLM Override: {llm.model_id if llm else 'None'}, " - f"Callable Agents: {callable_agents or []}, " - f"Strip Think Tags: {strip_think_tags})" + logger.info( + f"Registered agent: '{agent_name}' (Class: {agent_class_to_log}). " + f"Native Tools: {final_config['native_tool_names']}. " + f"MCP Tools from config: {mcp_tools_from_servers or 'None'}." ) return target - return decorator def get_tool(self, name: str) -> Optional[Tool]: @@ -150,13 +203,9 @@ def register_flow(self, flow_instance: Flow) -> None: if not isinstance(flow_instance, Flow): raise TypeError("Can only register an instance of the Flow class.") if flow_instance.flow_name in self._flows: - raise ValueError( - f"Flow with name '{flow_instance.flow_name}' already registered." - ) + raise ValueError(f"Flow with name '{flow_instance.flow_name}' already registered.") self._flows[flow_instance.flow_name] = flow_instance - logger.debug( - f"Registered flow: '{flow_instance.flow_name}' with {len(flow_instance.steps)} steps." - ) + logger.debug(f"Registered flow: '{flow_instance.flow_name}' with {len(flow_instance.steps)} steps.") def get_flow(self, name: str) -> Optional[Flow]: return self._flows.get(name) @@ -164,55 +213,45 @@ def get_flow(self, name: str) -> Optional[Flow]: def run_context( self, llm_override: Optional[BaseLLMWrapper] = None, - context_memory_override: Optional[BaseMemoryStore] = None, + # context_memory_override: Optional[BaseMemoryStore] = None, # Not used by context currently ) -> "TFrameXRuntimeContext": ctx_llm = llm_override or self.default_llm - ctx_memory = context_memory_override - return TFrameXRuntimeContext(self, llm=ctx_llm, context_memory=ctx_memory) - + ctx_mcp_manager = self._mcp_manager + return TFrameXRuntimeContext(self, llm=ctx_llm, mcp_manager=ctx_mcp_manager) class TFrameXRuntimeContext: def __init__( self, app: TFrameXApp, - llm: Optional[BaseLLMWrapper], - context_memory: Optional[BaseMemoryStore] = None, + llm: Optional[BaseLLMWrapper], # LLM for this context + mcp_manager: Optional[MCPManager] = None, ): self._app = app - self.llm = llm # Context-level LLM - self.context_memory = context_memory - self.engine = Engine(app, self) + self.llm = llm + self.mcp_manager = mcp_manager + self.engine = Engine(app, self) # Engine gets this context instance async def __aenter__(self) -> "TFrameXRuntimeContext": - llm_id = self.llm.model_id if self.llm else "None" - ctx_mem_type = ( - type(self.context_memory).__name__ if self.context_memory else "None" - ) - logger.info( - f"TFrameXRuntimeContext entered. LLM: {llm_id}. Context Memory: {ctx_mem_type}" - ) + llm_id = self.llm.model_id if self.llm else "None (App Default/Agent Specific)" + logger.info(f"TFrameXRuntimeContext entered. Context LLM: {llm_id}.") + if self.mcp_manager: + if not self.mcp_manager.servers or \ + any(not s.is_initialized for s in self.mcp_manager.servers.values()): + logger.info("RuntimeContext: Initializing MCP servers on enter (first time or some not ready)...") + await self.mcp_manager.initialize_servers() + else: + logger.debug("RuntimeContext: All configured MCP servers already initialized by app or previous context.") return self async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: - # Close own LLM if it has a close method (some LLM wrappers might need this) - if ( - self.llm - and hasattr(self.llm, "close") - and inspect.iscoroutinefunction(self.llm.close) - ): - await self.llm.close() - logger.info( - f"TFrameXRuntimeContext exited. Context LLM client closed for {self.llm.model_id}." - ) - elif self.llm: - logger.info( - f"TFrameXRuntimeContext exited. Context LLM {self.llm.model_id} did not require async close." - ) - else: - logger.info("TFrameXRuntimeContext exited. No context LLM client to close.") - # Note: Agent-specific LLMs are not closed here; they are managed by the agent or assumed to share lifetime with context/app LLM. - # If agent LLMs need explicit closing, agent's __del__ or a specific cleanup phase would handle it. - + if (self.llm and hasattr(self.llm, "close") and inspect.iscoroutinefunction(self.llm.close)): + try: + await self.llm.close() + logger.info(f"Context LLM client for {self.llm.model_id} closed.") + except Exception as e: + logger.error(f"Error closing context LLM client for {self.llm.model_id}: {e}") + logger.info("TFrameXRuntimeContext exited.") + async def run_flow( self, flow_ref: Union[str, Flow], @@ -231,117 +270,57 @@ async def run_flow( raise TypeError("flow_ref must be a flow name (str) or a Flow instance.") return await flow_to_run.execute( initial_input, - self.engine, + self.engine, initial_shared_data=initial_shared_data, flow_template_vars=flow_template_vars, ) - async def interactive_chat(self, default_flow_name: Optional[str] = None) -> None: - print("\n--- TFrameX Interactive Flow Chat ---") + async def call_agent( + self, agent_name: str, input_message: Union[str, Message], **kwargs: Any + ) -> Message: + return await self.engine.call_agent(agent_name, input_message, **kwargs) - flow_to_use: Optional[Flow] = None - if default_flow_name: - flow_to_use = self._app.get_flow(default_flow_name) - if flow_to_use: - print(f"Default flow: '{default_flow_name}'") - else: - print(f"Warning: Default flow '{default_flow_name}' not found.") + async def interactive_chat(self, default_agent_name: Optional[str] = None) -> None: + print("\n--- TFrameX Interactive Agent Chat (with MCP) ---") - if not flow_to_use: - if not self._app._flows: - print( - "No flows registered in the application. Exiting interactive chat." - ) + agent_to_chat_with = default_agent_name + if not agent_to_chat_with: + if not self._app._agents: + print("No agents registered. Exiting interactive chat.") return - - print("Available flows:") - flow_names_list = list(self._app._flows.keys()) - for i, name in enumerate(flow_names_list): - print(f" {i + 1}. {name}") - + print("Available agents:") + agent_names_list = list(self._app._agents.keys()) + for i, name in enumerate(agent_names_list): print(f" {i + 1}. {name}") while True: try: - choice_str = await asyncio.to_thread( - input, - "Select a flow to chat with (number or name, or 'exit'): ", - ) - if choice_str.lower() == "exit": - return - - selected_flow_name: Optional[str] = None - if choice_str.isdigit(): - choice_idx = int(choice_str) - 1 - if 0 <= choice_idx < len(flow_names_list): - selected_flow_name = flow_names_list[choice_idx] - else: - if choice_str in self._app._flows: - selected_flow_name = choice_str - - if selected_flow_name: - flow_to_use = self._app.get_flow(selected_flow_name) + choice = await asyncio.to_thread(input, "Select an agent (number or name): ") + if choice.isdigit() and 0 <= int(choice) - 1 < len(agent_names_list): + agent_to_chat_with = agent_names_list[int(choice) - 1] break - else: - print("Invalid selection. Please try again.") - except ValueError: - print("Invalid input. Please enter a number or flow name.") - except KeyboardInterrupt: - print("\nExiting.") - return - - if not flow_to_use: - print("No flow selected. Exiting.") - return - - print(f"\n--- Chatting with Flow: '{flow_to_use.flow_name}' ---") - print(f"Description: {flow_to_use.description or 'No description'}") - print("Type 'exit' or 'quit' to end this chat session.") - + elif choice in agent_names_list: + agent_to_chat_with = choice + break + else: print("Invalid selection.") + except ValueError: print("Invalid input.") + except KeyboardInterrupt: print("\nExiting."); return + + if not agent_to_chat_with: print("No agent selected. Exiting."); return + + print(f"\nChatting with Agent: '{agent_to_chat_with}'. Type 'exit' or 'quit'.") + # For a persistent chat, you'd use the agent's memory. + # This loop makes independent calls for simplicity. while True: try: user_input_str = await asyncio.to_thread(input, "\nYou: ") - if user_input_str.lower() in ["exit", "quit"]: - break - if not user_input_str.strip(): - continue - - initial_message = Message(role="user", content=user_input_str) - - logger.info( - f"CLI: Running flow '{flow_to_use.flow_name}' with input: '{user_input_str}'" - ) - final_flow_context: FlowContext = await self.run_flow( - flow_to_use, initial_message - ) - - final_output_message = final_flow_context.current_message - - print(f"\nFlow Output ({final_output_message.role}):") - if final_output_message.content: - print(f" Content: {final_output_message.content}") - - if final_output_message.tool_calls: - print( - f" Final Message Tool Calls (Unprocessed by Flow): {final_output_message.tool_calls}" - ) - - if final_flow_context.shared_data: - print(" Flow Shared Data (at end of execution):") - for key, value in final_flow_context.shared_data.items(): - value_str = str(value) - print( - f" {key}: {value_str[:200]}{'...' if len(value_str) > 200 else ''}" - ) - - except KeyboardInterrupt: - print("\nExiting chat session.") - break - except Exception as e: - print( - f"Error during interactive chat with flow '{flow_to_use.flow_name}': {e}" - ) - logger.error( - f"Error in interactive_chat with flow '{flow_to_use.flow_name}'", - exc_info=True, - ) - - print(f"--- Ended chat with Flow: '{flow_to_use.flow_name}' ---") + if user_input_str.lower() in ["exit", "quit"]: break + if not user_input_str.strip(): continue + + print("Assistant: Thinking...") + response_message = await self.call_agent(agent_to_chat_with, Message(role="user", content=user_input_str)) + + print(f"\nAssistant ({response_message.role}):") + if response_message.content: print(f" Content: {response_message.content}") + if response_message.tool_calls: print(f" Final Tool Calls (should be processed by agent): {response_message.tool_calls}") + except KeyboardInterrupt: break + except Exception as e: print(f"Error: {e}"); logger.error("Chat Error", exc_info=True) + print(f"--- Ended chat with Agent: '{agent_to_chat_with}' ---") \ No newline at end of file diff --git a/tframex/mcp/__init__.py b/tframex/mcp/__init__.py new file mode 100644 index 0000000..816298f --- /dev/null +++ b/tframex/mcp/__init__.py @@ -0,0 +1,29 @@ +# tframex/mcp/__init__.py +import logging + +from .config import MCPConfigError, load_mcp_server_configs +from .manager import MCPManager +from .server_connector import MCPConnectedServer +from .meta_tools import ( + tframex_list_mcp_servers, + tframex_list_mcp_resources, + tframex_read_mcp_resource, + tframex_list_mcp_prompts, + tframex_use_mcp_prompt +) + +logger = logging.getLogger("tframex.mcp") + +__all__ = [ + "MCPManager", + "MCPConnectedServer", + "MCPConfigError", + "load_mcp_server_configs", + "tframex_list_mcp_servers", + "tframex_list_mcp_resources", + "tframex_read_mcp_resource", + "tframex_list_mcp_prompts", + "tframex_use_mcp_prompt", +] + +logger.debug("TFrameX MCP Integration package initialized.") \ No newline at end of file diff --git a/tframex/mcp/config.py b/tframex/mcp/config.py new file mode 100644 index 0000000..80e403a --- /dev/null +++ b/tframex/mcp/config.py @@ -0,0 +1,61 @@ +# tframex/mcp/config.py +import json +import logging +from typing import Dict, Any, Optional + +logger = logging.getLogger("tframex.mcp.config") + +class MCPConfigError(Exception): + pass + +def load_mcp_server_configs(file_path: str = "servers_config.json") -> Dict[str, Dict[str, Any]]: + """ + Loads MCP server configurations from a JSON file. + Returns a dictionary where keys are server aliases and values are their configs. + """ + logger.info(f"Loading MCP server configurations from '{file_path}'...") + try: + with open(file_path, "r") as f: + data = json.load(f) + except FileNotFoundError: + logger.warning(f"MCP server configuration file '{file_path}' not found. No MCP servers will be loaded.") + return {} + except json.JSONDecodeError as e: + logger.error(f"Error decoding JSON from MCP server config '{file_path}': {e}") + raise MCPConfigError(f"Invalid JSON in {file_path}: {e}") from e + + if not isinstance(data, dict) or "mcpServers" not in data: + raise MCPConfigError(f"'mcpServers' key not found or not a dictionary in '{file_path}'.") + + server_configs = data["mcpServers"] + if not isinstance(server_configs, dict): + raise MCPConfigError(f"'mcpServers' value must be a dictionary in '{file_path}'.") + + validated_configs: Dict[str, Dict[str, Any]] = {} + for server_alias, config in server_configs.items(): + if not isinstance(config, dict): + logger.warning(f"Skipping invalid config for MCP server '{server_alias}': not a dictionary.") + continue + if "type" not in config: + logger.warning(f"Skipping MCP server '{server_alias}': 'type' (stdio/streamable-http) missing.") + continue + + config_type = str(config["type"]).lower() + if config_type == "stdio": + if "command" not in config or not config["command"]: + logger.warning(f"Skipping stdio MCP server '{server_alias}': 'command' is missing or empty.") + continue + elif config_type == "streamable-http": + if "url" not in config or not config["url"]: + logger.warning(f"Skipping streamable-http MCP server '{server_alias}': 'url' is missing or empty.") + continue + else: + logger.warning(f"Skipping MCP server '{server_alias}': unknown type '{config['type']}'.") + continue + + validated_configs[server_alias] = config + logger.info(f"Validated MCP server config for '{server_alias}' (type: {config_type}).") + + if not validated_configs: + logger.info(f"No valid MCP server configurations found in '{file_path}'.") + return validated_configs \ No newline at end of file diff --git a/tframex/mcp/manager.py b/tframex/mcp/manager.py new file mode 100644 index 0000000..94b3ab8 --- /dev/null +++ b/tframex/mcp/manager.py @@ -0,0 +1,177 @@ +# tframex/mcp/manager.py +import asyncio +import logging +from typing import Dict, Any, List, Optional, Tuple + +from .config import load_mcp_server_configs, MCPConfigError +from .server_connector import MCPConnectedServer +from tframex.models.primitives import ToolDefinition # For LLM tool formatting +from mcp.types import ( # Corrected imports + Tool as ActualMCPTool, + Resource as ActualMCPResource, + Prompt as ActualMCPPrompt, + TextContent, ImageContent, EmbeddedResource # For result parsing +) + +logger = logging.getLogger("tframex.mcp.manager") + +class MCPManager: + def __init__(self, mcp_config_file_path: Optional[str] = "servers_config.json"): + self.config_file_path = mcp_config_file_path + self.servers: Dict[str, MCPConnectedServer] = {} + self._is_shutting_down = False # To prevent multiple shutdown attempts + + async def initialize_servers(self): + if self._is_shutting_down: + logger.warning("MCPManager is shutting down, cannot initialize servers.") + return + if not self.config_file_path: + logger.info("No MCP config file path provided. Skipping MCP server initialization.") + return + + try: + server_configs = load_mcp_server_configs(self.config_file_path) + except MCPConfigError as e: + logger.error(f"Failed to load MCP server configurations: {e}") + return + except FileNotFoundError: + return # Already logged by load_mcp_server_configs + + if not server_configs: + logger.info("No MCP servers defined in configuration.") + return + + # Filter out already existing server aliases to avoid re-creating + new_server_configs = { + alias: config for alias, config in server_configs.items() if alias not in self.servers + } + + for alias, config in new_server_configs.items(): + self.servers[alias] = MCPConnectedServer(alias, config) + + init_tasks_map = { # Only create tasks for servers not yet marked as initialized + alias: server.initialize() for alias, server in self.servers.items() if not server.is_initialized and alias in new_server_configs + } + + if not init_tasks_map: + logger.info("All configured MCP servers are already initialized or no new servers to initialize from current config.") + return + + results = await asyncio.gather(*init_tasks_map.values(), return_exceptions=True) + + successful_count = 0 + aliases_to_remove = [] + for i, alias in enumerate(init_tasks_map.keys()): + init_success_flag_or_exception = results[i] + if isinstance(init_success_flag_or_exception, Exception): + logger.error(f"Exception during initialization of MCP server '{alias}': {init_success_flag_or_exception}", exc_info=init_success_flag_or_exception) + aliases_to_remove.append(alias) + elif init_success_flag_or_exception is False: # Explicit check for False return + logger.error(f"Initialization task returned False for MCP server '{alias}', indicating setup failure.") + aliases_to_remove.append(alias) + else: # Assuming True means success + successful_count += 1 + + for alias in aliases_to_remove: + if alias in self.servers: + # The server.initialize() method should call its own cleanup on failure. + # Here, we just remove it from the manager's active list. + logger.info(f"Removing failed server '{alias}' from active MCP manager list.") + del self.servers[alias] + + logger.info(f"MCPManager: {successful_count}/{len(init_tasks_map)} new MCP servers initialized successfully.") + + + def get_server(self, server_alias: str) -> Optional[MCPConnectedServer]: + server = self.servers.get(server_alias) + if server and server.is_initialized: + return server + logger.warning(f"MCP Server '{server_alias}' not found or not initialized.") + return None + + def get_all_mcp_tools_for_llm(self) -> List[ToolDefinition]: + llm_tool_defs = [] + for server_alias, server in self.servers.items(): + if server.is_initialized and server.tools: + for mcp_tool_info in server.tools: # mcp_tool_info is ActualMCPTool + parameters = mcp_tool_info.inputSchema if mcp_tool_info.inputSchema else {"type": "object", "properties": {}} + prefixed_name = f"{server_alias}__{mcp_tool_info.name}" + llm_tool_defs.append( + ToolDefinition( # This is tframex.models.primitives.ToolDefinition + type="function", + function={ + "name": prefixed_name, + "description": mcp_tool_info.description or f"Tool '{mcp_tool_info.name}' from MCP server '{server_alias}'.", + "parameters": parameters, + } + ) + ) + logger.debug(f"MCPManager provides {len(llm_tool_defs)} MCP tools for LLM.") + return llm_tool_defs + + def get_all_mcp_resource_infos(self) -> Dict[str, List[ActualMCPResource]]: + all_resources = {} + for server_alias, server in self.servers.items(): + if server.is_initialized and server.resources: + all_resources[server_alias] = server.resources + return all_resources + + def get_all_mcp_prompt_infos(self) -> Dict[str, List[ActualMCPPrompt]]: + all_prompts = {} + for server_alias, server in self.servers.items(): + if server.is_initialized and server.prompts: + all_prompts[server_alias] = server.prompts + return all_prompts + + async def call_mcp_tool_by_prefixed_name(self, prefixed_tool_name: str, arguments: Dict[str, Any]) -> Any: # Returns MCP CallToolResult + if self._is_shutting_down: + logger.warning(f"MCPManager is shutting down. Call to '{prefixed_tool_name}' aborted.") + return {"error": "MCP Manager is shutting down."} # Mimic tool error + + if "__" not in prefixed_tool_name: + raise ValueError(f"MCP tool name '{prefixed_tool_name}' is not correctly prefixed with 'server_alias__'.") + + server_alias, actual_tool_name = prefixed_tool_name.split("__", 1) + server = self.get_server(server_alias) # This checks is_initialized + if not server: + return {"error": f"MCP Server '{server_alias}' for tool '{actual_tool_name}' not available."} + + try: + # This returns the raw mcp.types.CallToolResult + return await server.call_mcp_tool(actual_tool_name, arguments) + except Exception as e: + logger.error(f"Error calling MCP tool '{actual_tool_name}' on server '{server_alias}': {e}", exc_info=True) + # Construct a CallToolResult-like error structure for consistency if possible, + # or a simple error dict that the engine can parse. + # For now, simple error dict to match other error paths in engine. + return {"error": f"Failed to call MCP tool '{actual_tool_name}' on '{server_alias}': {str(e)}"} + + async def shutdown_all_servers(self): + if self._is_shutting_down: + return + self._is_shutting_down = True # Set flag immediately + logger.info("MCPManager: Initiating shutdown for all connected MCP servers...") + + servers_to_cleanup = list(self.servers.values()) # Iterate over a copy + if not servers_to_cleanup: + logger.info("MCPManager: No servers to shutdown.") + self._is_shutting_down = False # Reset if nothing to do + return + + cleanup_tasks = [server.cleanup() for server in servers_to_cleanup] + results = await asyncio.gather(*cleanup_tasks, return_exceptions=True) + + # Log results of cleanup + original_aliases = list(self.servers.keys()) # Get aliases before clearing + for i, alias in enumerate(original_aliases): + if i < len(results): # Check bounds for safety + if isinstance(results[i], Exception): + logger.error(f"Exception during shutdown of MCP server '{alias}': {results[i]}", exc_info=results[i]) + else: + logger.info(f"MCP server '{alias}' shutdown process completed/invoked.") + else: # Should not happen if gather returns for all tasks + logger.warning(f"Missing cleanup result for MCP server '{alias}'.") + + self.servers.clear() + logger.info("MCPManager: All server shutdown procedures completed and list cleared.") + self._is_shutting_down = False # Reset flag after completion \ No newline at end of file diff --git a/tframex/mcp/meta_tools.py b/tframex/mcp/meta_tools.py new file mode 100644 index 0000000..50dddaf --- /dev/null +++ b/tframex/mcp/meta_tools.py @@ -0,0 +1,200 @@ +# tframex/mcp/meta_tools.py +import logging +from typing import Dict, Any, Optional, List, Union, TYPE_CHECKING # Import TYPE_CHECKING + +# from tframex.app import TFrameXRuntimeContext # REMOVE THIS DIRECT IMPORT +from .manager import MCPManager # To access the manager +from mcp.types import TextContent, ImageContent, EmbeddedResource + +# Conditional import for type hinting to avoid circular import +if TYPE_CHECKING: + from tframex.app import TFrameXRuntimeContext + +logger = logging.getLogger("tframex.mcp.meta_tools") + +# These functions will be wrapped by @app.tool() in TFrameXApp setup. +# They need access to the MCPManager, which can be passed via TFrameXRuntimeContext. + +# Use string literal for the type hint if TYPE_CHECKING is False at runtime +async def tframex_list_mcp_servers(rt_ctx: 'TFrameXRuntimeContext') -> str: + """Lists all configured and initialized MCP servers.""" + # Ensure rt_ctx has mcp_manager (runtime check) + if not hasattr(rt_ctx, 'mcp_manager') or not rt_ctx.mcp_manager: + return "MCP integration is not initialized in the application's runtime context." + + manager: MCPManager = rt_ctx.mcp_manager # Type cast if needed, or rely on duck typing + output = "Connected MCP Servers:\n" + if not manager.servers: + return "No MCP servers are currently configured or connected." + + for alias, server_obj in manager.servers.items(): + status = "Initialized" if server_obj.is_initialized else "Failed/Not Initialized" + s_info = server_obj.server_info + name_version = f"{s_info.name} v{s_info.version}" if s_info and hasattr(s_info, 'name') and hasattr(s_info, 'version') else "N/A" + output += f" - Alias: {alias} (Name: {name_version}, Status: {status})\n" + if server_obj.is_initialized and server_obj.capabilities: + caps = server_obj.capabilities + output += f" Capabilities: Tools={hasattr(caps, 'tools') and bool(caps.tools)}, Resources={hasattr(caps, 'resources') and bool(caps.resources)}, Prompts={hasattr(caps, 'prompts') and bool(caps.prompts)}\n" + return output + +async def tframex_list_mcp_resources(rt_ctx: 'TFrameXRuntimeContext', server_alias: Optional[str] = None) -> Union[str, Dict[str, List[Dict[str, Any]]]]: + """ + Lists available resources from a specific MCP server or all initialized MCP servers. + Args: + rt_ctx: The TFrameX runtime context. + server_alias: The alias of the MCP server. If None, lists from all. + Returns: + A string listing resources or a dictionary if server_alias is None. + """ + if not hasattr(rt_ctx, 'mcp_manager') or not rt_ctx.mcp_manager: + return "MCP integration is not initialized." + manager: MCPManager = rt_ctx.mcp_manager + infos_by_server: Dict[str, List[Dict[str, Any]]] = {} + servers_to_query = [] + if server_alias: + server = manager.get_server(server_alias) # get_server checks for initialization + if server: + servers_to_query.append(server) + else: + return f"Error: MCP Server '{server_alias}' not found or not initialized." + else: + servers_to_query = [s for s in manager.servers.values() if s.is_initialized] + + if not servers_to_query: + return "No MCP servers available to list resources from." + + for server_obj in servers_to_query: + # Ensure capabilities and resources attribute exist before accessing + if server_obj.capabilities and hasattr(server_obj.capabilities, 'resources') and server_obj.capabilities.resources and server_obj.resources: + infos_by_server[server_obj.server_alias] = [ + {"uri": str(r.uri), "name": r.name, "description": r.description, "mimeType": r.mimeType} + for r in server_obj.resources # server_obj.resources should be List[ActualMCPResource] + ] + else: + infos_by_server[server_obj.server_alias] = [] + + if server_alias: + res_list = infos_by_server.get(server_alias, []) + if not res_list: return f"No resources found or resource capability missing on server '{server_alias}'." + output = f"Resources from MCP server '{server_alias}':\n" + for r_info in res_list: + output += f" - URI: {r_info['uri']}\n Name: {r_info['name']}\n Desc: {r_info['description']}\n MIME: {r_info['mimeType']}\n" + return output + else: + return infos_by_server + + +async def tframex_read_mcp_resource(rt_ctx: 'TFrameXRuntimeContext', server_alias: str, resource_uri: str) -> str: + """ + Reads content from a specific resource on a specific MCP server. + Args: + rt_ctx: The TFrameX runtime context. + server_alias: The alias of the MCP server. + resource_uri: The URI of the resource to read. + Returns: + The resource content as a string, or an error message. + """ + if not hasattr(rt_ctx, 'mcp_manager') or not rt_ctx.mcp_manager: + return "MCP integration is not initialized." + manager: MCPManager = rt_ctx.mcp_manager + server = manager.get_server(server_alias) + if not server: + return f"Error: MCP Server '{server_alias}' not found or not initialized." + if not (server.capabilities and hasattr(server.capabilities, 'resources') and server.capabilities.resources): + return f"Error: Server '{server_alias}' does not support resources." + + try: + content_result = await server.read_mcp_resource(resource_uri) + + if isinstance(content_result, str): return content_result + if isinstance(content_result, TextContent) and content_result.text is not None: return content_result.text + if isinstance(content_result, ImageContent): return f"[Image content from MCP resource '{resource_uri}', mime: {content_result.mimeType}]" + if isinstance(content_result, EmbeddedResource): return f"[Embedded resource from MCP resource '{resource_uri}', uri: {content_result.resource.uri}]" + + if hasattr(content_result, 'text') and content_result.text is not None: return content_result.text + + return f"[Non-textual or complex content received from resource '{resource_uri}'. Type: {type(content_result)}]" + except Exception as e: + logger.error(f"Error reading MCP resource '{resource_uri}' from '{server_alias}': {e}", exc_info=True) + return f"Error reading resource '{resource_uri}' from '{server_alias}': {str(e)}" + +async def tframex_list_mcp_prompts(rt_ctx: 'TFrameXRuntimeContext', server_alias: Optional[str] = None) -> Union[str, Dict[str, List[Dict[str, Any]]]]: + """Lists available prompts from a specific or all MCP servers.""" + if not hasattr(rt_ctx, 'mcp_manager') or not rt_ctx.mcp_manager: + return "MCP integration is not initialized." + manager: MCPManager = rt_ctx.mcp_manager + infos_by_server: Dict[str, List[Dict[str, Any]]] = {} + servers_to_query = [] + + if server_alias: + server = manager.get_server(server_alias) + if server: servers_to_query.append(server) + else: return f"Error: MCP Server '{server_alias}' not found or not initialized." + else: + servers_to_query = [s for s in manager.servers.values() if s.is_initialized] + + if not servers_to_query: return "No MCP servers to list prompts from." + + for server_obj in servers_to_query: + if server_obj.capabilities and hasattr(server_obj.capabilities, 'prompts') and server_obj.capabilities.prompts and server_obj.prompts: + infos_by_server[server_obj.server_alias] = [ + { + "name": p.name, + "description": p.description, + "arguments": [{"name": arg.name, "description": arg.description, "required": arg.required} for arg in p.arguments] if p.arguments else [] + } + for p in server_obj.prompts # server_obj.prompts should be List[ActualMCPPrompt] + ] + else: infos_by_server[server_obj.server_alias] = [] + + if server_alias: + p_list = infos_by_server.get(server_alias, []) + if not p_list: return f"No prompts found or prompt capability missing on server '{server_alias}'." + output = f"Prompts from MCP server '{server_alias}':\n" + for p_info in p_list: + args_str = ", ".join([f"{a['name']}{' (req)' if a['required'] else ''}" for a in p_info['arguments']]) + output += f" - Name: {p_info['name']}\n Desc: {p_info['description']}\n Args: {args_str}\n" + return output + else: + return infos_by_server + + +async def tframex_use_mcp_prompt(rt_ctx: 'TFrameXRuntimeContext', server_alias: str, prompt_name: str, arguments: Dict[str, Any]) -> List[Dict[str, str]]: + """ + Gets messages from a server-defined MCP prompt. The LLMAgent should then use these. + Args: + rt_ctx: The TFrameX runtime context. + server_alias: The alias of the MCP server. + prompt_name: The name of the prompt. + arguments: Arguments for the prompt. + Returns: + A list of message dictionaries (e.g., [{"role": "user", "content": "..."}]), or an error message list. + """ + if not hasattr(rt_ctx, 'mcp_manager') or not rt_ctx.mcp_manager: + return [{"role": "system", "content": "MCP integration is not initialized."}] + manager: MCPManager = rt_ctx.mcp_manager + # ... (rest of the function as before) + server = manager.get_server(server_alias) + if not server: + return [{"role": "system", "content": f"Error: MCP Server '{server_alias}' not found or not initialized."}] + if not (server.capabilities and hasattr(server.capabilities, 'prompts') and server.capabilities.prompts): + return [{"role": "system", "content": f"Error: Server '{server_alias}' does not support prompts."}] + + try: + prompt_result = await server.get_mcp_prompt(prompt_name, arguments) + messages = [] + if prompt_result and hasattr(prompt_result, 'messages'): + for mcp_msg in prompt_result.messages: + content_text = "" + if isinstance(mcp_msg.content, TextContent): content_text = mcp_msg.content.text or "" + elif isinstance(mcp_msg.content, ImageContent): content_text = f"[Image from prompt: {mcp_msg.content.mimeType}]" + elif isinstance(mcp_msg.content, EmbeddedResource): content_text = f"[Resource from prompt: {mcp_msg.content.resource.uri}]" + else: content_text = str(mcp_msg.content) if mcp_msg.content else "" + messages.append({"role": mcp_msg.role, "content": content_text}) + return messages + else: + logger.error(f"MCP prompt '{prompt_name}' from '{server_alias}' returned invalid result: {prompt_result}") + return [{"role": "system", "content": f"Error: Prompt '{prompt_name}' from '{server_alias}' returned an empty or invalid result."}] + except Exception as e: + logger.error(f"Error using MCP prompt '{prompt_name}' from '{server_alias}': {e}", exc_info=True) + return [{"role": "system", "content": f"Error getting prompt '{prompt_name}' from '{server_alias}': {str(e)}"}] \ No newline at end of file diff --git a/tframex/mcp/server_connector.py b/tframex/mcp/server_connector.py new file mode 100644 index 0000000..24d1ce5 --- /dev/null +++ b/tframex/mcp/server_connector.py @@ -0,0 +1,392 @@ +# tframex/mcp/server_connector.py +import asyncio +import logging +import os +import shutil +from contextlib import AsyncExitStack +from typing import List, Dict, Any, Optional + +from mcp import ClientSession, StdioServerParameters, InitializeResult +from mcp.client.stdio import stdio_client +from mcp.client.streamable_http import streamablehttp_client +from mcp.types import ( + Tool as ActualMCPTool, Resource as ActualMCPResource, Prompt as ActualMCPPrompt, + # TextContent, ImageContent, EmbeddedResource # Not directly used in this file, but good to know +) + +logger = logging.getLogger("tframex.mcp.server_connector") + +# Default timeout for critical MCP server initialization steps (e.g., transport, session handshake) +DEFAULT_MCP_SERVER_INIT_STEP_TIMEOUT = 30.0 # seconds + +class MCPConnectedServer: + """ + Manages the connection and interaction with a single MCP server. + Handles initialization, fetching primitives (tools, resources, prompts), + calling tools, reading resources, getting prompts, and cleanup. + """ + def __init__(self, server_alias: str, config: Dict[str, Any]): + """ + Initializes an MCPConnectedServer instance. + + Args: + server_alias: A unique alias for this server connection. + config: The configuration dictionary for this server, typically from servers_config.json. + """ + self.server_alias: str = server_alias + self.config: Dict[str, Any] = config + self.session: Optional[ClientSession] = None + self.capabilities: Optional[Any] = None # Stores mcp.types.Capabilities + self.server_info: Optional[Any] = None # Stores mcp.types.ServerInformation + + self.tools: List[ActualMCPTool] = [] + self.resources: List[ActualMCPResource] = [] + self.prompts: List[ActualMCPPrompt] = [] + + self._exit_stack: AsyncExitStack = AsyncExitStack() + self._lock = asyncio.Lock() # Ensures thread-safe operations on initialization/cleanup + self.is_initialized = False # True if the server has successfully initialized and fetched primitives + self._notification_listener_task: Optional[asyncio.Task] = None + self._read_stream_for_listener: Optional[Any] = None + + # Timeout for individual critical steps during server initialization + self.init_step_timeout: float = self.config.get( + "init_step_timeout", DEFAULT_MCP_SERVER_INIT_STEP_TIMEOUT + ) + + async def initialize(self) -> bool: + """ + Initializes the connection to the MCP server. + This involves: + 1. Establishing the transport (stdio or streamable-http). + 2. Creating an MCP ClientSession. + 3. Performing the MCP handshake (session.initialize()). + 4. Fetching server primitives (tools, resources, prompts). + 5. Setting up a (placeholder) notification listener. + + Returns: + True if initialization was successful, False otherwise. + """ + async with self._lock: + if self.is_initialized: + logger.debug(f"MCP server '{self.server_alias}' already initialized.") + return True + + logger.info(f"Attempting initialization for MCP server '{self.server_alias}' (type: {self.config.get('type', 'unknown').lower()}). Timeout per step: {self.init_step_timeout}s.") + server_type = self.config.get("type", "stdio").lower() + self._read_stream_for_listener = None + write_stream_for_session = None + initialization_successful = False + + try: + logger.debug(f"[{self.server_alias}] DEBUG_POINT_0: Start of initialize try block.") + + # --- 1. Establish Transport --- + if server_type == "stdio": + command_path_config = self.config.get("command") + command_path = None + if command_path_config: # Check if config string is not None or empty + # Special handling for 'npx' for cross-platform Node.js scripts + if command_path_config.lower() == "npx": + resolved_npx = shutil.which("npx") or shutil.which("npx.cmd") + command_path = resolved_npx + else: + command_path = shutil.which(command_path_config) + + if not command_path: + err_cmd_str = command_path_config or "configured command" + raise FileNotFoundError(f"Command '{err_cmd_str}' for stdio server '{self.server_alias}' not found in PATH or as absolute path.") + + logger.debug(f"[{self.server_alias}] Resolved stdio command: {command_path}") + env_config = self.config.get("env") + # Merge with current process environment, allowing config to override + full_env = {**os.environ, **env_config} if env_config else {**os.environ} + server_params = StdioServerParameters(command=command_path, args=self.config.get("args", []), env=full_env) + logger.debug(f"[{self.server_alias}] StdioServerParameters: {server_params!r}") # Use !r for more detail + + transport_context = stdio_client(server_params) + # Stdio client creation itself is usually fast; timeout more critical for handshake + self._read_stream_for_listener, write_stream_for_session = await self._exit_stack.enter_async_context(transport_context) + logger.info(f"[{self.server_alias}] Stdio transport established.") + + elif server_type == "streamable-http": + url = self.config.get("url") + if not url: raise ValueError(f"URL missing for streamable-http server '{self.server_alias}'.") + logger.info(f"[{self.server_alias}] Attempting HTTP connection to: {url}") + + transport_context = streamablehttp_client(url) + logger.debug(f"[{self.server_alias}] streamablehttp_client created for {url}.") + # The streamablehttp_client's __aenter__ makes the initial HTTP request. + # A timeout here protects against unresponsive HTTP servers during initial connection. + self._read_stream_for_listener, write_stream_for_session, http_response = \ + await asyncio.wait_for( + self._exit_stack.enter_async_context(transport_context), + timeout=self.init_step_timeout + ) + status_code = getattr(http_response, 'status_code', None) or getattr(http_response, 'status', None) + logger.info(f"[{self.server_alias}] HTTP transport context entered. Initial HTTP status: {status_code}. Streams obtained.") + else: + raise ValueError(f"Unsupported server type '{server_type}' for '{self.server_alias}'.") + + if not self._read_stream_for_listener or not write_stream_for_session: + raise ConnectionError(f"Failed to establish read/write streams for '{self.server_alias}'. This should not happen if transport was successful.") + logger.debug(f"[{self.server_alias}] DEBUG_POINT_1: Transport streams successfully established.") + + # --- 2. Create ClientSession and Perform MCP Handshake --- + self.session = await self._exit_stack.enter_async_context( + ClientSession(self._read_stream_for_listener, write_stream_for_session) + ) + logger.debug(f"[{self.server_alias}] DEBUG_POINT_2: ClientSession created. Attempting session.initialize() with timeout {self.init_step_timeout}s.") + + init_result: InitializeResult = await asyncio.wait_for( + self.session.initialize(), # MCP Handshake + timeout=self.init_step_timeout + ) + self.capabilities = init_result.capabilities + self.server_info = init_result.serverInfo + s_name = getattr(self.server_info, 'name', 'N/A_NAME') + s_version = getattr(self.server_info, 'version', 'N/A_VER') + logger.info(f"MCP server '{self.server_alias}' session initialized: {s_name} v{s_version}") + logger.debug(f"[{self.server_alias}] DEBUG_POINT_3: MCP session.initialize() complete. Capabilities: {self.capabilities!r}") + + # --- 3. Fetch Server Primitives (Tools, Resources, Prompts) --- + await self._fetch_server_primitives() # This method has its own internal timeouts/error handling + logger.debug(f"[{self.server_alias}] DEBUG_POINT_4: Server primitives fetched. Tools: {len(self.tools)}, Resources: {len(self.resources)}, Prompts: {len(self.prompts)}") + + # --- 4. Setup Notification Listener (Placeholder) --- + # Note: For a robust notification listener, it should handle stream reading carefully. + # The current one is a placeholder. + if self._read_stream_for_listener: + self._notification_listener_task = asyncio.create_task( + self._listen_for_notifications(self._read_stream_for_listener) + ) + await asyncio.sleep(0.01) # Give the task a moment to start or fail fast + if self._notification_listener_task.done(): + listener_exc = self._notification_listener_task.exception() + if listener_exc: + logger.error(f"[{self.server_alias}] Notification listener task failed on startup!", exc_info=listener_exc) + else: + logger.info(f"[{self.server_alias}] Notification listener task completed very quickly (or was cancelled). Task: {self._notification_listener_task!r}") + else: + logger.info(f"Placeholder notification listener task created for '{self.server_alias}'. Task: {self._notification_listener_task!r}") + else: + # This case should ideally not be reached if transport setup was successful. + logger.warning(f"No raw read stream available for placeholder notification listener for '{self.server_alias}'. This might indicate an issue in transport setup.") + logger.debug(f"[{self.server_alias}] DEBUG_POINT_5: Notification listener setup attempted.") + + initialization_successful = True + logger.debug(f"[{self.server_alias}] DEBUG_POINT_6: Reached end of successful try block.") + + except asyncio.TimeoutError as e_timeout: + logger.error(f"TIMEOUT ({self.init_step_timeout}s) during critical initialization step for MCP server '{self.server_alias}': {e_timeout}", exc_info=False) # exc_info=False to reduce noise for common timeouts + initialization_successful = False + except FileNotFoundError as e_fnf: + logger.error(f"COMMAND NOT FOUND for stdio server '{self.server_alias}': {e_fnf}", exc_info=False) + initialization_successful = False + except ConnectionRefusedError as e_conn_refused: + url_for_error = self.config.get('url', 'configured URL') + logger.error(f"CONNECTION REFUSED for http server '{self.server_alias}' at {url_for_error}: {e_conn_refused}. Is the server running?", exc_info=False) + initialization_successful = False + except Exception as e: # Catch-all for other init errors + logger.error(f"CRITICAL UNHANDLED ERROR during initialization of MCP server '{self.server_alias}': {e}", exc_info=True) + initialization_successful = False + + finally: + if initialization_successful: + self.is_initialized = True + logger.info(f"MCP Server '{self.server_alias}' FULLY INITIALIZED successfully and marked as ready.") + else: + self.is_initialized = False + logger.error(f"MCP Server '{self.server_alias}' FAILED to initialize fully. is_initialized remains False.") + # Perform immediate cleanup if initialization failed to release resources. + # The MCPManager will also attempt cleanup if this server is removed. + # This ensures resources are released even if manager doesn't get to it. + logger.info(f"Performing immediate cleanup for failed server '{self.server_alias}' due to initialization failure.") + await self.cleanup(initiated_by_failure=True) # Pass a flag to indicate context + + return self.is_initialized + + async def _listen_for_notifications(self, stream_to_listen_on: Any): + """ + Placeholder for listening to server-pushed notifications. + This task operates on the raw stream passed to it. + In a production system, this would involve parsing MCP Notification messages. + """ + logger.debug(f"[{self.server_alias}] Placeholder notification listener active on stream: {type(stream_to_listen_on)}.") + try: + while self.is_initialized and stream_to_listen_on: + # In a real implementation, you'd `await stream_to_listen_on.receive()` or similar + # and parse mcp.protocol.Envelope to check for Notification messages. + # This placeholder just sleeps to keep the task alive if is_initialized is true. + if getattr(stream_to_listen_on, 'at_eof', lambda: True)(): # Check if stream reports EOF + logger.info(f"[{self.server_alias}] Notification listener: Stream at EOF. Exiting loop.") + break + await asyncio.sleep(5) # Check periodically + except asyncio.CancelledError: + logger.info(f"Notification listener for '{self.server_alias}' was cancelled.") + except Exception as e: + # Only log errors if the server was meant to be initialized and running. + # If cleanup has already set is_initialized to False, this might be an expected closure. + if self.is_initialized: + logger.error(f"Error in placeholder notification listener for '{self.server_alias}': {e}", exc_info=True) + finally: + logger.info(f"Placeholder notification listener for '{self.server_alias}' stopped.") + + + async def _fetch_server_primitives(self): + """Fetches tools, resources, and prompts from the initialized MCP server session.""" + if not self.session: # self.is_initialized is checked by callers + logger.warning(f"Cannot fetch primitives for '{self.server_alias}'; session is not available.") + return + if not self.capabilities: # Should be set if session.initialize() was successful + logger.warning(f"Cannot fetch primitives for '{self.server_alias}'; capabilities not populated.") + return + + cap = self.capabilities # mcp.types.Capabilities + # Check if capability attributes exist AND are True (or their specific truthy value) + can_list_tools = hasattr(cap, 'tools') and bool(cap.tools) + can_list_resources = hasattr(cap, 'resources') and bool(cap.resources) + can_list_prompts = hasattr(cap, 'prompts') and bool(cap.prompts) + + # Use a short timeout for these list calls, as they should be quick. + PRIMITIVE_FETCH_TIMEOUT = 15.0 + + async def fetch_with_timeout(coro, primitive_name): + try: + return await asyncio.wait_for(coro, timeout=PRIMITIVE_FETCH_TIMEOUT) + except asyncio.TimeoutError: + logger.warning(f"Timeout fetching {primitive_name} for '{self.server_alias}'.") + except Exception as e: + logger.warning(f"Could not fetch {primitive_name} for '{self.server_alias}': {e}", exc_info=False) # exc_info=False to reduce noise + return None + + if can_list_tools: + resp = await fetch_with_timeout(self.session.list_tools(), "tools") + self.tools = resp.tools if resp and hasattr(resp, 'tools') else [] + + if can_list_resources: + resp = await fetch_with_timeout(self.session.list_resources(), "resources") + self.resources = resp.resources if resp and hasattr(resp, 'resources') else [] + + if can_list_prompts: + resp = await fetch_with_timeout(self.session.list_prompts(), "prompts") + self.prompts = resp.prompts if resp and hasattr(resp, 'prompts') else [] + + logger.debug(f"[{self.server_alias}] Primitives fetched: {len(self.tools)} tools, {len(self.resources)} resources, {len(self.prompts)} prompts.") + + async def call_mcp_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any: + """Calls a tool on the MCP server.""" + if not self.session or not self.is_initialized: + # This state should ideally be prevented by checks in MCPManager or Engine. + logger.error(f"Attempted to call tool '{tool_name}' on uninitialized/unavailable server '{self.server_alias}'.") + raise RuntimeError(f"MCP server '{self.server_alias}' not initialized for tool call.") + logger.info(f"Calling MCP tool '{tool_name}' on server '{self.server_alias}' with args: {arguments}") + # Add timeout for the tool call itself, can be configured per server or globally. + tool_call_timeout = self.config.get("tool_call_timeout", 60.0) # Default 60s + try: + return await asyncio.wait_for( + self.session.call_tool(tool_name, arguments), + timeout=tool_call_timeout + ) + except asyncio.TimeoutError: + logger.error(f"Timeout calling MCP tool '{tool_name}' on '{self.server_alias}' after {tool_call_timeout}s.") + # Return an MCP-like error structure if possible, or raise specific exception. + # For now, let it propagate as TimeoutError or wrap in a custom one. + raise # Re-raise for the engine to handle and convert to a tool error message. + + async def read_mcp_resource(self, uri: str) -> Any: + """Reads a resource from the MCP server.""" + if not self.session or not self.is_initialized: + logger.error(f"Attempted to read resource '{uri}' on uninitialized/unavailable server '{self.server_alias}'.") + raise RuntimeError(f"MCP server '{self.server_alias}' not initialized for resource read.") + logger.info(f"Reading MCP resource '{uri}' from server '{self.server_alias}'") + resource_read_timeout = self.config.get("resource_read_timeout", 30.0) + try: + return await asyncio.wait_for( + self.session.read_resource(uri), + timeout=resource_read_timeout + ) + except asyncio.TimeoutError: + logger.error(f"Timeout reading MCP resource '{uri}' from '{self.server_alias}' after {resource_read_timeout}s.") + raise + + async def get_mcp_prompt(self, prompt_name: str, arguments: Dict[str, Any]) -> Any: + """Gets a prompt (its messages) from the MCP server.""" + if not self.session or not self.is_initialized: + logger.error(f"Attempted to get prompt '{prompt_name}' on uninitialized/unavailable server '{self.server_alias}'.") + raise RuntimeError(f"MCP server '{self.server_alias}' not initialized for get prompt.") + logger.info(f"Getting MCP prompt '{prompt_name}' from server '{self.server_alias}' with args: {arguments}") + prompt_get_timeout = self.config.get("prompt_get_timeout", 30.0) + try: + return await asyncio.wait_for( + self.session.get_prompt(prompt_name, arguments), + timeout=prompt_get_timeout + ) + except asyncio.TimeoutError: + logger.error(f"Timeout getting MCP prompt '{prompt_name}' from '{self.server_alias}' after {prompt_get_timeout}s.") + raise + + async def cleanup(self, initiated_by_failure: bool = False): + """ + Cleans up resources associated with this server connection. + This includes cancelling the notification listener and closing the AsyncExitStack + (which handles closing the MCP session and transport). + + Args: + initiated_by_failure: If True, indicates cleanup is due to initialization failure. + """ + async with self._lock: # Ensure cleanup is atomic for this instance + # Check if substantial resources were allocated that need cleaning. + # If it was never initialized and exit stack is empty, minimal cleanup needed. + if not self.is_initialized and not initiated_by_failure and not self._exit_stack._exit_callbacks and not self._notification_listener_task and not self.session: + logger.debug(f"Cleanup for '{self.server_alias}' skipped (already clean or not substantially initialized).") + return + + # Log differently based on whether it was a successful run or a failed init + if initiated_by_failure: + logger.warning(f"Starting cleanup for MCP server '{self.server_alias}' due to initialization failure...") + else: + logger.info(f"Starting cleanup for MCP server '{self.server_alias}'...") + + was_initialized_before_cleanup = self.is_initialized + self.is_initialized = False # Mark as not initialized immediately to stop dependent operations + + # 1. Cancel and await notification listener task + if self._notification_listener_task and not self._notification_listener_task.done(): + logger.debug(f"Cancelling notification listener for '{self.server_alias}'...") + self._notification_listener_task.cancel() + try: + # Wait for a short period; don't let cleanup hang indefinitely on this task. + await asyncio.wait_for(self._notification_listener_task, timeout=5.0) + except asyncio.CancelledError: + logger.debug(f"Notification listener task for '{self.server_alias}' successfully cancelled.") + except asyncio.TimeoutError: + logger.warning(f"Timeout (5s) waiting for notification listener of '{self.server_alias}' to complete cancellation.") + except Exception as e_task_cancel: # Catch any other errors during task await + logger.error(f"Error awaiting notification listener cancellation for '{self.server_alias}': {e_task_cancel}", exc_info=True) + self._notification_listener_task = None + + # 2. Close the AsyncExitStack (this handles ClientSession.aclose() and transport context aclose()) + try: + # _exit_stack.aclose() calls __aexit__ on ClientSession and then on the transport context (stdio_client/streamablehttp_client) + await self._exit_stack.aclose() + logger.info(f"AsyncExitStack for '{self.server_alias}' closed successfully (session and transport cleaned).") + except Exception as e_stack_close: + logger.error(f"Error during AsyncExitStack.aclose() for '{self.server_alias}': {e_stack_close}", exc_info=True) + + # 3. Reset internal state + self.session = None # Session is closed by AsyncExitStack + self.capabilities = None + self.server_info = None + self.tools, self.resources, self.prompts = [], [], [] + self._read_stream_for_listener = None + + # Re-initialize the exit stack for potential future re-initialization (though not typical for a single instance) + self._exit_stack = AsyncExitStack() + + if initiated_by_failure: + logger.warning(f"MCP server '{self.server_alias}' cleanup finished (triggered by initialization failure).") + elif was_initialized_before_cleanup: + logger.info(f"MCP server '{self.server_alias}' successfully cleaned up (was previously initialized).") + else: # Cleanup called on an instance that wasn't fully initialized but not due to explicit failure handling + logger.info(f"MCP server '{self.server_alias}' cleanup finished (was not fully initialized or already partially cleaned).") \ No newline at end of file diff --git a/tframex/util/engine.py b/tframex/util/engine.py index 828c2c1..d6e1845 100644 --- a/tframex/util/engine.py +++ b/tframex/util/engine.py @@ -1,305 +1,179 @@ -# tframex/engine.py - -""" -Core execution engine for TFrameX agents and tools within a specific runtime context. - -This module defines the `Engine` class, responsible for managing the lifecycle -and execution of agents registered within a TFrameXApp instance. It handles -agent instantiation, configuration resolution (LLM, memory, tools), and -delegates calls to the appropriate agent or tool methods. -""" - +# tframex/util/engine.py +import asyncio import inspect +import json import logging from typing import TYPE_CHECKING, Any, Dict, List, Optional, Type, Union -# Import primitives and utilities - generally safe from circular dependencies -from ..models.primitives import Message -from ..util.tools import Tool, ToolDefinition, ToolParameterProperty, ToolParameters +from ..models.primitives import Message, ToolDefinition, ToolParameters, ToolParameterProperty +from ..util.tools import Tool +# MCP specific types for parsing results if needed, though results are simplified to string/error dict +from mcp.types import TextContent, ImageContent, EmbeddedResource + -# Use TYPE_CHECKING block for imports needed only for static analysis -# This avoids runtime circular imports. if TYPE_CHECKING: from ..agents.base import BaseAgent - from ..agents.llm_agent import LLMAgent - from ..app import TFrameXApp # Assuming app type for better hinting - from ..runtime import RuntimeContext # Assuming context type for better hinting - + from ..agents.llm_agent import LLMAgent # For issubclass check + from ..app import TFrameXApp, TFrameXRuntimeContext logger = logging.getLogger("tframex.engine") - class Engine: - """ - Manages agent instantiation and execution within a TFrameX runtime context. - - An Engine instance is typically created per request or session (via RuntimeContext) - and provides the necessary environment for agents to run, resolving dependencies - like LLMs, memory stores, and tools based on application defaults, context - overrides, and agent-specific configurations. - """ - - def __init__(self, app: 'TFrameXApp', runtime_context: 'RuntimeContext'): - """ - Initializes the Engine. - - Args: - app: The main TFrameXApp instance containing agent/tool registrations. - runtime_context: The specific runtime context for this engine instance, - potentially holding session-specific state or overrides (e.g., LLM). - """ + def __init__(self, app: 'TFrameXApp', runtime_context: 'TFrameXRuntimeContext'): self._app = app - self._runtime_context = runtime_context - # Use string literal for type hint to avoid import at class definition time - # Stores agent instances, keyed by agent name. Instantiated lazily. + self._runtime_context = runtime_context self._agent_instances: Dict[str, 'BaseAgent'] = {} def _get_agent_instance(self, agent_name: str) -> 'BaseAgent': - """ - Retrieves or lazily instantiates an agent based on its registered configuration. - - This method handles the core logic of agent creation: - 1. Checks if an instance for the given `agent_name` already exists for this engine. - 2. If not, retrieves the agent's registration info from the `TFrameXApp`. - 3. Resolves the LLM instance (Agent config > Context > App default). - 4. Resolves the MemoryStore instance (Agent config > App default factory). - 5. Resolves the Tools list based on registered `tool_names`. - 6. Gathers other configuration: description, `strip_think_tags`, callable agents. - 7. Creates ToolDefinitions for any specified `callable_agent_names`. - 8. Determines the correct agent class to instantiate. - 9. Filters registration config to pass only valid constructor arguments. - 10. Validates required dependencies (e.g., LLM for LLMAgents). - 11. Instantiates the agent class with the resolved configuration. - 12. Stores and returns the new agent instance. - - Args: - agent_name: The registered name of the agent to get or create. - - Returns: - The agent instance corresponding to the `agent_name`. - - Raises: - ValueError: If the `agent_name` is not registered in the app. - ValueError: If an LLMAgent is required but no LLM is available. - """ - # Import agent classes here, INSIDE the method, only when needed for instantiation - # This prevents module-level circular dependencies. - from ..agents.base import BaseAgent - from ..agents.llm_agent import LLMAgent + from ..agents.base import BaseAgent + from ..agents.llm_agent import LLMAgent # For issubclass check if agent_name not in self._agent_instances: - # --- Agent Registration Lookup --- if agent_name not in self._app._agents: - raise ValueError( - f"Agent '{agent_name}' not registered with the TFrameXApp." - ) + raise ValueError(f"Agent '{agent_name}' not registered.") + reg_info = self._app._agents[agent_name] - agent_config = reg_info["config"] # Use a shorter alias + agent_config_from_app = reg_info["config"] # This is the agent's specific config - # --- Dependency Resolution --- - # Resolve LLM: Agent-specific config > Context > App default agent_llm = ( - agent_config.get("llm_instance_override") - or self._runtime_context.llm + agent_config_from_app.get("llm_instance_override") + or self._runtime_context.llm or self._app.default_llm ) - - # Resolve Memory: Agent-specific config > App default factory agent_memory = ( - agent_config.get("memory_override") - or self._app.default_memory_store_factory() # Ensure factory provides a new instance + agent_config_from_app.get("memory_override") + or self._app.default_memory_store_factory() ) - # Resolve Tools: Look up tools by name from app registry - agent_tools_resolved: List[Tool] = [] - tool_names = agent_config.get("tool_names", []) - if tool_names: - for tool_name_ref in tool_names: - tool_obj = self._app.get_tool(tool_name_ref) - if tool_obj: - agent_tools_resolved.append(tool_obj) - else: - logger.warning( - f"Tool '{tool_name_ref}' specified for agent '{agent_name}' " - f"not found in the app registry. Skipping." - ) - - # --- Agent Configuration --- - agent_description = agent_config.get("description") - strip_think_tags_for_agent = agent_config.get( - "strip_think_tags", False # Default to False if not specified in config - ) - - # --- Callable Agent Definitions --- - # Define other agents this agent can call as tools - callable_agent_definitions: List[ToolDefinition] = [] - callable_agent_names = agent_config.get("callable_agent_names", []) - for sub_agent_name in callable_agent_names: - if sub_agent_name not in self._app._agents: - logger.warning( - f"Agent '{agent_name}' configured to call non-existent agent " - f"'{sub_agent_name}'. Skipping definition." - ) - continue - - # Fetch sub-agent info to create a tool-like definition - sub_agent_reg_info = self._app._agents[sub_agent_name] - sub_agent_description = ( - sub_agent_reg_info["config"].get("description") - or f"Invoke the '{sub_agent_name}' agent. Provide the specific input message for it." - ) - - # Define standard parameters for calling another agent - agent_tool_params = ToolParameters( - properties={ - "input_message": ToolParameterProperty( - type="string", - description=f"The specific query, task, or input content to pass to the '{sub_agent_name}' agent.", - ), - }, - required=["input_message"], - ) + resolved_native_tools: List[Tool] = [] + native_tool_names = agent_config_from_app.get("native_tool_names", []) + for tool_name_ref in native_tool_names: + tool_obj = self._app.get_tool(tool_name_ref) + if tool_obj: + resolved_native_tools.append(tool_obj) + else: + logger.warning(f"Native tool '{tool_name_ref}' for agent '{agent_name}' not found in app registry.") + + callable_agent_defs: List[ToolDefinition] = [] + callable_agent_names_cfg = agent_config_from_app.get("callable_agent_names", []) + for sub_agent_name_cfg in callable_agent_names_cfg: + if sub_agent_name_cfg in self._app._agents: + sub_agent_reg_info = self._app._agents[sub_agent_name_cfg] + sub_agent_desc = sub_agent_reg_info["config"].get("description") or f"Invoke agent '{sub_agent_name_cfg}'." + params = ToolParameters(properties={"input_message": ToolParameterProperty(type="string", description="Input for the agent.")}, required=["input_message"]) + callable_agent_defs.append(ToolDefinition(type="function", function={ + "name": sub_agent_name_cfg, "description": sub_agent_desc, "parameters": params.model_dump(exclude_none=True) + })) + else: + logger.warning(f"Callable agent '{sub_agent_name_cfg}' for agent '{agent_name}' not found.") - callable_agent_definitions.append( - ToolDefinition( - type="function", - function={ - "name": sub_agent_name, # The name the primary agent uses to call - "description": sub_agent_description, - "parameters": agent_tool_params.model_dump(exclude_none=True), - }, - ) - ) - # --- Agent Instantiation --- instance_id = f"{agent_name}_ctx{id(self._runtime_context)}" - AgentClassToInstantiate: Type[BaseAgent] = agent_config["agent_class_ref"] - - # Identify keys used internally for setup vs. those passed to the constructor - internal_config_keys = { - "llm_instance_override", "memory_override", "tool_names", - "system_prompt_template", "agent_class_ref", "description", - "callable_agent_names", "strip_think_tags" - } - additional_constructor_args = { - k: v - for k, v in agent_config.items() - if k not in internal_config_keys + AgentClassToInstantiate: Type[BaseAgent] = agent_config_from_app["agent_class_ref"] + + # Prepare constructor arguments, excluding those managed internally by engine/app + constructor_args_from_config = { + k: v for k, v in agent_config_from_app.items() + if k not in [ + "llm_instance_override", "memory_override", "native_tool_names", + "callable_agent_names", "agent_class_ref", + # "mcp_tools_from_servers_config" is handled by LLMAgent itself using the engine + ] } - - # Runtime check: LLMAgent requires an LLM + + # Ensure LLMAgent gets an LLM if issubclass(AgentClassToInstantiate, LLMAgent) and not agent_llm: - raise ValueError( - f"Agent '{agent_name}' (type: {AgentClassToInstantiate.__name__}) " - f"requires an LLM, but none could be resolved (check agent config, " - f"runtime context, and app defaults)." - ) + raise ValueError(f"Agent '{agent_name}' (type: LLMAgent) requires an LLM, but none resolved.") - # Prepare arguments for the agent's constructor agent_init_kwargs = { "agent_id": instance_id, - "description": agent_description, "llm": agent_llm, - "tools": agent_tools_resolved, + "tools": resolved_native_tools, # Native TFrameX tools "memory": agent_memory, - "system_prompt_template": agent_config.get("system_prompt_template"), - "callable_agent_definitions": callable_agent_definitions, - "strip_think_tags": strip_think_tags_for_agent, - **additional_constructor_args, # Include any other config values + "engine": self, # Pass the engine instance + "callable_agent_definitions": callable_agent_defs, + # Pass the agent's specific desire for MCP tools to its constructor + "mcp_tools_from_servers_config": agent_config_from_app.get("mcp_tools_from_servers_config"), + **constructor_args_from_config, # Other configs like system_prompt_template, description, strip_think_tags } - - # Inject engine dependency specifically for LLMAgents (if needed by their impl) - # Check inheritance dynamically using the imported LLMAgent class - if issubclass(AgentClassToInstantiate, LLMAgent): - agent_init_kwargs["engine"] = self # Pass self (the engine) - - # Create the agent instance + self._agent_instances[agent_name] = AgentClassToInstantiate(**agent_init_kwargs) - - logger.debug( - f"Instantiated agent '{instance_id}' " - f"(Name: '{agent_name}', Type: {AgentClassToInstantiate.__name__}, " - f"LLM: {agent_llm.model_id if agent_llm else 'None'}, " - f"Memory: {type(agent_memory).__name__}, " - f"Tools: {[t.name for t in agent_tools_resolved]}, " - f"Callable Agents: {callable_agent_names}, " - f"Strip Tags: {strip_think_tags_for_agent})" - ) - - # Return the existing or newly created instance + logger.debug(f"Instantiated agent '{instance_id}' (Type: {AgentClassToInstantiate.__name__}) for context {id(self._runtime_context)}.") return self._agent_instances[agent_name] async def call_agent( self, agent_name: str, input_message: Union[str, Message], **kwargs: Any ) -> Message: - """ - Executes a registered agent with the given input. - - This method retrieves (or instantiates) the specified agent and calls its - `run` method. - - Args: - agent_name: The registered name of the agent to call. - input_message: The input message for the agent, either as a string - (which will be wrapped in a 'user' Message) or a Message object. - **kwargs: Additional keyword arguments to be passed directly to the - agent's `run` method. - - Returns: - The response Message object from the agent's execution. - - Raises: - ValueError: If the agent is not registered. - (Potentially others depending on the agent's `run` method) - """ - # Ensure input is a Message object - if isinstance(input_message, str): - input_msg_obj = Message(role="user", content=input_message) - elif isinstance(input_message, Message): - input_msg_obj = input_message - else: - # Add type checking for clarity, though Union hint covers it - raise TypeError(f"input_message must be str or Message, not {type(input_message).__name__}") - - # Get the agent instance (will create if first time for this engine) + if isinstance(input_message, str): input_msg_obj = Message(role="user", content=input_message) + elif isinstance(input_message, Message): input_msg_obj = input_message + else: raise TypeError(f"input_message must be str or Message, not {type(input_message)}") agent_instance = self._get_agent_instance(agent_name) - - # Execute the agent's primary run logic return await agent_instance.run(input_msg_obj, **kwargs) - async def call_tool(self, tool_name: str, arguments_json_str: str) -> Any: - """ - Executes a registered tool with the provided arguments. - - This method looks up the tool in the application's registry and calls - its `execute` method. This is typically used internally by agents that - decide to use a tool. - - Args: - tool_name: The registered name of the tool to execute. - arguments_json_str: A JSON string containing the arguments for the tool, - as expected by the tool's definition. - - Returns: - The result returned by the tool's `execute` method. This can be of Any type. - Returns an error dictionary if the tool is not found. - """ - tool = self._app.get_tool(tool_name) - if not tool: - logger.error( - f"Engine requested to call tool '{tool_name}', but it was not " - f"found in the app registry." - ) - # Return a consistent error format that agents might handle - return {"error": f"Tool '{tool_name}' not found."} - - logger.debug(f"Engine executing tool '{tool_name}' with args: {arguments_json_str}") - # Execute the tool - try: - result = await tool.execute(arguments_json_str) - logger.debug(f"Tool '{tool_name}' executed successfully.") - return result - except Exception as e: - logger.error(f"Error executing tool '{tool_name}': {e}", exc_info=True) - # Propagate error in a structured way if possible - return {"error": f"Error executing tool '{tool_name}': {str(e)}"} \ No newline at end of file + async def execute_tool_by_llm_definition( + self, + tool_definition_name: str, + arguments_json_str: str + ) -> Any: # Returns str for LLM or error dict + logger.info(f"Engine executing by LLM def name: '{tool_definition_name}' with args: {arguments_json_str[:100]}...") + + # 1. Handle TFrameX MCP Meta-tools (registered as native tools) + # These are tools like 'tframex_list_mcp_resources' + if tool_definition_name.startswith("tframex_"): + native_tool = self._app.get_tool(tool_definition_name) + if native_tool: + logger.debug(f"Executing TFrameX MCP meta-tool: {tool_definition_name}") + try: + parsed_args = json.loads(arguments_json_str) + # Meta tools are defined as async def func(rt_ctx: TFrameXRuntimeContext, ...other_args) + # The Tool class doesn't auto-inject rt_ctx from engine. + # We need to call the underlying function with rt_ctx. + if asyncio.iscoroutinefunction(native_tool.func): + # Check if 'rt_ctx' is an expected parameter + sig = inspect.signature(native_tool.func) + if 'rt_ctx' in sig.parameters: + return await native_tool.func(rt_ctx=self._runtime_context, **parsed_args) + else: # Should not happen for well-defined meta-tools + return await native_tool.func(**parsed_args) + else: # Should be async + return await asyncio.to_thread(native_tool.func, rt_ctx=self._runtime_context, **parsed_args) + except Exception as e: + logger.error(f"Error executing meta-tool '{tool_definition_name}': {e}", exc_info=True) + return {"error": f"Error in meta-tool '{tool_definition_name}': {str(e)}"} + # If not found as native, it might be an error or fall through if a server is named 'tframex_' + + # 2. Handle MCP tools (prefixed with server_alias__) + if "__" in tool_definition_name and self._runtime_context.mcp_manager: + mcp_manager = self._runtime_context.mcp_manager + logger.debug(f"Attempting to execute as MCP tool: {tool_definition_name}") + try: + mcp_call_tool_result = await mcp_manager.call_mcp_tool_by_prefixed_name( + tool_definition_name, json.loads(arguments_json_str) + ) + # Convert MCP CallToolResult to string or error dict for LLMAgent + if hasattr(mcp_call_tool_result, 'isError') and mcp_call_tool_result.isError: + error_content = "Unknown MCP tool error" + if mcp_call_tool_result.content and isinstance(mcp_call_tool_result.content[0], TextContent) and mcp_call_tool_result.content[0].text: + error_content = mcp_call_tool_result.content[0].text + return {"error": f"MCP Tool '{tool_definition_name}' error: {error_content}"} + elif mcp_call_tool_result.content and isinstance(mcp_call_tool_result.content[0], TextContent) and mcp_call_tool_result.content[0].text is not None: + return mcp_call_tool_result.content[0].text + elif mcp_call_tool_result.content and isinstance(mcp_call_tool_result.content[0], ImageContent): + return f"[Image from MCP tool '{tool_definition_name}', mime: {mcp_call_tool_result.content[0].mimeType}]" + elif mcp_call_tool_result.content and isinstance(mcp_call_tool_result.content[0], EmbeddedResource): + res = mcp_call_tool_result.content[0].resource + return f"[Resource from MCP tool '{tool_definition_name}', uri: {res.uri}, mime: {res.mimeType}]" + else: + return f"MCP Tool '{tool_definition_name}' executed, but result format not directly parsable to text for LLM." + except Exception as e: + logger.error(f"Error dispatching/executing MCP tool '{tool_definition_name}': {e}", exc_info=True) + return {"error": f"Client-side error executing MCP tool '{tool_definition_name}': {str(e)}"} + + # 3. Handle native TFrameX tools (not meta, not MCP prefixed) + native_tool = self._app.get_tool(tool_definition_name) + if native_tool: # Check if it's a non-MCP-meta native tool + logger.debug(f"Executing native TFrameX tool: {tool_definition_name}") + return await native_tool.execute(arguments_json_str) # Tool.execute expects JSON string + + logger.error(f"Engine: Tool/Function '{tool_definition_name}' not found or MCP manager unavailable for MCP tools.") + return {"error": f"Tool or function '{tool_definition_name}' could not be resolved or executed."} \ No newline at end of file