Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 131 additions & 99 deletions mellea/telemetry/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,69 +106,63 @@
# Provide dummy types for type hints
metrics = None # type: ignore

# Configuration from environment variables
_METRICS_ENABLED = _OTEL_AVAILABLE and os.getenv(
"MELLEA_METRICS_ENABLED", "false"
).lower() in ("true", "1", "yes")
_METRICS_CONSOLE = os.getenv("MELLEA_METRICS_CONSOLE", "false").lower() in (
"true",
"1",
"yes",
)
_METRICS_OTLP = os.getenv("MELLEA_METRICS_OTLP", "false").lower() in (
"true",
"1",
"yes",
)
# Metrics-specific endpoint takes precedence over general OTLP endpoint
_OTLP_METRICS_ENDPOINT = os.getenv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT") or os.getenv(
"OTEL_EXPORTER_OTLP_ENDPOINT"
)
_METRICS_PROMETHEUS = os.getenv("MELLEA_METRICS_PROMETHEUS", "false").lower() in (
"true",
"1",
"yes",
)
_SERVICE_NAME = os.getenv("OTEL_SERVICE_NAME", "mellea")

# Parse export interval (default 60000 milliseconds = 60 seconds)
try:
_EXPORT_INTERVAL_MILLIS = int(os.getenv("OTEL_METRIC_EXPORT_INTERVAL", "60000"))
if _EXPORT_INTERVAL_MILLIS <= 0:

def _env_true(name: str) -> bool:
"""Return True if `name` is set to a truthy value (1/true/yes)."""
return os.getenv(name, "false").lower() in ("true", "1", "yes")


def _parse_export_interval() -> int:
"""Read OTEL_METRIC_EXPORT_INTERVAL with warn-and-default fallback.

Returns:
Export interval in milliseconds (default 60000 = 60 seconds).
"""
raw = os.getenv("OTEL_METRIC_EXPORT_INTERVAL", "60000")
try:
value = int(raw)
if value <= 0:
warnings.warn(
f"Invalid OTEL_METRIC_EXPORT_INTERVAL value: {value}. "
"Must be positive. Using default of 60000 milliseconds.",
UserWarning,
stacklevel=2,
)
return 60000
return value
except ValueError:
warnings.warn(
f"Invalid OTEL_METRIC_EXPORT_INTERVAL value: {_EXPORT_INTERVAL_MILLIS}. "
"Must be positive. Using default of 60000 milliseconds.",
f"Invalid OTEL_METRIC_EXPORT_INTERVAL value: {raw}. "
"Must be an integer. Using default of 60000 milliseconds.",
UserWarning,
stacklevel=2,
)
_EXPORT_INTERVAL_MILLIS = 60000
except ValueError:
warnings.warn(
f"Invalid OTEL_METRIC_EXPORT_INTERVAL value: {os.getenv('OTEL_METRIC_EXPORT_INTERVAL')}. "
"Must be an integer. Using default of 60000 milliseconds.",
UserWarning,
stacklevel=2,
)
_EXPORT_INTERVAL_MILLIS = 60000
return 60000


def _setup_meter_provider() -> Any:
"""Set up the MeterProvider with configured exporters.

Reads exporter, endpoint, and service-name env vars at call time so that
environment changes made after module import are respected without
requiring a module reload.

Returns:
MeterProvider instance or None if OpenTelemetry is not available
"""
if not _OTEL_AVAILABLE:
return None

resource = Resource.create({"service.name": _SERVICE_NAME}) # type: ignore
service_name = os.getenv("OTEL_SERVICE_NAME", "mellea")
export_interval_millis = _parse_export_interval()
resource = Resource.create({"service.name": service_name}) # type: ignore
readers = []

# Add Prometheus metric reader if enabled.
# This registers metrics with the prometheus_client default registry.
# The application is responsible for exposing the registry (e.g. via
# prometheus_client.start_http_server() or a framework integration).
if _METRICS_PROMETHEUS:
if _env_true("MELLEA_METRICS_PROMETHEUS"):
try:
from opentelemetry.exporter.prometheus import PrometheusMetricReader

Expand All @@ -191,15 +185,18 @@ def _setup_meter_provider() -> Any:
)

