Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 157 additions & 0 deletions backend/app/intelligence/routes/intelligence.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@
* ``GET /api/intelligence/search`` — text + filter search
* ``GET /api/intelligence/country/{code}`` — aggregated country summary
* ``GET /api/intelligence/health`` — adapter + ingest health
* ``GET /api/intelligence/status`` — per-adapter status with human-readable messages
"""

from __future__ import annotations

import logging
from datetime import datetime, timezone
from typing import Annotated

from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
Expand All @@ -33,6 +36,7 @@


router = APIRouter(prefix="/api/intelligence", tags=["intelligence"])
logger = logging.getLogger(__name__)


def get_runtime(request: Request) -> IntelligenceRuntime:
Expand Down Expand Up @@ -83,6 +87,7 @@ class AdapterHealthModel(BaseModel):
hasApiKey: bool = False
baseUrl: str | None = None
configured: bool = True
message: str = "Unknown"


class PersistenceHealthModel(BaseModel):
Expand All @@ -101,6 +106,26 @@ class HealthResponse(BaseModel):
persistence: PersistenceHealthModel


class AdapterStatusModel(BaseModel):
name: str # e.g. "news", "conflict", "flights", "health", "stocks", "commodities", "mood"
enabled: bool
configured: bool # Has API key or is free (e.g. GDELT)
status: str # "active", "missing_key", "rate_limited", "error", "unavailable"
lastError: str | None
lastSuccessAt: str | None
eventCount: int # Events produced by this adapter in the last poll cycle
provider: str | None
message: str # Human-readable: "Live", "Missing API key", "Rate limited", etc.


class StatusResponse(BaseModel):
backend_status: str # "ok", "degraded", "error"
postgres_connected: bool
redis_connected: bool
adapters: list[AdapterStatusModel]
timestamp: str


@router.get("/events/latest", response_model=EventsResponse)
async def latest_events(
runtime: Annotated[IntelligenceRuntime, Depends(get_runtime)],
Expand Down Expand Up @@ -194,6 +219,34 @@ async def country_detail(
return CountryDetailResponse(summary=summary, events=events)


def _adapter_status_message(
*,
enabled: bool,
configured: bool,
last_error: str | None,
last_success_at: str | None,
) -> tuple[str, str]:
"""Derive (status_str, human_message) from adapter health fields.

Returns a machine-readable status token and a user-facing message
suitable for display in the frontend status panel.
"""
if not enabled:
return "unavailable", "Disabled"
if not configured:
return "missing_key", "Missing API key"
if last_error:
err_lower = last_error.lower()
if "rate" in err_lower or "429" in err_lower or "too many" in err_lower:
return "rate_limited", "Rate limited — try again later"
if "key" in err_lower or "auth" in err_lower or "401" in err_lower or "403" in err_lower:
return "missing_key", "Missing API key"
return "error", f"Error: {last_error[:50]}"
if last_success_at:
return "active", "Live"
return "unavailable", "No active signals"


@router.get("/health", response_model=HealthResponse)
async def intelligence_health(
runtime: Annotated[IntelligenceRuntime, Depends(get_runtime)],
Expand All @@ -219,6 +272,14 @@ async def intelligence_health(
)
if health_payload.get("stale"):
any_stale = True
# Derive a human-readable message for the health payload.
_, msg = _adapter_status_message(
enabled=health_payload.get("enabled", True),
configured=health_payload.get("configured", True),
last_error=health_payload.get("lastError"),
last_success_at=health_payload.get("lastSuccessAt"),
)
health_payload["message"] = msg
adapter_models.append(AdapterHealthModel(**health_payload))
persistence = PersistenceHealthModel(
investigations=type(runtime.investigation_repository).__name__,
Expand Down Expand Up @@ -248,6 +309,102 @@ async def intelligence_health(
)


@router.get("/status", response_model=StatusResponse)
async def intelligence_status(
runtime: Annotated[IntelligenceRuntime, Depends(get_runtime)],
) -> StatusResponse:
"""Return detailed per-adapter status for each intelligence rail.

For each adapter this endpoint reports:
* Whether it is enabled and configured (has API key or is a free source)
* Its current operational status: "active", "missing_key", "rate_limited",
"error", or "unavailable"
* The last error message and last successful poll timestamp
* The number of events produced in the most recent poll cycle
* A human-readable message suitable for display in the frontend

Connectivity to Postgres and Redis is also probed so the caller can
distinguish backend infrastructure failures from adapter-level issues.
"""
adapters_status: list[AdapterStatusModel] = []
for adapter in runtime.adapters:
health = adapter.health
config = adapter.provider_config

enabled = adapter.enabled
# An adapter is "configured" when it either has an API key or is a
# free/keyless source (e.g. GDELT news, open-meteo weather).
configured = adapter.is_configured

last_error = health.last_error
last_success_at = (
health.last_success_at.isoformat() if health.last_success_at else None
)

status_str, message = _adapter_status_message(
enabled=enabled,
configured=configured,
last_error=last_error,
last_success_at=last_success_at,
)

provider_name: str | None = None
if config is not None:
provider_name = config.provider or None

adapters_status.append(
AdapterStatusModel(
name=getattr(adapter, "domain", None) or adapter.adapter_id,
enabled=enabled,
configured=configured,
status=status_str,
lastError=last_error,
lastSuccessAt=last_success_at,
eventCount=health.last_item_count,
provider=provider_name,
message=message,
)
)

# Probe Postgres connectivity via a lightweight repository call.
postgres_ok = True
try:
await runtime.repository.latest(limit=1)
except Exception as exc: # pragma: no cover - defensive
logger.warning("intelligence.status: postgres probe failed: %s", exc)
postgres_ok = False

# Probe Redis connectivity when a Redis-backed alert repository is in use.
redis_ok = True
settings = runtime._settings
if settings is not None and (settings.redis_url or "").strip():
try:
from app.cache import build_redis_client, ping_redis

client = build_redis_client(settings.redis_url) # type: ignore[arg-type]
try:
pong = await ping_redis(client)
if not pong:
raise RuntimeError("PING returned False")
finally:
await client.aclose()
except Exception as exc: # pragma: no cover - defensive
logger.warning("intelligence.status: redis probe failed: %s", exc)
redis_ok = False

backend_status = "ok"
if not postgres_ok or not redis_ok:
backend_status = "degraded"

return StatusResponse(
backend_status=backend_status,
postgres_connected=postgres_ok,
redis_connected=redis_ok,
adapters=adapters_status,
timestamp=datetime.now(timezone.utc).isoformat(),
)


class AgentQueryRequest(BaseModel):
query: str
portfolio_id: str | None = None
Expand Down