Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
b3006e6
✨ Feat: Update monitoring configuration to use OpenTelemetry OTLP pro…
hhhhsc701 Apr 27, 2026
4351d67
Refine OpenTelemetry monitoring and multi-platform config
Apr 29, 2026
b4f1d67
Merge branch 'develop' into dev/opentelemetry
Apr 29, 2026
b71d7af
Add local Phoenix and Langfuse monitoring deployment support
May 6, 2026
e4e37ec
Merge branch 'refs/heads/develop' into dev/opentelemetry
hhhhsc701 May 6, 2026
d6035af
✨ Feat: Enhance monitoring capabilities with FastAPI instrumentation …
May 6, 2026
6290b32
✨ Feat: Add Grafana and Tempo support for enhanced monitoring capabil…
May 8, 2026
6bc16e7
✨ Feat: Update monitoring configurations with specific versioning for…
May 8, 2026
b7d8d64
✨ Feat: Refactor monitoring configuration to use environment variable…
May 8, 2026
d734591
✨ Feat: Expand OpenTelemetry design documentation to include observab…
May 9, 2026
7a752e7
✨ Feat: Add support for Apache SkyWalking as a monitoring provider, i…
May 11, 2026
db73c07
支持zipkin查看
hhhhsc701 May 11, 2026
6b08eb5
✨ Feat: Add support for LangSmith as a monitoring provider, including…
May 12, 2026
5e1c1b4
Merge remote-tracking branch 'origin/dev/opentelemetry' into dev/open…
May 12, 2026
6e4e531
clean code
May 12, 2026
8ffe8b6
Merge branch 'develop' into dev/opentelemetry
May 12, 2026
59b9e18
clean code
May 13, 2026
224c291
✨ Feat: Add OpenTelemetry monitoring stack with configurable provider…
May 13, 2026
d614401
Normalize monitoring env variables
May 13, 2026
2dda002
✨ Feat: Add OpenTelemetry monitoring stack with configurable provider…
May 13, 2026
95bfe0f
Merge branch 'refs/heads/develop' into dev/opentelemetry
hhhhsc701 May 14, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 39 additions & 5 deletions backend/apps/monitoring_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,16 @@

import logging
from http import HTTPStatus
from typing import Annotated, Optional
from typing import Annotated, Any

from fastapi import APIRouter, Header, HTTPException, Query
from sqlalchemy import text

from consts.const import (
ENABLE_TELEMETRY,
MONITORING_DASHBOARD_URL,
MONITORING_PROVIDER,
)
from consts.model import ConversationResponse
from database.client import get_monitoring_db_session
from utils.auth_utils import get_current_user_id
Expand All @@ -21,19 +26,38 @@
router = APIRouter(prefix="/monitoring")


def _normalize_monitoring_provider(value: str | None) -> str:
return str(value or "otlp").strip().lower()


def get_monitoring_status() -> dict[str, Any]:
"""Return telemetry state and the monitoring UI entrypoint for frontend use."""
telemetry_enabled = ENABLE_TELEMETRY
provider = _normalize_monitoring_provider(MONITORING_PROVIDER)
dashboard_url = MONITORING_DASHBOARD_URL.strip() or None

return {
"telemetry_enabled": telemetry_enabled,
"provider": provider,
"dashboard_url": dashboard_url,
"dashboard_port": None,
"dashboard_path": None,
}


def _compute_time_range_filter(time_range: str) -> str:
"""Convert time_range parameter to SQL timestamp condition."""
hours = {"24h": 24, "7d": 168, "30d": 720}.get(time_range, 24)
return f"m.create_time >= NOW() - INTERVAL '{hours} hours'"


def _query_model_metrics_from_db(
time_range: str, tenant_id: Optional[str] = None
) -> list[dict]:
time_range: str, tenant_id: str | None = None
) -> list[dict[str, Any]]:
time_filter = _compute_time_range_filter(time_range)

