diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java index c8beb8c4863..98a194fbeca 100644 --- a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java @@ -8,6 +8,7 @@ import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf; import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND; +import static datadog.trace.api.datastreams.DataStreamsTags.create; import static datadog.trace.api.datastreams.DataStreamsTags.createWithExchange; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; @@ -219,9 +220,17 @@ public static void onEnter( if (TIME_IN_QUEUE_ENABLED) { RabbitDecorator.injectTimeInQueueStart(headers); } - DataStreamsTags tags = - createWithExchange( - "rabbitmq", OUTBOUND, exchange, routingKey != null && !routingKey.isEmpty()); + final boolean hasRoutingKey = routingKey != null && !routingKey.isEmpty(); + DataStreamsTags tags; + if ((exchange == null || exchange.isEmpty()) && hasRoutingKey) { + // Publishing to the default exchange: the routing key is the destination queue + // name, so record it as the topic (matching the consumer checkpoint). Without + // this the producer has neither a topic nor an exchange and shows up disconnected + // in the data streams map. + tags = create("rabbitmq", OUTBOUND, routingKey); + } else { + tags = createWithExchange("rabbitmq", OUTBOUND, exchange, hasRoutingKey); + } DataStreamsContext dsmContext = DataStreamsContext.fromTags(tags); defaultPropagator().inject(span.with(dsmContext), headers, SETTER); props = diff --git a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy index 407195ed08c..53a9545af6c 100644 --- a/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy +++ b/dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/test/groovy/RabbitMQTest.groovy @@ -225,7 +225,7 @@ abstract class RabbitMQTestBase extends VersionedNamingTestBase { if (isDataStreamsEnabled()) { StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 } verifyAll(first) { - tags.hasAllTags("direction:out", "exchange:", "has_routing_key:true", "type:rabbitmq") + tags.hasAllTags("direction:out", "topic:" + queueName, "type:rabbitmq") } StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash } @@ -493,7 +493,7 @@ abstract class RabbitMQTestBase extends VersionedNamingTestBase { if (isDataStreamsEnabled()) { StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 } verifyAll(first) { - tags.hasAllTags("direction:out", "exchange:", "has_routing_key:true", "type:rabbitmq") + tags.hasAllTags("direction:out", "topic:some-routing-queue", "type:rabbitmq") } StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash }