Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/plugins-jdk17-test.0.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ jobs:
- jetty-11.x-scenario
- jetty-10.x-scenario
- spring-ai-1.x-scenario
- spring-rabbitmq-scenario
steps:
- uses: actions/checkout@v2
with:
Expand Down
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Release Notes.
* Added support for Lettuce reactive Redis commands.
* Add Spring AI 1.x plugin and GenAI layer.
* Fix httpclient-5.x plugin injecting sw8 propagation headers into ClickHouse HTTP requests (port 8123), causing HTTP 400. Add `PROPAGATION_EXCLUDE_PORTS` config to skip tracing (including header injection) for specified ports in the classic client interceptor.
* Add Spring RabbitMQ 2.x - 4.x plugin.

All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/249?closed=1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@

public class RabbitMQConsumerInterceptor implements InstanceMethodsAroundInterceptor {

public static final String INTERNAL_CONSUMER_CLASS_NAME = "org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$InternalConsumer";

@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
Consumer consumer = (Consumer) allArguments[6];
if (consumer != null && INTERNAL_CONSUMER_CLASS_NAME.equals(consumer.getClass().getName())) {
return;
}
allArguments[6] = new TracerConsumer(consumer, (String) objInst.getSkyWalkingDynamicField());
}

Expand Down
3 changes: 2 additions & 1 deletion apm-sniffer/apm-sdk-plugin/spring-plugins/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@
<module>spring-webflux-6.x-webclient-plugin</module>
<module>resttemplate-commons</module>
<module>spring-ai-1.x-plugin</module>
<module>spring-rabbitmq-plugin</module>
</modules>
<packaging>pom</packaging>

<name>apm-sdk-plugin</name>
<name>spring-plugins</name>
<url>http://maven.apache.org</url>

<properties>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.skywalking</groupId>
<artifactId>spring-plugins</artifactId>
<version>9.7.0-SNAPSHOT</version>
</parent>

<artifactId>apm-spring-rabbitmq-plugin</artifactId>
<name>apm-spring-rabbitmq-plugin</name>
<packaging>jar</packaging>

<properties>
<spring-rabbit.version>2.2.1.RELEASE</spring-rabbit.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>${spring-rabbit.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.spring.rabbitmq;

import java.lang.reflect.Method;
import java.util.Map;

import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.InstanceMethodsAroundInterceptorV2;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.MethodInvocationContext;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class SpringRabbitMQConsumerInterceptor implements InstanceMethodsAroundInterceptorV2 {
public static final String OPERATE_NAME_PREFIX = "RabbitMQ/";
public static final String CONSUMER_OPERATE_NAME_SUFFIX = "/Consumer";

@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInvocationContext context) throws Throwable {
Channel channel = (Channel) allArguments[0];
Message message = (Message) allArguments[1];
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In SpringRabbitMQConsumerInterceptor#beforeMethod, allArguments[1] is directly cast to
org.springframework.amqp.core.Message. However, in Spring AMQP, this argument (the data
parameter in executeListener) is an Object that can be a List when batch mode is enabled.

Refer to Spring AMQP's AbstractMessageListenerContainer.java

https://github.com/spring-projects/spring-amqp/blob/d892bfce5d002f051370dd6316538298a75a4eb6/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java#L1734-L1744

Performing a direct cast (Message) allArguments[1] will throw a ClassCastException in any Spring RabbitMQ application where batching is enabled.

MessageProperties messageProperties = message.getMessageProperties();
Map<String, Object> headers = messageProperties.getHeaders();
ContextCarrier contextCarrier = new ContextCarrier();
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
Object value = headers.get(next.getHeadKey());
if (value != null) {
next.setHeadValue(value.toString());
}
}
String operationName = OPERATE_NAME_PREFIX + "Topic/" + messageProperties.getReceivedExchange()
+ "Queue/" + messageProperties.getReceivedRoutingKey() + CONSUMER_OPERATE_NAME_SUFFIX;
AbstractSpan activeSpan = ContextManager.createEntrySpan(operationName, contextCarrier);
Connection connection = channel.getConnection();
String serverUrl = connection.getAddress().getHostAddress() + ":" + connection.getPort();
Tags.MQ_BROKER.set(activeSpan, serverUrl);
Tags.MQ_TOPIC.set(activeSpan, messageProperties.getReceivedExchange());
Tags.MQ_QUEUE.set(activeSpan, messageProperties.getReceivedRoutingKey());
activeSpan.setComponent(ComponentsDefine.RABBITMQ_CONSUMER);
activeSpan.setPeer(serverUrl);
SpanLayer.asMQ(activeSpan);
}

@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret, MethodInvocationContext context) throws Throwable {
ContextManager.stopSpan();
return ret;
}

