Skip to content

Commit 514b6fb

Browse files
committed
Update spring rabbitmq plugin Batch message consume
1 parent 32079d6 commit 514b6fb

File tree

7 files changed

+306
-22
lines changed

7 files changed

+306
-22
lines changed

apm-sniffer/apm-sdk-plugin/rabbitmq-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQConsumerInterceptor.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,18 @@
2626

2727
public class RabbitMQConsumerInterceptor implements InstanceMethodsAroundInterceptor {
2828

29-
public static final String INTERNAL_CONSUMER_CLASS_NAME = "org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$InternalConsumer";
29+
public static final String SMLC_INTERNAL_CONSUMER = "org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$InternalConsumer";
30+
public static final String DMLC_INTERNAL_CONSUMER = "org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer";
3031

3132
@Override
3233
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
3334
MethodInterceptResult result) throws Throwable {
3435
Consumer consumer = (Consumer) allArguments[6];
35-
if (consumer != null && INTERNAL_CONSUMER_CLASS_NAME.equals(consumer.getClass().getName())) {
36-
return;
36+
if (consumer != null) {
37+
String className = consumer.getClass().getName();
38+
if (SMLC_INTERNAL_CONSUMER.equals(className) || DMLC_INTERNAL_CONSUMER.equals(className)) {
39+
return;
40+
}
3741
}
3842
allArguments[6] = new TracerConsumer(consumer, (String) objInst.getSkyWalkingDynamicField());
3943
}

apm-sniffer/apm-sdk-plugin/spring-plugins/spring-rabbitmq-plugin/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
<packaging>jar</packaging>
3333

3434
<properties>
35-
<spring-rabbit.version>2.2.1.RELEASE</spring-rabbit.version>
35+
<spring-rabbit.version>3.0.0</spring-rabbit.version>
3636
</properties>
3737

3838
<dependencies>

apm-sniffer/apm-sdk-plugin/spring-plugins/spring-rabbitmq-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/rabbitmq/SpringRabbitMQConsumerInterceptor.java

Lines changed: 75 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.skywalking.apm.plugin.spring.rabbitmq;
2020

2121
import java.lang.reflect.Method;
22+
import java.util.List;
2223
import java.util.Map;
2324

2425
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
@@ -45,9 +46,58 @@ public class SpringRabbitMQConsumerInterceptor implements InstanceMethodsAroundI
4546
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
4647
MethodInvocationContext context) throws Throwable {
4748
Channel channel = (Channel) allArguments[0];
48-
Message message = (Message) allArguments[1];
49-
MessageProperties messageProperties = message.getMessageProperties();
50-
Map<String, Object> headers = messageProperties.getHeaders();
49+
50+
if (allArguments[1] instanceof Message) {
51+
// Single message consume
52+
Message message = (Message) allArguments[1];
53+
MessageProperties messageProperties = message.getMessageProperties();
54+
Map<String, Object> headers = messageProperties.getHeaders();
55+
56+
ContextCarrier contextCarrier = buildContextCarrier(headers);
57+
String operationName = buildOperationName(messageProperties);
58+
AbstractSpan activeSpan = ContextManager.createEntrySpan(operationName, contextCarrier);
59+
60+
setSpanAttributes(activeSpan, channel, messageProperties);
61+
} else if (allArguments[1] instanceof List) {
62+
// Batch message consume
63+
List<?> messages = (List<?>) allArguments[1];
64+
if (messages.isEmpty()) {
65+
return;
66+
}
67+
68+
// Use the first message to create EntrySpan
69+
Message firstMessage = (Message) messages.get(0);
70+
MessageProperties firstMessageProperties = firstMessage.getMessageProperties();
71+
Map<String, Object> firstMessageHeaders = firstMessageProperties.getHeaders();
72+
73+
ContextCarrier contextCarrier = buildContextCarrier(firstMessageHeaders);
74+
String operationName = buildOperationName(firstMessageProperties);
75+
AbstractSpan activeSpan = ContextManager.createEntrySpan(operationName, contextCarrier);
76+
77+
setSpanAttributes(activeSpan, channel, firstMessageProperties);
78+
79+
// Extract trace context from remaining messages (skip first, already used for EntrySpan)
80+
// to correlate all producer traces with this consumer span
81+
for (int i = 1; i < messages.size(); i++) {
82+
Object msg = messages.get(i);
83+
if (msg instanceof Message) {
84+
Message message = (Message) msg;
85+
MessageProperties messageProperties = message.getMessageProperties();
86+
Map<String, Object> headers = messageProperties.getHeaders();
87+
88+
ContextCarrier carrier = buildContextCarrier(headers);
89+
if (carrier.isValid()) {
90+
ContextManager.extract(carrier);
91+
}
92+
}
93+
}
94+
}
95+
}
96+
97+
/**
98+
* Build ContextCarrier from message headers
99+
*/
100+
private ContextCarrier buildContextCarrier(Map<String, Object> headers) {
51101
ContextCarrier contextCarrier = new ContextCarrier();
52102
CarrierItem next = contextCarrier.items();
53103
while (next.hasNext()) {
@@ -57,29 +107,40 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
57107
next.setHeadValue(value.toString());
58108
}
59109
}
60-
String operationName = OPERATE_NAME_PREFIX + "Topic/" + messageProperties.getReceivedExchange()
61-
+ "Queue/" + messageProperties.getReceivedRoutingKey() + CONSUMER_OPERATE_NAME_SUFFIX;
62-
AbstractSpan activeSpan = ContextManager.createEntrySpan(operationName, contextCarrier);
110+
return contextCarrier;
111+
}
112+
113+
private String buildOperationName(MessageProperties messageProperties) {
114+
return OPERATE_NAME_PREFIX + "Topic/" + messageProperties.getReceivedExchange()
115+
+ "Queue/" + messageProperties.getReceivedRoutingKey()
116+
+ CONSUMER_OPERATE_NAME_SUFFIX;
117+
}
118+
119+
private void setSpanAttributes(AbstractSpan span, Channel channel, MessageProperties messageProperties) {
63120
Connection connection = channel.getConnection();
64121
String serverUrl = connection.getAddress().getHostAddress() + ":" + connection.getPort();
65-
Tags.MQ_BROKER.set(activeSpan, serverUrl);
66-
Tags.MQ_TOPIC.set(activeSpan, messageProperties.getReceivedExchange());
67-
Tags.MQ_QUEUE.set(activeSpan, messageProperties.getReceivedRoutingKey());
68-
activeSpan.setComponent(ComponentsDefine.RABBITMQ_CONSUMER);
69-
activeSpan.setPeer(serverUrl);
70-
SpanLayer.asMQ(activeSpan);
122+
Tags.MQ_BROKER.set(span, serverUrl);
123+
Tags.MQ_TOPIC.set(span, messageProperties.getReceivedExchange());
124+
Tags.MQ_QUEUE.set(span, messageProperties.getReceivedRoutingKey());
125+
span.setComponent(ComponentsDefine.RABBITMQ_CONSUMER);
126+
span.setPeer(serverUrl);
127+
SpanLayer.asMQ(span);
71128
}
72129