tenant_filter = ""
params = {}
params: dict[str, str] = {}
if tenant_id:
tenant_filter = "AND m.tenant_id = :tenant_id"
params["tenant_id"] = tenant_id
Expand Down Expand Up @@ -96,7 +120,7 @@ async def list_models_endpoint(
page: Annotated[int, Query(ge=1, description="Page number")] = 1,
page_size: Annotated[int, Query(
ge=1, le=100, description="Items per page")] = 20,
authorization: Annotated[Optional[str], Header()] = None,
authorization: Annotated[str | None, Header()] = None,
):
"""List all models with aggregated monitoring metrics from database."""
try:
Expand All @@ -113,3 +137,13 @@ async def list_models_endpoint(
logger.error(f"Failed to list monitoring models: {str(e)}")
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=str(e))


@router.get("/status", response_model=ConversationResponse)
async def get_monitoring_status_endpoint():
"""Return whether monitoring UI should be shown in the frontend."""
return ConversationResponse(
code=0,
message="success",
data=get_monitoring_status(),
)
72 changes: 59 additions & 13 deletions backend/consts/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,19 +336,65 @@ class VectorDatabaseType(str, Enum):
THINK_END_PATTERN = "</think>"


# Telemetry and Monitoring Configuration
ENABLE_TELEMETRY = os.getenv("ENABLE_TELEMETRY", "false").lower() == "true"
SERVICE_NAME = os.getenv("SERVICE_NAME", "nexent-backend")
JAEGER_ENDPOINT = os.getenv(
"JAEGER_ENDPOINT", "http://localhost:14268/api/traces")
PROMETHEUS_PORT = int(os.getenv("PROMETHEUS_PORT", "8000"))
TELEMETRY_SAMPLE_RATE = float(os.getenv("TELEMETRY_SAMPLE_RATE", "1.0"))

# Performance monitoring thresholds
LLM_SLOW_REQUEST_THRESHOLD_SECONDS = float(
os.getenv("LLM_SLOW_REQUEST_THRESHOLD_SECONDS", "5.0"))
LLM_SLOW_TOKEN_RATE_THRESHOLD = float(
os.getenv("LLM_SLOW_TOKEN_RATE_THRESHOLD", "10.0")) # tokens per second
# Telemetry and Monitoring Configuration (OTLP Protocol)
MONITORING_PROVIDER = os.getenv("MONITORING_PROVIDER", "")
ENABLE_TELEMETRY_RAW = os.getenv("ENABLE_TELEMETRY")
ENABLE_TELEMETRY = (ENABLE_TELEMETRY_RAW or "false").lower() == "true"
OTEL_SERVICE_NAME_RAW = os.getenv("OTEL_SERVICE_NAME")
OTEL_SERVICE_NAME = OTEL_SERVICE_NAME_RAW or "nexent-backend"
OTEL_EXPORTER_OTLP_ENDPOINT_RAW = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT")
OTEL_EXPORTER_OTLP_ENDPOINT = OTEL_EXPORTER_OTLP_ENDPOINT_RAW or "http://localhost:4318"
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT = os.getenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", "")
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT = os.getenv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", "")
OTEL_EXPORTER_OTLP_PROTOCOL_RAW = os.getenv("OTEL_EXPORTER_OTLP_PROTOCOL")
OTEL_EXPORTER_OTLP_PROTOCOL = OTEL_EXPORTER_OTLP_PROTOCOL_RAW or "http"
OTEL_EXPORTER_OTLP_HEADERS_RAW = os.getenv("OTEL_EXPORTER_OTLP_HEADERS")
OTEL_EXPORTER_OTLP_HEADERS = OTEL_EXPORTER_OTLP_HEADERS_RAW or ""
OTEL_EXPORTER_OTLP_AUTHORIZATION = os.getenv("OTEL_EXPORTER_OTLP_AUTHORIZATION", "")
OTEL_EXPORTER_OTLP_X_API_KEY = os.getenv("OTEL_EXPORTER_OTLP_X_API_KEY", "")
OTEL_EXPORTER_OTLP_LANGFUSE_INGESTION_VERSION = os.getenv(
"OTEL_EXPORTER_OTLP_LANGFUSE_INGESTION_VERSION", "")
LANGSMITH_API_KEY = os.getenv("LANGSMITH_API_KEY", "")
LANGSMITH_PROJECT = os.getenv("LANGSMITH_PROJECT", "")
OTEL_EXPORTER_OTLP_METRICS_ENABLED_RAW = os.getenv("OTEL_EXPORTER_OTLP_METRICS_ENABLED")
OTEL_EXPORTER_OTLP_METRICS_ENABLED = (
OTEL_EXPORTER_OTLP_METRICS_ENABLED_RAW or "true").lower() == "true"
MONITORING_INSTRUMENT_FASTAPI_RAW = os.getenv("MONITORING_INSTRUMENT_FASTAPI")
MONITORING_INSTRUMENT_FASTAPI = (
MONITORING_INSTRUMENT_FASTAPI_RAW or "true").lower() == "true"
MONITORING_INSTRUMENT_REQUESTS_RAW = os.getenv("MONITORING_INSTRUMENT_REQUESTS")
MONITORING_INSTRUMENT_REQUESTS = (
MONITORING_INSTRUMENT_REQUESTS_RAW or "false").lower() == "true"
MONITORING_FASTAPI_EXCLUDED_URLS = os.getenv("MONITORING_FASTAPI_EXCLUDED_URLS", "")
MONITORING_FASTAPI_EXCLUDE_SPANS = os.getenv("MONITORING_FASTAPI_EXCLUDE_SPANS", "receive,send")
MONITORING_PROJECT_NAME = os.getenv("MONITORING_PROJECT_NAME", "")
MONITORING_DASHBOARD_URL = os.getenv("MONITORING_DASHBOARD_URL", "")
TELEMETRY_SAMPLE_RATE_RAW = os.getenv("TELEMETRY_SAMPLE_RATE")
TELEMETRY_SAMPLE_RATE = float(TELEMETRY_SAMPLE_RATE_RAW or "1.0")

