Skip to content

Commit 355abb2

Browse files
committed
Add Spring RabbitMQ plugin
1 parent f750006 commit 355abb2

File tree

31 files changed

+1296
-1
lines changed

31 files changed

+1296
-1
lines changed

.github/workflows/plugins-jdk17-test.0.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ jobs:
8080
- jetty-11.x-scenario
8181
- jetty-10.x-scenario
8282
- spring-ai-1.x-scenario
83+
- spring-rabbitmq-3.x-4.x-scenario
8384
steps:
8485
- uses: actions/checkout@v2
8586
with:

.github/workflows/plugins-test.3.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ jobs:
9696
- spring-kafka-1.3.x-scenario
9797
- spring-kafka-2.2.x-scenario
9898
- spring-kafka-2.3.x-scenario
99+
- spring-rabbitmq-2.x-scenario
99100
- spring-scheduled-3.x-5.x-scenario
100101
- elasticjob-2.x-scenario
101102
- quartz-scheduler-2.x-scenario

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ Release Notes.
77

88
* Added support for Lettuce reactive Redis commands.
99
* Add Spring AI 1.x plugin and GenAI layer.
10+
* Add Spring RabbitMQ plugin.
1011

1112
All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/249?closed=1)
1213

apm-sniffer/apm-sdk-plugin/rabbitmq-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQConsumerInterceptor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ public class RabbitMQConsumerInterceptor implements InstanceMethodsAroundInterce
2929
@Override
3030
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
3131
MethodInterceptResult result) throws Throwable {
32+
if (Thread.currentThread().getName().toLowerCase().contains("springframework")) {
33+
return;
34+
}
3235
Consumer consumer = (Consumer) allArguments[6];
3336
allArguments[6] = new TracerConsumer(consumer, (String) objInst.getSkyWalkingDynamicField());
3437
}

apm-sniffer/apm-sdk-plugin/spring-plugins/pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,11 @@
4646
<module>spring-webflux-6.x-webclient-plugin</module>
4747
<module>resttemplate-commons</module>
4848
<module>spring-ai-1.x-plugin</module>
49+
<module>spring-rabbitmq-plugin</module>
4950
</modules>
5051
<packaging>pom</packaging>
5152

52-
<name>apm-sdk-plugin</name>
53+
<name>spring-plugins</name>
5354
<url>http://maven.apache.org</url>
5455