73130
@Override
74131
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
75132
Object ret, MethodInvocationContext context) throws Throwable {
76-
ContextManager.stopSpan();
133+
if (ContextManager.isActive()) {
134+
ContextManager.stopSpan();
135+
}
77136
return ret;
78137
}
79138

80139
@Override
81140
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
82141
Class<?>[] argumentsTypes, Throwable t, MethodInvocationContext context) {
83-
ContextManager.activeSpan().log(t);
142+
if (ContextManager.isActive()) {
143+
ContextManager.activeSpan().log(t);
144+
}
84145
}
85146
}

apm-sniffer/apm-sdk-plugin/spring-plugins/spring-rabbitmq-plugin/src/test/java/org/apache/skywalking/apm/plugin/spring/rabbitmq/RabbitMQSpringConsumerInterceptorTest.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import static org.mockito.Mockito.when;
2424

2525
import java.net.InetAddress;
26+
import java.util.ArrayList;
27+
import java.util.Arrays;
2628
import java.util.HashMap;
2729
import java.util.List;
2830
import java.util.Map;
@@ -83,6 +85,33 @@ public void testRabbitMQConsumerInterceptor() throws Throwable {
8385
Assert.assertThat(traceSegments.size(), is(1));
8486
}
8587

88+
@Test
89+
public void testBatchMessageConsumption() throws Throwable {
90+
Object[] args = prepareMockBatchData(true);
91+
rabbitMQConsumerInterceptor.beforeMethod(enhancedInstance, null, args, new Class[0], null);
92+
rabbitMQConsumerInterceptor.afterMethod(enhancedInstance, null, args, new Class[0], null, null);
93+
List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
94+
Assert.assertThat(traceSegments.size(), is(1));
95+
}
96+
97+
@Test
98+
public void testEmptyBatchMessageConsumption() throws Throwable {
99+
Channel channel = mock(Channel.class);
100+
Connection connection = mock(Connection.class);
101+
InetAddress address = mock(InetAddress.class);
102+
103+
when(channel.getConnection()).thenReturn(connection);
104+
when(connection.getAddress()).thenReturn(address);
105+
when(address.getHostAddress()).thenReturn("127.0.0.1");
106+
when(connection.getPort()).thenReturn(5672);
107+
108+
Object[] args = new Object[] {channel, new java.util.ArrayList<Message>()};
109+
rabbitMQConsumerInterceptor.beforeMethod(enhancedInstance, null, args, new Class[0], null);
110+
rabbitMQConsumerInterceptor.afterMethod(enhancedInstance, null, args, new Class[0], null, null);
111+
List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
112+
Assert.assertThat(traceSegments.size(), is(0));
113+
}
114+
86115
private Object[] prepareMockData(boolean withHeaders) throws Exception {
87116
Channel channel = mock(Channel.class);
88117
Connection connection = mock(Connection.class);
@@ -108,4 +137,56 @@ private Object[] prepareMockData(boolean withHeaders) throws Exception {
108137

109138
return new Object[] {channel, message};
110139
}
140+
141+
private Object[] prepareMockBatchData(boolean withHeaders) throws Exception {
142+
Channel channel = mock(Channel.class);
143+
Connection connection = mock(Connection.class);
144+
InetAddress address = mock(InetAddress.class);
145+
146+
when(channel.getConnection()).thenReturn(connection);
147+
when(connection.getAddress()).thenReturn(address);
148+
when(address.getHostAddress()).thenReturn("127.0.0.1");
149+
when(connection.getPort()).thenReturn(5672);
150+
151+
Message message1 = mock(Message.class);
152+
Message message2 = mock(Message.class);
153+
Message message3 = mock(Message.class);
154+
MessageProperties props1 = mock(MessageProperties.class);
155+
MessageProperties props2 = mock(MessageProperties.class);
156+
MessageProperties props3 = mock(MessageProperties.class);
157+
158+
when(message1.getMessageProperties()).thenReturn(props1);
159+
when(message2.getMessageProperties()).thenReturn(props2);
160+
when(message3.getMessageProperties()).thenReturn(props3);
161+
162+
when(props1.getReceivedExchange()).thenReturn("test-exchange");
163+
when(props1.getReceivedRoutingKey()).thenReturn("test-routing-key");
164+
when(props2.getReceivedExchange()).thenReturn("test-exchange");
165+
when(props2.getReceivedRoutingKey()).thenReturn("test-routing-key");
166+
when(props3.getReceivedExchange()).thenReturn("test-exchange");
167+
when(props3.getReceivedRoutingKey()).thenReturn("test-routing-key");
168+
169+
if (withHeaders) {
170+
Map<String, Object> headers1 = new HashMap<>();
171+
headers1.put(SW8CarrierItem.HEADER_NAME,
172+
"1-My40LjU=-MS4yLjM=-3-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA=");
173+
when(props1.getHeader(SW8CarrierItem.HEADER_NAME))
174+
.thenReturn(headers1.get(SW8CarrierItem.HEADER_NAME));
175+
176+
Map<String, Object> headers2 = new HashMap<>();
177+
headers2.put(SW8CarrierItem.HEADER_NAME,
178+
"1-NTY3Ljg=-OS4xMC4xMQ==-12-ZXJ2aWNlMg==-aW5zdGFuY2UyLU9hcHA=-MTI3LjAuMC4yOjgwODA=");
179+
when(props2.getHeader(SW8CarrierItem.HEADER_NAME))
180+
.thenReturn(headers2.get(SW8CarrierItem.HEADER_NAME));
181+
182+
Map<String, Object> headers3 = new HashMap<>();
183+
headers3.put(SW8CarrierItem.HEADER_NAME,
184+
"1-MTExLjIyMi4zMzM=-NDQ0LjU1NS42NjY=-MzQ1-ZXJ2aWNlMw==-aW5zdGFuY2UzLU9hcHA=-MTI3LjAuMC4zOjgwODA=");
185+
when(props3.getHeader(SW8CarrierItem.HEADER_NAME))
186+
.thenReturn(headers3.get(SW8CarrierItem.HEADER_NAME));
187+
}
188+
189+
List<Message> messages = new ArrayList<>(Arrays.asList(message1, message2, message3));
190+
return new Object[] {channel, messages};
191+
}
111192
}

test/plugin/scenarios/spring-rabbitmq-scenario/config/expectedData.yaml

Lines changed: 96 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@
1515
# limitations under the License.
1616
segmentItems:
1717
- serviceName: spring-rabbitmq-scenario
18-
segmentSize: gt 2
18+
segmentSize: ge 4
1919
segments:
2020
- segmentId: not null
2121
spans:
22-
- operationName: GET:/spring-rabbitmq-scenario/case/rabbitmq
22+
- operationName: HEAD:/spring-rabbitmq-scenario/case/healthcheck
2323
parentSpanId: -1
2424
spanId: 0
2525
spanLayer: Http
@@ -30,10 +30,12 @@ segmentItems:
3030
spanType: Entry
3131
peer: ''
3232
tags:
33-
- {key: url, value: 'http://localhost:8080/spring-rabbitmq-scenario/case/rabbitmq'}
34-
- {key: http.method, value: GET}
33+
- {key: url, value: 'http://localhost:8080/spring-rabbitmq-scenario/case/healthcheck'}
34+
- {key: http.method, value: HEAD}
3535
- {key: http.status_code, value: '200'}
3636
skipAnalysis: 'false'
37+
- segmentId: not null
38+
spans:
3739
- operationName: RabbitMQ/Topic/Queue/test/Producer
3840
parentSpanId: 0
3941
spanId: 1
@@ -49,6 +51,96 @@ segmentItems:
4951
- {key: mq.queue, value: test}
5052
- {key: mq.topic, value: ''}
5153
skipAnalysis: 'false'
54+
- operationName: RabbitMQ/Topic/Queue/test-batch/Producer
55+
parentSpanId: 0
56+
spanId: 2
57+
spanLayer: MQ
58+
startTime: nq 0
59+
endTime: nq 0
60+
componentId: 52
61+
isError: false
62+
spanType: Exit
63+
peer: not blank
64+
tags:
65+
- {key: mq.broker, value: not blank}
66+
- {key: mq.queue, value: test-batch}
67+
- {key: mq.topic, value: ''}
68+
skipAnalysis: 'false'
69+
- operationName: RabbitMQ/Topic/Queue/test-batch/Producer
70+
parentSpanId: 0
71+
spanId: 3
72+
spanLayer: MQ
73+
startTime: nq 0
74+
endTime: nq 0
75+
componentId: 52
76+
isError: false
77+
spanType: Exit
78+
peer: not blank
79+
tags:
80+
- {key: mq.broker, value: not blank}
81+
- {key: mq.queue, value: test-batch}
82+
- {key: mq.topic, value: ''}
83+
skipAnalysis: 'false'
84+
- operationName: RabbitMQ/Topic/Queue/test-batch/Producer
85+
parentSpanId: 0
86+
spanId: 4
87+
spanLayer: MQ
88+
startTime: nq 0
89+
endTime: nq 0
90+
componentId: 52
91+
isError: false
92+
spanType: Exit
93+
peer: not blank
94+
tags:
95+
- {key: mq.broker, value: not blank}
96+
- {key: mq.queue, value: test-batch}
97+
- {key: mq.topic, value: ''}
98+
skipAnalysis: 'false'
99+
- operationName: GET:/spring-rabbitmq-scenario/case/rabbitmq
100+
parentSpanId: -1
101+
spanId: 0
102+
spanLayer: Http
103+
startTime: nq 0
104+
endTime: nq 0
105+
componentId: 1
106+
isError: false
107+
spanType: Entry
108+
peer: ''
109+
tags:
110+
- {key: url, value: 'http://localhost:8080/spring-rabbitmq-scenario/case/rabbitmq'}
111+
- {key: http.method, value: GET}
112+
- {key: http.status_code, value: '200'}
113+
skipAnalysis: 'false'
114+
- segmentId: not null
115+
spans:
116+
- operationName: RabbitMQ/Topic/Queue/test-batch/Consumer
117+
parentSpanId: -1
118+
spanId: 0
119+
spanLayer: MQ
120+
startTime: nq 0
121+
endTime: nq 0
122+
componentId: 53
123+
isError: false
124+
spanType: Entry
125+
peer: not blank
126+
tags:
127+
- {key: transmission.latency, value: ge 0}
128+
- {key: mq.broker, value: not blank}
129+
- {key: mq.topic, value: ''}
130+
- {key: mq.queue, value: test-batch}
131+
- {key: transmission.latency, value: ge 0}
132+
- {key: transmission.latency, value: ge 0}
133+
refs:
134+
- {parentEndpoint: GET:/spring-rabbitmq-scenario/case/rabbitmq, networkAddress: not null,
135+
refType: CrossProcess, parentSpanId: 2, parentTraceSegmentId: not null, parentServiceInstance: not
136+
null, parentService: spring-rabbitmq-scenario, traceId: not null}
137+
- {parentEndpoint: GET:/spring-rabbitmq-scenario/case/rabbitmq, networkAddress: not null,
138+
refType: CrossProcess, parentSpanId: 3, parentTraceSegmentId: not null, parentServiceInstance: not
139+
null, parentService: spring-rabbitmq-scenario, traceId: not null}
140+
- {parentEndpoint: GET:/spring-rabbitmq-scenario/case/rabbitmq, networkAddress: not null,
141+
refType: CrossProcess, parentSpanId: 4, parentTraceSegmentId: not null, parentServiceInstance: not
142+
null, parentService: spring-rabbitmq-scenario, traceId: not null}
143+
skipAnalysis: 'false'
52144
- segmentId: not null
53145
spans:
54146
- operationName: RabbitMQ/Topic/Queue/test/Consumer

0 commit comments

Comments
 (0)