# Parse OTLP headers into dict format
def _parse_otlp_headers(headers_str: str) -> dict:
"""Parse OTLP headers string into dict. Format: 'key1=value1,key2=value2'"""
if not headers_str:
return {}
headers = {}
for pair in headers_str.split(","):
if "=" in pair:
key, value = pair.split("=", 1)
headers[key.strip()] = value.strip()
return headers

OTLP_HEADERS = _parse_otlp_headers(OTEL_EXPORTER_OTLP_HEADERS)
if OTEL_EXPORTER_OTLP_AUTHORIZATION:
OTLP_HEADERS["Authorization"] = OTEL_EXPORTER_OTLP_AUTHORIZATION
if OTEL_EXPORTER_OTLP_X_API_KEY:
OTLP_HEADERS["x-api-key"] = OTEL_EXPORTER_OTLP_X_API_KEY
elif LANGSMITH_API_KEY:
OTLP_HEADERS["x-api-key"] = LANGSMITH_API_KEY
if LANGSMITH_PROJECT:
OTLP_HEADERS["Langsmith-Project"] = LANGSMITH_PROJECT
if OTEL_EXPORTER_OTLP_LANGFUSE_INGESTION_VERSION:
OTLP_HEADERS["x-langfuse-ingestion-version"] = OTEL_EXPORTER_OTLP_LANGFUSE_INGESTION_VERSION


DEFAULT_ZH_TITLE = "新对话"
Expand Down
30 changes: 29 additions & 1 deletion backend/services/agent_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
from utils.llm_utils import call_llm_for_system_prompt

# Monitoring utilities: expose monitoring context for downstream observers
from nexent.monitor import set_monitoring_context
from nexent.monitor import OPENINFERENCE_SPAN_KIND_CHAIN, set_monitoring_context

# Import monitoring utilities
from utils.monitoring import monitoring_manager
Expand Down Expand Up @@ -1815,7 +1815,7 @@


@monitoring_manager.monitor_endpoint("agent_service.run_agent_stream", exclude_params=["authorization"])
async def run_agent_stream(

Check failure on line 1818 in backend/services/agent_service.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 16 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=ModelEngine-Group_nexent&issues=AZ4bbd53SehsUYuHnVqM&open=AZ4bbd53SehsUYuHnVqM&pullRequest=2969
agent_request: AgentRequest,
http_request: Request,
authorization: str,
Expand Down Expand Up @@ -1875,6 +1875,20 @@
agent_id=agent_request.agent_id,
conversation_id=agent_request.conversation_id,
)
monitoring_manager.set_openinference_agent_context(
agent_id=agent_request.agent_id,
conversation_id=agent_request.conversation_id,
user_id=resolved_user_id,
tenant_id=resolved_tenant_id,
query=agent_request.query,
is_debug=agent_request.is_debug,
extra_metadata={
"language": language,
"history_count": len(agent_request.history) if agent_request.history else 0,
"minio_files_count": len(agent_request.minio_files) if agent_request.minio_files else 0,
},
span_kind=OPENINFERENCE_SPAN_KIND_CHAIN,
)

