|
10 | 10 | from agenticlayer.agent_to_a2a import to_a2a |
11 | 11 | from agenticlayer.config import InteractionType, McpTool, SubAgent |
12 | 12 | from asgi_lifespan import LifespanManager |
13 | | -from fastmcp import FastMCP |
| 13 | +from fastmcp import Context, FastMCP |
14 | 14 | from httpx_retries import Retry |
15 | 15 | from pydantic import AnyHttpUrl |
16 | 16 | from starlette.testclient import TestClient |
@@ -342,3 +342,104 @@ def add(a: int, b: int) -> int: |
342 | 342 |
|
343 | 343 | assert history[4]["role"] == "agent" |
344 | 344 | assert history[4]["parts"] == [{"kind": "text", "text": "The calculation result is correct!"}] |
| 345 | + |
| 346 | + @pytest.mark.asyncio |
| 347 | + async def test_external_token_passed_to_mcp_tools( |
| 348 | + self, |
| 349 | + app_factory: Any, |
| 350 | + agent_factory: Any, |
| 351 | + llm_controller: LLMMockController, |
| 352 | + respx_mock: respx.MockRouter, |
| 353 | + ) -> None: |
| 354 | + """Test that X-External-Token header is passed from A2A request to MCP tool calls. |
| 355 | +
|
| 356 | + Verifies end-to-end token passing through the agent to MCP servers. |
| 357 | + """ |
| 358 | + |
| 359 | + # Given: Mock LLM to call 'echo' tool |
| 360 | + llm_controller.respond_with_tool_call( |
| 361 | + pattern="", # Match any message |
| 362 | + tool_name="echo", |
| 363 | + tool_args={"message": "test"}, |
| 364 | + final_message="Echo completed!", |
| 365 | + ) |
| 366 | + |
| 367 | + # Given: MCP server with 'echo' tool that can access headers via Context |
| 368 | + mcp = FastMCP("TokenVerifier") |
| 369 | + received_headers: list[dict[str, str]] = [] |
| 370 | + received_tokens_in_tool: list[str | None] = [] |
| 371 | + |
| 372 | + @mcp.tool() |
| 373 | + def echo(message: str, ctx: Context) -> str: |
| 374 | + """Echo a message and verify token is accessible in tool context.""" |
| 375 | + # Access headers from the MCP request context |
| 376 | + # The Context object provides access to the request_context which includes HTTP headers |
| 377 | + if ctx.request_context and hasattr(ctx.request_context, "request"): |
| 378 | + # Try to get the token from request headers if available |
| 379 | + request = ctx.request_context.request |
| 380 | + if request and hasattr(request, "headers"): |
| 381 | + token = request.headers.get("x-external-token") or request.headers.get("X-External-Token") |
| 382 | + received_tokens_in_tool.append(token) |
| 383 | + return f"Echoed: {message}" |
| 384 | + |
| 385 | + mcp_server_url = "http://test-mcp-token.local" |
| 386 | + mcp_app = mcp.http_app(path="/mcp") |
| 387 | + |
| 388 | + async with LifespanManager(mcp_app) as mcp_manager: |
| 389 | + # Create a custom handler that captures headers |
| 390 | + async def handler_with_header_capture(request: httpx.Request) -> httpx.Response: |
| 391 | + # Capture the headers from the request |
| 392 | + received_headers.append(dict(request.headers)) |
| 393 | + |
| 394 | + # Forward to the MCP app |
| 395 | + transport = httpx.ASGITransport(app=mcp_manager.app) |
| 396 | + async with httpx.AsyncClient(transport=transport, base_url=mcp_server_url) as client: |
| 397 | + return await client.request( |
| 398 | + method=request.method, |
| 399 | + url=str(request.url), |
| 400 | + headers=request.headers, |
| 401 | + content=request.content, |
| 402 | + ) |
| 403 | + |
| 404 | + # Route MCP requests through our custom handler |
| 405 | + respx_mock.route(host="test-mcp-token.local").mock(side_effect=handler_with_header_capture) |
| 406 | + |
| 407 | + # When: Create agent with MCP tool and send request with X-External-Token header |
| 408 | + test_agent = agent_factory("test_agent") |
| 409 | + tools = [McpTool(name="verifier", url=AnyHttpUrl(f"{mcp_server_url}/mcp"), timeout=30)] |
| 410 | + external_token = "secret-api-token-12345" # nosec B105 |
| 411 | + |
| 412 | + async with app_factory(test_agent, tools=tools) as app: |
| 413 | + client = TestClient(app) |
| 414 | + user_message = "Echo test message" |
| 415 | + response = client.post( |
| 416 | + "", |
| 417 | + json=create_send_message_request(user_message), |
| 418 | + headers={"X-External-Token": external_token}, |
| 419 | + ) |
| 420 | + |
| 421 | + # Then: Verify response is successful |
| 422 | + assert response.status_code == 200 |
| 423 | + result = verify_jsonrpc_response(response.json()) |
| 424 | + assert result["status"]["state"] == "completed", "Task should complete successfully" |
| 425 | + |
| 426 | + # Then: Verify X-External-Token header was passed to MCP server |
| 427 | + assert len(received_headers) > 0, "MCP server should have received requests" |
| 428 | + |
| 429 | + # Find the tool call request (not the initialization requests) |
| 430 | + # Header keys might be lowercase |
| 431 | + tool_call_headers = [h for h in received_headers if "x-external-token" in h or "X-External-Token" in h] |
| 432 | + assert len(tool_call_headers) > 0, ( |
| 433 | + f"At least one request should have X-External-Token header. " |
| 434 | + f"Received {len(received_headers)} requests total." |
| 435 | + ) |
| 436 | + |
| 437 | + # Verify the token value |
| 438 | + for headers in tool_call_headers: |
| 439 | + # Header might be lowercase in the dict |
| 440 | + token_value = headers.get("X-External-Token") or headers.get("x-external-token") |
| 441 | + assert token_value == external_token, ( |
| 442 | + f"Expected token '{external_token}', got '{token_value}'" |
| 443 | + ) |
| 444 | + |
| 445 | + |
0 commit comments