Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
93e7aba
feat(otel): add genai and mcp telemetry primitives
tdabasinskas May 3, 2026
53bb706
feat(otel): wire SDK init, W3C propagation, and HTTP client gating
tdabasinskas May 3, 2026
18591df
feat(otel): instrument provider chat, embed, rerank with semconv span…
tdabasinskas May 3, 2026
6dafda3
feat(otel): instrument runtime sessions, streams, fallback, delegatio…
tdabasinskas May 3, 2026
9a1341d
feat(otel): annotate hook executor with span verdict and subprocess t…
tdabasinskas May 3, 2026
e6f56f0
feat(otel): instrument MCP client, server, and OAuth flows
tdabasinskas May 3, 2026
d125606
feat(otel): instrument A2A server with otelhttp and gen_ai.invoke_age…
tdabasinskas May 3, 2026
86996c7
feat(otel): wrap remaining HTTP servers, wire runtime tracer entry po…
tdabasinskas May 3, 2026
3e55ce4
feat(otel): instrument memory, RAG, sessiontitle, and evaluation
tdabasinskas May 3, 2026
b0d39e7
feat(otel): annotate built-in tool internals
tdabasinskas May 3, 2026
4356a83
fix(otel): correct tool_call_response schema and cap filesystem paths…
tdabasinskas May 4, 2026
b6a181b
fix(otel): gate codemode script body on capture, sanitize fetch URL a…
tdabasinskas May 4, 2026
653dcc9
feat(mcp-client): span remote mcp requests with w3c traceparent injec…
tdabasinskas May 5, 2026
d70e6d7
feat(mcp-client): stamp server.address on all client spans
tdabasinskas May 5, 2026
60b5ebd
fix(mcp-client): strip credentials from server.address before stamping
tdabasinskas May 5, 2026
75ccd04
fix(otel): unwrap toolset wrapper in toolset.start kind attribute
tdabasinskas May 6, 2026
8a791f7
chore(otel): add tool count attributes to session and mcp spans
tdabasinskas May 6, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmd/root/new.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

tea "charm.land/bubbletea/v2"
"github.com/spf13/cobra"
"go.opentelemetry.io/otel"

"github.com/docker/docker-agent/pkg/app"
"github.com/docker/docker-agent/pkg/config"
Expand Down Expand Up @@ -63,7 +64,9 @@ func (f *newFlags) runNewCommand(cmd *cobra.Command, args []string) (commandErr
}
defer stopToolSets(t)

rt, err := runtime.New(t)
rt, err := runtime.New(t,
runtime.WithTracer(otel.Tracer(AppName)),
)
if err != nil {
return err
}
Expand Down
214 changes: 170 additions & 44 deletions cmd/root/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,26 @@ import (
"fmt"
"net"
"os"
"runtime"
"strings"
"time"

"github.com/google/uuid"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/log/global"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/log"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.40.0"

"github.com/docker/docker-agent/pkg/httpclient"
"github.com/docker/docker-agent/pkg/version"
)

const AppName = "cagent"
Expand All @@ -25,73 +36,188 @@ func initOTelSDK(ctx context.Context) (err error) {
return fmt.Errorf("failed to create resource: %w", err)
}

var traceExporter trace.SpanExporter
endpoint := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT")

