Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ test: build


.PHONY: check
check: build test
uv run mypy .
check: build
uv run ruff check
uv run mypy .
uv run bandit -c pyproject.toml -r .
make test


.PHONY: check-fix
Expand Down
110 changes: 110 additions & 0 deletions adk/agenticlayer/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""
Convert Sub Agents and Tools into RemoteA2aAgents, AgentTools and McpToolsets.
"""

import logging

import httpx
from a2a.client import A2ACardResolver
from a2a.utils.constants import AGENT_CARD_WELL_KNOWN_PATH
from google.adk.agents import BaseAgent, LlmAgent
from google.adk.agents.llm_agent import ToolUnion
from google.adk.agents.remote_a2a_agent import RemoteA2aAgent
from google.adk.tools.agent_tool import AgentTool
from google.adk.tools.mcp_tool import StreamableHTTPConnectionParams
from google.adk.tools.mcp_tool.mcp_toolset import McpToolset
from httpx_retries import Retry, RetryTransport

from agenticlayer.config import InteractionType, McpTool, SubAgent

logger = logging.getLogger(__name__)


class AgentFactory:
def __init__(
self,
timeout: httpx.Timeout = httpx.Timeout(timeout=10),
retry: Retry = Retry(total=10, backoff_factor=0.5, max_backoff_wait=15),
) -> None:
self.timeout = timeout
self.transport = RetryTransport(retry=retry)

async def load_agent(self, agent: LlmAgent, sub_agents: list[SubAgent], tools: list[McpTool]) -> LlmAgent:
"""
Load Sub Agents and Tools into the given agent.

:param agent: The root agent to load sub agents and tools into
:param sub_agents: The sub agents to load
:param tools: The tools to load
:return: The agent with loaded sub agents and tools
"""

agents, agent_tools = await self.load_sub_agents(sub_agents)
mcp_tools = self.load_tools(tools)
all_tools: list[ToolUnion] = agent_tools + mcp_tools

# The ADK currently only adds the agent as a function with the agent name to the instructions.
# The description is not included. So we manually add the descriptions here.
if agent_tools:
agent_tool_instructions = "\n\nFollowing agents are available as tools:\n"
agent_tool_instructions += "\n".join(
[f"- '{agent_tool.name}': {agent_tool.description}" for agent_tool in agent_tools]
)
agent_tool_instructions += "\nYou can use them by calling the tool with the agent name.\n"
agent.instruction = f"{agent.instruction}{agent_tool_instructions}"

agent.sub_agents += agents
agent.tools += all_tools
return agent

async def load_sub_agents(self, sub_agents: list[SubAgent]) -> tuple[list[BaseAgent], list[AgentTool]]:
"""
Convert Sub Agents into RemoteA2aAgents and AgentTools.

:param sub_agents: The sub agents to load
:return: A tuple of:
- list of sub agents for transfer interaction type
- list of agent tools for tool_call interaction type
"""

agents: list[BaseAgent] = []
tools: list[AgentTool] = []
for sub_agent in sub_agents:
base_url = str(sub_agent.url).replace(AGENT_CARD_WELL_KNOWN_PATH, "")
async with httpx.AsyncClient(transport=self.transport, timeout=self.timeout) as client:
resolver = A2ACardResolver(
httpx_client=client,
base_url=base_url,
)
agent_card = await resolver.get_agent_card()
agent = RemoteA2aAgent(name=sub_agent.name, agent_card=agent_card)
# Set description from agent card, as this is currently done lazy on first RPC call to agent by ADK
agent.description = agent_card.description
if sub_agent.interaction_type == InteractionType.TOOL_CALL:
tools.append(AgentTool(agent=agent))
else:
agents.append(agent)

return agents, tools

def load_tools(self, mcp_tools: list[McpTool]) -> list[ToolUnion]:
"""
Convert Tools into McpToolsets.

:param mcp_tools: The tools to load
:return: A list of McpToolset tools
"""

tools: list[ToolUnion] = []
for tool in mcp_tools:
logger.info(f"Loading tool {tool.model_dump_json()}")
tools.append(
McpToolset(
connection_params=StreamableHTTPConnectionParams(
url=str(tool.url),
timeout=tool.timeout,
),
)
)

return tools
97 changes: 74 additions & 23 deletions adk/agenticlayer/agent_to_a2a.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@
This is an adaption of google.adk.a2a.utils.agent_to_a2a.
"""

import contextlib
import logging
import os
from typing import AsyncIterator, Awaitable, Callable

from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import AgentCapabilities, AgentCard
from a2a.utils.constants import AGENT_CARD_WELL_KNOWN_PATH
from google.adk.a2a.executor.a2a_agent_executor import A2aAgentExecutor
from google.adk.agents import LlmAgent
from google.adk.agents.base_agent import BaseAgent
from google.adk.apps.app import App
from google.adk.artifacts.in_memory_artifact_service import InMemoryArtifactService
Expand All @@ -21,9 +24,11 @@
from google.adk.sessions.in_memory_session_service import InMemorySessionService
from starlette.applications import Starlette

from .agent import AgentFactory
from .callback_tracer_plugin import CallbackTracerPlugin
from .config import McpTool, SubAgent

logger = logging.getLogger("agenticlayer")
logger = logging.getLogger(__name__)


class HealthCheckFilter(logging.Filter):
Expand All @@ -32,28 +37,16 @@ def filter(self, record: logging.LogRecord) -> bool:
return record.getMessage().find(AGENT_CARD_WELL_KNOWN_PATH) == -1


def to_a2a(agent: BaseAgent, rpc_url: str) -> Starlette:
"""Convert an ADK agent to a A2A Starlette application.
This is an adaption of google.adk.a2a.utils.agent_to_a2a.
async def create_a2a_app(agent: BaseAgent, rpc_url: str) -> A2AStarletteApplication:
"""Create an A2A Starlette application from an ADK agent.

Args:
agent: The ADK agent to convert
rpc_url: The URL where the agent will be available for A2A communication

Returns:
A Starlette application that can be run with uvicorn

Example:
agent = MyAgent()
rpc_url = "http://localhost:8000/"
app = to_a2a(root_agent, rpc_url)
# Then run with: uvicorn module:app
An A2AStarletteApplication instance
"""

# Filter out health check logs from uvicorn access logger
uvicorn_access_logger = logging.getLogger("uvicorn.access")
uvicorn_access_logger.addFilter(HealthCheckFilter())

async def create_runner() -> Runner:
"""Create a runner for the agent."""
return Runner(
Expand Down Expand Up @@ -92,18 +85,76 @@ async def create_runner() -> Runner:
logger.info("Built agent card: %s", agent_card.model_dump_json())

# Create the A2A Starlette application
a2a_app = A2AStarletteApplication(
return A2AStarletteApplication(
agent_card=agent_card,
http_handler=request_handler,
)

# Create a Starlette app that will be configured during startup
starlette_app = Starlette()

# Add A2A routes to the main app
a2a_app.add_routes_to_app(
starlette_app,
)
def to_a2a(
agent: LlmAgent,
rpc_url: str,
sub_agents: list[SubAgent] | None = None,
tools: list[McpTool] | None = None,
agent_factory: AgentFactory | None = None,
) -> Starlette:
"""Convert an ADK agent to a Starlette application.
Resolves sub-agents and tools while starting the application.

Args:
:param agent: The ADK agent to convert
:param rpc_url: The URL where the agent will be available for A2A communication
:param sub_agents: The sub agents to add to the agent
:param tools: The tools to add to the agent
:param agent_factory: Agent factory to use for loading sub-agents and tools

Returns:
A Starlette application that can be run with uvicorn

Example:
agent = MyAgent()
rpc_url = "http://localhost:8000/"
app = to_a2a(root_agent, rpc_url)
# Then run with: uvicorn module:app
"""

agent_factory = agent_factory or AgentFactory()

async def a2a_app_creator() -> A2AStarletteApplication:
configured_agent = await agent_factory.load_agent(
agent=agent,
sub_agents=sub_agents or [],
tools=tools or [],
)
return await create_a2a_app(configured_agent, rpc_url)

return to_starlette(a2a_app_creator)


def to_starlette(a2a_starlette: Callable[[], Awaitable[A2AStarletteApplication]]) -> Starlette:
"""Convert an ADK agent to a A2A Starlette application.
This is inspired by google.adk.a2a.utils.agent_to_a2a.

Args:
:param a2a_starlette: A callable that creates an A2AStarletteApplication asynchronously during startup.

