Skip to content

feat: Extensible ReAct Agent Framework with A2A Protocol Support #723

@manavgup

Description

@manavgup

Summary

Design and implement an extensible, reusable ReAct agent framework that supports specialized agents (RAG, PowerPoint, Data Analysis) with inter-agent communication via the A2A (Agent-to-Agent) protocol.

Motivation

Current Limitations

The existing ReActAgent implementation in backend/rag_solution/agents/react_agent.py has several constraints:

  1. Tightly coupled to RAG: The agent hardcodes search_service dependency and RAG-specific tools
  2. No agent specialization: Cannot create domain-specific agents (PPT, Data, Email) without duplicating code
  3. No inter-agent communication: Agents cannot delegate tasks to other specialized agents
  4. Hardcoded tools: Built-in tools (rag_search, finish) are defined at class level, not configurable
  5. Single-agent workflows: No orchestration layer for multi-agent task coordination

Use Case Example

A user asks: "Create a presentation about IBM's AI strategy based on our documents"

Current approach: One generic agent with ALL tools (RAG + PPT + 20+ MCP tools) → leads to tool confusion, hallucinated tool names, and poor accuracy.

Desired approach:

  1. Orchestrator plans the workflow
  2. RAG Agent searches documents, extracts key points
  3. PPT Agent creates presentation with the findings
  4. Agents communicate via A2A protocol

Proposed Architecture

┌─────────────────────────────────────────────────────────────────────────┐
│                         AGENT FRAMEWORK                                  │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│  ┌───────────────────┐        ┌─────────────────────────────────────┐   │
│  │   AgentConfig     │        │          AgentFactory               │   │
│  │  - agent_type     │        │  - create_agent(config) → Agent     │   │
│  │  - tools          │───────►│  - register_agent_type(...)         │   │
│  │  - system_prompt  │        │  - get_available_types()            │   │
│  │  - mcp_servers    │        └─────────────────────────────────────┘   │
│  │  - max_iterations │                         │                        │
│  └───────────────────┘                         │                        │
│                                                ▼                        │
│  ┌───────────────────────────────────────────────────────────────────┐ │
│  │                      BaseReActAgent                                │ │
│  │  (Abstract base class with core ReAct loop)                        │ │
│  │                                                                    │ │
│  │  Concrete methods:                                                 │ │
│  │  - run(state) → AgentState                                         │ │
│  │  - _get_llm_response()                                             │ │
│  │  - _execute_tool()                                                 │ │
│  │  - _discover_tools()                                               │ │
│  │                                                                    │ │
│  │  Abstract methods (subclasses implement):                          │ │
│  │  + get_agent_card() → AgentCard                                    │ │
│  │  + get_builtin_tools() → list[BaseTool]                            │ │
│  │  + get_system_prompt() → str                                       │ │
│  │  + should_continue(state) → bool  [optional override]              │ │
│  └───────────────────────────────────────────────────────────────────┘ │
│                              │                                          │
│         ┌────────────────────┼────────────────────┐                    │
│         ▼                    ▼                    ▼                    │
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐        │
│  │   RAGAgent      │  │   PPTAgent      │  │   DataAgent     │        │
│  │                 │  │                 │  │                 │        │
│  │ Tools:          │  │ Tools:          │  │ Tools:          │        │
│  │ - rag_search    │  │ - create_pres   │  │ - query_db      │        │
│  │ - summarize     │  │ - add_slide     │  │ - analyze       │        │
│  │ - cite_sources  │  │ - add_text      │  │ - visualize     │        │
│  │ - finish        │  │ - save_pres     │  │ - finish        │        │
│  │                 │  │ - finish        │  │                 │        │
│  │ MCP: None       │  │ MCP: :8085      │  │ MCP: :8090      │        │
│  └─────────────────┘  └─────────────────┘  └─────────────────┘        │
│                                                                         │
├─────────────────────────────────────────────────────────────────────────┤
│                      ORCHESTRATION LAYER                                │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌───────────────────────────────────────────────────────────────────┐ │
│  │                    AgentOrchestrator                               │ │
│  │                                                                    │ │
│  │  - register_agent(agent_class)                                     │ │
│  │  - execute_workflow(task, workflow) → WorkflowResult               │ │
│  │  - plan_workflow(task_description) → Workflow  [LLM-powered]       │ │
│  │  - run_parallel(agents, tasks) → list[AgentState]                  │ │
│  │  - run_sequential(workflow) → WorkflowResult                       │ │
│  └───────────────────────────────────────────────────────────────────┘ │
│                                                                         │
│  ┌───────────────────────────────────────────────────────────────────┐ │
│  │                    A2A Protocol Adapter                            │ │
│  │                                                                    │ │
│  │  - expose_as_a2a_server(agent) → A2A Server                        │ │
│  │  - connect_to_a2a_agent(url) → A2A Client                          │ │
│  │  - send_task(client, task) → TaskResult                            │ │
│  │  - stream_updates(task_id) → AsyncIterator[TaskUpdate]             │ │
│  └───────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘

Protocol Integration

MCP vs A2A

Protocol Purpose Transport Use Case
MCP Agent ↔ Tools JSON-RPC/SSE RAG search, PPT manipulation, DB queries
A2A Agent ↔ Agent JSON-RPC/SSE RAG Agent delegating to PPT Agent

Both protocols complement each other:

  • MCP connects agents to external capabilities (tools, resources)
  • A2A enables agents to collaborate on complex tasks

A2A Core Concepts

  1. AgentCard: JSON document describing agent capabilities (similar to MCP tool schemas)
  2. Task: Unit of work with lifecycle (submitted → working → completed/failed)
  3. Message: Communication payload with typed "parts" (text, data, artifacts)

Detailed Design

1. AgentCard (A2A Compatible)

@dataclass
class AgentCard:
    """A2A-compatible agent capability advertisement."""
    
    name: str                           # Unique agent identifier
    description: str                    # Human-readable description
    version: str = "1.0.0"
    capabilities: list[str] = field(default_factory=list)  # e.g., ["search", "summarize"]
    supported_modalities: list[str] = field(default_factory=lambda: ["text"])
    mcp_servers: list[str] = field(default_factory=list)   # MCP server URLs
    metadata: dict[str, Any] = field(default_factory=dict)
    
    def to_a2a_json(self) -> dict:
        """Export as A2A-compliant Agent Card JSON."""
        return {
            "name": self.name,
            "description": self.description,
            "version": self.version,
            "capabilities": self.capabilities,
            "supportedModalities": self.supported_modalities,
            "url": f"/.well-known/agent.json",  # A2A convention
        }

2. BaseReActAgent (Abstract)

class BaseReActAgent(ABC):
    """Abstract base class for extensible ReAct agents."""
    
    def __init__(
        self,
        llm_service: BaseLLM,
        mcp_client: ResilientMCPGatewayClient | None = None,
        max_iterations: int = 10,
    ) -> None:
        self.llm_service = llm_service
        self.mcp_client = mcp_client
        self.max_iterations = max_iterations
        self.tool_registry = ToolRegistry(mcp_client)
        self.prompts = ReActPrompts(max_iterations=max_iterations)
        
        # Register built-in tools from subclass
        self._builtin_tools = {t.name: t for t in self.get_builtin_tools()}
    
    @abstractmethod
    def get_agent_card(self) -> AgentCard:
        """Return the agent's capability card."""
        pass
    
    @abstractmethod
    def get_builtin_tools(self) -> list[BaseTool]:
        """Return tools this agent provides natively."""
        pass
    
    @abstractmethod
    def get_system_prompt(self) -> str:
        """Return the agent's system prompt."""
        pass
    
    async def run(self, state: AgentState) -> AgentState:
        """Execute the ReAct loop (inherited from current implementation)."""
        # ... existing ReAct loop logic ...
        pass
    
    def should_continue(self, state: AgentState) -> bool:
        """Override to customize continuation logic."""
        return state.should_continue()

3. Specialized Agent Example (RAGAgent)

class RAGAgent(BaseReActAgent):
    """Agent specialized for document retrieval and synthesis."""
    
    def __init__(
        self,
        llm_service: BaseLLM,
        search_service: SearchService,
        mcp_client: ResilientMCPGatewayClient | None = None,
        max_iterations: int = 10,
    ) -> None:
        self.search_service = search_service
        super().__init__(llm_service, mcp_client, max_iterations)
    
    def get_agent_card(self) -> AgentCard:
        return AgentCard(
            name="rag-agent",
            description="Searches knowledge bases and synthesizes accurate answers with citations",
            capabilities=["document_search", "summarization", "citation", "question_answering"],
            supported_modalities=["text"],
        )
    
    def get_builtin_tools(self) -> list[BaseTool]:
        return [
            RAGSearchTool(self.search_service),
            FinishTool(),
        ]
    
    def get_system_prompt(self) -> str:
        return """You are a RAG (Retrieval-Augmented Generation) specialist.

Your job is to:
1. Search the knowledge base for relevant documents
2. Extract and synthesize key information
3. Provide accurate answers with proper citations

Always cite your sources with document names and page numbers when available.
Never make up information that isn't in the search results."""

