A powerful, framework-agnostic multi-agent system that orchestrates AI agents across different frameworks with unified interfaces, memory management, and flexible workflow execution.
The system is built using the Adapter Pattern + Strategy Pattern + Factory Pattern combination to achieve:
- π Framework Agnostic: Support for Google ADK, LangGraph, CrewAI, and easy extension to new frameworks
- β‘ Dynamic Switching: Runtime framework switching with automatic fallback
- π Multiple Workflows: Hierarchical, sequential, parallel, and custom workflow execution
- π― Unified Interface: Consistent API regardless of underlying framework
- π§ Memory Management: Persistent conversation and context memory across sessions
- π Session Management: Multi-user session handling with group chat support
graph TB
%% User Layer
User[π€ User/Application] --> API[π Multi-Agent API]
%% Core Coordination Layer
API --> Coordinator[π― MultiAgentCoordinator]
Coordinator --> Registry[π AdapterRegistry]
Coordinator --> WorkflowEngine[βοΈ WorkflowEngine]
Coordinator --> SessionMgr[π SessionManager]
Coordinator --> MemoryMgr[π§ MemoryManager]
Coordinator --> MCPMgr[π MCPToolManager]
%% Framework Adapters Layer
Registry --> GoogleADK[π¦ GoogleADKAdapter]
Registry --> LangGraph[π© LangGraphAdapter]
Registry --> CrewAI[π¨ CrewAIAdapter]
%% AI Frameworks
GoogleADK --> GoogleSDK[Google ADK Framework]
LangGraph --> LangGraphSDK[LangGraph Framework]
CrewAI --> CrewAISDK[CrewAI Framework]
%% Workflow Types
WorkflowEngine --> Single[π€ Single Agent]
WorkflowEngine --> Hierarchical[π’ Hierarchical]
WorkflowEngine --> Sequential[β‘οΈ Sequential]
WorkflowEngine --> Parallel[β‘ Parallel]
- π Multi-Framework Support: Seamlessly switch between Google ADK, LangGraph, CrewAI
- π― Unified API: Consistent interface regardless of underlying framework
- π§ Smart Memory: Persistent conversation memory with importance scoring
- π Session Management: Multi-user sessions with group chat support
- βοΈ Flexible Workflows: Single, hierarchical, sequential, and parallel execution
- π Real-time Streaming: Live updates during task execution
- β‘ Batch Processing: Efficient handling of multiple tasks
- π§ Tool Integration: Built-in support for external tools and APIs
- π MCP Tools: Model Context Protocol integration for external tool access
- π Knowledge Bases: Query and integrate with knowledge repositories
- π Capability Detection: Automatic adapter selection based on requirements
- π Comprehensive Monitoring: Detailed metrics and health monitoring
- π‘οΈ Error Handling: Robust error handling with automatic fallbacks
- π Extensible Architecture: Easy to add new frameworks and capabilities
# Clone the repository
git clone <repository-url>
cd tgo-agent-coordinator
# Install dependencies
poetry install
# Run the example
python example.pyimport asyncio
from tgo.agents import (
MultiAgentCoordinator, AdapterRegistry, GoogleADKAdapter,
InMemoryMemoryManager, InMemorySessionManager
)
from tgo.agents.core.models import (
MultiAgentConfig, AgentConfig, Task, WorkflowConfig, Session
)
from tgo.agents.core.enums import (
AgentType, WorkflowType, ExecutionStrategy, SessionType
)
async def main():
# 1. Initialize system components
memory_manager = InMemoryMemoryManager()
session_manager = InMemorySessionManager()
registry = AdapterRegistry()
registry.register("google-adk", GoogleADKAdapter())
coordinator = MultiAgentCoordinator(
registry=registry,
memory_manager=memory_manager,
session_manager=session_manager
)
# 2. Create session
session = await session_manager.create_session(
session_id="session_001",
user_id="user_123",
session_type=SessionType.SINGLE_CHAT
)
# 3. Configure multi-agent team (Manager + Experts)
config = MultiAgentConfig(
framework="google-adk",
agents=[
# Manager Agent - coordinates the team
AgentConfig(
agent_id="project_manager",
name="Project Manager",
agent_type=AgentType.MANAGER,
model="gemini-2.0-flash",
instructions="You coordinate tasks between expert agents and synthesize their results."
),
# Research Expert
AgentConfig(
agent_id="researcher",
name="Research Specialist",
agent_type=AgentType.EXPERT,
model="gemini-2.0-flash",
instructions="You are a research expert. Provide thorough market analysis and data insights."
),
# Writing Expert
AgentConfig(
agent_id="writer",
name="Content Writer",
agent_type=AgentType.EXPERT,
model="gemini-2.0-flash",
instructions="You are a content writer. Create clear, engaging reports from research data."
)
],
workflow=WorkflowConfig(
workflow_type=WorkflowType.HIERARCHICAL, # Manager coordinates experts
execution_strategy=ExecutionStrategy.FAIL_FAST,
manager_agent_id="project_manager",
expert_agent_ids=["researcher", "writer"]
)
)
# 4. Create task for the team
task = Task(
title="AI Market Analysis Report",
description="Create a comprehensive report on current AI market trends, including key players, growth projections, and emerging technologies."
)
# 5. Execute multi-agent workflow
print("π Starting multi-agent collaboration...")
result = await coordinator.execute_task(config, task, session)
if result.is_successful():
print("β
Multi-agent task completed successfully!")
print(f"π Final Result: {result.result}")
print(f"π₯ Agents involved: {', '.join(result.agents_used)}")
else:
print(f"β Task failed: {result.error_message}")
if __name__ == "__main__":
asyncio.run(main())π What happens in this multi-agent workflow:
- Project Manager receives the task and breaks it down into subtasks
- Research Specialist analyzes market data and trends
- Content Writer creates the final report structure
- Project Manager synthesizes all results into a comprehensive report
This demonstrates true multi-agent collaboration where different specialists work together under coordination.
tgo/agents/
βββ core/ # ποΈ Core abstractions
β βββ interfaces.py # Core interfaces and protocols
β βββ models.py # Data models and schemas
β βββ enums.py # Enumerations
β βββ exceptions.py # Exception classes
βββ registry/ # π Adapter registry
β βββ adapter_registry.py # Framework adapter registry
βββ adapters/ # π Framework adapters
β βββ base_adapter.py # Base adapter implementation
β βββ google_adk_adapter.py # Google ADK integration
β βββ langgraph_adapter.py # LangGraph integration
β βββ crewai_adapter.py # CrewAI integration
βββ coordinator/ # π― Multi-agent coordination
β βββ multi_agent_coordinator.py # Main coordinator
β βββ workflow_engine.py # Workflow execution engine
β βββ task_executor.py # Task execution logic
β βββ result_aggregator.py # Result aggregation
βββ memory/ # π§ Memory management
β βββ memory_manager.py # Memory management implementation
β βββ session_manager.py # Session management
βββ tools/ # π MCP tools integration
β βββ mcp_tool_manager.py # MCP tool manager
β βββ mcp_connector.py # MCP protocol connector
β βββ mcp_tool_proxy.py # Framework tool adapter
β βββ mcp_security_manager.py # Security controls
βββ example.py # π Complete usage example
βββ basic_session_memory_example.py # π§ Memory & session example
Centralized management of AI framework adapters with dynamic discovery:
registry = AdapterRegistry()
registry.register("google-adk", GoogleADKAdapter(), is_default=True)
registry.register("langgraph", LangGraphAdapter())
registry.register("crewai", CrewAIAdapter())
# Get adapter by capability
adapter = registry.get_adapter_by_capability(FrameworkCapability.STREAMING)Orchestrates multi-agent task execution with memory and session management:
# Initialize with memory and session managers in constructor
coordinator = MultiAgentCoordinator(
registry=registry,
memory_manager=memory_manager,
session_manager=session_manager
)
# Execute with session context
result = await coordinator.execute_task(config, task, session)Unified interface to different AI frameworks with capability detection:
- π¦ GoogleADKAdapter: Google Agent Development Kit integration
- π© LangGraphAdapter: LangGraph framework integration
- π¨ CrewAIAdapter: CrewAI framework integration
Flexible execution patterns with streaming and batch support:
- π€ Single: Single agent execution
- π’ Hierarchical: Manager-expert coordination
- β‘οΈ Sequential: Pipeline-style execution
- β‘ Parallel: Concurrent execution
- π¨ Custom: User-defined workflows
Persistent context and conversation memory:
# Store conversation memory
await memory_manager.store_memory(
session_id="session_123",
content="User prefers detailed explanations",
memory_type="preference",
session_type=SessionType.SINGLE_CHAT
)
# Retrieve relevant memories
memories = await memory_manager.retrieve_memories(
session_id="session_123",
limit=5,
min_importance=0.3
)Unified tool configuration supporting both function tools and MCP tools in one array:
from tgo.agents.core.models import MCPTool
# Define function tools
def calculate_metrics(revenue: float, growth_rate: float) -> dict:
"""Calculate business metrics."""
return {
"projected_revenue": revenue * (1 + growth_rate),
"growth_percentage": growth_rate * 100
}
async def fetch_data(source: str) -> str:
"""Async function tool for data fetching."""
# Simulate async data fetching
await asyncio.sleep(0.1)
return f"Data from {source}"
# Define MCP tools
web_search_tool = MCPTool(
name="web_search",
description="Search the web for information",
input_schema={
"type": "object",
"properties": {
"query": {"type": "string"},
"max_results": {"type": "integer", "default": 5}
},
"required": ["query"]
},
server_id="web_api"
)
file_reader_tool = MCPTool(
name="read_file",
description="Read content from a file",
input_schema={
"type": "object",
"properties": {
"file_path": {"type": "string"}
},
"required": ["file_path"]
},
server_id="filesystem"
)
# Configure agent with elegant mixed tools
agent_config = AgentConfig(
agent_id="research_agent",
name="Research Agent",
agent_type=AgentType.EXPERT,
model="gemini-2.0-flash",
instructions="You have access to calculation functions, data fetching, web search, and file operations.",
tools=[
calculate_metrics, # Function tool
fetch_data, # Async function tool
web_search_tool, # MCP tool
file_reader_tool # MCP tool
] # Elegant: All tools in one array!
)
# Tool type detection
print(f"Function tools: {len(agent_config.get_function_tools())}") # 2
print(f"MCP tools: {len(agent_config.get_mcp_tools())}") # 2
print(f"Has MCP tools: {agent_config.has_mcp_tools()}") # Trueimport asyncio
from tgo.agents import MultiAgentCoordinator, AdapterRegistry, GoogleADKAdapter
from tgo.agents.core.models import MultiAgentConfig, AgentConfig, Task, WorkflowConfig
from tgo.agents.core.enums import AgentType, WorkflowType, ExecutionStrategy
async def single_agent_example():
# Setup with memory and session management
memory_manager = InMemoryMemoryManager()
session_manager = InMemorySessionManager()
registry = AdapterRegistry()
registry.register("google-adk", GoogleADKAdapter())
coordinator = MultiAgentCoordinator(
registry=registry,
memory_manager=memory_manager,
session_manager=session_manager
)
# Create session
session = await session_manager.create_session(
session_id="session_001",
user_id="user_123",
session_type=SessionType.SINGLE_CHAT
)
# Configure single agent
config = MultiAgentConfig(
framework="google-adk",
agents=[
AgentConfig(
agent_id="analyst_001",
name="Market Analyst",
agent_type=AgentType.EXPERT,
model="gemini-2.0-flash",
instructions="You are a market analyst. Provide detailed insights."
)
],
workflow=WorkflowConfig(
workflow_type=WorkflowType.SINGLE,
execution_strategy=ExecutionStrategy.FAIL_FAST
)
)
# Execute task with session
task = Task(
title="Analyze AI Market Trends",
description="Provide comprehensive analysis of AI market trends"
)
result = await coordinator.execute_task(config, task, session)
print(f"Result: {result.result}")
asyncio.run(single_agent_example())async def hierarchical_example():
# Configure hierarchical system with manager and experts
config = MultiAgentConfig(
framework="google-adk",
agents=[
AgentConfig(
agent_id="manager_001",
name="Project Manager",
agent_type=AgentType.MANAGER,
instructions="Coordinate tasks between expert agents"
),
AgentConfig(
agent_id="researcher_001",
name="Research Expert",
agent_type=AgentType.EXPERT,
instructions="Provide thorough research and analysis"
),
AgentConfig(
agent_id="writer_001",
name="Technical Writer",
agent_type=AgentType.EXPERT,
instructions="Create clear technical documentation"
)
],
workflow=WorkflowConfig(
workflow_type=WorkflowType.HIERARCHICAL,
manager_agent_id="manager_001",
expert_agent_ids=["researcher_001", "writer_001"]
)
)
# Complex task requiring coordination
task = Task(
title="Create AI Implementation Guide",
description="Research and create comprehensive AI implementation guide"
)
# Execute with streaming updates
async for update in coordinator.execute_task_stream(config, task):
print(f"Update: {update}")
asyncio.run(hierarchical_example())from tgo.agents import (
MultiAgentCoordinator, AdapterRegistry, GoogleADKAdapter,
InMemoryMemoryManager, InMemorySessionManager
)
from tgo.agents.core.models import Session
from tgo.agents.core.enums import SessionType
async def memory_example():
# Setup with memory and session management
memory_manager = InMemoryMemoryManager()
session_manager = InMemorySessionManager()
registry = AdapterRegistry()
registry.register("google-adk", GoogleADKAdapter())
coordinator = MultiAgentCoordinator(
registry=registry,
memory_manager=memory_manager,
session_manager=session_manager
)
# Create session
session = await session_manager.create_session(
session_id="session_123",
user_id="user_456",
session_type=SessionType.SINGLE_CHAT
)
# Store context memory
await memory_manager.store_memory(
session_id="session_123",
content="User prefers detailed technical explanations",
memory_type="preference",
session_type=SessionType.SINGLE_CHAT
)
# Execute task with memory context
result = await coordinator.execute_task(config, task, session)
asyncio.run(memory_example())from tgo.agents import (
MultiAgentCoordinator, AdapterRegistry, GoogleADKAdapter,
MCPToolManager, MCPServerConfig, InMemoryMemoryManager, InMemorySessionManager
)
from tgo.agents.core.models import MultiAgentConfig, AgentConfig, Task, WorkflowConfig
from tgo.agents.core.enums import AgentType, WorkflowType
async def mcp_tools_example():
# Configure MCP servers
config = {
"mcpServers": {
# A remote HTTP server
"weather": {
"url": "https://weather-api.example.com/mcp",
"transport": "streamable-http"
},
# A local server running via stdio
"math_server": {
"command": "python",
"args": ["./fastmcp_simple_server.py"],
"env": {"DEBUG": "true"}
}
}
}
# Setup MCP tool manager
mcp_manager = MCPToolManager(config)
# Setup coordinator with MCP support
registry = AdapterRegistry()
registry.register("google-adk", GoogleADKAdapter())
coordinator = MultiAgentCoordinator(
registry=registry,
memory_manager=InMemoryMemoryManager(),
session_manager=InMemorySessionManager(),
mcp_tool_manager=mcp_manager
)
# Configure agents with MCP tool access
calculator_tool = MCPTool(
name="calculate",
description="Real MCP calculator via Stdio Transport",
input_schema={
"type": "object",
"properties": {
"expression": {"type": "string", "description": "Mathematical expression to evaluate"}
},
"required": ["expression"]
},
server_id="math_server", # Must match MCPServerConfig server_id
requires_confirmation=False
)
config = MultiAgentConfig(
framework="google-adk",
agents=[
AgentConfig(
agent_id="data_processor",
name="Data Processing Agent",
agent_type=AgentType.EXPERT,
model="gemini-2.0-flash",
instructions="You can read files and query databases using MCP tools.",
tools=[calculator_tool]
)
],
workflow=WorkflowConfig(
workflow_type=WorkflowType.SINGLE
)
)
# Create task that requires MCP tools
task = Task(
title="Data Analysis Report",
description="Read data files and query database to create analysis report",
input_data={
"data_file": "/workspace/sales_data.csv",
"query": "SELECT * FROM customers WHERE region = 'North'"
}
)
# Execute task with MCP tools
result = await coordinator.execute_task(config, task)
if result.is_successful():
print("β
MCP-enabled task completed!")
print(f"π Result: {result.result}")
else:
print(f"β Task failed: {result.error_message}")
# Cleanup
await mcp_manager.shutdown()
asyncio.run(mcp_tools_example())Automatic fallback to alternative frameworks when primary framework fails:
config = MultiAgentConfig(
framework="google-adk",
fallback_frameworks=["langgraph", "crewai"],
agents=[...],
workflow=WorkflowConfig(...)
)
# System automatically tries fallback frameworks if primary fails
result = await coordinator.execute_task(config, task)Real-time updates during task execution:
async for update in coordinator.execute_task_stream(config, task):
if update.get("type") == "agent_started":
print(f"Agent {update.get('agent_id')} started")
elif update.get("type") == "agent_completed":
print(f"Agent {update.get('agent_id')} completed")
elif update.get("type") == "workflow_completed":
print("Workflow completed successfully")Execute multiple tasks efficiently:
tasks = [
Task(title="Task 1", description="First task"),
Task(title="Task 2", description="Second task"),
Task(title="Task 3", description="Third task")
]
results = await coordinator.execute_batch_tasks(config, tasks)
for i, result in enumerate(results):
print(f"Task {i+1}: {'Success' if result.is_successful() else 'Failed'}")Automatically select adapters based on required capabilities:
# Get adapter that supports streaming
streaming_adapter = registry.get_adapter_by_capability(
FrameworkCapability.STREAMING
)
# Get adapters supporting multiple capabilities
multi_capable_adapters = registry.get_adapters_by_capabilities([
FrameworkCapability.TOOL_CALLING,
FrameworkCapability.KNOWLEDGE_BASE
])Advanced security controls and management for MCP tools:
from tgo.agents.tools.mcp_security_manager import MCPSecurityManager, SecurityPolicy, SecurityLevel
# Configure security policies
security_manager = MCPSecurityManager()
# Restrictive policy for sensitive agents
restrictive_policy = SecurityPolicy(
allowed_tools={"read_file", "safe_query"},
denied_tools={"delete_file", "system_command"},
max_calls_per_minute=10,
security_level=SecurityLevel.HIGH,
require_approval_for_untrusted=True
)
# Set policy for specific agent
security_manager.set_policy("sensitive_agent", restrictive_policy)
# Create MCP manager with security
mcp_manager = MCPToolManager(security_manager=security_manager)
# Check tool permissions
permission = await mcp_manager.check_tool_permission(
agent_id="sensitive_agent",
tool_name="read_file",
context=execution_context
)
print(f"Permission: {permission}") # "allow", "deny", or "require_approval"
# Get security audit log
audit_log = mcp_manager.get_security_audit_log(limit=50)
for entry in audit_log:
print(f"{entry['timestamp']}: {entry['event_type']} - {entry['message']}")Agents can call tools and query knowledge bases:
# Tool calling through agents
tool_result = await adapter.call_tool(
agent_id="expert_001",
tool_id="search_tool",
tool_name="web_search",
parameters={"query": "latest AI trends"},
context=execution_context
)
# MCP tool calling with security
mcp_result = await adapter.call_mcp_tool(
agent_id="expert_001",
tool_name="filesystem_read",
arguments={"path": "/data/report.txt"},
context=execution_context,
user_approved=True
)
# Knowledge base queries
kb_result = await adapter.query_knowledge_base(
agent_id="expert_001",
kb_id="company_kb",
kb_name="Company Knowledge Base",
query="AI implementation best practices",
context=execution_context
)- Why: Provides unified interface across different AI frameworks
- Benefit: Easy to add new frameworks without changing existing code
- Why: Centralized management of framework adapters
- Benefit: Dynamic discovery and switching of frameworks
- Why: Different execution strategies for different use cases
- Benefit: Flexible workflow execution without tight coupling
- Why: Type safety and validation
- Benefit: Catch errors early and provide clear interfaces
- Why: Non-blocking execution for better performance
- Benefit: Handle multiple agents and tasks concurrently
- Create new adapter inheriting from
BaseFrameworkAdapter - Implement required abstract methods
- Register with the registry
- Add new workflow type to
WorkflowTypeenum - Implement handler in
WorkflowEngine - Update coordinator to support new type
- Add capability to
FrameworkCapabilityenum - Update adapters to declare support
- Use capability checks in coordination logic
The old architecture used a single GoogleADKAdapter with tight coupling. The new architecture:
- Abstracts frameworks behind unified interfaces
- Separates concerns with dedicated components
- Enables extensibility through adapter pattern
- Provides type safety with Pydantic models
- Supports multiple workflows beyond single-agent execution
The architecture provides comprehensive monitoring and performance features:
- Timing: Detailed execution time tracking
- Resource Usage: Memory and CPU monitoring
- Token Counting: LLM token usage tracking
- Success Rates: Task and agent success metrics
# Check adapter health
health_status = await registry.get_health_status()
for adapter_name, status in health_status.items():
print(f"{adapter_name}: {status}")
# Monitor execution metrics
metrics = result.get_execution_metrics()
print(f"Execution time: {metrics.total_duration_ms}ms")
print(f"Token usage: {metrics.total_tokens}")- Parallel task processing across multiple agents
- Asynchronous execution with proper resource management
- Configurable concurrency limits
-
Framework Not Available
# Check if adapter is registered if not registry.is_registered("google-adk"): registry.register("google-adk", GoogleADKAdapter())
-
Configuration Errors
# Validate configuration before execution try: config.model_validate(config_dict) except ValidationError as e: print(f"Configuration error: {e}")
-
Memory Issues
# Check memory manager status if not coordinator._memory_manager: await coordinator.set_memory_manager(InMemoryMemoryManager())
Enable comprehensive debugging:
import logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Enable specific logger
logger = logging.getLogger('src.coordinator')
logger.setLevel(logging.DEBUG)- Multi-Framework Support: Google ADK, LangGraph, CrewAI adapters
- Memory Management: Persistent conversation and context memory
- Session Management: Multi-user session handling
- Workflow Engine: Multiple execution patterns
- Streaming Support: Real-time execution updates
- Caching Layer: Result caching for improved performance
- Security Layer: Authentication and authorization
- Configuration Management: Environment-based configuration
- Distributed Execution: Multi-node agent coordination
- Advanced Monitoring: Grafana/Prometheus integration
- Plugin System: Dynamic capability extension
[Add license information here]
We welcome contributions! Please see our contributing guidelines for details.
# Clone and setup development environment
git clone <repository-url>
cd tgo-agent-coordinator
pip install -r requirements-dev.txt
# Run tests
python -m pytest tests/
# Run linting
python -m flake8 src/
python -m mypy src/For questions or support:
- Create an issue on GitHub
- Check the documentation
- Contact the maintainer team
Special thanks to:
- Google ADK team for the excellent framework
- LangGraph and CrewAI communities
- Contributors and early adopters