// Only initialize if endpoint is configured
if endpoint != "" {
var opts []otlptracehttp.Option
// An endpoint with an http:// or https:// scheme goes through
// WithEndpointURL so the SDK picks the transport from the scheme
// (per the OTLP/HTTP spec). Bare host:port still flows through
// WithEndpoint with the loopback-insecure shortcut preserved.
if strings.HasPrefix(endpoint, "http://") || strings.HasPrefix(endpoint, "https://") {
opts = []otlptracehttp.Option{otlptracehttp.WithEndpointURL(endpoint)}
} else {
opts = []otlptracehttp.Option{otlptracehttp.WithEndpoint(endpoint)}
if isLocalhostEndpoint(endpoint) {
opts = append(opts, otlptracehttp.WithInsecure())
}
}
traceExporter, err = otlptracehttp.New(ctx, opts...)
if err != nil {
return fmt.Errorf("failed to create trace exporter: %w", err)
}
tp, err := newTracerProvider(ctx, res, endpoint)
if err != nil {
return fmt.Errorf("failed to create tracer provider: %w", err)
}
otel.SetTracerProvider(tp)

// Configure tracer provider
tracerProviderOpts := []trace.TracerProviderOption{
trace.WithResource(res),
mp, err := newMeterProvider(ctx, res, endpoint)
if err != nil {
_ = shutdownTracerProvider(tp)
return fmt.Errorf("failed to create meter provider: %w", err)
}
otel.SetMeterProvider(mp)

if traceExporter != nil {
tracerProviderOpts = append(tracerProviderOpts,
trace.WithBatcher(traceExporter,
trace.WithBatchTimeout(5*time.Second),
trace.WithMaxExportBatchSize(512),
),
)
lp, err := newLoggerProvider(ctx, res, endpoint)
if err != nil {
_ = mp.Shutdown(context.Background())
_ = shutdownTracerProvider(tp)
return fmt.Errorf("failed to create logger provider: %w", err)
}
global.SetLoggerProvider(lp)

tp := trace.NewTracerProvider(tracerProviderOpts...)
otel.SetTracerProvider(tp)

// Propagator must be set so otelhttp injects W3C traceparent on
// outbound requests and extracts it from incoming ones. Without this
// the SDK records spans locally but they never chain across services.
// Set the global text-map propagator unconditionally so otelhttp
// (and any other propagation-aware instrumentation) injects W3C
// `traceparent` / `tracestate` / `baggage` on outbound requests
// and extracts them on incoming ones. The propagator is a global
// no-op until set; without this the SDK records spans locally
// but they never chain across processes — `gen_ai.conversation.id`
// baggage and the MCP `_meta` / sandbox env-var injectors are
// dormant until this runs.
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
))

// Single source of truth for "is OTel enabled?" — flip the
// httpclient gate now so outbound requests start emitting CLIENT
// spans and injecting traceparent. Previously the gate read
// OTEL_EXPORTER_OTLP_ENDPOINT directly, which diverged from the
// `--otel` CLI gate that controls this function: we'd either
// initialise providers without HTTP wrapping, or wrap HTTP without
// having a usable propagator.
httpclient.SetOTelEnabled(true)

go func() {
<-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = tp.Shutdown(shutdownCtx)
// Flush in dependency order: logs and metrics first (they may
// reference active spans), then traces. Each provider gets its
// own 5s budget so a slow exporter can't starve the others —
// sharing a single timeout meant a stuck logs endpoint silently
// dropped buffered metrics and spans.
shutdown := func(fn func(context.Context) error) {
c, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = fn(c)
}
shutdown(lp.Shutdown)
shutdown(mp.Shutdown)
shutdown(tp.Shutdown)
}()

return nil
}

// newTracerProvider builds the SDK tracer provider with an OTLP/HTTP
// exporter when an endpoint is set.
func newTracerProvider(ctx context.Context, res *resource.Resource, endpoint string) (*trace.TracerProvider, error) {
opts := []trace.TracerProviderOption{trace.WithResource(res)}

if endpoint == "" {
return trace.NewTracerProvider(opts...), nil
}

exp, err := otlptracehttp.New(ctx, traceExporterOptions(endpoint)...)
if err != nil {
return nil, fmt.Errorf("failed to create trace exporter: %w", err)
}
opts = append(opts, trace.WithBatcher(exp,
trace.WithBatchTimeout(5*time.Second),
trace.WithMaxExportBatchSize(512),
))
return trace.NewTracerProvider(opts...), nil
}

// newMeterProvider builds the SDK meter provider. Without an endpoint the
// provider still wires up so meters callers create are valid no-ops; with
// an endpoint, a periodic reader exports via OTLP/HTTP.
func newMeterProvider(ctx context.Context, res *resource.Resource, endpoint string) (*metric.MeterProvider, error) {
opts := []metric.Option{metric.WithResource(res)}

if endpoint != "" {
exp, err := otlpmetrichttp.New(ctx, metricExporterOptions(endpoint)...)
if err != nil {
return nil, fmt.Errorf("failed to create metric exporter: %w", err)
}
opts = append(opts, metric.WithReader(metric.NewPeriodicReader(exp,
metric.WithInterval(60*time.Second),
)))
}

return metric.NewMeterProvider(opts...), nil
}