# Step 2: Save user message (if needed)
if not agent_request.is_debug and not skip_user_save:
Expand Down Expand Up @@ -1912,6 +1926,20 @@

memory_duration = time.time() - memory_start_time
memory_enabled = memory_ctx_preview.user_config.memory_switch
monitoring_manager.set_openinference_agent_context(
agent_id=agent_request.agent_id,
conversation_id=agent_request.conversation_id,
user_id=resolved_user_id,
tenant_id=resolved_tenant_id,
query=agent_request.query,
is_debug=agent_request.is_debug,
memory_enabled=memory_enabled,
extra_metadata={
"language": language,
"agent_share_option": getattr(memory_ctx_preview.user_config, "agent_share_option", "unknown"),
},
span_kind=OPENINFERENCE_SPAN_KIND_CHAIN,
)
monitoring_manager.add_span_event("memory_context_build.completed", {
"duration": memory_duration,
"memory_enabled": memory_enabled,
Expand Down
92 changes: 52 additions & 40 deletions backend/utils/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
Global Monitoring Manager for Backend

This module initializes and configures the global monitoring manager instance
with backend environment variables. All other backend modules should import
`monitoring_manager` directly from this module.
with backend environment variables using OTLP protocol. All other backend modules
should import `monitoring_manager` directly from this module.

Usage:
from utils.monitoring import monitoring_manager

@monitoring_manager.monitor_endpoint("my_service.my_function")
async def my_function():
return {"status": "ok"}
Expand All @@ -17,67 +17,79 @@ async def my_function():
MonitoringConfig,
get_monitoring_manager
)
# Import configuration from backend (support both relative and absolute imports)
try:
# Try relative import first (when running from backend directory)
from consts.const import (
ENABLE_TELEMETRY,
SERVICE_NAME,
JAEGER_ENDPOINT,
PROMETHEUS_PORT,
TELEMETRY_SAMPLE_RATE,
LLM_SLOW_REQUEST_THRESHOLD_SECONDS,
LLM_SLOW_TOKEN_RATE_THRESHOLD
MONITORING_PROVIDER,
MONITORING_PROJECT_NAME,
OTEL_SERVICE_NAME,
OTEL_EXPORTER_OTLP_ENDPOINT,
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
OTEL_EXPORTER_OTLP_PROTOCOL,
OTEL_EXPORTER_OTLP_METRICS_ENABLED,
MONITORING_INSTRUMENT_FASTAPI,
MONITORING_INSTRUMENT_REQUESTS,
MONITORING_FASTAPI_EXCLUDED_URLS,
MONITORING_FASTAPI_EXCLUDE_SPANS,
OTLP_HEADERS,
TELEMETRY_SAMPLE_RATE
)
except ImportError:
# Fallback to absolute import (when running from project root)
from backend.consts.const import (
ENABLE_TELEMETRY,
SERVICE_NAME,
JAEGER_ENDPOINT,
PROMETHEUS_PORT,
TELEMETRY_SAMPLE_RATE,
LLM_SLOW_REQUEST_THRESHOLD_SECONDS,
LLM_SLOW_TOKEN_RATE_THRESHOLD
MONITORING_PROVIDER,
MONITORING_PROJECT_NAME,
OTEL_SERVICE_NAME,
OTEL_EXPORTER_OTLP_ENDPOINT,
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
OTEL_EXPORTER_OTLP_PROTOCOL,
OTEL_EXPORTER_OTLP_METRICS_ENABLED,
MONITORING_INSTRUMENT_FASTAPI,
MONITORING_INSTRUMENT_REQUESTS,
MONITORING_FASTAPI_EXCLUDED_URLS,
MONITORING_FASTAPI_EXCLUDE_SPANS,
OTLP_HEADERS,
TELEMETRY_SAMPLE_RATE
)

import logging

logger = logging.getLogger(__name__)

# ============================================================================
# Global Monitoring Manager Instance
# ============================================================================

# Get the global monitoring manager instance
monitoring_manager = get_monitoring_manager()

# Initialize monitoring configuration immediately when this module is imported


def _initialize_monitoring():
"""Initialize monitoring configuration with backend environment variables."""
"""Initialize monitoring configuration with OTLP settings."""
config = MonitoringConfig(
enable_telemetry=ENABLE_TELEMETRY,
service_name=SERVICE_NAME,
jaeger_endpoint=JAEGER_ENDPOINT,
prometheus_port=PROMETHEUS_PORT,
telemetry_sample_rate=TELEMETRY_SAMPLE_RATE,
llm_slow_request_threshold_seconds=LLM_SLOW_REQUEST_THRESHOLD_SECONDS,
llm_slow_token_rate_threshold=LLM_SLOW_TOKEN_RATE_THRESHOLD
service_name=OTEL_SERVICE_NAME,
provider=MONITORING_PROVIDER or "otlp",
otlp_endpoint=OTEL_EXPORTER_OTLP_ENDPOINT,
otlp_traces_endpoint=OTEL_EXPORTER_OTLP_TRACES_ENDPOINT or None,
otlp_metrics_endpoint=OTEL_EXPORTER_OTLP_METRICS_ENDPOINT or None,
otlp_protocol=OTEL_EXPORTER_OTLP_PROTOCOL,
otlp_headers=OTLP_HEADERS,
export_metrics=OTEL_EXPORTER_OTLP_METRICS_ENABLED,
instrument_fastapi=MONITORING_INSTRUMENT_FASTAPI,
instrument_requests=MONITORING_INSTRUMENT_REQUESTS,
fastapi_excluded_urls=MONITORING_FASTAPI_EXCLUDED_URLS,
fastapi_exclude_spans=MONITORING_FASTAPI_EXCLUDE_SPANS,
project_name=MONITORING_PROJECT_NAME or None,
telemetry_sample_rate=TELEMETRY_SAMPLE_RATE
)

# Configure the SDK monitoring system using the singleton
monitoring_manager.configure(config)
logger.info(
f"Global monitoring initialized: service_name={SERVICE_NAME}, enable_telemetry={ENABLE_TELEMETRY}")
f"OTLP monitoring initialized: service_name={OTEL_SERVICE_NAME}, "
f"enable_telemetry={config.enable_telemetry}, provider={config.provider}, "
f"endpoint={config.otlp_endpoint}, trace_endpoint={config.get_trace_endpoint()}, "
f"protocol={OTEL_EXPORTER_OTLP_PROTOCOL}"
)


