From 336b1b050ce7bb1b7bf2efa3c109ca94c9d38c5e Mon Sep 17 00:00:00 2001 From: Eric Firth Date: Tue, 30 Jun 2026 09:09:08 -0400 Subject: [PATCH] Use routing key as DSM topic for RabbitMQ default-exchange publishes When a producer publishes to the default exchange (exchange == ""), the routing key is the destination queue name. The DSM checkpoint previously recorded an empty exchange and no topic, so the producer had no destination and showed up disconnected in the Data Streams Monitoring map. Record the routing key as the topic in that case (matching the consumer checkpoint and the JS/.NET tracers). Named-exchange publishes are unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../amqp/RabbitChannelInstrumentation.java | 15 ++++++++++++--- .../src/test/groovy/RabbitMQTest.groovy | 4 ++-- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java index c8beb8c4863..98a194fbeca 100644 --- a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java @@ -8,6 +8,7 @@ import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf; import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; +import static datadog.trace.api.datastreams.DataStreamsTags.create; import static datadog.trace.api.datastreams.DataStreamsTags.createWithExchange; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; @@ -219,9 +220,17 @@ public static void onEnter( if (TIME_IN_QUEUE_ENABLED) { RabbitDecorator.injectTimeInQueueStart(headers); } - DataStreamsTags tags = - createWithExchange( - "rabbitmq", OUTBOUND, exchange, routingKey != null && !routingKey.isEmpty()); + final boolean hasRoutingKey = routingKey != null && !routingKey.isEmpty(); + DataStreamsTags tags; + if ((exchange == null || exchange.isEmpty()) && hasRoutingKey) { + // Publishing to the default exchange: the routing key is the destination queue + // name, so record it as the topic (matching the consumer checkpoint). Without + // this the producer has neither a topic nor an exchange and shows up disconnected + // in the data streams map. + tags = create("rabbitmq", OUTBOUND, routingKey); + } else { + tags = createWithExchange("rabbitmq", OUTBOUND, exchange, hasRoutingKey); + } DataStreamsContext dsmContext = DataStreamsContext.fromTags(tags); defaultPropagator().inject(span.with(dsmContext), headers, SETTER); props = diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy index 407195ed08c..53a9545af6c 100644 --- a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy @@ -225,7 +225,7 @@ abstract class RabbitMQTestBase extends VersionedNamingTestBase { if (isDataStreamsEnabled()) { StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 } verifyAll(first) { - tags.hasAllTags("direction:out", "exchange:", "has_routing_key:true", "type:rabbitmq") + tags.hasAllTags("direction:out", "topic:" + queueName, "type:rabbitmq") } StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash } @@ -493,7 +493,7 @@ abstract class RabbitMQTestBase extends VersionedNamingTestBase { if (isDataStreamsEnabled()) { StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 } verifyAll(first) { - tags.hasAllTags("direction:out", "exchange:", "has_routing_key:true", "type:rabbitmq") + tags.hasAllTags("direction:out", "topic:some-routing-queue", "type:rabbitmq") } StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash }