Skip to content

Commit 233effa

Browse files
authored
Add Spring RabbitMQ plugin (#796)
1 parent eb4eb48 commit 233effa

File tree

22 files changed

+1108
-1
lines changed

22 files changed

+1108
-1
lines changed

.github/workflows/plugins-jdk17-test.0.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ jobs:
8080
- jetty-11.x-scenario
8181
- jetty-10.x-scenario
8282
- spring-ai-1.x-scenario
83+
- spring-rabbitmq-scenario
8384
steps:
8485
- uses: actions/checkout@v2
8586
with:

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ Release Notes.
88
* Added support for Lettuce reactive Redis commands.
99
* Add Spring AI 1.x plugin and GenAI layer.
1010
* Fix httpclient-5.x plugin injecting sw8 propagation headers into ClickHouse HTTP requests (port 8123), causing HTTP 400. Add `PROPAGATION_EXCLUDE_PORTS` config to skip tracing (including header injection) for specified ports in the classic client interceptor.
11+
* Add Spring RabbitMQ 2.x - 4.x plugin.
1112

1213
All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/249?closed=1)
1314

apm-sniffer/apm-sdk-plugin/rabbitmq-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQConsumerInterceptor.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,19 @@
2626

2727
public class RabbitMQConsumerInterceptor implements InstanceMethodsAroundInterceptor {
2828

29+
public static final String SMLC_INTERNAL_CONSUMER = "org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$InternalConsumer";
30+
public static final String DMLC_INTERNAL_CONSUMER = "org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer";
31+
2932
@Override
3033
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
3134
MethodInterceptResult result) throws Throwable {
3235
Consumer consumer = (Consumer) allArguments[6];
36+
if (consumer != null) {
37+
String className = consumer.getClass().getName();
38+
if (SMLC_INTERNAL_CONSUMER.equals(className) || DMLC_INTERNAL_CONSUMER.equals(className)) {
39+
return;
40+
}
41+
}
3342
allArguments[6] = new TracerConsumer(consumer, (String) objInst.getSkyWalkingDynamicField());
3443
}
3544

apm-sniffer/apm-sdk-plugin/spring-plugins/pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,11 @@
4646
<module>spring-webflux-6.x-webclient-plugin</module>
4747
<module>resttemplate-commons</module>
4848
<module>spring-ai-1.x-plugin</module>
49+
<module>spring-rabbitmq-plugin</module>
4950
</modules>
5051
<packaging>pom</packaging>
5152

52-
<name>apm-sdk-plugin</name>
53+
<name>spring-plugins</name>
5354
<url>http://maven.apache.org</url>
5455