# Add OTLP exporter if explicitly enabled
if _METRICS_OTLP:
if _OTLP_METRICS_ENDPOINT:
if _env_true("MELLEA_METRICS_OTLP"):
otlp_endpoint = os.getenv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT") or os.getenv(
"OTEL_EXPORTER_OTLP_ENDPOINT"
)
if otlp_endpoint:
try:
otlp_exporter = OTLPMetricExporter( # type: ignore
endpoint=_OTLP_METRICS_ENDPOINT
endpoint=otlp_endpoint
)
readers.append(
PeriodicExportingMetricReader( # type: ignore
otlp_exporter, export_interval_millis=_EXPORT_INTERVAL_MILLIS
otlp_exporter, export_interval_millis=export_interval_millis
)
)
except Exception as e:
Expand All @@ -218,12 +215,12 @@ def _setup_meter_provider() -> Any:
)

# Add console exporter for debugging if enabled
if _METRICS_CONSOLE:
if _env_true("MELLEA_METRICS_CONSOLE"):
try:
console_exporter = ConsoleMetricExporter() # type: ignore
readers.append(
PeriodicExportingMetricReader( # type: ignore
console_exporter, export_interval_millis=_EXPORT_INTERVAL_MILLIS
console_exporter, export_interval_millis=export_interval_millis
)
)
except Exception as e:
Expand Down Expand Up @@ -267,14 +264,84 @@ def _setup_meter_provider() -> Any:
return provider


# Initialize meter provider if metrics are enabled
_meter_provider = None
_meter = None
_meter_provider: Any = None
_meter: Any = None
_metrics_enabled: bool = False
_plugins_registered: bool = False


def _register_metrics_plugins() -> None:
"""Register metrics plugins on the global plugin registry.

Idempotent via `_plugins_registered`; subsequent calls are no-ops. The
plugin registry is process-global, so plugins remain registered for the
lifetime of the process even if the meter provider is rebuilt.
"""
global _plugins_registered
if _plugins_registered:
return

from mellea.plugins.registry import _HAS_PLUGIN_FRAMEWORK, register

if not _HAS_PLUGIN_FRAMEWORK:
warnings.warn(
"Metrics are enabled but the plugin framework is not installed. "
"Token usage and latency metrics will not be recorded automatically. "
"Install with: pip install mellea[telemetry]",
UserWarning,
stacklevel=2,
)
return

from mellea.telemetry.metrics_plugins import _METRICS_PLUGIN_CLASSES

for plugin_cls in _METRICS_PLUGIN_CLASSES:
try:
register(plugin_cls())
except ValueError as e:
warnings.warn(
f"{plugin_cls.__name__} already registered: {e}",
UserWarning,
stacklevel=2,
)
_plugins_registered = True


def is_metrics_enabled() -> bool:
"""Check if metrics collection is enabled.

Returns:
True if `MELLEA_METRICS_ENABLED` is truthy AND OpenTelemetry is installed.
"""
return _metrics_enabled


def _setup_metrics() -> None:
"""Build the MeterProvider and register metrics plugins.

No-op if metrics are disabled. Env vars are read at call time inside
`_setup_meter_provider()` so changes made after module import are
respected without requiring a module reload.
"""
global _meter_provider, _meter, _metrics_enabled

_metrics_enabled = False
if not (_OTEL_AVAILABLE and _env_true("MELLEA_METRICS_ENABLED")):
return

if _OTEL_AVAILABLE and _METRICS_ENABLED:
_meter_provider = _setup_meter_provider()
if _meter_provider is not None:
_meter = metrics.get_meter("mellea.metrics", version("mellea")) # type: ignore
if _meter_provider is None:
return
# Bind directly off the provider we just built — OTel's
# `set_meter_provider()` is one-shot per process, so `metrics.get_meter()`
# would return a meter attached to the original (now shutdown) provider
# after a reset.
_meter = _meter_provider.get_meter("mellea.metrics", version("mellea"))
_metrics_enabled = True
_register_metrics_plugins()


_setup_metrics()


# No-op instrument classes for when metrics are disabled
Expand Down Expand Up @@ -396,15 +463,6 @@ def create_up_down_counter(name: str, description: str = "", unit: str = "1") ->
return _meter.create_up_down_counter(name, description=description, unit=unit)