5556
<properties>
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<parent>
8+
<groupId>org.apache.skywalking</groupId>
9+
<artifactId>spring-plugins</artifactId>
10+
<version>9.7.0-SNAPSHOT</version>
11+
</parent>
12+
13+
<artifactId>apm-spring-rabbitmq-plugin</artifactId>
14+
<name>apm-spring-rabbitmq-plugin</name>
15+
<packaging>jar</packaging>
16+
17+
<properties>
18+
<spring-rabbit.version>2.2.1.RELEASE</spring-rabbit.version>
19+
</properties>
20+
21+
<dependencies>
22+
<dependency>
23+
<groupId>org.springframework.amqp</groupId>
24+
<artifactId>spring-rabbit</artifactId>
25+
<version>${spring-rabbit.version}</version>
26+
<scope>provided</scope>
27+
</dependency>
28+
</dependencies>
29+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.spring.rabbitmq;
20+
21+
import java.lang.reflect.Method;
22+
import java.util.Map;
23+
24+
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
25+
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
26+
import org.apache.skywalking.apm.agent.core.context.ContextManager;
27+
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
28+
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
29+
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
30+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
31+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
32+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
33+
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
34+
import org.springframework.amqp.core.Message;
35+
import org.springframework.amqp.core.MessageProperties;
36+
37+
import com.rabbitmq.client.Channel;
38+
import com.rabbitmq.client.Connection;
39+
40+
public class SpringRabbitMQConsumerInterceptor implements InstanceMethodsAroundInterceptor {
41+
public static final String OPERATE_NAME_PREFIX = "RabbitMQ/";
42+
public static final String CONSUMER_OPERATE_NAME_SUFFIX = "/Consumer";
43+
44+
@Override
45+
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
46+
MethodInterceptResult result) throws Throwable {
47+
Channel channel = (Channel) allArguments[0];
48+
Message message = (Message) allArguments[1];
49+
MessageProperties messageProperties = message.getMessageProperties();
50+
Map<String, Object> headers = messageProperties.getHeaders();
51+
ContextCarrier contextCarrier = new ContextCarrier();
52+
CarrierItem next = contextCarrier.items();
53+
while (next.hasNext()) {
54+
next = next.next();
55+
Object value = headers.get(next.getHeadKey());
56+
if (value != null) {
57+
next.setHeadValue(value.toString());
58+
}
59+
}
60+
String operationName = OPERATE_NAME_PREFIX + "Topic/" + messageProperties.getReceivedExchange()
61+
+ "Queue/" + messageProperties.getReceivedRoutingKey() + CONSUMER_OPERATE_NAME_SUFFIX;
62+
AbstractSpan activeSpan = ContextManager.createEntrySpan(operationName, contextCarrier);
63+
Connection connection = channel.getConnection();
64+
String serverUrl = connection.getAddress().getHostAddress() + ":" + connection.getPort();
65+
Tags.MQ_BROKER.set(activeSpan, serverUrl);
66+
Tags.MQ_TOPIC.set(activeSpan, messageProperties.getReceivedExchange());
67+
Tags.MQ_QUEUE.set(activeSpan, messageProperties.getReceivedRoutingKey());
68+
activeSpan.setComponent(ComponentsDefine.RABBITMQ_CONSUMER);
69+
activeSpan.setPeer(serverUrl);
70+
SpanLayer.asMQ(activeSpan);
71+
}
72+
73+
@Override
74+
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
75+
Object ret) throws Throwable {
76+
ContextManager.stopSpan();
77+
return ret;
78+
}
79+
80+
@Override
81+
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
82+
Class<?>[] argumentsTypes, Throwable t) {
83+
ContextManager.activeSpan().log(t);
84+
}
85+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.spring.rabbitmq.define;
20+
21+
import static net.bytebuddy.matcher.ElementMatchers.named;
22+
23+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
24+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.DeclaredInstanceMethodsInterceptPoint;
25+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
26+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
27+
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
28+
import org.apache.skywalking.apm.agent.core.plugin.match.NameMatch;
29+
30+
import net.bytebuddy.description.method.MethodDescription;
31+
import net.bytebuddy.matcher.ElementMatcher;
32+
33+
public class SpringRabbitMQConsumerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
34+
public static final String ENHANCE_CLASS = "org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer";
35+
public static final String ENHANCE_METHOD = "executeListener";
36+
public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.spring.rabbitmq.SpringRabbitMQConsumerInterceptor";
37+
38+
@Override
39+
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
40+
return null;
41+
}
42+
43+
@Override
44+
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
45+
return new InstanceMethodsInterceptPoint[] {
46+
new DeclaredInstanceMethodsInterceptPoint() {
47+
@Override
48+
public ElementMatcher<MethodDescription> getMethodsMatcher() {
49+
return named(ENHANCE_METHOD);
50+
}
51+
52+
@Override
53+
public String getMethodsInterceptor() {
54+
return INTERCEPTOR_CLASS;
55+
}
56+
57+
@Override
58+
public boolean isOverrideArgs() {
59+
return false;
60+
}
61+
}
62+
};
63+
}
64+
65+
@Override
66+
protected ClassMatch enhanceClass() {
67+
return NameMatch.byName(ENHANCE_CLASS);
68+
}
69+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
spring-rabbitmq=org.apache.skywalking.apm.plugin.spring.rabbitmq.define.SpringRabbitMQConsumerInstrumentation
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.spring.rabbitmq;
20+
21+
import static org.hamcrest.CoreMatchers.is;
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.when;
24+
25+
import java.net.InetAddress;
26+
import java.util.HashMap;
27+
import java.util.List;
28+
import java.util.Map;
29+
30+
import org.apache.skywalking.apm.agent.core.context.SW8CarrierItem;
31+
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
32+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
33+
import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
34+
import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
35+
import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
36+
import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
37+
import org.junit.Assert;
38+
import org.junit.Before;
39+
import org.junit.Rule;
40+
import org.junit.Test;
41+
import org.junit.runner.RunWith;
42+
import org.mockito.Mock;
43+
import org.springframework.amqp.core.Message;
44+
import org.springframework.amqp.core.MessageProperties;
45+
46+
import com.rabbitmq.client.Channel;
47+
import com.rabbitmq.client.Connection;
48+
49+
@RunWith(TracingSegmentRunner.class)
50+
public class RabbitMQSpringConsumerInterceptorTest {
51+
52+
@SegmentStoragePoint
53+
private SegmentStorage segmentStorage;
54+
55+
private SpringRabbitMQConsumerInterceptor rabbitMQConsumerInterceptor;
56+
57+
@Mock
58+
private EnhancedInstance enhancedInstance;
59+
60+
@Rule
61+
public AgentServiceRule serviceRule = new AgentServiceRule();
62+
63+
@Before
64+
public void setUp() throws Exception {
65+
rabbitMQConsumerInterceptor = new SpringRabbitMQConsumerInterceptor();
66+
}
67+
68+
@Test
69+
public void testRabbitMQConsumerInterceptorWithNilHeaders() throws Throwable {
70+
Object[] args = prepareMockData(false);
71+
rabbitMQConsumerInterceptor.beforeMethod(enhancedInstance, null, args, new Class[0], null);
72+
rabbitMQConsumerInterceptor.afterMethod(enhancedInstance, null, args, new Class[0], null);
73+
List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
74+
Assert.assertThat(traceSegments.size(), is(1));
75+
}
76+
77+
@Test
78+
public void testRabbitMQConsumerInterceptor() throws Throwable {
79+
Object[] args = prepareMockData(true);
80+
rabbitMQConsumerInterceptor.beforeMethod(enhancedInstance, null, args, new Class[0], null);
81+
rabbitMQConsumerInterceptor.afterMethod(enhancedInstance, null, args, new Class[0], null);
82+
List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
83+
Assert.assertThat(traceSegments.size(), is(1));
84+
}
85+
86+
private Object[] prepareMockData(boolean withHeaders) throws Exception {
87+
Channel channel = mock(Channel.class);
88+
Connection connection = mock(Connection.class);
89+
InetAddress address = mock(InetAddress.class);
90+
Message message = mock(Message.class);
91+
MessageProperties messageProperties = mock(MessageProperties.class);
92+
93+
when(channel.getConnection()).thenReturn(connection);
94+
when(connection.getAddress()).thenReturn(address);
95+
when(address.getHostAddress()).thenReturn("127.0.0.1");
96+
when(connection.getPort()).thenReturn(5672);
97+
when(message.getMessageProperties()).thenReturn(messageProperties);
98+
when(messageProperties.getReceivedExchange()).thenReturn("test-exchange");
99+
when(messageProperties.getReceivedRoutingKey()).thenReturn("test-routing-key");
100+
101+
if (withHeaders) {
102+
Map<String, Object> headers = new HashMap<>();
103+
headers.put(SW8CarrierItem.HEADER_NAME,
104+
"1-My40LjU=-MS4yLjM=-3-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA=");
105+
when(messageProperties.getHeader(SW8CarrierItem.HEADER_NAME))
106+
.thenReturn(headers.get(SW8CarrierItem.HEADER_NAME));
107+
}
108+
109+
return new Object[] {channel, message};
110+
}
111+
}

0 commit comments

Comments
 (0)