5556
<properties>
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one or more
4+
~ contributor license agreements. See the NOTICE file distributed with
5+
~ this work for additional information regarding copyright ownership.
6+
~ The ASF licenses this file to You under the Apache License, Version 2.0
7+
~ (the "License"); you may not use this file except in compliance with
8+
~ the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing, software
13+
~ distributed under the License is distributed on an "AS IS" BASIS,
14+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
~ See the License for the specific language governing permissions and
16+
~ limitations under the License.
17+
~
18+
-->
19+
<project xmlns="http://maven.apache.org/POM/4.0.0"
20+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
21+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
22+
<modelVersion>4.0.0</modelVersion>
23+
24+
<parent>
25+
<groupId>org.apache.skywalking</groupId>
26+
<artifactId>spring-plugins</artifactId>
27+
<version>9.7.0-SNAPSHOT</version>
28+
</parent>
29+
30+
<artifactId>apm-spring-rabbitmq-plugin</artifactId>
31+
<name>apm-spring-rabbitmq-plugin</name>
32+
<packaging>jar</packaging>
33+
34+
<properties>
35+
<spring-rabbit.version>3.0.0</spring-rabbit.version>
36+
</properties>
37+
38+
<dependencies>
39+
<dependency>
40+
<groupId>org.springframework.amqp</groupId>
41+
<artifactId>spring-rabbit</artifactId>
42+
<version>${spring-rabbit.version}</version>
43+
<scope>provided</scope>
44+
</dependency>
45+
</dependencies>
46+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.spring.rabbitmq;
20+
21+
import java.lang.reflect.Method;
22+
import java.util.List;
23+
import java.util.Map;
24+
25+
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
26+
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
27+
import org.apache.skywalking.apm.agent.core.context.ContextManager;
28+
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
29+
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
30+
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
31+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
32+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.InstanceMethodsAroundInterceptorV2;
33+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.MethodInvocationContext;
34+
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
35+
import org.springframework.amqp.core.Message;
36+
import org.springframework.amqp.core.MessageProperties;
37+
38+
import com.rabbitmq.client.Channel;
39+
import com.rabbitmq.client.Connection;
40+
41+
public class SpringRabbitMQConsumerInterceptor implements InstanceMethodsAroundInterceptorV2 {
42+
public static final String OPERATE_NAME_PREFIX = "RabbitMQ/";
43+
public static final String CONSUMER_OPERATE_NAME_SUFFIX = "/Consumer";
44+
45+
@Override
46+
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
47+
MethodInvocationContext context) throws Throwable {
48+
Channel channel = (Channel) allArguments[0];
49+
50+
if (allArguments[1] instanceof Message) {
51+
// Single message consume
52+
Message message = (Message) allArguments[1];
53+
MessageProperties messageProperties = message.getMessageProperties();
54+
Map<String, Object> headers = messageProperties.getHeaders();
55+
56+
ContextCarrier contextCarrier = buildContextCarrier(headers);
57+
String operationName = buildOperationName(messageProperties);
58+
AbstractSpan activeSpan = ContextManager.createEntrySpan(operationName, contextCarrier);
59+
60+
setSpanAttributes(activeSpan, channel, messageProperties);
61+
} else if (allArguments[1] instanceof List) {
62+
// Batch message consume
63+
List<?> messages = (List<?>) allArguments[1];
64+
if (messages.isEmpty()) {
65+
return;
66+
}
67+
68+
// Use the first message to create EntrySpan
69+
Message firstMessage = (Message) messages.get(0);
70+
MessageProperties firstMessageProperties = firstMessage.getMessageProperties();
71+
Map<String, Object> firstMessageHeaders = firstMessageProperties.getHeaders();
72+
73+
ContextCarrier contextCarrier = buildContextCarrier(firstMessageHeaders);
74+
String operationName = buildOperationName(firstMessageProperties);
75+
AbstractSpan activeSpan = ContextManager.createEntrySpan(operationName, contextCarrier);
76+
77+
setSpanAttributes(activeSpan, channel, firstMessageProperties);
78+
79+
// Extract trace context from remaining messages (skip first, already used for EntrySpan)
80+
// to correlate all producer traces with this consumer span
81+
for (int i = 1; i < messages.size(); i++) {
82+
Object msg = messages.get(i);
83+
if (msg instanceof Message) {
84+
Message message = (Message) msg;
85+
MessageProperties messageProperties = message.getMessageProperties();
86+
Map<String, Object> headers = messageProperties.getHeaders();
87+
88+
ContextCarrier carrier = buildContextCarrier(headers);
89+
if (carrier.isValid()) {
90+
ContextManager.extract(carrier);
91+
}
92+
}
93+
}
94+
}
95+
}
96+
97+
/**
98+
* Build ContextCarrier from message headers
99+
*/
100+
private ContextCarrier buildContextCarrier(Map<String, Object> headers) {
101+
ContextCarrier contextCarrier = new ContextCarrier();
102+
CarrierItem next = contextCarrier.items();
103+
while (next.hasNext()) {
104+
next = next.next();
105+
Object value = headers.get(next.getHeadKey());
106+
if (value != null) {
107+
next.setHeadValue(value.toString());
108+
}
109+
}
110+
return contextCarrier;
111+
}
112+
113+
private String buildOperationName(MessageProperties messageProperties) {
114+
return OPERATE_NAME_PREFIX + "Topic/" + messageProperties.getReceivedExchange()
115+
+ "Queue/" + messageProperties.getReceivedRoutingKey()
116+
+ CONSUMER_OPERATE_NAME_SUFFIX;
117+
}
118+
119+
private void setSpanAttributes(AbstractSpan span, Channel channel, MessageProperties messageProperties) {
120+
Connection connection = channel.getConnection();
121+
String serverUrl = connection.getAddress().getHostAddress() + ":" + connection.getPort();
122+
Tags.MQ_BROKER.set(span, serverUrl);
123+
Tags.MQ_TOPIC.set(span, messageProperties.getReceivedExchange());
124+
Tags.MQ_QUEUE.set(span, messageProperties.getReceivedRoutingKey());
125+
span.setComponent(ComponentsDefine.RABBITMQ_CONSUMER);
126+
span.setPeer(serverUrl);
127+
SpanLayer.asMQ(span);
128+
}
129+
130+
@Override
131+
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
132+
Object ret, MethodInvocationContext context) throws Throwable {
133+
if (ContextManager.isActive()) {
134+
ContextManager.stopSpan();
135+
}
136+
return ret;
137+
}
138+
139+
@Override
140+
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
141+
Class<?>[] argumentsTypes, Throwable t, MethodInvocationContext context) {
142+
if (ContextManager.isActive()) {
143+
ContextManager.activeSpan().log(t);
144+
}
145+
}
146+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.spring.rabbitmq.define;
20+
21+
import static net.bytebuddy.matcher.ElementMatchers.named;
22+
23+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
24+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.ClassInstanceMethodsEnhancePluginDefineV2;
25+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.v2.DeclaredInstanceMethodsInterceptV2Point;
26+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.v2.InstanceMethodsInterceptV2Point;
27+
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
28+
import org.apache.skywalking.apm.agent.core.plugin.match.NameMatch;
29+
30+
import net.bytebuddy.description.method.MethodDescription;
31+
import net.bytebuddy.matcher.ElementMatcher;
32+
33+
public class SpringRabbitMQConsumerInstrumentation extends ClassInstanceMethodsEnhancePluginDefineV2 {
34+
public static final String ENHANCE_CLASS = "org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer";
35+
public static final String ENHANCE_METHOD = "executeListener";
36+
public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.spring.rabbitmq.SpringRabbitMQConsumerInterceptor";
37+
38+
@Override
39+
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
40+
return null;
41+
}
42+
43+
@Override
44+
public InstanceMethodsInterceptV2Point[] getInstanceMethodsInterceptV2Points() {
45+
return new InstanceMethodsInterceptV2Point[] {
46+
new DeclaredInstanceMethodsInterceptV2Point() {
47+
@Override
48+
public ElementMatcher<MethodDescription> getMethodsMatcher() {
49+
return named(ENHANCE_METHOD);
50+
}
51+
52+
@Override
53+
public String getMethodsInterceptorV2() {
54+
return INTERCEPTOR_CLASS;
55+
}
56+
57+
@Override
58+
public boolean isOverrideArgs() {
59+
return false;
60+
}
61+
}
62+
};
63+
}
64+
65+
@Override
66+
protected ClassMatch enhanceClass() {
67+
return NameMatch.byName(ENHANCE_CLASS);
68+
}
69+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
spring-rabbitmq=org.apache.skywalking.apm.plugin.spring.rabbitmq.define.SpringRabbitMQConsumerInstrumentation

0 commit comments

Comments
 (0)