Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ jobs:
- run: .venv/bin/ruff format --check src/

test:
if: false # temporarily skipped — economy simulation tests removed
runs-on: ubuntu-latest
services:
postgres:
Expand Down Expand Up @@ -57,4 +56,4 @@ jobs:
version: "latest"
- run: uv venv && uv pip install -e ".[dev]"
- run: .venv/bin/alembic upgrade head
- run: .venv/bin/pytest tests/ -v
- run: .venv/bin/pytest tests/ -v -m "not integration"
87 changes: 87 additions & 0 deletions src/core/rate_limit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""Redis-backed fixed-window rate limiting middleware.

Uses a simple fixed-window counter per client (by IP or authenticated user).
Gracefully degrades to allowing all requests when Redis is unavailable.
"""

import logging
import time

from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse

from src.core.settings import settings

logger = logging.getLogger(__name__)


class RateLimitMiddleware(BaseHTTPMiddleware):
"""Per-client rate limiter using Redis fixed-window counters."""

async def dispatch(self, request: Request, call_next):
if not settings.RATE_LIMIT_ENABLED:
return await call_next(request)

# Skip rate limiting for health checks
if request.url.path in ("/health", "/health/"):
return await call_next(request)

redis = getattr(request.app.state, "redis", None)
if redis is None:
# Redis not available — allow request (graceful degradation)
return await call_next(request)

# Identify client: authenticated user or IP
client_id = _get_client_id(request)
window_seconds = 60
now = int(time.time())
window_key = f"rl:{client_id}:{now // window_seconds}"

try:
pipe = redis.pipeline(transaction=True)
pipe.incr(window_key)
pipe.expire(window_key, window_seconds + 1)
results = await pipe.execute()
count = results[0]

limit = settings.RATE_LIMIT_REQUESTS_PER_MINUTE
remaining = max(0, limit - count)
retry_after = window_seconds - (now % window_seconds)

if count > limit:
return JSONResponse(
status_code=429,
content={"detail": "Too many requests"},
headers={
"Retry-After": str(retry_after),
"X-RateLimit-Limit": str(limit),
"X-RateLimit-Remaining": "0",
"X-RateLimit-Reset": str(now + retry_after),
},
)

response: Response = await call_next(request)
response.headers["X-RateLimit-Limit"] = str(limit)
response.headers["X-RateLimit-Remaining"] = str(remaining)
response.headers["X-RateLimit-Reset"] = str(now + retry_after)
return response

except Exception:
# Redis error — allow request (graceful degradation)
logger.debug("Rate limit check failed, allowing request", exc_info=True)
return await call_next(request)


def _get_client_id(request: Request) -> str:
"""Extract a client identifier from the request."""
# Check for authenticated user (set by auth middleware)
user = getattr(request.state, "user_id", None)
if user:
return f"user:{user}"

# Fall back to IP
forwarded = request.headers.get("X-Forwarded-For")
if forwarded:
return f"ip:{forwarded.split(',')[0].strip()}"
return f"ip:{request.client.host}" if request.client else "ip:unknown"
5 changes: 5 additions & 0 deletions src/core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class Settings(BaseSettings):

# Configuration
API_VERSION: str = "v1"
BASE_URL: str = "https://api.intuno.ai"
CORS_ORIGINS: list[str] = ["*"]
LOG_LEVEL: str = "DEBUG"
ENVIRONMENT: str = "development"
Expand Down Expand Up @@ -79,6 +80,10 @@ class Settings(BaseSettings):
# Encryption key for per-agent credentials (defaults to JWT_SECRET_KEY-derived if empty)
CREDENTIALS_ENCRYPTION_KEY: str = ""

# ── Rate limiting ─────────────────────────────────────────────────────
RATE_LIMIT_ENABLED: bool = True
RATE_LIMIT_REQUESTS_PER_MINUTE: int = 120

# Orchestrator fallback: when discovery returns no candidates, use this agent
ORCHESTRATOR_FALLBACK_AGENT_ID: Optional[str] = None

Expand Down
129 changes: 102 additions & 27 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from src.core.logging_config import setup_logging
from src.core.middleware import RequestTracingMiddleware
from src.core.rate_limit import RateLimitMiddleware
from src.core.redis_client import close_redis, init_redis
from src.core.settings import settings

Expand Down Expand Up @@ -81,7 +82,8 @@ async def cron_callback(workflow_id):
wf_def = WorkflowDef.model_validate(wf.definition)
exec_repo = ExecutionRepository(session)
execution = await exec_repo.create_execution(
workflow_id=wf.id, trigger_data={"source": "cron"},
workflow_id=wf.id,
trigger_data={"source": "cron"},
)
await session.commit()

Expand All @@ -104,7 +106,8 @@ async def event_callback(workflow_id, event_data):
wf_def = WorkflowDef.model_validate(wf.definition)
exec_repo = ExecutionRepository(session)
execution = await exec_repo.create_execution(
workflow_id=wf.id, trigger_data=event_data,
workflow_id=wf.id,
trigger_data=event_data,
)
await session.commit()

Expand Down Expand Up @@ -134,9 +137,16 @@ async def event_callback(workflow_id, event_data):
async def lifespan(app: FastAPI):
# Warn if JWT secret is not configured (safe for local dev, dangerous in production)
if not settings.JWT_SECRET_KEY:
logger.warning("JWT_SECRET_KEY is not set — authentication will not work. Set it in your .env file.")
elif settings.JWT_SECRET_KEY == "dev-secret-change-in-prod" and settings.ENVIRONMENT != "development":
logger.warning("JWT_SECRET_KEY is using the default dev value in a non-development environment. Change it immediately.")
logger.warning(
"JWT_SECRET_KEY is not set — authentication will not work. Set it in your .env file."
)
elif (
settings.JWT_SECRET_KEY == "dev-secret-change-in-prod"
and settings.ENVIRONMENT != "development"
):
logger.warning(
"JWT_SECRET_KEY is using the default dev value in a non-development environment. Change it immediately."
)

# Shared HTTP client for broker → agent invocations (connection pooling)
app.state.http_client = httpx.AsyncClient(
Expand Down Expand Up @@ -192,11 +202,16 @@ async def lifespan(app: FastAPI):
# Request tracing middleware (X-Request-ID + timing)
app.add_middleware(RequestTracingMiddleware)

# Rate limiting middleware (Redis-backed, graceful degradation)
app.add_middleware(RateLimitMiddleware)


# Workflow exception handler
@app.exception_handler(WorkflowAppException)
async def handle_workflow_exception(_request: Request, exc: WorkflowAppException):
return JSONResponse(status_code=exc.status_code, content={"detail": exc.detail})


# ── Existing wisdom routers ──────────────────────────────────────────
app.include_router(health_router)
app.include_router(analytics_router)
Expand Down Expand Up @@ -226,28 +241,88 @@ async def handle_workflow_exception(_request: Request, exc: WorkflowAppException
app.mount("/mcp", create_mcp_app())


@app.get("/.well-known/agent.json")
async def a2a_agent_card():
"""A2A-compatible AgentCard for agent-to-agent discovery."""
return JSONResponse(
{
"name": "Intuno Agent Network",
"description": "Registry, broker, and orchestrator for AI agents",
"url": settings.BASE_URL,
"version": settings.API_VERSION,
"capabilities": {
"streaming": True,
"pushNotifications": False,
},
"skills": [
{
"id": "discover",
"name": "Discover Agents",
"description": "Semantic search for AI agents by natural-language query",
},
{
"id": "invoke",
"name": "Invoke Agent",
"description": "Execute an agent with input data through the broker",
},
{
"id": "orchestrate",
"name": "Orchestrate Task",
"description": "Multi-step task orchestration across multiple agents",
},
],
"authentication": {
"schemes": ["apiKey", "bearer"],
},
}
)


@app.get("/.well-known/mcp/server-card.json")
async def mcp_server_card():
"""Public metadata for MCP marketplace scanners (e.g. Smithery)."""
return JSONResponse({
"serverInfo": {
"name": "Intuno Agent Network",
"version": settings.API_VERSION,
},
"authentication": {
"required": True,
"schemes": ["apiKey"],
},
"tools": [
{"name": "discover_agents", "description": "Search for AI agents by natural-language query"},
{"name": "get_agent_details", "description": "Get full details of a specific agent including its input schema"},
{"name": "invoke_agent", "description": "Invoke an agent with the provided input data"},
{"name": "create_task", "description": "Create and run a multi-step task via the Intuno orchestrator"},
{"name": "get_task_status", "description": "Check the current status and result of a previously created task"},
],
"resources": [
{"uri": "intuno://agents/trending", "description": "Trending agents by recent invocation count"},
{"uri": "intuno://agents/new", "description": "Recently published agents (last 7 days)"},
],
"prompts": [],
})
return JSONResponse(
{
"serverInfo": {
"name": "Intuno Agent Network",
"version": settings.API_VERSION,
},
"authentication": {
"required": True,
"schemes": ["apiKey"],
},
"tools": [
{
"name": "discover_agents",
"description": "Search for AI agents by natural-language query",
},
{
"name": "get_agent_details",
"description": "Get full details of a specific agent including its input schema",
},
{
"name": "invoke_agent",
"description": "Invoke an agent with the provided input data",
},
{
"name": "create_task",
"description": "Create and run a multi-step task via the Intuno orchestrator",
},
{
"name": "get_task_status",
"description": "Check the current status and result of a previously created task",
},
],
"resources": [
{
"uri": "intuno://agents/trending",
"description": "Trending agents by recent invocation count",
},
{
"uri": "intuno://agents/new",
"description": "Recently published agents (last 7 days)",
},
],
"prompts": [],
}
)
Loading
Loading