Skip to content

Commit 62735ee

Browse files
author
Pradeep Kunchala
committed
AMQ-9855: Defensive copy in VMTransport to align behavior with remote transports + unit test
1 parent 7d5291e commit 62735ee

2 files changed

Lines changed: 192 additions & 9 deletions

File tree

activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,11 @@ public Message copy() {
5454
return copy;
5555
}
5656

57-
private void copy(ActiveMQTextMessage copy) {
58-
super.copy(copy);
59-
copy.text = text;
57+
protected void copy(ActiveMQTextMessage copy) {
58+
synchronized(this) {
59+
super.copy(copy);
60+
copy.text = this.text;
61+
}
6062
}
6163

6264
@Override
@@ -70,14 +72,14 @@ public String getJMSXMimeType() {
7072
}
7173

7274
@Override
73-
public void setText(String text) throws MessageNotWriteableException {
75+
public synchronized void setText(String text) throws MessageNotWriteableException {
7476
checkReadOnlyBody();
7577
this.text = text;
7678
setContent(null);
7779
}
7880

7981
@Override
80-
public String getText() throws JMSException {
82+
public synchronized String getText() throws JMSException {
8183
ByteSequence content = getContent();
8284

8385
if (text == null && content != null) {
@@ -116,19 +118,19 @@ private String decodeContent(ByteSequence bodyAsBytes) throws JMSException {
116118
}
117119

118120
@Override
119-
public void beforeMarshall(WireFormat wireFormat) throws IOException {
121+
public synchronized void beforeMarshall(WireFormat wireFormat) throws IOException {
120122
super.beforeMarshall(wireFormat);
121123
storeContentAndClear();
122124
}
123125

124126
@Override
125-
public void storeContentAndClear() {
127+
public synchronized void storeContentAndClear() {
126128
storeContent();
127129
text=null;
128130
}
129131

130132
@Override
131-
public void storeContent() {
133+
public synchronized void storeContent() {
132134
try {
133135
ByteSequence content = getContent();
134136
String text = this.text;
@@ -153,7 +155,12 @@ public void storeContent() {
153155
// see https://issues.apache.org/activemq/browse/AMQ-2103
154156
// and https://issues.apache.org/activemq/browse/AMQ-2966
155157
@Override
156-
public void clearUnMarshalledState() throws JMSException {
158+
public synchronized void clearUnMarshalledState() throws JMSException {
159+
// Crucial: Store the content before we wipe the text
160+
// This ensures we don't end up with BOTH being null
161+
if (this.text != null && getContent() == null) {
162+
storeContent();
163+
}
157164
super.clearUnMarshalledState();
158165
this.text = null;
159166
}
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.activemq.command;
18+
19+
import jakarta.jms.Connection;
20+
import jakarta.jms.MessageConsumer;
21+
import jakarta.jms.Session;
22+
import jakarta.jms.Topic;
23+
import jakarta.jms.MessageProducer;
24+
import jakarta.jms.TextMessage;
25+
import jakarta.jms.JMSException;
26+
27+
import org.apache.activemq.ActiveMQConnectionFactory;
28+
import org.apache.activemq.broker.BrokerService;
29+
import org.junit.After;
30+
import org.junit.Before;
31+
import org.junit.Test;
32+
33+
import java.util.List;
34+
import java.util.ArrayList;
35+
import java.util.concurrent.ExecutorService;
36+
import java.util.concurrent.Executors;
37+
import java.util.concurrent.CountDownLatch;
38+
import java.util.concurrent.Future;
39+
import java.util.concurrent.TimeUnit;
40+
41+
import static org.junit.Assert.assertNotNull;
42+
import static org.junit.Assert.assertNotSame;
43+
import static org.junit.Assert.assertEquals;
44+
45+
public class ActiveMQTextMessageStressTest {
46+
47+
private BrokerService broker;
48+
private Connection connection;
49+
50+
@Before
51+
public void setUp() throws Exception {
52+
broker = new BrokerService();
53+
broker.setPersistent(false);
54+
broker.addConnector("vm://localhost");
55+
broker.start();
56+
57+
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
58+
connection = cf.createConnection();
59+
connection.setClientID("HIGH_CONC_TEST"); // needed for durable subscribers
60+
connection.start();
61+
}
62+
63+
@After
64+
public void tearDown() throws Exception {
65+
if (connection != null) connection.close();
66+
if (broker != null) broker.stop();
67+
}
68+
69+
@Test
70+
public void testConcurrentProducersAndConsumers() throws Exception {
71+
final int MESSAGE_COUNT = 100;
72+
final int PRODUCERS = 5;
73+
final int DURABLE_CONSUMERS = 0;
74+
final int NON_DURABLE_CONSUMERS = 5;
75+
76+
// Topic
77+
Session tmpSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
78+
Topic topic = tmpSession.createTopic("HIGH_CONC.TOPIC");
79+
80+
// Consumers
81+
List<MessageConsumer> consumers = new ArrayList<>();
82+
List<Session> consumerSessions = new ArrayList<>();
83+
for (int i = 1; i <= DURABLE_CONSUMERS; i++) {
84+
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
85+
consumers.add(s.createDurableSubscriber(topic, "Durable-" + i));
86+
consumerSessions.add(s);
87+
}
88+
for (int i = 1; i <= NON_DURABLE_CONSUMERS; i++) {
89+
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
90+
consumers.add(s.createConsumer(topic));
91+
consumerSessions.add(s);
92+
}
93+
94+
ExecutorService executor = Executors.newFixedThreadPool(PRODUCERS + consumers.size());
95+
96+
// Produce messages concurrently
97+
CountDownLatch producerLatch = new CountDownLatch(PRODUCERS);
98+
for (int p = 1; p <= PRODUCERS; p++) {
99+
final int producerId = p;
100+
executor.submit(() -> {
101+
try {
102+
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
103+
MessageProducer producer = s.createProducer(topic);
104+
for (int m = 1; m <= MESSAGE_COUNT; m++) {
105+
TextMessage msg = s.createTextMessage("P" + producerId + "-M" + m);
106+
producer.send(msg);
107+
}
108+
} catch (JMSException e) {
109+
e.printStackTrace();
110+
} finally {
111+
producerLatch.countDown();
112+
}
113+
});
114+
}
115+
116+
// Consume messages concurrently
117+
List<Future<List<TextMessage>>> consumerFutures = new ArrayList<>();
118+
for (MessageConsumer consumer : consumers) {
119+
consumerFutures.add(executor.submit(() -> {
120+
List<TextMessage> received = new ArrayList<>();
121+
for (int i = 0; i < MESSAGE_COUNT * PRODUCERS; i++) {
122+
TextMessage msg = (TextMessage) consumer.receive(5000);
123+
assertNotNull("Consumer should receive a message", msg);
124+
125+
// loop to increase the chance of hitting the race condition
126+
// while other consumers are doing the same to their copies.
127+
for (int j = 0; j < 50; j++) {
128+
String txt = msg.getText();
129+
assertNotNull("Text should never be null during stress", txt);
130+
131+
// It clears the 'text' field and forces the next getText()
132+
((org.apache.activemq.command.ActiveMQTextMessage)msg).clearUnMarshalledState();
133+
}
134+
135+
received.add(msg);
136+
}
137+
return received;
138+
}));
139+
}
140+
141+
// Wait for producers and consumers
142+
producerLatch.await();
143+
List<List<TextMessage>> allConsumed = new ArrayList<>();
144+
for (Future<List<TextMessage>> f : consumerFutures) {
145+
allConsumed.add(f.get(30, TimeUnit.SECONDS));
146+
}
147+
148+
// VALIDATION LOGIC
149+
for (int i = 0; i < allConsumed.size(); i++) {
150+
List<TextMessage> consumerMsgs = allConsumed.get(i);
151+
for (int j = i + 1; j < allConsumed.size(); j++) {
152+
List<TextMessage> otherMsgs = allConsumed.get(j);
153+
154+
for (int k = 0; k < consumerMsgs.size(); k++) {
155+
TextMessage m1 = consumerMsgs.get(k);
156+
TextMessage m2 = otherMsgs.get(k);
157+
158+
// 1. This SHOULD pass: They are different message objects
159+
assertNotSame("Message wrappers MUST be different instances", m1, m2);
160+
161+
// 2. This SHOULD pass: Strings are immutable, so sharing the reference is fine
162+
assertEquals("Sharing string references is actually okay and efficient", m1.getText(), m2.getText());
163+
164+
// 3. The "Real" check: Ensure the content didn't disappear
165+
assertNotNull("Content should be preserved in m1", m1.getText());
166+
assertNotNull("Content should be preserved in m2", m2.getText());
167+
assertEquals("Content must match", m1.getText(), m2.getText());
168+
}
169+
}
170+
}
171+
172+
executor.shutdownNow();
173+
for (Session s : consumerSessions) s.close();
174+
}
175+
176+
}

0 commit comments

Comments
 (0)