Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.activemq.test.TestSupport;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.mock.MockTransport;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -94,17 +95,27 @@ public void doTestEviction(ConnectionFactory pooledFactory) throws Exception {
final CountDownLatch gotExceptionEvent = new CountDownLatch(1);
try (final PooledConnection connection = (PooledConnection) pooledFactory.createConnection()) {
final ActiveMQConnection amqC = (ActiveMQConnection) connection.getConnection();
amqC.addTransportListener(new TransportListener() {
// Intercept exception propagation at the MockTransport level where it fires
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #1666

// synchronously. ActiveMQConnection.addTransportListener() callbacks fire via
// executeAsync(), which silently drops the task if the pool's ExceptionListener
// closes the connection and shuts down the executor first (race condition that
// affects the XA path).
final MockTransport mockTransport = (MockTransport) amqC.getTransportChannel().narrow(MockTransport.class);
final TransportListener originalListener = mockTransport.getTransportListener();
mockTransport.setTransportListener(new TransportListener() {
public void onCommand(Object command) {
originalListener.onCommand(command);
}
public void onException(IOException error) {
// we know connection is dead...
// listeners are fired async
// fires synchronously when MockTransport.onException() is called
gotExceptionEvent.countDown();
originalListener.onException(error);
}
public void transportInterupted() {
originalListener.transportInterupted();
}
public void transportResumed() {
originalListener.transportResumed();
}
});

Expand All @@ -116,18 +127,21 @@ public void transportResumed() {
TestCase.fail("Expected Error");
} catch (JMSException e) {
}
// Wait for async exception event BEFORE the try-with-resources closes the connection.
// ActiveMQConnection.onException() fires TransportListener callbacks via executeAsync(),
// so the callback runs in a separate thread. If we wait after connection.close(), the
// async executor may already be shut down and the callback never fires.
TestCase.assertTrue("exception event propagated ok", gotExceptionEvent.await(15, TimeUnit.SECONDS));
TestCase.assertTrue("exception event propagated ok", gotExceptionEvent.await(5, TimeUnit.SECONDS));
}
// If we get another connection now it should be a new connection that
// works.
// After the failure, a new connection from the pool should work.
// The pool eviction is async (ExceptionListener fires via executeAsync),
// so retry until the pool returns a working connection.
LOG.info("expect new connection after failure");
try (final Connection connection2 = pooledFactory.createConnection()) {
sendMessage(connection2);
}
assertTrue("pool should provide working connection after eviction",
Wait.waitFor(() -> {
try (final Connection connection2 = pooledFactory.createConnection()) {
sendMessage(connection2);
return true;
} catch (Exception e) {
return false;
}
}, 5000, 100));
}

private void createConnectionFailure(Connection connection) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.junit.Test;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics;
Expand Down Expand Up @@ -171,6 +172,12 @@ public Receiver(jakarta.jms.Destination dest, boolean transactional) throws JMSE
this.transactional = transactional;
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
connectionFactory.setWatchTopicAdvisories(false);
if (transactional) {
final ActiveMQPrefetchPolicy policy = connectionFactory.getPrefetchPolicy();
policy.setQueuePrefetch(1);
policy.setTopicPrefetch(1);
policy.setDurableTopicPrefetch(1);
}
connection = connectionFactory.createConnection();
connection.setClientID(dest.toString());
session = connection.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
Expand All @@ -194,7 +201,7 @@ public long getNextExpectedSeqNo() {

final int TRANSACITON_BATCH = 500;
boolean resumeOnNextOrPreviousIsOk = false;
public void onMessage(Message message) {
public synchronized void onMessage(Message message) {
try {
final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY);
if ((seqNum % TRANSACITON_BATCH) == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public void stopBroker() throws Exception {
}
if (broker != null) {
broker.stop();
broker.waitUntilStopped();
}
}

Expand Down Expand Up @@ -257,6 +258,7 @@ protected BrokerService startBroker(boolean deleteMessagesOnStartup) throws Exce
configureBroker(newBroker);
newBroker.setDeleteAllMessagesOnStartup(deleteMessagesOnStartup);
newBroker.start();
newBroker.waitUntilStarted();
return newBroker;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,7 @@ public void testStatsAndBrowseAfterAckPreparedRolledback() throws Exception {

dumpMessages();

Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return proxy.getInFlightCount() == 0l;
}
});
Wait.waitFor(() -> proxy.getInFlightCount() == 0L && proxy.cursorSize() == 0);
assertEquals("prefetch", 0, proxy.getInFlightCount());
assertEquals("size", 10, proxy.getQueueSize());
assertEquals("cursor size", 0, proxy.cursorSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ public class MaxFrameSizeEnabledTest {
private static final int CONNECTION_COUNT = 3;
private static final int MESSAGE_ATTEMPTS = 3;
private static final int BODY_SIZE = 20000; // large enough to trip 2k limit, compressible enough for 60k
private static final long BROKER_START_TIMEOUT_MS = 30_000;
private static final long BROKER_STOP_TIMEOUT_MS = 30_000;
private static final int TEST_TIMEOUT_MS = 120_000;

private BrokerService broker;
private final String transportType;
Expand Down Expand Up @@ -158,30 +161,32 @@ public void after() throws Exception {
}

public BrokerService createBroker(String connectorName, String connectorString) throws Exception {
BrokerService broker = new BrokerService();
final BrokerService broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(false);
TransportConnector connector = broker.addConnector(connectorString);
final TransportConnector connector = broker.addConnector(connectorString);
connector.setName(connectorName);
broker.start();
broker.waitUntilStarted();
assertTrue("Broker should start within timeout",
Wait.waitFor(broker::isStarted, BROKER_START_TIMEOUT_MS, 100));
return broker;
}

public void stopBroker(BrokerService broker) throws Exception {
if (broker != null) {
broker.stop();
broker.waitUntilStopped();
assertTrue("Broker should stop within timeout",
Wait.waitFor(broker::isStopped, BROKER_STOP_TIMEOUT_MS, 100));
}
}

@Test
@Test(timeout = TEST_TIMEOUT_MS)
public void testMaxFrameSize() throws Exception {
broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=2048" + getServerParams());
testMaxFrameSize(transportType, getClientUri(broker.getConnectorByName(transportType).getConnectUri().getPort()), false);
}

@Test
@Test(timeout = TEST_TIMEOUT_MS)
public void testMaxFrameSizeCompression() throws Exception {
// Test message body length is 99841 bytes. Compresses to ~ 48000
broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=60000" + getServerParams());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

public class RestrictedThreadPoolInactivityTimeoutTest extends JmsTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(RestrictedThreadPoolInactivityTimeoutTest.class);
private static final int TEST_TIMEOUT_MS = 120_000;

public String brokerTransportScheme = "tcp";
public Boolean rejectWork = Boolean.FALSE;
Expand Down Expand Up @@ -86,6 +87,7 @@ public void initCombosForTestThreadsInvolvedInXInactivityTimeouts() {
addCombinationValues("rejectWork", new Object[] {Boolean.TRUE, Boolean.FALSE});
}

@org.junit.Test(timeout = TEST_TIMEOUT_MS)
public void testThreadsInvolvedInXInactivityTimeouts() throws Exception {

URI tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(0).getConnectUri());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,15 +199,23 @@ public void testPriorityMessagesWithJmsBrowser() throws Exception {
assertNotNull(message);
assertEquals(5, message.getJMSPriority());

// consume messages
final ArrayList<Message> consumeList = consumeMessages("TestQ");
// Wait for remaining messages to be fully available after consumeOneMessage closes its connection.
// With lazyDispatch=true + optimizedDispatch=true, messages may briefly be in-flight
// during connection teardown and not yet re-queued for dispatch to a new consumer.
final int remaining = numToSend - 1;
assertTrue("Remaining messages available for dispatch", Wait.waitFor(() -> {
final Queue q = (Queue) broker.getDestination(destination);
return q != null
&& q.getDestinationStatistics().getMessages().getCount() == remaining
&& q.getDestinationStatistics().getInflight().getCount() == 0;
}, 5000, 100));

// consume messages (use timeout-based overload for reliable dispatch on slow CI)
final ArrayList<Message> consumeList = consumeMessages("TestQ", remaining, TimeUnit.SECONDS.toMillis(30));
LOG.info("Consumed list {}", consumeList.size());

// compare lists
// assertEquals("Iteration: " + i
// +", message 1 should be priority high", 5,
// consumeList.get(0).getJMSPriority());
for (int j = 1; j < (numToSend - 1); j++) {
assertEquals("Iteration: " + i + ", all remaining messages consumed", remaining, consumeList.size());
for (int j = 0; j < consumeList.size(); j++) {
assertEquals("Iteration: " + i + ", message " + j + " should be priority medium", 4, consumeList.get(j).getJMSPriority());
}
}
Expand Down