@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t, MethodInvocationContext context) {
ContextManager.activeSpan().log(t);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.spring.rabbitmq.define;

import static net.bytebuddy.matcher.ElementMatchers.named;

import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.ClassInstanceMethodsEnhancePluginDefineV2;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.v2.DeclaredInstanceMethodsInterceptV2Point;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.v2.InstanceMethodsInterceptV2Point;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import org.apache.skywalking.apm.agent.core.plugin.match.NameMatch;

import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class SpringRabbitMQConsumerInstrumentation extends ClassInstanceMethodsEnhancePluginDefineV2 {
public static final String ENHANCE_CLASS = "org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer";
public static final String ENHANCE_METHOD = "executeListener";
public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.spring.rabbitmq.SpringRabbitMQConsumerInterceptor";

@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return null;
}

@Override
public InstanceMethodsInterceptV2Point[] getInstanceMethodsInterceptV2Points() {
return new InstanceMethodsInterceptV2Point[] {
new DeclaredInstanceMethodsInterceptV2Point() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_METHOD);
}

@Override
public String getMethodsInterceptorV2() {
return INTERCEPTOR_CLASS;
}

@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}

@Override
protected ClassMatch enhanceClass() {
return NameMatch.byName(ENHANCE_CLASS);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

spring-rabbitmq=org.apache.skywalking.apm.plugin.spring.rabbitmq.define.SpringRabbitMQConsumerInstrumentation
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.spring.rabbitmq;

import static org.hamcrest.CoreMatchers.is;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.net.InetAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.skywalking.apm.agent.core.context.SW8CarrierItem;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

@RunWith(TracingSegmentRunner.class)
public class RabbitMQSpringConsumerInterceptorTest {

@SegmentStoragePoint
private SegmentStorage segmentStorage;

private SpringRabbitMQConsumerInterceptor rabbitMQConsumerInterceptor;

@Mock
private EnhancedInstance enhancedInstance;

@Rule
public AgentServiceRule serviceRule = new AgentServiceRule();

@Before
public void setUp() throws Exception {
rabbitMQConsumerInterceptor = new SpringRabbitMQConsumerInterceptor();
}

@Test
public void testRabbitMQConsumerInterceptorWithNilHeaders() throws Throwable {
Object[] args = prepareMockData(false);
rabbitMQConsumerInterceptor.beforeMethod(enhancedInstance, null, args, new Class[0], null);
rabbitMQConsumerInterceptor.afterMethod(enhancedInstance, null, args, new Class[0], null, null);
List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
Assert.assertThat(traceSegments.size(), is(1));
}

@Test
public void testRabbitMQConsumerInterceptor() throws Throwable {
Object[] args = prepareMockData(true);
rabbitMQConsumerInterceptor.beforeMethod(enhancedInstance, null, args, new Class[0], null);
rabbitMQConsumerInterceptor.afterMethod(enhancedInstance, null, args, new Class[0], null, null);
List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
Assert.assertThat(traceSegments.size(), is(1));
}

private Object[] prepareMockData(boolean withHeaders) throws Exception {
Channel channel = mock(Channel.class);
Connection connection = mock(Connection.class);
InetAddress address = mock(InetAddress.class);
Message message = mock(Message.class);
MessageProperties messageProperties = mock(MessageProperties.class);

when(channel.getConnection()).thenReturn(connection);
when(connection.getAddress()).thenReturn(address);
when(address.getHostAddress()).thenReturn("127.0.0.1");
when(connection.getPort()).thenReturn(5672);
when(message.getMessageProperties()).thenReturn(messageProperties);
when(messageProperties.getReceivedExchange()).thenReturn("test-exchange");
when(messageProperties.getReceivedRoutingKey()).thenReturn("test-routing-key");

if (withHeaders) {
Map<String, Object> headers = new HashMap<>();
headers.put(SW8CarrierItem.HEADER_NAME,
"1-My40LjU=-MS4yLjM=-3-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA=");
when(messageProperties.getHeader(SW8CarrierItem.HEADER_NAME))
.thenReturn(headers.get(SW8CarrierItem.HEADER_NAME));
}

return new Object[] {channel, message};
}
}
1 change: 1 addition & 0 deletions docs/en/setup/service-agent/java-agent/Plugin-list.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
- spring-core-patch
- spring-kafka-1.x
- spring-kafka-2.x
- spring-rabbitmq
- spring-mvc-annotation
- spring-mvc-annotation-3.x
- spring-mvc-annotation-4.x
Expand Down
1 change: 1 addition & 0 deletions docs/en/setup/service-agent/java-agent/Supported-list.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ metrics based on the tracing data.
* [Spring-Kafka](https://github.com/spring-projects/spring-kafka) Spring Kafka Consumer 1.3.x -> 2.3.x (2.0.x and 2.1.x not tested and not recommended by [the official document](https://spring.io/projects/spring-kafka))
* [ActiveMQ](https://github.com/apache/activemq) 5.10.0 -> 5.15.4
* [RabbitMQ](https://www.rabbitmq.com/) 3.x-> 5.x
* [Spring-RabbitMQ](https://github.com/spring-projects/spring-amqp) 2.x -> 4.x
* [Pulsar](http://pulsar.apache.org) 2.2.x -> 2.9.x
* [NATS](https://github.com/nats-io/nats.java) 2.14.x -> 2.16.5
* [ActiveMQ-Artemis](https://github.com/apache/activemq) 2.30.0 -> 2.31.2
Expand Down
Loading
Loading