4. PPTAgent Example

class PPTAgent(BaseReActAgent):
    """Agent specialized for PowerPoint presentation creation."""
    
    def __init__(
        self,
        llm_service: BaseLLM,
        mcp_client: ResilientMCPGatewayClient,  # Required - connects to PPT MCP server
        max_iterations: int = 15,
    ) -> None:
        super().__init__(llm_service, mcp_client, max_iterations)
    
    def get_agent_card(self) -> AgentCard:
        return AgentCard(
            name="ppt-agent",
            description="Creates professional PowerPoint presentations",
            capabilities=["presentation_creation", "slide_design", "content_formatting"],
            supported_modalities=["text"],
            mcp_servers=["http://localhost:8085/sse"],
        )
    
    def get_builtin_tools(self) -> list[BaseTool]:
        # Only finish tool - PPT tools come from MCP server
        return [FinishTool()]
    
    def get_system_prompt(self) -> str:
        return """You are a PowerPoint presentation specialist.

Your job is to:
1. Create professional, well-structured presentations
2. Use appropriate slide layouts for different content types
3. Ensure content is clear, concise, and visually organized

CRITICAL WORKFLOW:
1. ALWAYS call powerpoint_create_presentation FIRST to get a presentation_id
2. Pass the presentation_id to ALL subsequent tool calls
3. Call powerpoint_save_presentation at the end to save the file

Available tools: powerpoint_create_presentation, powerpoint_add_slide, 
powerpoint_add_text, powerpoint_populate_placeholder, powerpoint_save_presentation"""

5. AgentOrchestrator

class AgentOrchestrator:
    """Coordinates multiple specialized agents for complex tasks."""
    
    def __init__(self, llm_service: BaseLLM):
        self.llm_service = llm_service
        self._agent_registry: dict[str, type[BaseReActAgent]] = {}
        self._agent_instances: dict[str, BaseReActAgent] = {}
    
    def register_agent(self, agent_class: type[BaseReActAgent]) -> None:
        """Register an agent type for use in workflows."""
        # Create temporary instance to get agent card
        card = agent_class.__dict__.get('get_agent_card', lambda: None)
        # ... registration logic ...
    
    async def execute_workflow(
        self,
        task: Task,
        workflow: list[WorkflowStep],
        mode: Literal["sequential", "parallel"] = "sequential",
    ) -> WorkflowResult:
        """Execute a multi-agent workflow."""
        context = SharedContext()
        results = []
        
        if mode == "sequential":
            for step in workflow:
                agent = self._get_or_create_agent(step.agent_name)
                state = AgentState.create(
                    user_id=task.user_id,
                    collection_id=task.collection_id,
                    query=step.instruction,
                )
                # Inject context from previous agents
                state.shared_context = context
                
                result = await agent.run(state)
                results.append(result)
                
                # Update shared context with results
                context.add_result(step.agent_name, result)
        
        elif mode == "parallel":
            # Run independent tasks concurrently
            tasks = [
                self._run_agent(step, context)
                for step in workflow
            ]
            results = await asyncio.gather(*tasks)
        
        return WorkflowResult(
            task_id=task.task_id,
            steps=results,
            final_output=self._synthesize_results(results),
        )

6. Task and Workflow Models

@dataclass
class Task:
    """A2A-compatible task definition."""
    task_id: UUID
    description: str
    user_id: UUID
    collection_id: UUID | None = None
    metadata: dict[str, Any] = field(default_factory=dict)
    status: TaskStatus = TaskStatus.PENDING


@dataclass
class WorkflowStep:
    """Single step in a multi-agent workflow."""
    agent_name: str
    instruction: str
    depends_on: list[str] = field(default_factory=list)  # For parallel execution
    timeout_seconds: int = 300


@dataclass  
class SharedContext:
    """Context shared between agents in a workflow."""
    results: dict[str, Any] = field(default_factory=dict)
    artifacts: list[dict] = field(default_factory=list)
    
    def add_result(self, agent_name: str, state: AgentState) -> None:
        self.results[agent_name] = {
            "final_answer": state.final_answer,
            "search_results": state.search_results,
            "artifacts": state.artifacts,
        }
        self.artifacts.extend(state.artifacts)

File Structure

