Skip to content
Merged
86 changes: 75 additions & 11 deletions adk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,36 @@ The JSON configuration for `AGENT_TOOLS` should follow this structure:
{
"tool_name": {
"url": "https://mcp-tool-endpoint:8000/mcp",
"timeout": 30 // Optional: connect timeout in seconds
"timeout": 30, // Optional: connect timeout in seconds (default: 30)
"propagate_headers": ["X-API-Key", "Authorization"] // Optional: list of headers to propagate
}
}
```

### Header Propagation

You can configure which HTTP headers are passed from the incoming A2A request to each MCP server using the `propagate_headers` field. This provides fine-grained control over which headers each MCP server receives.

**Key features:**
- **Per-server configuration**: Each MCP server can receive different headers
- **Security**: Headers are only sent to servers explicitly configured to receive them
- **Case-insensitive matching**: Header names are matched case-insensitively
- **Backward compatibility**: When `propagate_headers` is not specified, the legacy behavior is used (only `X-External-Token` is passed)

**Example configuration:**
```json5
{
"github_api": {
"url": "https://github-mcp.example.com/mcp",
"propagate_headers": ["Authorization", "X-GitHub-Token"]
},
"stripe_api": {
"url": "https://stripe-mcp.example.com/mcp",
"propagate_headers": ["X-Stripe-Key"]
},
"public_tool": {
"url": "https://public-mcp.example.com/mcp"
// No propagate_headers - only X-External-Token will be passed (legacy behavior)
}
}
```
Expand Down Expand Up @@ -102,42 +131,77 @@ Body logging behavior:
**Note**: Starlette body logging is more limited than HTTPX because it must avoid consuming request/response streams.
Bodies are only captured when already buffered in the ASGI scope.

## External API Token Passing
## HTTP Header Propagation to MCP Tools

The SDK supports passing external API tokens from A2A requests to MCP tools. This enables MCP servers to authenticate with external APIs on behalf of users.
The SDK supports passing HTTP headers from A2A requests to MCP tools. This enables MCP servers to authenticate with external APIs on behalf of users, and provides flexible header-based configuration.

### How It Works

1. **Token Capture**: When an A2A request includes the `X-External-Token` header, the SDK automatically captures and stores it in the ADK session state
2. **Secure Storage**: The token is stored in ADK's session state (not in memory state accessible to the LLM), ensuring the agent cannot directly access or leak it
3. **Automatic Injection**: When MCP tools are invoked, the SDK uses ADK's `header_provider` hook to retrieve the token from the session and inject it as the `X-External-Token` header in tool requests
1. **Header Capture**: When an A2A request is received, all HTTP headers are captured and stored in the ADK session state
2. **Secure Storage**: Headers are stored in ADK's session state (not in memory state accessible to the LLM), ensuring the agent cannot directly access or leak sensitive information
3. **Per-Server Filtering**: Each MCP server receives only the headers configured in its `propagate_headers` list
4. **Automatic Injection**: When MCP tools are invoked, the SDK uses ADK's `header_provider` hook to retrieve the configured headers from the session and inject them into tool requests

### Configuration

**Current Limitations**: The token is only passed to MCP servers. Propagation to sub-agents is not currently supported due to ADK limitations in passing custom HTTP headers in A2A requests.
Configure which headers to propagate using the `propagate_headers` field in your MCP tool configuration:

```json5
{
"weather_api": {
"url": "https://weather-mcp.example.com/mcp",
"propagate_headers": ["X-API-Key", "X-User-Location"]
},
"database_tool": {
"url": "https://db-mcp.example.com/mcp",
"propagate_headers": ["Authorization"]
}
}
```

### Usage Example

Simply include the `X-External-Token` header in your A2A requests:
Include the headers you want to propagate in your A2A requests:

```bash
curl -X POST http://localhost:8000/ \
-H "Content-Type: application/json" \
-H "X-External-Token: your-api-token-here" \
-H "X-API-Key: your-api-key" \
-H "Authorization: Bearer your-token" \
-H "X-User-Location: US-West" \
-d '{
"jsonrpc": "2.0",
"id": 1,
"method": "message/send",
"params": {
"message": {
"role": "user",
"parts": [{"kind": "text", "text": "Your message"}],
"parts": [{"kind": "text", "text": "What is the weather?"}],
"messageId": "msg-123",
"contextId": "ctx-123"
}
}
}'
```

The SDK will automatically pass `your-api-token-here` to all MCP tool calls and sub-agent requests made during that session.
Based on the configuration above:
- `weather_api` MCP server will receive `X-API-Key` and `X-User-Location` headers
- `database_tool` MCP server will receive only the `Authorization` header

### Backward Compatibility

For backward compatibility, if `propagate_headers` is not specified in the configuration, the SDK will use legacy behavior: only the `X-External-Token` header is passed to the MCP server.

```json5
{
"legacy_tool": {
"url": "https://legacy-mcp.example.com/mcp"
// No propagate_headers - only X-External-Token will be passed
}
}
```

**Limitations**: Header propagation is only supported for MCP servers. Propagation to sub-agents is not currently supported due to ADK limitations in passing custom HTTP headers in A2A requests.

### Security Considerations

Expand Down
71 changes: 68 additions & 3 deletions adk/agenticlayer/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

import logging
from typing import Callable

import httpx
from a2a.client import A2ACardResolver
Expand All @@ -17,7 +18,7 @@
from httpx_retries import Retry, RetryTransport

from agenticlayer.config import InteractionType, McpTool, SubAgent
from agenticlayer.constants import EXTERNAL_TOKEN_SESSION_KEY
from agenticlayer.constants import EXTERNAL_TOKEN_SESSION_KEY, HTTP_HEADERS_SESSION_KEY

logger = logging.getLogger(__name__)

Expand All @@ -44,6 +45,61 @@ def _get_mcp_headers_from_session(readonly_context: ReadonlyContext) -> dict[str
return {}


def _create_header_provider(propagate_headers: list[str]) -> Callable[[ReadonlyContext], dict[str, str]]:
"""Create a header provider function for a specific MCP server.

This factory function creates a header provider that filters headers based on
the MCP server's configuration. Only headers listed in propagate_headers will
be included in requests to that server.

The matching is case-insensitive: if the configuration specifies 'Authorization'
and the incoming request has 'authorization', they will match. The output header
will use the case specified in the configuration.

Example:
>>> provider = _create_header_provider(['Authorization', 'X-API-Key'])
>>> # If session has: {'authorization': 'Bearer token', 'x-api-key': 'key123'}
>>> # Output will be: {'Authorization': 'Bearer token', 'X-API-Key': 'key123'}

Note: If multiple headers with different casing match a single configured header
(e.g., both 'authorization' and 'Authorization' in stored headers), only one
will be included. The last match found will be used.

Args:
propagate_headers: List of header names to propagate to this MCP server

Returns:
A header provider function that can be passed to McpToolset
"""

def header_provider(readonly_context: ReadonlyContext) -> dict[str, str]:
"""Header provider that filters headers based on server configuration."""
if not readonly_context or not readonly_context.state:
return {}

# Get all stored headers from session
all_headers = readonly_context.state.get(HTTP_HEADERS_SESSION_KEY, {})
if not all_headers:
return {}

# Create a lowercase lookup dictionary for O(n+m) complexity instead of O(n*m)
all_headers_lower = {k.lower(): (k, v) for k, v in all_headers.items()}

# Filter to only include configured headers (case-insensitive matching)
result_headers = {}
for header_name in propagate_headers:
# Try to find the header in the stored headers (case-insensitive)
header_lower = header_name.lower()
if header_lower in all_headers_lower:
original_key, value = all_headers_lower[header_lower]
# Use the original case from the configuration
result_headers[header_name] = value

return result_headers

return header_provider


class AgentFactory:
def __init__(
self,
Expand Down Expand Up @@ -128,14 +184,23 @@ def load_tools(self, mcp_tools: list[McpTool]) -> list[ToolUnion]:
tools: list[ToolUnion] = []
for tool in mcp_tools:
logger.info(f"Loading tool {tool.model_dump_json()}")

# Use configured header provider if propagate_headers is specified,
# otherwise fall back to legacy behavior (x-external-token only)
if tool.propagate_headers is not None:
header_provider = _create_header_provider(tool.propagate_headers)
else:
# Backward compatibility: use legacy provider that only sends x-external-token
Comment thread
g3force marked this conversation as resolved.
Outdated
header_provider = _get_mcp_headers_from_session

tools.append(
McpToolset(
connection_params=StreamableHTTPConnectionParams(
url=str(tool.url),
timeout=tool.timeout,
),
# Provide header provider to inject session-stored token into tool requests
header_provider=_get_mcp_headers_from_session,
# Provide header provider to inject session-stored headers into tool requests
header_provider=header_provider,
)
)

Expand Down
29 changes: 18 additions & 11 deletions adk/agenticlayer/agent_to_a2a.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from .agent import AgentFactory
from .callback_tracer_plugin import CallbackTracerPlugin
from .config import McpTool, SubAgent
from .constants import EXTERNAL_TOKEN_SESSION_KEY
from .constants import EXTERNAL_TOKEN_SESSION_KEY, HTTP_HEADERS_SESSION_KEY

logger = logging.getLogger(__name__)

Expand All @@ -51,10 +51,10 @@ async def _prepare_session(
run_request: AgentRunRequest,
runner: Runner,
) -> Session:
"""Prepare the session and store the external token if present.
"""Prepare the session and store HTTP headers if present.

This method extends the parent implementation to capture the X-External-Token
header from the request context and store it in the session state using ADK's
This method extends the parent implementation to capture HTTP headers
from the request context and store them in the session state using ADK's
recommended approach: creating an Event with state_delta and appending it to
the session.

Expand All @@ -64,25 +64,32 @@ async def _prepare_session(
runner: The ADK runner instance

Returns:
The prepared session with the external token stored in its state
The prepared session with HTTP headers stored in its state
"""
# Call parent to get or create the session
session: Session = await super()._prepare_session(context, run_request, runner)

# Extract the X-External-Token header from the request context
# Extract HTTP headers from the request context
# The call_context.state contains headers from the original HTTP request
if context.call_context and "headers" in context.call_context.state:
headers = context.call_context.state["headers"]
# Headers might be in different cases, check all variations

# Store all headers in session state for per-MCP-server filtering
# This allows each MCP server to receive only the headers it's configured to receive
if headers:
event = Event(
author="system", actions=EventActions(state_delta={HTTP_HEADERS_SESSION_KEY: dict(headers)})
)
await runner.session_service.append_event(session, event)
logger.debug("Stored HTTP headers in session %s via state_delta", session.id)

# Keep backward compatibility: also store x-external-token separately
Comment thread
g3force marked this conversation as resolved.
Outdated
# This maintains existing behavior for code that expects EXTERNAL_TOKEN_SESSION_KEY
external_token = (
headers.get("x-external-token") or headers.get("X-External-Token") or headers.get("X-EXTERNAL-TOKEN")
)

if external_token:
# Store the token in the session state using ADK's recommended method:
# Create an Event with a state_delta and append it to the session.
# This follows ADK's pattern for updating session state as documented at:
# https://google.github.io/adk-docs/sessions/state/#how-state-is-updated-recommended-methods
event = Event(
author="system", actions=EventActions(state_delta={EXTERNAL_TOKEN_SESSION_KEY: external_token})
)
Expand Down
1 change: 1 addition & 0 deletions adk/agenticlayer/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class McpTool(BaseModel):
name: str
url: AnyHttpUrl
timeout: int = 30
propagate_headers: list[str] | None = None


def parse_sub_agents(sub_agents_config: str) -> list[SubAgent]:
Expand Down
3 changes: 3 additions & 0 deletions adk/agenticlayer/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@

# Key used to store the external token in the ADK session state
EXTERNAL_TOKEN_SESSION_KEY = "__external_token__" # nosec B105
Comment thread
g3force marked this conversation as resolved.
Outdated

# Key used to store all HTTP headers in the ADK session state
HTTP_HEADERS_SESSION_KEY = "__http_headers__" # nosec B105
Loading