From 6d8b4980b7d415d150b1c2a07d898fe05ba3c160 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 30 Mar 2026 23:10:46 +0000 Subject: [PATCH 1/2] Implement 5 high-priority issues: broker batching, semaphore pub/sub, A2A endpoint, rate limiting, CI tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - #7: Batch independent DB queries in broker invoke_agent() with asyncio.gather() to reduce sequential round trips (config + agent + conversation fetched concurrently) - #8: Replace polling-based Redis semaphore with pub/sub notifications for near-instant slot acquisition, with fallback to polling if pub/sub fails - #15: Add A2A-compatible AgentCard endpoint at /.well-known/agent.json for agent-to-agent discovery - #11: Add Redis-backed rate limiting middleware with Retry-After headers and graceful degradation when Redis is unavailable - #17: Re-enable CI test job (remove if:false gate), add conftest to skip live integration tests, add smoke tests for CI All changes are internal optimizations or additive endpoints — no breaking changes to the public API surface or intuno-sdk compatibility. https://claude.ai/code/session_015zTWwVFtUWeN5ameM6teEa --- .github/workflows/ci.yml | 3 +- src/core/rate_limit.py | 91 +++++++++++++++++++ src/core/settings.py | 4 + src/main.py | 129 ++++++++++++++++++++------ src/services/broker.py | 144 ++++++++++++++++++++++-------- src/workflow/utils/concurrency.py | 77 +++++++++++++++- tests/conftest.py | 24 +++++ tests/test_smoke.py | 50 +++++++++++ 8 files changed, 454 insertions(+), 68 deletions(-) create mode 100644 src/core/rate_limit.py create mode 100644 tests/conftest.py create mode 100644 tests/test_smoke.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ba8ed2b..dfdb3d2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -19,7 +19,6 @@ jobs: - run: .venv/bin/ruff format --check src/ test: - if: false # temporarily skipped — economy simulation tests removed runs-on: ubuntu-latest services: postgres: @@ -57,4 +56,4 @@ jobs: version: "latest" - run: uv venv && uv pip install -e ".[dev]" - run: .venv/bin/alembic upgrade head - - run: .venv/bin/pytest tests/ -v + - run: .venv/bin/pytest tests/ -v -m "not integration" diff --git a/src/core/rate_limit.py b/src/core/rate_limit.py new file mode 100644 index 0000000..99c7779 --- /dev/null +++ b/src/core/rate_limit.py @@ -0,0 +1,91 @@ +"""Redis-backed sliding window rate limiting middleware. + +Uses a simple fixed-window counter per client (by IP or authenticated user). +Gracefully degrades to allowing all requests when Redis is unavailable. +""" + +import logging +import time + +from fastapi import Request, Response +from starlette.middleware.base import BaseHTTPMiddleware +from starlette.responses import JSONResponse + +from src.core.settings import settings + +logger = logging.getLogger(__name__) + + +class RateLimitMiddleware(BaseHTTPMiddleware): + """Per-client rate limiter using Redis fixed-window counters.""" + + async def dispatch(self, request: Request, call_next): + if not settings.RATE_LIMIT_ENABLED: + return await call_next(request) + + # Skip rate limiting for health checks + if request.url.path in ("/health", "/health/"): + return await call_next(request) + + redis = getattr(request.app.state, "redis", None) + if redis is None: + # Redis not available — allow request (graceful degradation) + return await call_next(request) + + # Identify client: authenticated user or IP + client_id = _get_client_id(request) + window_seconds = 60 + now = int(time.time()) + window_key = f"rl:{client_id}:{now // window_seconds}" + + try: + pipe = redis.pipeline(transaction=True) + pipe.incr(window_key) + pipe.ttl(window_key) + results = await pipe.execute() + count, ttl = results[0], results[1] + + # Set expiry on first request in this window + if ttl == -1: + await redis.expire(window_key, window_seconds + 1) + + limit = settings.RATE_LIMIT_REQUESTS_PER_MINUTE + remaining = max(0, limit - count) + retry_after = window_seconds - (now % window_seconds) + + if count > limit: + return JSONResponse( + status_code=429, + content={"detail": "Too many requests"}, + headers={ + "Retry-After": str(retry_after), + "X-RateLimit-Limit": str(limit), + "X-RateLimit-Remaining": "0", + "X-RateLimit-Reset": str(now + retry_after), + }, + ) + + response: Response = await call_next(request) + response.headers["X-RateLimit-Limit"] = str(limit) + response.headers["X-RateLimit-Remaining"] = str(remaining) + response.headers["X-RateLimit-Reset"] = str(now + retry_after) + return response + + except Exception: + # Redis error — allow request (graceful degradation) + logger.debug("Rate limit check failed, allowing request", exc_info=True) + return await call_next(request) + + +def _get_client_id(request: Request) -> str: + """Extract a client identifier from the request.""" + # Check for authenticated user (set by auth middleware) + user = getattr(request.state, "user_id", None) + if user: + return f"user:{user}" + + # Fall back to IP + forwarded = request.headers.get("X-Forwarded-For") + if forwarded: + return f"ip:{forwarded.split(',')[0].strip()}" + return f"ip:{request.client.host}" if request.client else "ip:unknown" diff --git a/src/core/settings.py b/src/core/settings.py index 515710e..47283c8 100644 --- a/src/core/settings.py +++ b/src/core/settings.py @@ -79,6 +79,10 @@ class Settings(BaseSettings): # Encryption key for per-agent credentials (defaults to JWT_SECRET_KEY-derived if empty) CREDENTIALS_ENCRYPTION_KEY: str = "" + # ── Rate limiting ───────────────────────────────────────────────────── + RATE_LIMIT_ENABLED: bool = True + RATE_LIMIT_REQUESTS_PER_MINUTE: int = 120 + # Orchestrator fallback: when discovery returns no candidates, use this agent ORCHESTRATOR_FALLBACK_AGENT_ID: Optional[str] = None diff --git a/src/main.py b/src/main.py index 0336a7b..cc643a3 100644 --- a/src/main.py +++ b/src/main.py @@ -9,6 +9,7 @@ from src.core.logging_config import setup_logging from src.core.middleware import RequestTracingMiddleware +from src.core.rate_limit import RateLimitMiddleware from src.core.redis_client import close_redis, init_redis from src.core.settings import settings @@ -81,7 +82,8 @@ async def cron_callback(workflow_id): wf_def = WorkflowDef.model_validate(wf.definition) exec_repo = ExecutionRepository(session) execution = await exec_repo.create_execution( - workflow_id=wf.id, trigger_data={"source": "cron"}, + workflow_id=wf.id, + trigger_data={"source": "cron"}, ) await session.commit() @@ -104,7 +106,8 @@ async def event_callback(workflow_id, event_data): wf_def = WorkflowDef.model_validate(wf.definition) exec_repo = ExecutionRepository(session) execution = await exec_repo.create_execution( - workflow_id=wf.id, trigger_data=event_data, + workflow_id=wf.id, + trigger_data=event_data, ) await session.commit() @@ -134,9 +137,16 @@ async def event_callback(workflow_id, event_data): async def lifespan(app: FastAPI): # Warn if JWT secret is not configured (safe for local dev, dangerous in production) if not settings.JWT_SECRET_KEY: - logger.warning("JWT_SECRET_KEY is not set — authentication will not work. Set it in your .env file.") - elif settings.JWT_SECRET_KEY == "dev-secret-change-in-prod" and settings.ENVIRONMENT != "development": - logger.warning("JWT_SECRET_KEY is using the default dev value in a non-development environment. Change it immediately.") + logger.warning( + "JWT_SECRET_KEY is not set — authentication will not work. Set it in your .env file." + ) + elif ( + settings.JWT_SECRET_KEY == "dev-secret-change-in-prod" + and settings.ENVIRONMENT != "development" + ): + logger.warning( + "JWT_SECRET_KEY is using the default dev value in a non-development environment. Change it immediately." + ) # Shared HTTP client for broker → agent invocations (connection pooling) app.state.http_client = httpx.AsyncClient( @@ -192,11 +202,16 @@ async def lifespan(app: FastAPI): # Request tracing middleware (X-Request-ID + timing) app.add_middleware(RequestTracingMiddleware) +# Rate limiting middleware (Redis-backed, graceful degradation) +app.add_middleware(RateLimitMiddleware) + + # Workflow exception handler @app.exception_handler(WorkflowAppException) async def handle_workflow_exception(_request: Request, exc: WorkflowAppException): return JSONResponse(status_code=exc.status_code, content={"detail": exc.detail}) + # ── Existing wisdom routers ────────────────────────────────────────── app.include_router(health_router) app.include_router(analytics_router) @@ -226,28 +241,88 @@ async def handle_workflow_exception(_request: Request, exc: WorkflowAppException app.mount("/mcp", create_mcp_app()) +@app.get("/.well-known/agent.json") +async def a2a_agent_card(): + """A2A-compatible AgentCard for agent-to-agent discovery.""" + return JSONResponse( + { + "name": "Intuno Agent Network", + "description": "Registry, broker, and orchestrator for AI agents", + "url": "https://api.intuno.ai", + "version": settings.API_VERSION, + "capabilities": { + "streaming": True, + "pushNotifications": False, + }, + "skills": [ + { + "id": "discover", + "name": "Discover Agents", + "description": "Semantic search for AI agents by natural-language query", + }, + { + "id": "invoke", + "name": "Invoke Agent", + "description": "Execute an agent with input data through the broker", + }, + { + "id": "orchestrate", + "name": "Orchestrate Task", + "description": "Multi-step task orchestration across multiple agents", + }, + ], + "authentication": { + "schemes": ["apiKey", "bearer"], + }, + } + ) + + @app.get("/.well-known/mcp/server-card.json") async def mcp_server_card(): """Public metadata for MCP marketplace scanners (e.g. Smithery).""" - return JSONResponse({ - "serverInfo": { - "name": "Intuno Agent Network", - "version": settings.API_VERSION, - }, - "authentication": { - "required": True, - "schemes": ["apiKey"], - }, - "tools": [ - {"name": "discover_agents", "description": "Search for AI agents by natural-language query"}, - {"name": "get_agent_details", "description": "Get full details of a specific agent including its input schema"}, - {"name": "invoke_agent", "description": "Invoke an agent with the provided input data"}, - {"name": "create_task", "description": "Create and run a multi-step task via the Intuno orchestrator"}, - {"name": "get_task_status", "description": "Check the current status and result of a previously created task"}, - ], - "resources": [ - {"uri": "intuno://agents/trending", "description": "Trending agents by recent invocation count"}, - {"uri": "intuno://agents/new", "description": "Recently published agents (last 7 days)"}, - ], - "prompts": [], - }) + return JSONResponse( + { + "serverInfo": { + "name": "Intuno Agent Network", + "version": settings.API_VERSION, + }, + "authentication": { + "required": True, + "schemes": ["apiKey"], + }, + "tools": [ + { + "name": "discover_agents", + "description": "Search for AI agents by natural-language query", + }, + { + "name": "get_agent_details", + "description": "Get full details of a specific agent including its input schema", + }, + { + "name": "invoke_agent", + "description": "Invoke an agent with the provided input data", + }, + { + "name": "create_task", + "description": "Create and run a multi-step task via the Intuno orchestrator", + }, + { + "name": "get_task_status", + "description": "Check the current status and result of a previously created task", + }, + ], + "resources": [ + { + "uri": "intuno://agents/trending", + "description": "Trending agents by recent invocation count", + }, + { + "uri": "intuno://agents/new", + "description": "Recently published agents (last 7 days)", + }, + ], + "prompts": [], + } + ) diff --git a/src/services/broker.py b/src/services/broker.py index 036fba7..2f2e634 100644 --- a/src/services/broker.py +++ b/src/services/broker.py @@ -88,16 +88,29 @@ async def invoke_agent( """ start_time = time.time() - # Load effective broker config - config = await self.broker_config_repository.get_effective_config(integration_id) - # Resolve conversation_id and message_id from request if not passed conv_id = conversation_id or invoke_request.conversation_id msg_id = message_id or invoke_request.message_id - # Validate conversation and message ownership + # Batch independent lookups: config + agent + conversation (if provided) + config_coro = self.broker_config_repository.get_effective_config(integration_id) + agent_coro = self.registry_repository.get_agent_by_agent_id( + invoke_request.agent_id + ) + + if conv_id is not None: + conv_coro = self.conversation_repository.get_by_id(conv_id) + config, agent, conversation = await asyncio.gather( + config_coro, + agent_coro, + conv_coro, + ) + else: + config, agent = await asyncio.gather(config_coro, agent_coro) + conversation = None + + # Validate conversation ownership if conv_id is not None: - conversation = await self.conversation_repository.get_by_id(conv_id) if not conversation or conversation.user_id != caller_user_id: return InvokeResponse( success=False, @@ -105,7 +118,10 @@ async def invoke_agent( latency_ms=int((time.time() - start_time) * 1000), status_code=404, ) - if integration_id is not None and conversation.integration_id != integration_id: + if ( + integration_id is not None + and conversation.integration_id != integration_id + ): return InvokeResponse( success=False, error="Conversation does not belong to this integration", @@ -143,8 +159,7 @@ async def invoke_agent( status_code=404, ) - # Get the agent - agent = await self.registry_repository.get_agent_by_agent_id(invoke_request.agent_id) + # Agent already fetched above via asyncio.gather if not agent or not agent.is_active: return InvokeResponse( success=False, @@ -178,7 +193,9 @@ async def invoke_agent( count = await quota_increment(month_key, month_ttl) if count is None: # Redis unavailable — fall back to DB scan - first_day = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) + first_day = now.replace( + day=1, hour=0, minute=0, second=0, microsecond=0 + ) count = await self.invocation_log_repository.count_invocations_for_integration( integration_id, first_day, now ) @@ -191,11 +208,15 @@ async def invoke_agent( ) if config.daily_invocation_quota is not None: day_key = f"quota:{integration_id}:daily:{now.strftime('%Y-%m-%d')}" - day_ttl = max(86400 - (now.hour * 3600 + now.minute * 60 + now.second), 1) + day_ttl = max( + 86400 - (now.hour * 3600 + now.minute * 60 + now.second), 1 + ) count = await quota_increment(day_key, day_ttl) if count is None: # Redis unavailable — fall back to DB scan - start_of_day = now.replace(hour=0, minute=0, second=0, microsecond=0) + start_of_day = now.replace( + hour=0, minute=0, second=0, microsecond=0 + ) count = await self.invocation_log_repository.count_invocations_for_integration( integration_id, start_of_day, now ) @@ -210,10 +231,13 @@ async def invoke_agent( # Billing: pre-flight balance check for priced agents _pricing_enabled = getattr(agent, "pricing_enabled", False) _agent_price_raw = getattr(agent, "base_price", None) - _agent_price = int(_agent_price_raw) if (_agent_price_raw and _agent_price_raw > 0) else 0 + _agent_price = ( + int(_agent_price_raw) if (_agent_price_raw and _agent_price_raw > 0) else 0 + ) if _pricing_enabled and _agent_price > 0: from src.economy.repositories.wallets import WalletRepository as _WR + _wr = _WR(self.registry_repository.session) _caller_wallet = await _wr.get_by_user_id(caller_user_id) if not _caller_wallet or _caller_wallet.balance < _agent_price: @@ -241,7 +265,9 @@ async def invoke_agent( ) msg_id = user_message.id try: - response_data = await generate_brand_agent_response(brand, request_payload) + response_data = await generate_brand_agent_response( + brand, request_payload + ) except Exception as e: response_data = {"error": str(e)} success = False @@ -269,7 +295,9 @@ async def invoke_agent( conversation_id=conv_id, message_id=msg_id, ) - await self.invocation_log_repository.create_invocation_log(invocation_log) + await self.invocation_log_repository.create_invocation_log( + invocation_log + ) credits_charged_brand = None if success and _pricing_enabled and _agent_price > 0: credits_charged_brand = await self._settle_credits( @@ -278,7 +306,9 @@ async def invoke_agent( return InvokeResponse( success=success, data=response_data if success else None, - error=None if success else response_data.get("error", "Brand agent error"), + error=None + if success + else response_data.get("error", "Brand agent error"), latency_ms=latency_ms, status_code=200 if success else 500, conversation_id=conv_id, @@ -299,8 +329,14 @@ async def invoke_agent( # SSRF protection try: - allowed = [h.strip() for h in settings.INVOKE_ENDPOINT_ALLOWED_HOSTS.split(",") if h.strip()] - validate_invoke_endpoint(agent.invoke_endpoint, allowed_hosts=allowed if allowed else None) + allowed = [ + h.strip() + for h in settings.INVOKE_ENDPOINT_ALLOWED_HOSTS.split(",") + if h.strip() + ] + validate_invoke_endpoint( + agent.invoke_endpoint, allowed_hosts=allowed if allowed else None + ) except ValueError as e: return InvokeResponse( success=False, @@ -310,7 +346,9 @@ async def invoke_agent( ) timeout_sec = ( - float(config.request_timeout_seconds) if config else DEFAULT_REQUEST_TIMEOUT_SECONDS + float(config.request_timeout_seconds) + if config + else DEFAULT_REQUEST_TIMEOUT_SECONDS ) max_retries = (config.max_retries or 0) if config else 0 retry_backoff = (config.retry_backoff_seconds or 1) if config else 1 @@ -325,10 +363,13 @@ async def invoke_agent( cred: Optional[object] = None cred_value: Optional[str] = None if auth_type in ("api_key", "bearer_token"): - cred = await self.registry_repository.get_agent_credential(agent.id, auth_type) + cred = await self.registry_repository.get_agent_credential( + agent.id, auth_type + ) if cred: try: from src.core.credential_crypto import decrypt_credential + cred_value = decrypt_credential(cred.encrypted_value) except ValueError: cred_value = None @@ -347,9 +388,17 @@ async def invoke_agent( "bearer_token": {"header": "Authorization", "scheme": "Bearer"}, } defaults = auth_defaults.get(auth_type, {}) - header_name = (cred.auth_header if cred and cred.auth_header else None) or defaults.get("header", "") - scheme = (cred.auth_scheme if cred and cred.auth_scheme is not None else None) or defaults.get("scheme", "") - header_value = f"{scheme} {cred_value}".strip() if (cred_value and scheme) else (cred_value or "") + header_name = ( + cred.auth_header if cred and cred.auth_header else None + ) or defaults.get("header", "") + scheme = ( + cred.auth_scheme if cred and cred.auth_scheme is not None else None + ) or defaults.get("scheme", "") + header_value = ( + f"{scheme} {cred_value}".strip() + if (cred_value and scheme) + else (cred_value or "") + ) # Use shared HTTP client when available; fall back to one-shot client client = self._http_client @@ -484,20 +533,31 @@ async def invoke_agent_stream( and returns an InvokeResponse. """ # Resolve agent to check streaming support - agent = await self.registry_repository.get_agent_by_agent_id(invoke_request.agent_id) - supports_streaming = getattr(agent, "supports_streaming", False) if agent else False + agent = await self.registry_repository.get_agent_by_agent_id( + invoke_request.agent_id + ) + supports_streaming = ( + getattr(agent, "supports_streaming", False) if agent else False + ) if not supports_streaming: # Fall back to normal invocation return await self.invoke_agent( - invoke_request, caller_user_id, integration_id, - conversation_id, message_id, + invoke_request, + caller_user_id, + integration_id, + conversation_id, + message_id, ) # Stream from the agent's endpoint return self._stream_from_agent( - agent, invoke_request, caller_user_id, integration_id, - conversation_id, message_id, + agent, + invoke_request, + caller_user_id, + integration_id, + conversation_id, + message_id, ) async def _stream_from_agent( @@ -519,9 +579,12 @@ async def _stream_from_agent( auth_type = (agent.auth_type or "public").lower() header_name, header_value = "", "" if auth_type in ("api_key", "bearer_token"): - cred = await self.registry_repository.get_agent_credential(agent.id, auth_type) + cred = await self.registry_repository.get_agent_credential( + agent.id, auth_type + ) if cred: from src.core.credential_crypto import decrypt_credential + try: cred_value = decrypt_credential(cred.encrypted_value) except ValueError: @@ -532,12 +595,22 @@ async def _stream_from_agent( "bearer_token": {"header": "Authorization", "scheme": "Bearer"}, } d = defaults.get(auth_type, {}) - header_name = (cred.auth_header or d.get("header", "")) - scheme = (cred.auth_scheme if cred.auth_scheme is not None else None) or d.get("scheme", "") - header_value = f"{scheme} {cred_value}".strip() if scheme else cred_value + header_name = cred.auth_header or d.get("header", "") + scheme = ( + cred.auth_scheme if cred.auth_scheme is not None else None + ) or d.get("scheme", "") + header_value = ( + f"{scheme} {cred_value}".strip() if scheme else cred_value + ) - config = await self.broker_config_repository.get_effective_config(integration_id) - timeout_sec = float(config.request_timeout_seconds) if config else DEFAULT_REQUEST_TIMEOUT_SECONDS + config = await self.broker_config_repository.get_effective_config( + integration_id + ) + timeout_sec = ( + float(config.request_timeout_seconds) + if config + else DEFAULT_REQUEST_TIMEOUT_SECONDS + ) headers = { "Content-Type": "application/json", @@ -610,7 +683,8 @@ async def _settle_credits( if not debited: logger.warning( "Insufficient balance for caller %s (need %d)", - caller_user_id, price, + caller_user_id, + price, ) return None diff --git a/src/workflow/utils/concurrency.py b/src/workflow/utils/concurrency.py index 6b89618..6cde671 100644 --- a/src/workflow/utils/concurrency.py +++ b/src/workflow/utils/concurrency.py @@ -7,8 +7,10 @@ workflows (key ``conc:agent:{agent_id}``). - **per-workflow**: caps concurrent executions of a single workflow definition (key ``conc:wf:{workflow_id}``). -""" +Slot availability is communicated via Redis pub/sub so waiters wake up +immediately instead of polling. +""" import asyncio import logging @@ -30,7 +32,11 @@ class ConcurrencyLimitExceeded(Exception): class RedisSemaphore: - """Counting semaphore backed by a Redis key (INCR/DECR).""" + """Counting semaphore backed by a Redis key (INCR/DECR). + + Uses pub/sub notifications on release so waiters don't have to poll. + Falls back to a short polling interval if pub/sub subscription fails. + """ def __init__( self, @@ -45,9 +51,63 @@ def __init__( self._limit = limit self._poll_interval = poll_interval self._timeout = timeout + self._notify_channel = f"conc:notify:{key}" async def acquire(self) -> None: + # Try to acquire immediately + current = await self._redis.incr(self._key) + await self._redis.expire(self._key, _SAFETY_TTL) + if current <= self._limit: + return + await self._redis.decr(self._key) + + # Slot not available — wait for a pub/sub notification deadline = asyncio.get_event_loop().time() + self._timeout + + try: + pubsub = self._redis.pubsub() + await pubsub.subscribe(self._notify_channel) + except Exception: + # Pub/sub failed — fall back to polling + logger.debug( + "Pub/sub subscribe failed for %s, falling back to polling", self._key + ) + await self._acquire_poll(deadline) + return + + try: + while True: + remaining = deadline - asyncio.get_event_loop().time() + if remaining <= 0: + raise ConcurrencyLimitExceeded( + f"Concurrency limit ({self._limit}) for '{self._key}' " + f"not released within {self._timeout}s" + ) + + # Wait for notification with timeout + try: + await asyncio.wait_for( + pubsub.get_message( + ignore_subscribe_messages=True, timeout=remaining + ), + timeout=min(remaining, self._poll_interval * 4), + ) + except asyncio.TimeoutError: + pass + + # Try to acquire regardless of whether we got a message + # (handles race conditions and missed notifications) + current = await self._redis.incr(self._key) + await self._redis.expire(self._key, _SAFETY_TTL) + if current <= self._limit: + return + await self._redis.decr(self._key) + finally: + await pubsub.unsubscribe(self._notify_channel) + await pubsub.aclose() + + async def _acquire_poll(self, deadline: float) -> None: + """Fallback polling loop when pub/sub is unavailable.""" while True: current = await self._redis.incr(self._key) await self._redis.expire(self._key, _SAFETY_TTL) @@ -65,6 +125,11 @@ async def release(self) -> None: val = await self._redis.decr(self._key) if val < 0: await self._redis.set(self._key, 0) + # Notify waiters that a slot is available + try: + await self._redis.publish(self._notify_channel, "released") + except Exception: + pass # Best-effort notification; waiters will retry on timeout class ConcurrencyLimiter: @@ -74,13 +139,17 @@ def __init__(self, redis: aioredis.Redis = Depends(get_redis)) -> None: self._redis = redis def agent_semaphore( - self, agent_id: str, limit: int | None = None, + self, + agent_id: str, + limit: int | None = None, ) -> RedisSemaphore: effective = limit or settings.WORKFLOW_DEFAULT_MAX_CONCURRENT_PER_AGENT return RedisSemaphore(self._redis, f"conc:agent:{agent_id}", effective) def workflow_semaphore( - self, workflow_id: uuid.UUID, limit: int | None = None, + self, + workflow_id: uuid.UUID, + limit: int | None = None, ) -> RedisSemaphore: effective = limit or settings.WORKFLOW_DEFAULT_MAX_CONCURRENT_EXECUTIONS return RedisSemaphore(self._redis, f"conc:wf:{workflow_id}", effective) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..8cbac7a --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,24 @@ +"""Pytest configuration: mark live integration tests so CI can skip them.""" + +import pytest + + +def pytest_collection_modifyitems(config, items): + """Auto-mark tests in files that require a running backend as 'integration'.""" + integration_files = { + "test_sdk_integration.py", + "test_user_session.py", + "test_economy.py", + "test_workflow.py", + "test_new_orchestrator.py", + } + for item in items: + if item.path and item.path.name in integration_files: + item.add_marker(pytest.mark.integration) + + +def pytest_configure(config): + config.addinivalue_line( + "markers", + "integration: marks tests that need a running backend (deselect with '-m \"not integration\"')", + ) diff --git a/tests/test_smoke.py b/tests/test_smoke.py new file mode 100644 index 0000000..a78f22a --- /dev/null +++ b/tests/test_smoke.py @@ -0,0 +1,50 @@ +"""Smoke tests that run in CI without a live backend. + +Verifies the FastAPI app can be imported and basic endpoints respond. +""" + +import pytest +from httpx import ASGITransport, AsyncClient + +from src.main import app + + +@pytest.fixture +async def client(): + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + yield ac + + +@pytest.mark.asyncio +async def test_health_endpoint(client): + response = await client.get("/health") + assert response.status_code == 200 + + +@pytest.mark.asyncio +async def test_openapi_schema(client): + response = await client.get("/openapi.json") + assert response.status_code == 200 + data = response.json() + assert data["info"]["title"] == "Intuno" + + +@pytest.mark.asyncio +async def test_a2a_agent_card(client): + response = await client.get("/.well-known/agent.json") + assert response.status_code == 200 + data = response.json() + assert data["name"] == "Intuno Agent Network" + assert "capabilities" in data + assert "skills" in data + assert len(data["skills"]) == 3 + + +@pytest.mark.asyncio +async def test_mcp_server_card(client): + response = await client.get("/.well-known/mcp/server-card.json") + assert response.status_code == 200 + data = response.json() + assert "serverInfo" in data + assert "tools" in data From f183a9ebf1f219c5c521de8c59c67e7d50a80aee Mon Sep 17 00:00:00 2001 From: Arturo Bautista Date: Wed, 1 Apr 2026 12:40:08 -0600 Subject: [PATCH 2/2] Fix rate limiter TTL race condition and hardcoded agent card URL MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Use pipeline expire instead of conditional TTL check to prevent key living forever if process crashes between INCR and EXPIRE - Fix docstring: "sliding window" → "fixed-window" to match implementation - Replace hardcoded agent card URL with settings.BASE_URL for staging/dev - Add BASE_URL setting (defaults to https://api.intuno.ai) Co-Authored-By: Claude Opus 4.6 (1M context) --- src/core/rate_limit.py | 10 +++------- src/core/settings.py | 1 + src/main.py | 2 +- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/src/core/rate_limit.py b/src/core/rate_limit.py index 99c7779..944496d 100644 --- a/src/core/rate_limit.py +++ b/src/core/rate_limit.py @@ -1,4 +1,4 @@ -"""Redis-backed sliding window rate limiting middleware. +"""Redis-backed fixed-window rate limiting middleware. Uses a simple fixed-window counter per client (by IP or authenticated user). Gracefully degrades to allowing all requests when Redis is unavailable. @@ -41,13 +41,9 @@ async def dispatch(self, request: Request, call_next): try: pipe = redis.pipeline(transaction=True) pipe.incr(window_key) - pipe.ttl(window_key) + pipe.expire(window_key, window_seconds + 1) results = await pipe.execute() - count, ttl = results[0], results[1] - - # Set expiry on first request in this window - if ttl == -1: - await redis.expire(window_key, window_seconds + 1) + count = results[0] limit = settings.RATE_LIMIT_REQUESTS_PER_MINUTE remaining = max(0, limit - count) diff --git a/src/core/settings.py b/src/core/settings.py index 47283c8..a5723d6 100644 --- a/src/core/settings.py +++ b/src/core/settings.py @@ -11,6 +11,7 @@ class Settings(BaseSettings): # Configuration API_VERSION: str = "v1" + BASE_URL: str = "https://api.intuno.ai" CORS_ORIGINS: list[str] = ["*"] LOG_LEVEL: str = "DEBUG" ENVIRONMENT: str = "development" diff --git a/src/main.py b/src/main.py index cc643a3..5a6707c 100644 --- a/src/main.py +++ b/src/main.py @@ -248,7 +248,7 @@ async def a2a_agent_card(): { "name": "Intuno Agent Network", "description": "Registry, broker, and orchestrator for AI agents", - "url": "https://api.intuno.ai", + "url": settings.BASE_URL, "version": settings.API_VERSION, "capabilities": { "streaming": True,