Returns:
A Starlette application that can be run with uvicorn
"""

# Filter out health check logs from uvicorn access logger
uvicorn_access_logger = logging.getLogger("uvicorn.access")
uvicorn_access_logger.addFilter(HealthCheckFilter())

@contextlib.asynccontextmanager
async def lifespan(app: Starlette) -> AsyncIterator[None]:
a2a_app = await a2a_starlette()
# Add A2A routes to the main app
a2a_app.add_routes_to_app(app)
yield

# Create a Starlette app that will be configured during startup
starlette_app = Starlette(lifespan=lifespan)

# Instrument the Starlette app with OpenTelemetry
# env needs to be set here since _excluded_urls is initialized at module import time
Expand Down
76 changes: 36 additions & 40 deletions adk/agenticlayer/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,29 @@
"""

import json
import logging
from enum import Enum

from google.adk.agents import BaseAgent
from google.adk.agents.llm_agent import ToolUnion
from google.adk.agents.remote_a2a_agent import RemoteA2aAgent
from google.adk.tools.agent_tool import AgentTool
from google.adk.tools.mcp_tool import StreamableHTTPConnectionParams
from google.adk.tools.mcp_tool.mcp_toolset import McpToolset
from pydantic import AnyHttpUrl, BaseModel


def parse_sub_agents(sub_agents_config: str) -> tuple[list[BaseAgent], list[ToolUnion]]:
class InteractionType(str, Enum):
TOOL_CALL = "tool_call"
TRANSFER = "transfer"


class SubAgent(BaseModel):
name: str
url: AnyHttpUrl
interaction_type: InteractionType = InteractionType.TOOL_CALL


class McpTool(BaseModel):
name: str
url: AnyHttpUrl
timeout: int = 30


def parse_sub_agents(sub_agents_config: str) -> list[SubAgent]:
"""
Get sub agents from JSON string.
Format: {"agent_name": {"url": "http://agent_url", "interaction_type", "transfer|tool_call"}, ...}
Expand All @@ -29,25 +41,17 @@ def parse_sub_agents(sub_agents_config: str) -> tuple[list[BaseAgent], list[Tool
except json.JSONDecodeError as e:
raise ValueError("Warning: Invalid JSON in SUB_AGENTS environment variable: " + sub_agents_config, e)

sub_agents: list[BaseAgent] = []
tools: list[ToolUnion] = []
for agent_name, config in agents_map.items():
if "url" not in config:
raise ValueError(f"Missing 'url' for agent '{agent_name}': " + str(config))

interaction_type = config.get("interaction_type", "tool_call")

logging.info("Adding sub-agent: %s (%s) with URL: %s", agent_name, interaction_type, config["url"])
agent = RemoteA2aAgent(name=agent_name, agent_card=config["url"])
if interaction_type == "tool_call":
tools.append(AgentTool(agent=agent))
else:
sub_agents.append(agent)

return sub_agents, tools
return [
SubAgent(
name=agent_name,
url=config["url"],
interaction_type=InteractionType(config.get("interaction_type")),
)
for agent_name, config in agents_map.items()
]


def parse_tools(tools_config: str) -> list[ToolUnion]:
def parse_tools(tools_config: str) -> list[McpTool]:
"""
Get tools from JSON string.
Format: {"tool_name": {"url": "http://tool_url", "timeout": 30}, ...}
Expand All @@ -60,19 +64,11 @@ def parse_tools(tools_config: str) -> list[ToolUnion]:
except json.JSONDecodeError as e:
raise ValueError("Warning: Invalid JSON in AGENT_TOOLS environment variable: " + tools_config, e)

tools: list[ToolUnion] = []
for name, config in tools_map.items():
if "url" not in config:
raise ValueError(f"Missing 'url' for tool '{name}': " + str(config))

logging.info("Adding tool: %s with URL: %s", name, config["url"])
tools.append(
McpToolset(
connection_params=StreamableHTTPConnectionParams(
url=config["url"],
timeout=config.get("timeout", 30),
),
)
return [
McpTool(
name=name,
url=config["url"],
timeout=config.get("timeout", 30),
)

return tools
for name, config in tools_map.items()
]
1 change: 1 addition & 0 deletions adk/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ dependencies = [
"opentelemetry-instrumentation-starlette",
"openinference-instrumentation-google-adk",
"opentelemetry-instrumentation-httpx",
"httpx-retries>=0.4.5",
]

[build-system]
Expand Down
Loading