From 08d2b7491315984002adecd5502692efd589bbe3 Mon Sep 17 00:00:00 2001 From: "railway-app[bot]" <68434857+railway-app[bot]@users.noreply.github.com> Date: Tue, 28 Apr 2026 04:03:00 +0000 Subject: [PATCH] feat: add /api/intelligence/status endpoint with per-adapter status --- .../app/intelligence/routes/intelligence.py | 157 ++++++++++++++++++ 1 file changed, 157 insertions(+) diff --git a/backend/app/intelligence/routes/intelligence.py b/backend/app/intelligence/routes/intelligence.py index 7ea9740..a858791 100644 --- a/backend/app/intelligence/routes/intelligence.py +++ b/backend/app/intelligence/routes/intelligence.py @@ -9,10 +9,13 @@ * ``GET /api/intelligence/search`` — text + filter search * ``GET /api/intelligence/country/{code}`` — aggregated country summary * ``GET /api/intelligence/health`` — adapter + ingest health +* ``GET /api/intelligence/status`` — per-adapter status with human-readable messages """ from __future__ import annotations +import logging +from datetime import datetime, timezone from typing import Annotated from fastapi import APIRouter, Depends, HTTPException, Query, Request, status @@ -33,6 +36,7 @@ router = APIRouter(prefix="/api/intelligence", tags=["intelligence"]) +logger = logging.getLogger(__name__) def get_runtime(request: Request) -> IntelligenceRuntime: @@ -83,6 +87,7 @@ class AdapterHealthModel(BaseModel): hasApiKey: bool = False baseUrl: str | None = None configured: bool = True + message: str = "Unknown" class PersistenceHealthModel(BaseModel): @@ -101,6 +106,26 @@ class HealthResponse(BaseModel): persistence: PersistenceHealthModel +class AdapterStatusModel(BaseModel): + name: str # e.g. "news", "conflict", "flights", "health", "stocks", "commodities", "mood" + enabled: bool + configured: bool # Has API key or is free (e.g. GDELT) + status: str # "active", "missing_key", "rate_limited", "error", "unavailable" + lastError: str | None + lastSuccessAt: str | None + eventCount: int # Events produced by this adapter in the last poll cycle + provider: str | None + message: str # Human-readable: "Live", "Missing API key", "Rate limited", etc. + + +class StatusResponse(BaseModel): + backend_status: str # "ok", "degraded", "error" + postgres_connected: bool + redis_connected: bool + adapters: list[AdapterStatusModel] + timestamp: str + + @router.get("/events/latest", response_model=EventsResponse) async def latest_events( runtime: Annotated[IntelligenceRuntime, Depends(get_runtime)], @@ -194,6 +219,34 @@ async def country_detail( return CountryDetailResponse(summary=summary, events=events) +def _adapter_status_message( + *, + enabled: bool, + configured: bool, + last_error: str | None, + last_success_at: str | None, +) -> tuple[str, str]: + """Derive (status_str, human_message) from adapter health fields. + + Returns a machine-readable status token and a user-facing message + suitable for display in the frontend status panel. + """ + if not enabled: + return "unavailable", "Disabled" + if not configured: + return "missing_key", "Missing API key" + if last_error: + err_lower = last_error.lower() + if "rate" in err_lower or "429" in err_lower or "too many" in err_lower: + return "rate_limited", "Rate limited — try again later" + if "key" in err_lower or "auth" in err_lower or "401" in err_lower or "403" in err_lower: + return "missing_key", "Missing API key" + return "error", f"Error: {last_error[:50]}" + if last_success_at: + return "active", "Live" + return "unavailable", "No active signals" + + @router.get("/health", response_model=HealthResponse) async def intelligence_health( runtime: Annotated[IntelligenceRuntime, Depends(get_runtime)], @@ -219,6 +272,14 @@ async def intelligence_health( ) if health_payload.get("stale"): any_stale = True + # Derive a human-readable message for the health payload. + _, msg = _adapter_status_message( + enabled=health_payload.get("enabled", True), + configured=health_payload.get("configured", True), + last_error=health_payload.get("lastError"), + last_success_at=health_payload.get("lastSuccessAt"), + ) + health_payload["message"] = msg adapter_models.append(AdapterHealthModel(**health_payload)) persistence = PersistenceHealthModel( investigations=type(runtime.investigation_repository).__name__, @@ -248,6 +309,102 @@ async def intelligence_health( ) +@router.get("/status", response_model=StatusResponse) +async def intelligence_status( + runtime: Annotated[IntelligenceRuntime, Depends(get_runtime)], +) -> StatusResponse: + """Return detailed per-adapter status for each intelligence rail. + + For each adapter this endpoint reports: + * Whether it is enabled and configured (has API key or is a free source) + * Its current operational status: "active", "missing_key", "rate_limited", + "error", or "unavailable" + * The last error message and last successful poll timestamp + * The number of events produced in the most recent poll cycle + * A human-readable message suitable for display in the frontend + + Connectivity to Postgres and Redis is also probed so the caller can + distinguish backend infrastructure failures from adapter-level issues. + """ + adapters_status: list[AdapterStatusModel] = [] + for adapter in runtime.adapters: + health = adapter.health + config = adapter.provider_config + + enabled = adapter.enabled + # An adapter is "configured" when it either has an API key or is a + # free/keyless source (e.g. GDELT news, open-meteo weather). + configured = adapter.is_configured + + last_error = health.last_error + last_success_at = ( + health.last_success_at.isoformat() if health.last_success_at else None + ) + + status_str, message = _adapter_status_message( + enabled=enabled, + configured=configured, + last_error=last_error, + last_success_at=last_success_at, + ) + + provider_name: str | None = None + if config is not None: + provider_name = config.provider or None + + adapters_status.append( + AdapterStatusModel( + name=getattr(adapter, "domain", None) or adapter.adapter_id, + enabled=enabled, + configured=configured, + status=status_str, + lastError=last_error, + lastSuccessAt=last_success_at, + eventCount=health.last_item_count, + provider=provider_name, + message=message, + ) + ) + + # Probe Postgres connectivity via a lightweight repository call. + postgres_ok = True + try: + await runtime.repository.latest(limit=1) + except Exception as exc: # pragma: no cover - defensive + logger.warning("intelligence.status: postgres probe failed: %s", exc) + postgres_ok = False + + # Probe Redis connectivity when a Redis-backed alert repository is in use. + redis_ok = True + settings = runtime._settings + if settings is not None and (settings.redis_url or "").strip(): + try: + from app.cache import build_redis_client, ping_redis + + client = build_redis_client(settings.redis_url) # type: ignore[arg-type] + try: + pong = await ping_redis(client) + if not pong: + raise RuntimeError("PING returned False") + finally: + await client.aclose() + except Exception as exc: # pragma: no cover - defensive + logger.warning("intelligence.status: redis probe failed: %s", exc) + redis_ok = False + + backend_status = "ok" + if not postgres_ok or not redis_ok: + backend_status = "degraded" + + return StatusResponse( + backend_status=backend_status, + postgres_connected=postgres_ok, + redis_connected=redis_ok, + adapters=adapters_status, + timestamp=datetime.now(timezone.utc).isoformat(), + ) + + class AgentQueryRequest(BaseModel): query: str portfolio_id: str | None = None