diff --git a/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java b/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java index 596eb00fe3b..3a0fc679437 100644 --- a/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java +++ b/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java @@ -39,6 +39,7 @@ import org.apache.activemq.test.TestSupport; import org.apache.activemq.transport.TransportListener; import org.apache.activemq.transport.mock.MockTransport; +import org.apache.activemq.util.Wait; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,17 +95,27 @@ public void doTestEviction(ConnectionFactory pooledFactory) throws Exception { final CountDownLatch gotExceptionEvent = new CountDownLatch(1); try (final PooledConnection connection = (PooledConnection) pooledFactory.createConnection()) { final ActiveMQConnection amqC = (ActiveMQConnection) connection.getConnection(); - amqC.addTransportListener(new TransportListener() { + // Intercept exception propagation at the MockTransport level where it fires + // synchronously. ActiveMQConnection.addTransportListener() callbacks fire via + // executeAsync(), which silently drops the task if the pool's ExceptionListener + // closes the connection and shuts down the executor first (race condition that + // affects the XA path). + final MockTransport mockTransport = (MockTransport) amqC.getTransportChannel().narrow(MockTransport.class); + final TransportListener originalListener = mockTransport.getTransportListener(); + mockTransport.setTransportListener(new TransportListener() { public void onCommand(Object command) { + originalListener.onCommand(command); } public void onException(IOException error) { - // we know connection is dead... - // listeners are fired async + // fires synchronously when MockTransport.onException() is called gotExceptionEvent.countDown(); + originalListener.onException(error); } public void transportInterupted() { + originalListener.transportInterupted(); } public void transportResumed() { + originalListener.transportResumed(); } }); @@ -116,18 +127,21 @@ public void transportResumed() { TestCase.fail("Expected Error"); } catch (JMSException e) { } - // Wait for async exception event BEFORE the try-with-resources closes the connection. - // ActiveMQConnection.onException() fires TransportListener callbacks via executeAsync(), - // so the callback runs in a separate thread. If we wait after connection.close(), the - // async executor may already be shut down and the callback never fires. - TestCase.assertTrue("exception event propagated ok", gotExceptionEvent.await(15, TimeUnit.SECONDS)); + TestCase.assertTrue("exception event propagated ok", gotExceptionEvent.await(5, TimeUnit.SECONDS)); } - // If we get another connection now it should be a new connection that - // works. + // After the failure, a new connection from the pool should work. + // The pool eviction is async (ExceptionListener fires via executeAsync), + // so retry until the pool returns a working connection. LOG.info("expect new connection after failure"); - try (final Connection connection2 = pooledFactory.createConnection()) { - sendMessage(connection2); - } + assertTrue("pool should provide working connection after eviction", + Wait.waitFor(() -> { + try (final Connection connection2 = pooledFactory.createConnection()) { + sendMessage(connection2); + return true; + } catch (Exception e) { + return false; + } + }, 5000, 100)); } private void createConnectionFailure(Connection connection) throws Exception { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java index 37b6cdeb260..4ec8c592318 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java @@ -41,6 +41,7 @@ import org.junit.Test; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.DestinationStatistics; @@ -171,6 +172,12 @@ public Receiver(jakarta.jms.Destination dest, boolean transactional) throws JMSE this.transactional = transactional; ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); connectionFactory.setWatchTopicAdvisories(false); + if (transactional) { + final ActiveMQPrefetchPolicy policy = connectionFactory.getPrefetchPolicy(); + policy.setQueuePrefetch(1); + policy.setTopicPrefetch(1); + policy.setDurableTopicPrefetch(1); + } connection = connectionFactory.createConnection(); connection.setClientID(dest.toString()); session = connection.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); @@ -194,7 +201,7 @@ public long getNextExpectedSeqNo() { final int TRANSACITON_BATCH = 500; boolean resumeOnNextOrPreviousIsOk = false; - public void onMessage(Message message) { + public synchronized void onMessage(Message message) { try { final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY); if ((seqNum % TRANSACITON_BATCH) == 0) { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/StoreOrderTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/StoreOrderTest.java index 58c853e1bad..8dfe3da99b6 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/StoreOrderTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/StoreOrderTest.java @@ -120,6 +120,7 @@ public void stopBroker() throws Exception { } if (broker != null) { broker.stop(); + broker.waitUntilStopped(); } } @@ -257,6 +258,7 @@ protected BrokerService startBroker(boolean deleteMessagesOnStartup) throws Exce configureBroker(newBroker); newBroker.setDeleteAllMessagesOnStartup(deleteMessagesOnStartup); newBroker.start(); + newBroker.waitUntilStarted(); return newBroker; } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java index 5b7a3a417fe..26e5784087e 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java @@ -275,12 +275,7 @@ public void testStatsAndBrowseAfterAckPreparedRolledback() throws Exception { dumpMessages(); - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return proxy.getInFlightCount() == 0l; - } - }); + Wait.waitFor(() -> proxy.getInFlightCount() == 0L && proxy.cursorSize() == 0); assertEquals("prefetch", 0, proxy.getInFlightCount()); assertEquals("size", 10, proxy.getQueueSize()); assertEquals("cursor size", 0, proxy.cursorSize()); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java index ed6635480d5..77aafacbab1 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java @@ -55,6 +55,9 @@ public class MaxFrameSizeEnabledTest { private static final int CONNECTION_COUNT = 3; private static final int MESSAGE_ATTEMPTS = 3; private static final int BODY_SIZE = 20000; // large enough to trip 2k limit, compressible enough for 60k + private static final long BROKER_START_TIMEOUT_MS = 30_000; + private static final long BROKER_STOP_TIMEOUT_MS = 30_000; + private static final int TEST_TIMEOUT_MS = 120_000; private BrokerService broker; private final String transportType; @@ -158,30 +161,32 @@ public void after() throws Exception { } public BrokerService createBroker(String connectorName, String connectorString) throws Exception { - BrokerService broker = new BrokerService(); + final BrokerService broker = new BrokerService(); broker.setPersistent(false); broker.setUseJmx(false); - TransportConnector connector = broker.addConnector(connectorString); + final TransportConnector connector = broker.addConnector(connectorString); connector.setName(connectorName); broker.start(); - broker.waitUntilStarted(); + assertTrue("Broker should start within timeout", + Wait.waitFor(broker::isStarted, BROKER_START_TIMEOUT_MS, 100)); return broker; } public void stopBroker(BrokerService broker) throws Exception { if (broker != null) { broker.stop(); - broker.waitUntilStopped(); + assertTrue("Broker should stop within timeout", + Wait.waitFor(broker::isStopped, BROKER_STOP_TIMEOUT_MS, 100)); } } - @Test + @Test(timeout = TEST_TIMEOUT_MS) public void testMaxFrameSize() throws Exception { broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=2048" + getServerParams()); testMaxFrameSize(transportType, getClientUri(broker.getConnectorByName(transportType).getConnectUri().getPort()), false); } - @Test + @Test(timeout = TEST_TIMEOUT_MS) public void testMaxFrameSizeCompression() throws Exception { // Test message body length is 99841 bytes. Compresses to ~ 48000 broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=60000" + getServerParams()); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java index 7eaefd23bde..bf3ab74a9ae 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java @@ -35,6 +35,7 @@ public class RestrictedThreadPoolInactivityTimeoutTest extends JmsTestSupport { private static final Logger LOG = LoggerFactory.getLogger(RestrictedThreadPoolInactivityTimeoutTest.class); + private static final int TEST_TIMEOUT_MS = 120_000; public String brokerTransportScheme = "tcp"; public Boolean rejectWork = Boolean.FALSE; @@ -86,6 +87,7 @@ public void initCombosForTestThreadsInvolvedInXInactivityTimeouts() { addCombinationValues("rejectWork", new Object[] {Boolean.TRUE, Boolean.FALSE}); } + @org.junit.Test(timeout = TEST_TIMEOUT_MS) public void testThreadsInvolvedInXInactivityTimeouts() throws Exception { URI tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(0).getConnectUri()); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java index 4c5b1937413..7ebe48cc1d3 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java @@ -199,15 +199,23 @@ public void testPriorityMessagesWithJmsBrowser() throws Exception { assertNotNull(message); assertEquals(5, message.getJMSPriority()); - // consume messages - final ArrayList consumeList = consumeMessages("TestQ"); + // Wait for remaining messages to be fully available after consumeOneMessage closes its connection. + // With lazyDispatch=true + optimizedDispatch=true, messages may briefly be in-flight + // during connection teardown and not yet re-queued for dispatch to a new consumer. + final int remaining = numToSend - 1; + assertTrue("Remaining messages available for dispatch", Wait.waitFor(() -> { + final Queue q = (Queue) broker.getDestination(destination); + return q != null + && q.getDestinationStatistics().getMessages().getCount() == remaining + && q.getDestinationStatistics().getInflight().getCount() == 0; + }, 5000, 100)); + + // consume messages (use timeout-based overload for reliable dispatch on slow CI) + final ArrayList consumeList = consumeMessages("TestQ", remaining, TimeUnit.SECONDS.toMillis(30)); LOG.info("Consumed list {}", consumeList.size()); - // compare lists - // assertEquals("Iteration: " + i - // +", message 1 should be priority high", 5, - // consumeList.get(0).getJMSPriority()); - for (int j = 1; j < (numToSend - 1); j++) { + assertEquals("Iteration: " + i + ", all remaining messages consumed", remaining, consumeList.size()); + for (int j = 0; j < consumeList.size(); j++) { assertEquals("Iteration: " + i + ", message " + j + " should be priority medium", 4, consumeList.get(j).getJMSPriority()); } }