Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Add retrieval step timestamp index.

Revision ID: f9c0d1e2f3a4
Revises: f9b0c1d2e3f4
Create Date: 2026-06-12 08:35:00.000000
"""

from __future__ import annotations

from typing import Sequence, Union

from alembic import op


# revision identifiers, used by Alembic.
revision: str = "f9c0d1e2f3a4"
down_revision: Union[str, Sequence[str], None] = "f9b0c1d2e3f4"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.create_index(
"idx_retrieval_steps_created",
"retrieval_steps",
["created_at"],
unique=False,
)


def downgrade() -> None:
op.drop_index("idx_retrieval_steps_created", table_name="retrieval_steps")
37 changes: 37 additions & 0 deletions apps/api/app/core/middleware/telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Anonymous aggregate telemetry middleware."""

from __future__ import annotations

import time

from starlette.types import ASGIApp, Message, Receive, Scope, Send

from shared.services.telemetry.api_metrics import ApiRequestTelemetryMetrics


class ApiTelemetryMiddleware:
"""Record bounded API request metrics without request payloads or raw paths."""

def __init__(self, app: ASGIApp, *, metrics: ApiRequestTelemetryMetrics) -> None:
self.app = app
self.metrics = metrics

async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] != "http":
await self.app(scope, receive, send)
return

status_code = 500
started_at = time.perf_counter()

async def send_wrapper(message: Message) -> None:
nonlocal status_code
if message["type"] == "http.response.start":
status_code = int(message.get("status", 500))
await send(message)

try:
await self.app(scope, receive, send_wrapper)
finally:
elapsed_ms = (time.perf_counter() - started_at) * 1000
self.metrics.record(status_code=status_code, latency_ms=elapsed_ms)
33 changes: 32 additions & 1 deletion apps/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
from contextlib import asynccontextmanager
from app.api.api_router import api_router
from app.core.middleware import setup_cors, LoggingMiddleware
from app.core.middleware.telemetry import ApiTelemetryMiddleware
from app.core.exception_handlers import setup_exception_handlers
from app.mcp import create_retrieval_mcp_server
from app.services.rate_limit.rule_loader import load_rules
from shared.services.telemetry.api_metrics import ApiRequestTelemetryMetrics


@asynccontextmanager
Expand Down Expand Up @@ -68,15 +70,35 @@ async def lifespan(app: FastAPI):
await load_rules(session)
logger.info("rate limit rules loaded at startup; restart the pod to apply changes")

from shared.services.telemetry.aggregates import (
start_self_hosted_aggregate_telemetry,
)
from shared.services.telemetry.runtime import start_self_hosted_telemetry

app.state.self_hosted_telemetry_client = await start_self_hosted_telemetry(
telemetry_runtime = await start_self_hosted_telemetry(
settings,
service_name="knowhere-api",
api_healthy=True,
postgres_healthy=True,
redis_healthy=True,
)
if telemetry_runtime is None:
app.state.self_hosted_telemetry_client = None
app.state.self_hosted_telemetry_config = None
app.state.self_hosted_aggregate_telemetry_runner = None
else:
telemetry_client, telemetry_config = telemetry_runtime
app.state.self_hosted_telemetry_client = telemetry_client
app.state.self_hosted_telemetry_config = telemetry_config
app.state.self_hosted_aggregate_telemetry_runner = (
await start_self_hosted_aggregate_telemetry(
settings,
telemetry_client=telemetry_client,
config=telemetry_config,
db_session_factory=get_db_context,
api_metrics=app.state.self_hosted_api_telemetry_metrics,
)
)

mcp_server = getattr(app.state, "retrieval_mcp_server", None)
mcp_session_manager = getattr(mcp_server, "session_manager", None)
Expand All @@ -89,8 +111,14 @@ async def lifespan(app: FastAPI):
yield

try:
from shared.services.telemetry.aggregates import (
stop_self_hosted_aggregate_telemetry,
)
from shared.services.telemetry.runtime import stop_self_hosted_telemetry

await stop_self_hosted_aggregate_telemetry(
getattr(app.state, "self_hosted_aggregate_telemetry_runner", None)
)
await stop_self_hosted_telemetry(
getattr(app.state, "self_hosted_telemetry_client", None)
)
Expand Down Expand Up @@ -150,6 +178,9 @@ def create_app() -> FastAPI:

# Setup middleware
setup_cors(app)
api_telemetry_metrics = ApiRequestTelemetryMetrics()
app.state.self_hosted_api_telemetry_metrics = api_telemetry_metrics
app.add_middleware(ApiTelemetryMiddleware, metrics=api_telemetry_metrics)
app.add_middleware(LoggingMiddleware)

@app.get("/", tags=["Root"])
Expand Down
Loading
Loading