Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
MessageEvent,
ToolCall,
)
from opentelemetry.instrumentation.langchain.langgraph_helper import (
is_langgraph_task,
get_graph_structure,
)
from opentelemetry.instrumentation.langchain.span_utils import (
SpanHolder,
_set_span_attribute,
Expand Down Expand Up @@ -431,6 +435,11 @@ def on_chain_start(
),
)

# Add graph structure
if is_langgraph_task(name):
graph_structure = get_graph_structure()
span.set_attribute("graph_structure", graph_structure)

# The start_time is now automatically set when creating the SpanHolder

@dont_throw
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import inspect
import json
from langgraph.graph.state import CompiledStateGraph


from typing import List, Dict, Union, Any
Comment on lines +1 to +6
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handle optional langgraph dependency gracefully.

The unconditional import of CompiledStateGraph will raise ImportError if langgraph is not installed. Since this is an instrumentation library that may be used in environments where langgraph isn't present, consider using a lazy import pattern.

 import inspect
 import json
-from langgraph.graph.state import CompiledStateGraph
-
-
 from typing import List, Dict, Union, Any
+
+try:
+    from langgraph.graph.state import CompiledStateGraph
+    HAS_LANGGRAPH = True
+except ImportError:
+    CompiledStateGraph = None
+    HAS_LANGGRAPH = False

Then guard the functions accordingly:

def get_compiled_graph():
    if not HAS_LANGGRAPH:
        return None
    # ... rest of implementation
🤖 Prompt for AI Agents
In
packages/opentelemetry-instrumentation-langchain/opentelemetry/instrumentation/langchain/langgraph_helper.py
around lines 1-6, the direct import of CompiledStateGraph will raise ImportError
when langgraph is absent; change to a guarded lazy-import pattern by wrapping
the import in a try/except ImportError that sets a module-level HAS_LANGGRAPH
boolean (False on exception) and only import CompiledStateGraph when available
(or import it inside functions that need it). Update public helper functions
(e.g., get_compiled_graph) to check HAS_LANGGRAPH at the start and return None
(or no-op) when False, so callers won’t error in environments without langgraph.



def is_langgraph_task(name: str) -> bool:
return name == "LangGraph"


def get_compiled_graph():
""" Get the compiled graph from the call stack """
graph = None
invocation_methods = ["Pregel.invoke", "Pregel.ainvoke", "Pregel.stream", "Pregel.astream"]
frames = inspect.stack()
for frame_info in frames[1:]:
if frame_info.frame.f_code.co_qualname in invocation_methods:
local_vars = frame_info.frame.f_locals
graph = local_vars.get("self", None)
graph = graph if isinstance(graph, CompiledStateGraph) else None
break
return graph


def _normalize_endpoint_names(
names: Union[str, List[str], tuple[str, ...]]
) -> List[str]:
"""Normalize edge endpoints to a list of node names."""
if isinstance(names, str):
return [names]
if isinstance(names, (list, tuple)):
return list(names)
raise TypeError(f"Unsupported endpoint type: {type(names)!r}")


def build_node_graph(compiled_state_graph: CompiledStateGraph) -> Dict[str, Any]:
"""
Build a simple node/edge representation from CompiledStateGraph.

Returns a dict:
{
"nodes": [node_name, ...], # excluding "__start__", "__end__"
"edges": [
[[source1, ...], [dest1, dest2, ...]], # each edge has list of sources and list of destinations
...
]
}
"""
builder = compiled_state_graph.builder

# Track *all* node names (including __start__/__end__) for edges,
# but only expose non-special nodes in the "nodes" list.
all_nodes_ordered = list(compiled_state_graph.nodes.keys())
nodes: List[str] = [
name for name in all_nodes_ordered if name not in ("__start__", "__end__")
]

edges: List[List[List[str]]] = []

# Regular edges
for src, dst in builder.edges:
src_names = _normalize_endpoint_names(src)
dst_names = _normalize_endpoint_names(dst)
edges.append([src_names, dst_names])

# Branches
branches: Dict[str, Dict[str]] = builder.branches
for source, branch_map in branches.items():
for branch in branch_map.values():
# branch.ends is expected to be a mapping; we use its values as destinations
dest_names = list(branch.ends.values())
# Source is a single node here
edges.append([[source], dest_names])

# Waiting edges
for src, dst in builder.waiting_edges:
src_names = _normalize_endpoint_names(src)
dst_names = _normalize_endpoint_names(dst)
edges.append([src_names, dst_names])

return {
"nodes": nodes,
"edges": edges,
}


def get_graph_structure() -> str:
"""
Get graph structure as a JSON string.

Returns:
JSON string with structure:
{
"nodes": [...],
"edges": [[[...], [...]], ...]
}
"""
graph_structure: Dict[str, Any] = {}
graph = get_compiled_graph()
if graph:
graph_structure = build_node_graph(graph)
return json.dumps(graph_structure)