Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions dimos/agents/mcp/mcp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from dimos.core.module import Module, ModuleConfig
from dimos.core.rpc_client import RPCClient
from dimos.core.stream import In, Out
from dimos.telemetry import session_attributes, span as trace_span
from dimos.utils.logging_config import setup_logger
from dimos.utils.sequential_ids import SequentialIds

Expand Down Expand Up @@ -64,6 +65,7 @@ class McpClient(Module):
_http_client: httpx.Client
_seq_ids: SequentialIds
_tool_stream_cleanup: Callable[[], None] | None
_session_id: str

def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
Expand All @@ -81,6 +83,9 @@ def __init__(self, **kwargs: Any) -> None:
self._http_client = httpx.Client(timeout=120.0)
self._seq_ids = SequentialIds()
self._tool_stream_cleanup = None
# Stable per-instance id; every agent.turn span is tagged with it so
# the observability backend groups all turns into one session.
self._session_id = str(uuid.uuid4())

def __reduce__(self) -> Any:
return (self.__class__, (), {})
Expand Down Expand Up @@ -323,12 +328,13 @@ def _process_message(
pretty_print_langchain_message(message)
self.agent.publish(message)

for update in state_graph.stream({"messages": self._history}, stream_mode="updates"):
for node_output in update.values():
for msg in node_output.get("messages", []):
self._history.append(msg)
pretty_print_langchain_message(msg)
self.agent.publish(msg)
with trace_span("agent.turn", **session_attributes(self._session_id)):
for update in state_graph.stream({"messages": self._history}, stream_mode="updates"):
for node_output in update.values():
for msg in node_output.get("messages", []):
self._history.append(msg)
pretty_print_langchain_message(msg)
self.agent.publish(msg)

if self._message_queue.empty():
self.agent_idle.publish(True)
Expand Down
152 changes: 152 additions & 0 deletions dimos/telemetry/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Copyright 2025-2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Opt-in OpenTelemetry tracing for DimOS.

Importing this package has no side effects and triggers no
opentelemetry imports, even when the `dimos[otel]` extra is installed.
The public `span` context manager is a silent no-op until tracing is
wired up.

Three ways to turn tracing on:

1. Env-driven setup. Install the extra and set OTEL_EXPORTER_OTLP_ENDPOINT
(plus optional OTEL_EXPORTER_OTLP_HEADERS and OTEL_SERVICE_NAME).
`enable()` runs automatically on first import of this package. The
OTLP HTTP exporter is configured, and LangChain auto-instrumentation
is applied when `openinference-instrumentation-langchain` is present.

2. Caller-owned provider. When the host app has its own TracerProvider:

dimos.telemetry.configure_tracing(my_provider)

3. Standard OTEL `BaseInstrumentor`:

DimosInstrumentor().instrument(tracer_provider=my_provider)

Any OTLP-compatible backend works (Langfuse, Arize Phoenix, LangSmith,
Opik, etc.). Vendor selection is by env var, not code.
"""

import os
from typing import Any

from dimos.telemetry._api import span
from dimos.telemetry._manager import _manager
from dimos.utils.logging_config import setup_logger

logger = setup_logger()

__all__ = [
"DimosInstrumentor",
"configure_tracing",
"enable",
"session_attributes",
"span",
]


def configure_tracing(tracer_provider: Any, tracer_name: str = "dimos") -> None:
"""Wire DimOS into a caller-owned TracerProvider.

Raises RuntimeError when the `dimos[otel]` extra isn't installed.
"""
try:
import opentelemetry # noqa: F401 (presence check)
except ImportError:
raise RuntimeError(
"dimos.telemetry: opentelemetry is not installed. "
"Install with `pip install dimos[otel]` (or `uv sync --extra otel`)."
) from None
_manager.configure(tracer_provider.get_tracer(tracer_name))


def enable() -> bool:
"""Auto-configure tracing from the standard OTEL env vars.

Returns True when tracing was wired up, False otherwise (no exporter
endpoint set, or the `dimos[otel]` extra isn't installed).
"""
if not os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT"):
return False
try:
import atexit

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
except ImportError:
logger.warning(
"OTEL_EXPORTER_OTLP_ENDPOINT is set but `dimos[otel]` is not "
"installed; tracing disabled."
)
return False

provider = TracerProvider(
resource=Resource.create({"service.name": os.environ.get("OTEL_SERVICE_NAME", "dimos")})
)
provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
trace.set_tracer_provider(provider)
atexit.register(provider.shutdown)
configure_tracing(provider)

try:
from openinference.instrumentation.langchain import LangChainInstrumentor

LangChainInstrumentor().instrument(tracer_provider=provider)
except ImportError:
# Auto-instrumentation is best-effort and optional within the extra.
pass

return True


def session_attributes(session_id: str) -> dict[str, str]:
"""Span-attribute dict that groups traces under one session in
common LLM-observability backends.

Sets three keys to cover the four major OTLP-compatible backends:
session.id — OpenInference convention.
Used by Langfuse and Arize Phoenix / AX.
langsmith.trace.session_id — LangSmith's own attribute namespace.
thread_id — Opik's Threads attribute (added in
comet-ml/opik#3441).
"""
return {
"session.id": session_id,
"langsmith.trace.session_id": session_id,
"thread_id": session_id,
}


def __getattr__(name: str) -> Any:
# Resolve DimosInstrumentor lazily so importing this package never
# pulls in opentelemetry.instrumentation.
if name == "DimosInstrumentor":
from dimos.telemetry.instrumentor import DimosInstrumentor

globals()[name] = DimosInstrumentor
return DimosInstrumentor
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")


# Boot-time auto-enable: treat OTEL_EXPORTER_OTLP_ENDPOINT being set as the
# user's explicit opt-in. When unset (the default), this is a single
# os.environ.get() check; no OTEL packages are imported.
if os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT"):
enable()
41 changes: 41 additions & 0 deletions dimos/telemetry/_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright 2025-2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Public tracing helpers for `dimos.telemetry`.

Safe to call whether or not the `dimos[otel]` extra is installed and
whether or not tracing has been wired up. Short-circuits to a no-op
when the manager is in its default off state.
"""

from collections.abc import Iterator
from contextlib import contextmanager
from typing import Any

from dimos.telemetry._manager import _manager


@contextmanager
def span(name: str, **attributes: Any) -> Iterator[Any]:
"""Open a span around a block.

Yields the active OTEL Span when tracing is configured, or None
otherwise. In the no-op case the only cost is a single boolean
check on the manager.
"""
if not _manager._export_enabled:
yield None
return
with _manager.tracer.start_as_current_span(name, attributes=attributes or None) as s:
yield s
51 changes: 51 additions & 0 deletions dimos/telemetry/_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright 2025-2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Internal tracer state for `dimos.telemetry`.

Owns a module-global `TracerManager` that holds the active OTEL tracer
and an explicit on/off flag. No opentelemetry packages are imported
here; that cost is paid only when a wiring entry point (`enable`,
`configure_tracing`, `DimosInstrumentor`) is called.
"""

from typing import Any

from dimos.utils.logging_config import setup_logger

logger = setup_logger()


class TracerManager:
"""Holds the active tracer plus an on/off flag.

Stays inert at import time: the tracer is None and export is off
until a caller wires us up.
"""

def __init__(self) -> None:
self.tracer: Any = None
self._export_enabled: bool = False

def configure(self, tracer: Any) -> None:
self.tracer = tracer
self._export_enabled = True
logger.info("dimos.telemetry: tracing configured.")

def reset(self) -> None:
self.tracer = None
self._export_enabled = False


_manager = TracerManager()
72 changes: 72 additions & 0 deletions dimos/telemetry/instrumentor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Copyright 2025-2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Standard OTEL `BaseInstrumentor` integration for DimOS.

Host applications wire DimOS into their telemetry stack the same way
they wire every other instrumented library:

from dimos.telemetry import DimosInstrumentor
DimosInstrumentor().instrument(tracer_provider=my_provider)

This module is imported lazily by `dimos.telemetry.__getattr__` only
when `DimosInstrumentor` is accessed — that's the gate that keeps OTEL
imports out of `import dimos.telemetry`. Inside this module the OTEL
imports run at the top level, behind a try/except that falls back to a
stub class when `dimos[otel]` isn't installed.
"""

from typing import Any

try:
from collections.abc import Collection

from opentelemetry import trace as otel_trace
from opentelemetry.instrumentation.instrumentor import ( # type: ignore[attr-defined]
BaseInstrumentor,
)

class DimosInstrumentor(BaseInstrumentor): # type: ignore[misc, valid-type]
"""OTEL instrumentor for DimOS.

Calling `instrument(tracer_provider=...)` wires the supplied
provider into `dimos.telemetry`. If `tracer_provider` is
omitted, the global provider is used.
"""

def instrumentation_dependencies(self) -> "Collection[str]":
return []

def _instrument(self, **kwargs: Any) -> None:
from dimos.telemetry import configure_tracing

tracer_provider = kwargs.get("tracer_provider") or otel_trace.get_tracer_provider()
tracer_name = kwargs.get("tracer_name", "dimos")
configure_tracing(tracer_provider, tracer_name)

def _uninstrument(self, **kwargs: Any) -> None:
from dimos.telemetry._manager import _manager

_manager.reset()

except ImportError:

class DimosInstrumentor: # type: ignore[no-redef]
"""Stub: install `dimos[otel]` to use."""

def __init__(self) -> None:
raise RuntimeError(
"DimosInstrumentor requires the `dimos[otel]` extra. "
"Install with `pip install dimos[otel]` (or `uv sync --extra otel`)."
)
Loading