From b794ba7d9a2a178f825590278f628e93ce8601c6 Mon Sep 17 00:00:00 2001 From: Jean-Louis Monteiro Date: Tue, 24 Feb 2026 10:09:13 +0100 Subject: [PATCH 1/4] test(unit-tests): improve reliability of message consumption in tests --- .../apache/activemq/store/StoreOrderTest.java | 2 ++ .../activemq/store/jdbc/XACompletionTest.java | 7 +----- ...eZeroPrefetchLazyDispatchPriorityTest.java | 22 +++++++++++++------ 3 files changed, 18 insertions(+), 13 deletions(-) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/StoreOrderTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/StoreOrderTest.java index 58c853e1bad..8dfe3da99b6 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/StoreOrderTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/StoreOrderTest.java @@ -120,6 +120,7 @@ public void stopBroker() throws Exception { } if (broker != null) { broker.stop(); + broker.waitUntilStopped(); } } @@ -257,6 +258,7 @@ protected BrokerService startBroker(boolean deleteMessagesOnStartup) throws Exce configureBroker(newBroker); newBroker.setDeleteAllMessagesOnStartup(deleteMessagesOnStartup); newBroker.start(); + newBroker.waitUntilStarted(); return newBroker; } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java index 5b7a3a417fe..26e5784087e 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java @@ -275,12 +275,7 @@ public void testStatsAndBrowseAfterAckPreparedRolledback() throws Exception { dumpMessages(); - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return proxy.getInFlightCount() == 0l; - } - }); + Wait.waitFor(() -> proxy.getInFlightCount() == 0L && proxy.cursorSize() == 0); assertEquals("prefetch", 0, proxy.getInFlightCount()); assertEquals("size", 10, proxy.getQueueSize()); assertEquals("cursor size", 0, proxy.cursorSize()); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java index 4c5b1937413..7ebe48cc1d3 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java @@ -199,15 +199,23 @@ public void testPriorityMessagesWithJmsBrowser() throws Exception { assertNotNull(message); assertEquals(5, message.getJMSPriority()); - // consume messages - final ArrayList consumeList = consumeMessages("TestQ"); + // Wait for remaining messages to be fully available after consumeOneMessage closes its connection. + // With lazyDispatch=true + optimizedDispatch=true, messages may briefly be in-flight + // during connection teardown and not yet re-queued for dispatch to a new consumer. + final int remaining = numToSend - 1; + assertTrue("Remaining messages available for dispatch", Wait.waitFor(() -> { + final Queue q = (Queue) broker.getDestination(destination); + return q != null + && q.getDestinationStatistics().getMessages().getCount() == remaining + && q.getDestinationStatistics().getInflight().getCount() == 0; + }, 5000, 100)); + + // consume messages (use timeout-based overload for reliable dispatch on slow CI) + final ArrayList consumeList = consumeMessages("TestQ", remaining, TimeUnit.SECONDS.toMillis(30)); LOG.info("Consumed list {}", consumeList.size()); - // compare lists - // assertEquals("Iteration: " + i - // +", message 1 should be priority high", 5, - // consumeList.get(0).getJMSPriority()); - for (int j = 1; j < (numToSend - 1); j++) { + assertEquals("Iteration: " + i + ", all remaining messages consumed", remaining, consumeList.size()); + for (int j = 0; j < consumeList.size(); j++) { assertEquals("Iteration: " + i + ", message " + j + " should be priority medium", 4, consumeList.get(j).getJMSPriority()); } } From 56a7815861f72df5c421bdf8a75e6fbcd7ba7e32 Mon Sep 17 00:00:00 2001 From: root Date: Tue, 24 Feb 2026 14:34:20 +0000 Subject: [PATCH 2/4] [test] Fix ConnectionFailureEvictsFromPoolTest: eliminate flaky async races Two race conditions caused testEvictionXA to fail intermittently: 1. Exception event propagation: ActiveMQConnection.addTransportListener() callbacks fire via executeAsync(), which silently drops tasks when the pool's ExceptionListener closes the connection and shuts down the executor first. Fixed by intercepting at the MockTransport level where exception propagation is synchronous. 2. Pool eviction timing: The pool evicts broken connections asynchronously via ExceptionListener fired through executeAsync(). The test could request a new connection before eviction completed. Fixed by using Wait.waitFor() retry pattern (consistent with other pool tests). --- .../ConnectionFailureEvictsFromPoolTest.java | 40 +++++++++++++------ 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java b/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java index 596eb00fe3b..3a0fc679437 100644 --- a/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java +++ b/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java @@ -39,6 +39,7 @@ import org.apache.activemq.test.TestSupport; import org.apache.activemq.transport.TransportListener; import org.apache.activemq.transport.mock.MockTransport; +import org.apache.activemq.util.Wait; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,17 +95,27 @@ public void doTestEviction(ConnectionFactory pooledFactory) throws Exception { final CountDownLatch gotExceptionEvent = new CountDownLatch(1); try (final PooledConnection connection = (PooledConnection) pooledFactory.createConnection()) { final ActiveMQConnection amqC = (ActiveMQConnection) connection.getConnection(); - amqC.addTransportListener(new TransportListener() { + // Intercept exception propagation at the MockTransport level where it fires + // synchronously. ActiveMQConnection.addTransportListener() callbacks fire via + // executeAsync(), which silently drops the task if the pool's ExceptionListener + // closes the connection and shuts down the executor first (race condition that + // affects the XA path). + final MockTransport mockTransport = (MockTransport) amqC.getTransportChannel().narrow(MockTransport.class); + final TransportListener originalListener = mockTransport.getTransportListener(); + mockTransport.setTransportListener(new TransportListener() { public void onCommand(Object command) { + originalListener.onCommand(command); } public void onException(IOException error) { - // we know connection is dead... - // listeners are fired async + // fires synchronously when MockTransport.onException() is called gotExceptionEvent.countDown(); + originalListener.onException(error); } public void transportInterupted() { + originalListener.transportInterupted(); } public void transportResumed() { + originalListener.transportResumed(); } }); @@ -116,18 +127,21 @@ public void transportResumed() { TestCase.fail("Expected Error"); } catch (JMSException e) { } - // Wait for async exception event BEFORE the try-with-resources closes the connection. - // ActiveMQConnection.onException() fires TransportListener callbacks via executeAsync(), - // so the callback runs in a separate thread. If we wait after connection.close(), the - // async executor may already be shut down and the callback never fires. - TestCase.assertTrue("exception event propagated ok", gotExceptionEvent.await(15, TimeUnit.SECONDS)); + TestCase.assertTrue("exception event propagated ok", gotExceptionEvent.await(5, TimeUnit.SECONDS)); } - // If we get another connection now it should be a new connection that - // works. + // After the failure, a new connection from the pool should work. + // The pool eviction is async (ExceptionListener fires via executeAsync), + // so retry until the pool returns a working connection. LOG.info("expect new connection after failure"); - try (final Connection connection2 = pooledFactory.createConnection()) { - sendMessage(connection2); - } + assertTrue("pool should provide working connection after eviction", + Wait.waitFor(() -> { + try (final Connection connection2 = pooledFactory.createConnection()) { + sendMessage(connection2); + return true; + } catch (Exception e) { + return false; + } + }, 5000, 100)); } private void createConnectionFailure(Connection connection) throws Exception { From a639a48bf5cc181610ca211a762f6788228fd209 Mon Sep 17 00:00:00 2001 From: Jean-Louis Monteiro Date: Tue, 24 Feb 2026 19:57:13 +0100 Subject: [PATCH 3/4] test(MaxFrameSizeEnabled): increase timeouts to improve test reliability --- .../transport/MaxFrameSizeEnabledTest.java | 17 +++++++++++------ ...strictedThreadPoolInactivityTimeoutTest.java | 2 ++ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java index ed6635480d5..77aafacbab1 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java @@ -55,6 +55,9 @@ public class MaxFrameSizeEnabledTest { private static final int CONNECTION_COUNT = 3; private static final int MESSAGE_ATTEMPTS = 3; private static final int BODY_SIZE = 20000; // large enough to trip 2k limit, compressible enough for 60k + private static final long BROKER_START_TIMEOUT_MS = 30_000; + private static final long BROKER_STOP_TIMEOUT_MS = 30_000; + private static final int TEST_TIMEOUT_MS = 120_000; private BrokerService broker; private final String transportType; @@ -158,30 +161,32 @@ public void after() throws Exception { } public BrokerService createBroker(String connectorName, String connectorString) throws Exception { - BrokerService broker = new BrokerService(); + final BrokerService broker = new BrokerService(); broker.setPersistent(false); broker.setUseJmx(false); - TransportConnector connector = broker.addConnector(connectorString); + final TransportConnector connector = broker.addConnector(connectorString); connector.setName(connectorName); broker.start(); - broker.waitUntilStarted(); + assertTrue("Broker should start within timeout", + Wait.waitFor(broker::isStarted, BROKER_START_TIMEOUT_MS, 100)); return broker; } public void stopBroker(BrokerService broker) throws Exception { if (broker != null) { broker.stop(); - broker.waitUntilStopped(); + assertTrue("Broker should stop within timeout", + Wait.waitFor(broker::isStopped, BROKER_STOP_TIMEOUT_MS, 100)); } } - @Test + @Test(timeout = TEST_TIMEOUT_MS) public void testMaxFrameSize() throws Exception { broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=2048" + getServerParams()); testMaxFrameSize(transportType, getClientUri(broker.getConnectorByName(transportType).getConnectUri().getPort()), false); } - @Test + @Test(timeout = TEST_TIMEOUT_MS) public void testMaxFrameSizeCompression() throws Exception { // Test message body length is 99841 bytes. Compresses to ~ 48000 broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=60000" + getServerParams()); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java index 7eaefd23bde..bf3ab74a9ae 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java @@ -35,6 +35,7 @@ public class RestrictedThreadPoolInactivityTimeoutTest extends JmsTestSupport { private static final Logger LOG = LoggerFactory.getLogger(RestrictedThreadPoolInactivityTimeoutTest.class); + private static final int TEST_TIMEOUT_MS = 120_000; public String brokerTransportScheme = "tcp"; public Boolean rejectWork = Boolean.FALSE; @@ -86,6 +87,7 @@ public void initCombosForTestThreadsInvolvedInXInactivityTimeouts() { addCombinationValues("rejectWork", new Object[] {Boolean.TRUE, Boolean.FALSE}); } + @org.junit.Test(timeout = TEST_TIMEOUT_MS) public void testThreadsInvolvedInXInactivityTimeouts() throws Exception { URI tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(0).getConnectUri()); From 838abb498992ea51467e936c638247d8278853a8 Mon Sep 17 00:00:00 2001 From: Jean-Louis Monteiro Date: Tue, 24 Feb 2026 21:32:41 +0100 Subject: [PATCH 4/4] test(AMQ2149): enhance prefetch policy for transactional connections --- .../test/java/org/apache/activemq/bugs/AMQ2149Test.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java index 37b6cdeb260..4ec8c592318 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java @@ -41,6 +41,7 @@ import org.junit.Test; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.DestinationStatistics; @@ -171,6 +172,12 @@ public Receiver(jakarta.jms.Destination dest, boolean transactional) throws JMSE this.transactional = transactional; ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); connectionFactory.setWatchTopicAdvisories(false); + if (transactional) { + final ActiveMQPrefetchPolicy policy = connectionFactory.getPrefetchPolicy(); + policy.setQueuePrefetch(1); + policy.setTopicPrefetch(1); + policy.setDurableTopicPrefetch(1); + } connection = connectionFactory.createConnection(); connection.setClientID(dest.toString()); session = connection.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); @@ -194,7 +201,7 @@ public long getNextExpectedSeqNo() { final int TRANSACITON_BATCH = 500; boolean resumeOnNextOrPreviousIsOk = false; - public void onMessage(Message message) { + public synchronized void onMessage(Message message) { try { final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY); if ((seqNum % TRANSACITON_BATCH) == 0) {