Skip to content

Commit 38400ee

Browse files
rootjeanouii
authored andcommitted
[test] Fix ConnectionFailureEvictsFromPoolTest: eliminate flaky async races
Two race conditions caused testEvictionXA to fail intermittently: 1. Exception event propagation: ActiveMQConnection.addTransportListener() callbacks fire via executeAsync(), which silently drops tasks when the pool's ExceptionListener closes the connection and shuts down the executor first. Fixed by intercepting at the MockTransport level where exception propagation is synchronous. 2. Pool eviction timing: The pool evicts broken connections asynchronously via ExceptionListener fired through executeAsync(). The test could request a new connection before eviction completed. Fixed by using Wait.waitFor() retry pattern (consistent with other pool tests).
1 parent 70e309a commit 38400ee

1 file changed

Lines changed: 27 additions & 13 deletions

File tree

activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.activemq.test.TestSupport;
4040
import org.apache.activemq.transport.TransportListener;
4141
import org.apache.activemq.transport.mock.MockTransport;
42+
import org.apache.activemq.util.Wait;
4243
import org.slf4j.Logger;
4344
import org.slf4j.LoggerFactory;
4445

@@ -94,17 +95,27 @@ public void doTestEviction(ConnectionFactory pooledFactory) throws Exception {
9495
final CountDownLatch gotExceptionEvent = new CountDownLatch(1);
9596
try (final PooledConnection connection = (PooledConnection) pooledFactory.createConnection()) {
9697
final ActiveMQConnection amqC = (ActiveMQConnection) connection.getConnection();
97-
amqC.addTransportListener(new TransportListener() {
98+
// Intercept exception propagation at the MockTransport level where it fires
99+
// synchronously. ActiveMQConnection.addTransportListener() callbacks fire via
100+
// executeAsync(), which silently drops the task if the pool's ExceptionListener
101+
// closes the connection and shuts down the executor first (race condition that
102+
// affects the XA path).
103+
final MockTransport mockTransport = (MockTransport) amqC.getTransportChannel().narrow(MockTransport.class);
104+
final TransportListener originalListener = mockTransport.getTransportListener();
105+
mockTransport.setTransportListener(new TransportListener() {
98106
public void onCommand(Object command) {
107+
originalListener.onCommand(command);
99108
}
100109
public void onException(IOException error) {
101-
// we know connection is dead...
102-
// listeners are fired async
110+
// fires synchronously when MockTransport.onException() is called
103111
gotExceptionEvent.countDown();
112+
originalListener.onException(error);
104113
}
105114
public void transportInterupted() {
115+
originalListener.transportInterupted();
106116
}
107117
public void transportResumed() {
118+
originalListener.transportResumed();
108119
}
109120
});
110121

@@ -116,18 +127,21 @@ public void transportResumed() {
116127
TestCase.fail("Expected Error");
117128
} catch (JMSException e) {
118129
}
119-
// Wait for async exception event BEFORE the try-with-resources closes the connection.
120-
// ActiveMQConnection.onException() fires TransportListener callbacks via executeAsync(),
121-
// so the callback runs in a separate thread. If we wait after connection.close(), the
122-
// async executor may already be shut down and the callback never fires.
123-
TestCase.assertTrue("exception event propagated ok", gotExceptionEvent.await(15, TimeUnit.SECONDS));
130+
TestCase.assertTrue("exception event propagated ok", gotExceptionEvent.await(5, TimeUnit.SECONDS));
124131
}
125-
// If we get another connection now it should be a new connection that
126-
// works.
132+
// After the failure, a new connection from the pool should work.
133+
// The pool eviction is async (ExceptionListener fires via executeAsync),
134+
// so retry until the pool returns a working connection.
127135
LOG.info("expect new connection after failure");
128-
try (final Connection connection2 = pooledFactory.createConnection()) {
129-
sendMessage(connection2);
130-
}
136+
assertTrue("pool should provide working connection after eviction",
137+
Wait.waitFor(() -> {
138+
try (final Connection connection2 = pooledFactory.createConnection()) {
139+
sendMessage(connection2);
140+
return true;
141+
} catch (Exception e) {
142+
return false;
143+
}
144+
}, 5000, 100));
131145
}
132146

133147
private void createConnectionFailure(Connection connection) throws Exception {

0 commit comments

Comments
 (0)