// newLoggerProvider builds the SDK logger provider. Required for the
// gen_ai.client.operation.exception event (a log record per spec) and for
// any future log-bridge instrumentation.
func newLoggerProvider(ctx context.Context, res *resource.Resource, endpoint string) (*log.LoggerProvider, error) {
opts := []log.LoggerProviderOption{log.WithResource(res)}

if endpoint != "" {
exp, err := otlploghttp.New(ctx, logExporterOptions(endpoint)...)
if err != nil {
return nil, fmt.Errorf("failed to create log exporter: %w", err)
}
opts = append(opts, log.WithProcessor(log.NewBatchProcessor(exp)))
}

return log.NewLoggerProvider(opts...), nil
}

// normalizeOTLPEndpoint turns a possibly-bare `host:port` into a fully
// scheme-qualified URL so all three OTLP/HTTP exporters can be wired via
// `WithEndpointURL` consistently. We can't rely on the SDKs' default
// scheme inference: `otlptracehttp` (older API) treats a bare endpoint
// as TLS-by-default while `otlploghttp` (newer API) treats the same
// bare endpoint as insecure-by-default. With `OTEL_EXPORTER_OTLP_CERTIFICATE`
// set in the env, the log exporter then errors out with
// `insecure HTTP endpoint cannot use TLS client configuration`,
// `initOTelSDK` propagates the failure, and the entire telemetry
// pipeline (including traces) is torn down.
//
// Pinning the scheme up front removes that asymmetry: localhost gets
// `http://`, every other host gets `https://`, and any explicit scheme
// the caller already supplied is honoured verbatim.
func normalizeOTLPEndpoint(endpoint string) string {
if strings.HasPrefix(endpoint, "http://") || strings.HasPrefix(endpoint, "https://") {
return endpoint
}
if isLocalhostEndpoint(endpoint) {
return "http://" + endpoint
}
return "https://" + endpoint
}

func traceExporterOptions(endpoint string) []otlptracehttp.Option {
return []otlptracehttp.Option{otlptracehttp.WithEndpointURL(normalizeOTLPEndpoint(endpoint))}
}

func metricExporterOptions(endpoint string) []otlpmetrichttp.Option {
return []otlpmetrichttp.Option{otlpmetrichttp.WithEndpointURL(normalizeOTLPEndpoint(endpoint))}
}

func logExporterOptions(endpoint string) []otlploghttp.Option {
return []otlploghttp.Option{otlploghttp.WithEndpointURL(normalizeOTLPEndpoint(endpoint))}
}

func shutdownTracerProvider(tp *trace.TracerProvider) error {
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return tp.Shutdown(shutdownCtx)
}

func newOTelResource() (*resource.Resource, error) {
// Standard OTel resource attributes; users can layer additional
// labels via the spec-defined `OTEL_RESOURCE_ATTRIBUTES` env var,
// which `resource.Default` merges in.
attrs := []attribute.KeyValue{
semconv.ServiceName(AppName),
semconv.ServiceVersion(version.Version),
semconv.ServiceInstanceID(uuid.NewString()),
semconv.ProcessPID(os.Getpid()),
semconv.ProcessRuntimeName("go"),
semconv.OSTypeKey.String(runtime.GOOS),
semconv.HostArchKey.String(runtime.GOARCH),
}
if hostname, err := os.Hostname(); err == nil && hostname != "" {
attrs = append(attrs, semconv.HostName(hostname))
}
return resource.Merge(
resource.Default(),
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName(AppName),
semconv.ServiceVersion("dev"), // TODO: use actual version
),
resource.NewWithAttributes(semconv.SchemaURL, attrs...),
)
}

Expand Down
57 changes: 57 additions & 0 deletions cmd/root/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,63 @@ func TestNewOTelResourceUsesCurrentSchemaURL(t *testing.T) {
assert.Equal(t, semconv.SchemaURL, res.SchemaURL())
}

// TestProvidersWithoutEndpoint verifies all three providers build cleanly
// when no OTLP endpoint is configured — they're no-op exporters but must
// still produce valid, non-nil providers so callers can create instruments.
func TestProvidersWithoutEndpoint(t *testing.T) {
t.Parallel()

ctx := t.Context()
res, err := newOTelResource()
require.NoError(t, err)

tp, err := newTracerProvider(ctx, res, "")
require.NoError(t, err)
require.NotNil(t, tp)
assert.NotNil(t, tp.Tracer("test"))

mp, err := newMeterProvider(ctx, res, "")
require.NoError(t, err)
require.NotNil(t, mp)
assert.NotNil(t, mp.Meter("test"))

lp, err := newLoggerProvider(ctx, res, "")
require.NoError(t, err)
require.NotNil(t, lp)
assert.NotNil(t, lp.Logger("test"))
}

// TestNormalizeOTLPEndpoint pins the bare-endpoint -> URL mapping the
// three OTLP/HTTP exporters share. Without this normalization the log
// exporter (insecure-by-default for bare hosts) conflicted with
// OTEL_EXPORTER_OTLP_CERTIFICATE and tore down the whole telemetry
// pipeline; the trace exporter (TLS-by-default for bare hosts) hid
// the inconsistency.
func TestNormalizeOTLPEndpoint(t *testing.T) {
t.Parallel()

tests := []struct {
name string
endpoint string
want string
}{
{"bare remote host:port -> https", "alloy.observability.svc.cluster.local:4318", "https://alloy.observability.svc.cluster.local:4318"},
{"bare remote host -> https", "example.com", "https://example.com"},
{"bare localhost host:port -> http", "localhost:4318", "http://localhost:4318"},
{"bare localhost -> http", "localhost", "http://localhost"},
{"bare ipv4 loopback -> http", "127.0.0.1:4318", "http://127.0.0.1:4318"},
{"bare ipv6 loopback -> http", "[::1]:4318", "http://[::1]:4318"},
{"explicit https preserved", "https://example.com:4318", "https://example.com:4318"},
{"explicit http preserved", "http://localhost:4318", "http://localhost:4318"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
assert.Equal(t, tt.want, normalizeOTLPEndpoint(tt.endpoint))
})
}
}

func TestIsLocalhostEndpoint(t *testing.T) {
t.Parallel()

Expand Down
16 changes: 16 additions & 0 deletions cmd/root/sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/docker/docker-agent/pkg/environment"
"github.com/docker/docker-agent/pkg/paths"
"github.com/docker/docker-agent/pkg/sandbox"
"github.com/docker/docker-agent/pkg/telemetry/genai"
)

// runInSandbox delegates the current command to a Docker sandbox.
Expand Down Expand Up @@ -68,15 +69,30 @@ func runInSandbox(ctx context.Context, cmd *cobra.Command, args []string, runCon
envFlags = append(envFlags, "-e", envModelsGateway+"="+gateway)
}

// Wrap the sandbox exec in a span so the host side captures timing
// and exit code, and inject W3C trace context via env vars so the
// agent process spawned inside the sandbox container chains its
// own spans onto this parent.
ctx, sbxSpan := genai.StartSandboxExec(ctx, genai.SandboxOptions{
Runtime: "docker",
Container: name,
})
defer sbxSpan.End()
envFlags = append(envFlags, genai.InjectSandboxEnv(ctx)...)

dockerCmd := backend.BuildExecCmd(ctx, name, wd, dockerAgentArgs, envFlags, envVars)
slog.Debug("Executing in sandbox", "name", name, "args", dockerCmd.Args)

if err := dockerCmd.Run(); err != nil {
if exitErr, ok := errors.AsType[*exec.ExitError](err); ok {
sbxSpan.SetExitCode(exitErr.ExitCode())
sbxSpan.RecordError(err, "")
return cli.StatusError{StatusCode: exitErr.ExitCode()}
}
sbxSpan.RecordError(err, "")
return fmt.Errorf("docker sandbox exec failed: %w", err)
}
sbxSpan.SetExitCode(0)

return nil
}
Expand Down
Loading