-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfoundry_multi_agent_e2e_tracing.py
More file actions
210 lines (163 loc) · 9.02 KB
/
foundry_multi_agent_e2e_tracing.py
File metadata and controls
210 lines (163 loc) · 9.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
"""
End-to-End Multi-Agent Tracing Configuration
=============================================
Drop-in replacement for ``tracing.init_tracing()`` that adds full
application-level identity and correlation to every span emitted by
your multi-agent system.
What this adds over the basic ``init_tracing()``:
1. **OTel Resource identity** — ``service.name``, ``service.namespace``,
``service.instance.id``, and ``service.version`` so Application Insights
groups all telemetry under one ``cloud_RoleName``.
2. **Custom AI resource attributes** — ``ai.application.name``,
``ai.orchestration.pattern``, and ``ai.agent.count`` for domain-specific
Kusto queries.
3. **OTel Baggage propagation** — attaches ``session.id`` as Baggage so
every span (including SDK-generated ones) carries the session context
without manual ``set_attribute`` calls.
4. **Enrichment SpanProcessor** — automatically stamps every span with
``app.name`` and the current ``session.id`` from Baggage, even SDK
auto-instrumented spans you don't control.
Usage::
from foundry_multi_agent_e2e_tracing import init_e2e_tracing, set_session_baggage
# At startup — before creating any AgentsClient
connection_string = init_e2e_tracing(
app_name="sports-multi-agent",
namespace="foundry-analytics",
version="1.0.0",
)
# At the start of each session/request
set_session_baggage(session_id)
# Then use the tracer normally — all spans automatically carry
# the app identity and session context.
Required environment variables:
APPLICATION_INSIGHTS_CONNECTION_STRING — enables Azure Monitor export
"""
import os
import uuid
from typing import Optional
from opentelemetry import context, trace, baggage
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider, ReadableSpan, Span
from opentelemetry.sdk.trace.export import SpanProcessor
from azure.core.settings import settings as azure_settings
# ─────────────────────────────────────────────────────────────────────────────
# Enrichment SpanProcessor
# ─────────────────────────────────────────────────────────────────────────────
# Stamps every span with application-level attributes so you can filter and
# group in Application Insights without modifying each span creation site.
class AppEnrichmentProcessor(SpanProcessor):
"""
SpanProcessor that enriches every span with application identity
and the current session ID from OTel Baggage.
This ensures that even SDK auto-instrumented spans (create_agent,
start_thread_run, etc.) carry your application context.
"""
def __init__(self, app_name: str):
self._app_name = app_name
def on_start(self, span: Span, parent_context: Optional[context.Context] = None) -> None:
"""Called when a span starts — inject app identity + session from baggage."""
span.set_attribute("app.name", self._app_name)
# Pull session.id from OTel Baggage (if set via set_session_baggage)
ctx = parent_context or context.get_current()
session_id = baggage.get_baggage("session.id", ctx)
if session_id:
span.set_attribute("session.id", session_id)
def on_end(self, span: ReadableSpan) -> None:
pass
def shutdown(self) -> None:
pass
def force_flush(self, timeout_millis: int = 30000) -> bool:
return True
# ─────────────────────────────────────────────────────────────────────────────
# Session Baggage Helper
# ─────────────────────────────────────────────────────────────────────────────
def set_session_baggage(session_id: str) -> context.Context:
"""
Attach a ``session.id`` to OTel Baggage on the current context.
Call this once at the start of each session/request. The
``AppEnrichmentProcessor`` reads it and stamps every subsequent
span with ``session.id``.
Returns the new context (also set as current).
"""
ctx = baggage.set_baggage("session.id", session_id)
context.attach(ctx)
return ctx
# ─────────────────────────────────────────────────────────────────────────────
# init_e2e_tracing
# ─────────────────────────────────────────────────────────────────────────────
def init_e2e_tracing(
*,
app_name: str = "multi-agent-app",
namespace: str = "foundry-analytics",
version: str = "0.1.0",
agent_count: int = 4,
orchestration_pattern: str = "router-worker",
) -> Optional[str]:
"""
Initialise OpenTelemetry tracing with full application-level identity.
This is a drop-in replacement for ``tracing.init_tracing()`` that
configures:
1. A ``Resource`` with ``service.name``, ``service.namespace``,
``service.instance.id``, ``service.version``, and custom AI
attributes.
2. An ``AppEnrichmentProcessor`` that stamps every span with
``app.name`` and ``session.id`` (from Baggage).
3. Azure Monitor export (if ``APPLICATION_INSIGHTS_CONNECTION_STRING``
is set).
4. ``AIAgentsInstrumentor`` to auto-instrument the Azure AI Agents SDK.
Args:
app_name: Maps to ``service.name`` / ``cloud_RoleName``.
namespace: Maps to ``service.namespace`` for org grouping.
version: Semantic version of your application.
agent_count: Number of agents in your orchestration.
orchestration_pattern: e.g. ``router-worker``, ``hierarchical``, ``flat``.
Returns:
The Application Insights connection string if configured, else ``None``.
"""
# ── Enable content recording for GenAI spans ──────────────────────────────
os.environ["AZURE_TRACING_GEN_AI_CONTENT_RECORDING_ENABLED"] = "true"
# Tell azure-core to use OpenTelemetry
azure_settings.tracing_implementation = "opentelemetry"
# ── Build the OTel Resource ───────────────────────────────────────────────
instance_id = os.getenv("SERVICE_INSTANCE_ID", f"{app_name}-{uuid.uuid4().hex[:8]}")
resource = Resource.create({
# Standard OTel resource attributes
"service.name": app_name,
"service.namespace": namespace,
"service.instance.id": instance_id,
"service.version": version,
# AI-specific resource attributes (queryable in Kusto)
"ai.application.name": app_name,
"ai.orchestration.pattern": orchestration_pattern,
"ai.agent.count": agent_count,
})
connection_string = os.getenv("APPLICATION_INSIGHTS_CONNECTION_STRING")
if connection_string:
# Azure Monitor — pass the custom resource so cloud_RoleName is set
from azure.monitor.opentelemetry import configure_azure_monitor
configure_azure_monitor(
connection_string=connection_string,
resource=resource,
)
print(f"☁️ Azure Monitor tracing enabled — cloud_RoleName = {app_name}")
# Add the enrichment processor to the existing provider
provider = trace.get_tracer_provider()
provider.add_span_processor(AppEnrichmentProcessor(app_name))
else:
# Local-only: create a TracerProvider with the resource
provider = TracerProvider(resource=resource)
provider.add_span_processor(AppEnrichmentProcessor(app_name))
trace.set_tracer_provider(provider)
print(
f"🔍 Local-only tracing enabled — service.name = {app_name}\n"
f" (set APPLICATION_INSIGHTS_CONNECTION_STRING to export)"
)
# ── Instrument the Azure AI Agents SDK ────────────────────────────────────
from azure.ai.agents.telemetry import AIAgentsInstrumentor
AIAgentsInstrumentor().instrument()
print(f" service.namespace = {namespace}")
print(f" service.instance.id = {instance_id}")
print(f" service.version = {version}")
print(f" orchestration = {orchestration_pattern} ({agent_count} agents)")
print()
return connection_string