# Initialize monitoring when module is imported
_initialize_monitoring()


# Export the global monitoring manager instance
__all__ = [
'monitoring_manager'
]
__all__ = ['monitoring_manager']
1 change: 1 addition & 0 deletions doc/docs/.vitepress/config.mts
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ export default defineConfig({
],
},
{ text: "性能监控", link: "/zh/sdk/monitoring" },
{ text: "OpenTelemetry 设计", link: "/zh/sdk/opentelemetry-design" },
{ text: "向量数据库", link: "/zh/sdk/vector-database" },
{ text: "数据处理", link: "/zh/sdk/data-process" },
],
Expand Down
2 changes: 1 addition & 1 deletion doc/docs/en/getting-started/software-architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ Real-time Input → Streaming Endpoint → Async Processing
- **High Availability**: Multi-service redundancy, health checks, auto-restart
- **High Performance**: Async processing, Redis caching, vector search optimization
- **High Concurrency**: Distributed architecture, load balancing
- **Monitoring Friendly**: Prometheus metrics, Jaeger tracing, structured logging
- **Monitoring Friendly**: OpenTelemetry observability, Grafana Tempo tracing, structured logging

### 🔧 Developer Friendly
- **Modular Development**: Clean layered architecture (App → Service → Database)
Expand Down
Loading
Loading