def is_metrics_enabled() -> bool:
"""Check if metrics collection is enabled.

Returns:
True if metrics are enabled, False otherwise
"""
return _METRICS_ENABLED


# Token usage counters following Gen-AI semantic conventions
# These are lazily initialized on first use and kept internal
_input_token_counter: Any = None
Expand Down Expand Up @@ -458,7 +516,7 @@ def record_token_usage_metrics(
)
"""
# Early return if metrics are disabled (zero overhead)
if not _METRICS_ENABLED:
if _meter is None:
return

# Get the token counters (lazily initialized)
Expand Down Expand Up @@ -528,7 +586,7 @@ def record_request_duration(
streaming=True,
)
"""
if not _METRICS_ENABLED:
if _meter is None:
return

duration_hist, _ = _get_latency_histograms()
Expand Down Expand Up @@ -558,7 +616,7 @@ def record_ttfb(ttfb_s: float, model: str, provider: str) -> None:
provider="ollama",
)
"""
if not _METRICS_ENABLED:
if _meter is None:
return

_, ttfb_hist = _get_latency_histograms()
Expand Down Expand Up @@ -685,7 +743,7 @@ def record_error(
exception_class="RateLimitError",
)
"""
if not _METRICS_ENABLED:
if _meter is None:
return

counter = _get_error_counter()
Expand Down Expand Up @@ -744,7 +802,7 @@ def record_cost(cost: float, model: str, provider: str) -> None:
provider="openai",
)
"""
if not _METRICS_ENABLED:
if _meter is None:
return

counter = _get_cost_counter()
Expand Down Expand Up @@ -803,7 +861,7 @@ def record_sampling_attempt(strategy: str) -> None:
Args:
strategy: Sampling strategy class name (e.g. `"RejectionSamplingStrategy"`).
"""
if not _METRICS_ENABLED:
if _meter is None:
return

_get_sampling_attempts_counter().add(1, {"strategy": strategy})
Expand All @@ -818,7 +876,7 @@ def record_sampling_outcome(strategy: str, success: bool) -> None:
strategy: Sampling strategy class name (e.g. `"RejectionSamplingStrategy"`).
success: `True` if at least one attempt passed all requirements.
"""
if not _METRICS_ENABLED:
if _meter is None:
return

if success:
Expand Down Expand Up @@ -865,7 +923,7 @@ def record_requirement_check(requirement: str) -> None:
Args:
requirement: Requirement class name (e.g. `"LLMaJRequirement"`).
"""
if not _METRICS_ENABLED:
if _meter is None:
return

_get_requirement_checks_counter().add(1, {"requirement": requirement})
Expand All @@ -880,7 +938,7 @@ def record_requirement_failure(requirement: str, reason: str) -> None:
requirement: Requirement class name (e.g. `"LLMaJRequirement"`).
reason: Human-readable failure reason from `ValidationResult.reason`.
"""
if not _METRICS_ENABLED:
if _meter is None:
return

_get_requirement_failures_counter().add(
Expand Down Expand Up @@ -913,39 +971,13 @@ def record_tool_call(tool: str, status: str) -> None:
tool: Name of the tool that was invoked.
status: `"success"` if the tool executed without error, `"failure"` otherwise.
"""
if not _METRICS_ENABLED:
if _meter is None:
return

counter = _get_tool_calls_counter()
counter.add(1, {"tool": tool, "status": status})


# Auto-register metrics plugins when metrics are enabled
if _OTEL_AVAILABLE and _METRICS_ENABLED:
from mellea.plugins.registry import _HAS_PLUGIN_FRAMEWORK, register
from mellea.telemetry.metrics_plugins import _METRICS_PLUGIN_CLASSES

if not _HAS_PLUGIN_FRAMEWORK:
warnings.warn(
"Metrics are enabled but the plugin framework is not installed. "
"Token usage and latency metrics will not be recorded automatically. "
"Install with: pip install mellea[telemetry]",
UserWarning,
stacklevel=2,
)
else:
for _plugin_cls in _METRICS_PLUGIN_CLASSES:
try:
register(_plugin_cls())
except ValueError as e:
# Already registered (expected during module reloads in tests)
warnings.warn(
f"{_plugin_cls.__name__} already registered: {e}",
UserWarning,
stacklevel=2,
)


__all__ = [
"classify_error",
"create_counter",
Expand Down
Loading
Loading