backend/rag_solution/agents/
├── __init__.py                    # Public exports
├── base/
│   ├── __init__.py
│   ├── agent.py                   # BaseReActAgent (abstract)
│   ├── agent_card.py              # AgentCard dataclass
│   ├── task.py                    # Task, WorkflowStep, SharedContext
│   └── exceptions.py              # Agent-specific exceptions
├── specialized/
│   ├── __init__.py
│   ├── rag_agent.py               # RAGAgent (refactored)
│   ├── ppt_agent.py               # PPTAgent
│   └── data_agent.py              # DataAnalysisAgent (future)
├── orchestration/
│   ├── __init__.py
│   ├── orchestrator.py            # AgentOrchestrator
│   ├── workflow.py                # Workflow execution logic
│   └── planner.py                 # LLM-powered workflow planning
├── a2a/
│   ├── __init__.py
│   ├── server.py                  # A2A server adapter
│   ├── client.py                  # A2A client for remote agents
│   └── messages.py                # A2A message types
├── tools/                         # Existing (unchanged)
│   ├── __init__.py
│   ├── base.py
│   ├── mcp_tool.py
│   └── rag_search.py
├── prompts.py                     # Enhanced prompts (add per-agent support)
├── state.py                       # Enhanced AgentState (add shared_context)
├── tool_registry.py               # Existing (unchanged)
└── react_agent.py                 # DEPRECATED - kept for backward compatibility

Implementation Plan

Phase 1: Core Abstractions (Priority: High)

  • Create base/agent.py with BaseReActAgent abstract class
  • Create base/agent_card.py with A2A-compatible AgentCard
  • Create base/task.py with Task, WorkflowStep, SharedContext
  • Enhance state.py to support shared_context
  • Add tests for base classes

Phase 2: Specialized Agents (Priority: High)

  • Refactor existing react_agent.py into specialized/rag_agent.py
  • Create specialized/ppt_agent.py for PowerPoint creation
  • Ensure backward compatibility with existing ReActAgent usage
  • Add integration tests for specialized agents

Phase 3: Orchestration Layer (Priority: Medium)

  • Create orchestration/orchestrator.py
  • Implement sequential workflow execution
  • Implement parallel workflow execution
  • Create orchestration/planner.py for LLM-powered workflow planning
  • Add end-to-end tests for orchestrated workflows

Phase 4: A2A Protocol Integration (Priority: Low)

  • Add a2a-python as optional dependency
  • Create a2a/server.py to expose agents as A2A servers
  • Create a2a/client.py to connect to remote A2A agents
  • Add A2A integration tests

API Examples

Creating a Specialized Agent

from rag_solution.agents.specialized import RAGAgent, PPTAgent
from rag_solution.agents.base import AgentState

# Create RAG agent
rag_agent = RAGAgent(
    llm_service=llm_service,
    search_service=search_service,
)

# Run agent
state = AgentState.create(
    user_id=user_id,
    collection_id=collection_id,
    query="What is IBM's AI strategy?"
)
result = await rag_agent.run(state)
print(result.final_answer)

Orchestrating Multiple Agents

from rag_solution.agents.orchestration import AgentOrchestrator
from rag_solution.agents.base import Task, WorkflowStep

# Create orchestrator
orchestrator = AgentOrchestrator(llm_service)
orchestrator.register_agent(RAGAgent)
orchestrator.register_agent(PPTAgent)

# Define workflow
task = Task(
    task_id=uuid4(),
    description="Create presentation about IBM AI strategy",
    user_id=user_id,
    collection_id=collection_id,
)

workflow = [
    WorkflowStep(
        agent_name="rag-agent",
        instruction="Search for IBM AI strategy information and extract key points"
    ),
    WorkflowStep(
        agent_name="ppt-agent",
        instruction="Create a 5-slide presentation with the findings",
        depends_on=["rag-agent"]  # Uses RAG results as context
    ),
]

# Execute
result = await orchestrator.execute_workflow(task, workflow)
print(f"Presentation saved: {result.artifacts[0]['path']}")

Success Criteria

  1. Backward Compatibility: Existing ReActAgent usage continues to work
  2. Type Safety: Full type hints with mypy compliance
  3. Test Coverage: >80% coverage for new code
  4. Documentation: Docstrings and usage examples
  5. Performance: No regression in single-agent execution time

Related Issues

References


/cc @manavgup

Metadata

Metadata

Assignees

No one assigned

    Labels

    agentsAgentic AI featuresarchitectureArchitectural decisions and design choicesenhancementNew feature or request

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions