From 9dce6b02ef48601275a427e770be3cef8dd809c2 Mon Sep 17 00:00:00 2001 From: Eric Firth Date: Tue, 30 Jun 2026 11:46:31 -0400 Subject: [PATCH 1/4] Tag consumer spans with the pathway hash on every data streams checkpoint The pathway.hash span tag was only set on the produce/inject path (DataStreamsPropagator.inject). Consumers that checkpoint without injecting (e.g. RabbitMQ) had no pathway.hash, unlike the JS and Python tracers which tag the span on every checkpoint. Set it centrally in DefaultDataStreamsMonitoring.setCheckpoint so all consume-side integrations get it. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../DefaultDataStreamsMonitoring.java | 8 +++ .../DefaultDataStreamsMonitoringTest.java | 65 +++++++++++++++++++ 2 files changed, 73 insertions(+) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java index 4cbc02acd37..91a1a19cf6f 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java @@ -1,6 +1,7 @@ package datadog.trace.core.datastreams; import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V01_DATASTREAMS_ENDPOINT; +import static datadog.trace.api.DDTags.PATHWAY_HASH; import static datadog.trace.api.datastreams.DataStreamsContext.fromTags; import static datadog.trace.api.datastreams.DataStreamsTags.Direction.INBOUND; import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; @@ -309,6 +310,13 @@ public void setCheckpoint(AgentSpan span, DataStreamsContext context) { PathwayContext pathwayContext = span.spanContext().getPathwayContext(); if (pathwayContext != null) { pathwayContext.setCheckpoint(context, this::add); + // Surface the pathway hash on the span so consume-side spans are correlatable too. The + // propagator only tags it on inject (produce); without this, consumers that checkpoint + // without injecting (e.g. RabbitMQ) would have no pathway.hash, unlike the JS/Python tracers + // which tag on every checkpoint. + if (pathwayContext.getHash() != 0) { + span.setTag(PATHWAY_HASH, Long.toUnsignedString(pathwayContext.getHash())); + } } } diff --git a/dd-trace-core/src/test/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.java b/dd-trace-core/src/test/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.java index c1c29026c49..176a8026d3a 100644 --- a/dd-trace-core/src/test/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.java @@ -10,8 +10,10 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.RETURNS_SMART_NULLS; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -19,13 +21,18 @@ import datadog.metrics.api.Histograms; import datadog.metrics.impl.DDSketchHistograms; import datadog.trace.api.Config; +import datadog.trace.api.DDTags; import datadog.trace.api.TraceConfig; +import datadog.trace.api.datastreams.DataStreamsContext; import datadog.trace.api.datastreams.DataStreamsTags; import datadog.trace.api.datastreams.KafkaConfigReport; +import datadog.trace.api.datastreams.PathwayContext; import datadog.trace.api.datastreams.SchemaRegistryUsage; import datadog.trace.api.datastreams.StatsPoint; import datadog.trace.api.experimental.DataStreamsContextCarrier; import datadog.trace.api.time.ControllableTimeSource; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; import datadog.trace.common.metrics.EventListener; import datadog.trace.common.metrics.Sink; import datadog.trace.core.DDCoreJavaSpecification; @@ -1559,6 +1566,64 @@ void close() { } } + private DefaultDataStreamsMonitoring newDataStreamsMonitoring() { + return new DefaultDataStreamsMonitoring( + mock(Sink.class), + stubFeatures(true), + new ControllableTimeSource(), + () -> stubTraceConfig(true), + mock(DatastreamsPayloadWriter.class), + DEFAULT_BUCKET_DURATION_NANOS); + } + + @Test + void setCheckpointTagsTheSpanWithThePathwayHash() { + DefaultDataStreamsMonitoring dataStreams = newDataStreamsMonitoring(); + + PathwayContext pathwayContext = mock(PathwayContext.class); + when(pathwayContext.getHash()).thenReturn(1234567890123456789L); + AgentSpanContext spanContext = mock(AgentSpanContext.class); + when(spanContext.getPathwayContext()).thenReturn(pathwayContext); + AgentSpan span = mock(AgentSpan.class); + when(span.spanContext()).thenReturn(spanContext); + DataStreamsContext context = mock(DataStreamsContext.class); + + dataStreams.setCheckpoint(span, context); + + verify(pathwayContext).setCheckpoint(eq(context), any()); + verify(span).setTag(DDTags.PATHWAY_HASH, Long.toUnsignedString(1234567890123456789L)); + } + + @Test + void setCheckpointDoesNotTagTheSpanWhenThePathwayHashIsZero() { + DefaultDataStreamsMonitoring dataStreams = newDataStreamsMonitoring(); + + PathwayContext pathwayContext = mock(PathwayContext.class); + when(pathwayContext.getHash()).thenReturn(0L); + AgentSpanContext spanContext = mock(AgentSpanContext.class); + when(spanContext.getPathwayContext()).thenReturn(pathwayContext); + AgentSpan span = mock(AgentSpan.class); + when(span.spanContext()).thenReturn(spanContext); + + dataStreams.setCheckpoint(span, mock(DataStreamsContext.class)); + + verify(span, never()).setTag(eq(DDTags.PATHWAY_HASH), any(String.class)); + } + + @Test + void setCheckpointDoesNotTagTheSpanWhenThereIsNoPathwayContext() { + DefaultDataStreamsMonitoring dataStreams = newDataStreamsMonitoring(); + + AgentSpanContext spanContext = mock(AgentSpanContext.class); + when(spanContext.getPathwayContext()).thenReturn(null); + AgentSpan span = mock(AgentSpan.class); + when(span.spanContext()).thenReturn(spanContext); + + dataStreams.setCheckpoint(span, mock(DataStreamsContext.class)); + + verify(span, never()).setTag(eq(DDTags.PATHWAY_HASH), any(String.class)); + } + static class CustomContextCarrier implements DataStreamsContextCarrier { private final Map data = new HashMap<>(); From 7be548a35f00dbdb39985f5416a11e7537b7a011 Mon Sep 17 00:00:00 2001 From: Eric Firth Date: Tue, 30 Jun 2026 11:48:58 -0400 Subject: [PATCH 2/4] Remove explanatory comment on pathway hash tagging Per review on #11808; the rationale lives in the PR description. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../trace/core/datastreams/DefaultDataStreamsMonitoring.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java index 91a1a19cf6f..ac3c29b1e89 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java @@ -310,10 +310,6 @@ public void setCheckpoint(AgentSpan span, DataStreamsContext context) { PathwayContext pathwayContext = span.spanContext().getPathwayContext(); if (pathwayContext != null) { pathwayContext.setCheckpoint(context, this::add); - // Surface the pathway hash on the span so consume-side spans are correlatable too. The - // propagator only tags it on inject (produce); without this, consumers that checkpoint - // without injecting (e.g. RabbitMQ) would have no pathway.hash, unlike the JS/Python tracers - // which tag on every checkpoint. if (pathwayContext.getHash() != 0) { span.setTag(PATHWAY_HASH, Long.toUnsignedString(pathwayContext.getHash())); } From 537f094fa287590fdca5fb48ffd9b471292e9431 Mon Sep 17 00:00:00 2001 From: Eric Firth Date: Tue, 30 Jun 2026 12:08:34 -0400 Subject: [PATCH 3/4] Address review on #11808 - cache pathwayContext.getHash() in a local instead of calling it twice - use a negative hash in the test so it actually exercises Long.toUnsignedString Co-Authored-By: Claude Opus 4.8 (1M context) --- .../core/datastreams/DefaultDataStreamsMonitoring.java | 5 +++-- .../core/datastreams/DefaultDataStreamsMonitoringTest.java | 7 +++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java index ac3c29b1e89..b6487c2c99d 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java @@ -310,8 +310,9 @@ public void setCheckpoint(AgentSpan span, DataStreamsContext context) { PathwayContext pathwayContext = span.spanContext().getPathwayContext(); if (pathwayContext != null) { pathwayContext.setCheckpoint(context, this::add); - if (pathwayContext.getHash() != 0) { - span.setTag(PATHWAY_HASH, Long.toUnsignedString(pathwayContext.getHash())); + long pathwayHash = pathwayContext.getHash(); + if (pathwayHash != 0) { + span.setTag(PATHWAY_HASH, Long.toUnsignedString(pathwayHash)); } } } diff --git a/dd-trace-core/src/test/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.java b/dd-trace-core/src/test/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.java index 176a8026d3a..6648e1a8c36 100644 --- a/dd-trace-core/src/test/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.java @@ -1580,8 +1580,11 @@ private DefaultDataStreamsMonitoring newDataStreamsMonitoring() { void setCheckpointTagsTheSpanWithThePathwayHash() { DefaultDataStreamsMonitoring dataStreams = newDataStreamsMonitoring(); + // Negative so the signed and unsigned string representations differ, proving the tag uses + // Long.toUnsignedString (pathway hashes are unsigned 64-bit values). + long hash = -1234567890123456789L; PathwayContext pathwayContext = mock(PathwayContext.class); - when(pathwayContext.getHash()).thenReturn(1234567890123456789L); + when(pathwayContext.getHash()).thenReturn(hash); AgentSpanContext spanContext = mock(AgentSpanContext.class); when(spanContext.getPathwayContext()).thenReturn(pathwayContext); AgentSpan span = mock(AgentSpan.class); @@ -1591,7 +1594,7 @@ void setCheckpointTagsTheSpanWithThePathwayHash() { dataStreams.setCheckpoint(span, context); verify(pathwayContext).setCheckpoint(eq(context), any()); - verify(span).setTag(DDTags.PATHWAY_HASH, Long.toUnsignedString(1234567890123456789L)); + verify(span).setTag(DDTags.PATHWAY_HASH, Long.toUnsignedString(hash)); } @Test From 2ed1305440e776a520c87209a900d94566f6d63a Mon Sep 17 00:00:00 2001 From: Eric Firth Date: Tue, 30 Jun 2026 13:28:42 -0400 Subject: [PATCH 4/4] Expect pathway.hash on the inferred-proxy server span when DSM is enabled Now that DefaultDataStreamsMonitoring.setCheckpoint tags the span on every checkpoint, the inferred-proxy (API Gateway) HTTP server span gets a pathway.hash like any other DSM-enabled server span. The shared HttpServerTest already asserts this conditionally; SpringBootBasedTest's hand-rolled inferred-proxy assertions were missing it. Mirror the same guard. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../src/test/groovy/test/boot/SpringBootBasedTest.groovy | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dd-java-agent/instrumentation/spring/spring-webmvc/spring-webmvc-3.1/src/test/groovy/test/boot/SpringBootBasedTest.groovy b/dd-java-agent/instrumentation/spring/spring-webmvc/spring-webmvc-3.1/src/test/groovy/test/boot/SpringBootBasedTest.groovy index 18badff3e97..89b0389c3da 100644 --- a/dd-java-agent/instrumentation/spring/spring-webmvc/spring-webmvc-3.1/src/test/groovy/test/boot/SpringBootBasedTest.groovy +++ b/dd-java-agent/instrumentation/spring/spring-webmvc/spring-webmvc-3.1/src/test/groovy/test/boot/SpringBootBasedTest.groovy @@ -570,6 +570,9 @@ class SpringBootBasedTest extends HttpServerTest "$Tags.HTTP_ROUTE" "/success" "servlet.context" "/boot-context" "servlet.path" "/success" + if ({ isDataStreamsEnabled() }) { + "$DDTags.PATHWAY_HASH" { String } + } defaultTags() } }