diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 8a1a300daec..2af6c174d60 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -25,7 +25,6 @@ import org.apache.activemq.broker.region.policy.MessageEvictionStrategy; import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy; import org.apache.activemq.command.*; -import org.apache.activemq.management.MessageFlowStats; import org.apache.activemq.thread.Scheduler; import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.transport.TransmitCallback; @@ -166,7 +165,7 @@ public void add(MessageReference node) throws Exception { if (maximumPendingMessages > 0 && maximumPendingMessages < max) { max = maximumPendingMessages; } - if (!matched.isEmpty() && matched.size() > max) { + if (messageEvictionStrategy.isExpiryScanEnabled() && !matched.isEmpty() && matched.size() > max) { removeExpiredMessages(); } // lets discard old messages as we are a slow consumer diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java index 334cb3730ff..62ad2d7e8c4 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java @@ -42,4 +42,18 @@ public interface MessageEvictionStrategy { */ int getEvictExpiredMessagesHighWatermark(); + /** + * Returns whether the eager expired-message scan is enabled. + *
+ * When {@code false}, the O(n) scan inside + * {@link org.apache.activemq.broker.region.TopicSubscription#add} is skipped entirely. + * Set to {@code false} when messages carry no TTL, or when the scan cost outweighs + * the benefit of eagerly evicting expired messages from slow-consumer buffers. + *
+ * See {@link MessageEvictionStrategySupport} for the default implementation that returns {@code true}. + * + * @return {@code true} if the expiry scan is enabled (default), {@code false} if skipped + */ + boolean isExpiryScanEnabled(); + } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategySupport.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategySupport.java index 32f0c6f0c03..b6ac52367aa 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategySupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategySupport.java @@ -24,6 +24,7 @@ public abstract class MessageEvictionStrategySupport implements MessageEvictionStrategy { private int evictExpiredMessagesHighWatermark = 1000; + private boolean expiryScanEnabled = true; public int getEvictExpiredMessagesHighWatermark() { return evictExpiredMessagesHighWatermark; @@ -35,6 +36,27 @@ public int getEvictExpiredMessagesHighWatermark() { public void setEvictExpiredMessagesHighWatermark(int evictExpiredMessagesHighWaterMark) { this.evictExpiredMessagesHighWatermark = evictExpiredMessagesHighWaterMark; } - - + + @Override + public boolean isExpiryScanEnabled() { + return expiryScanEnabled; + } + + /** + * Controls whether the broker performs an eager expired-message scan when a + * non-durable topic subscription's pending slow-consumer buffer exceeds + * {@link #getEvictExpiredMessagesHighWatermark()}. + *
+ * Set to {@code false} when messages carry no TTL, or when the O(n) scan cost + * outweighs the benefit of eagerly evicting expired messages from slow-consumer + * buffers. When messages have no TTL, every scan iterates the full buffer without + * removing anything, adding latency to every enqueue once the buffer exceeds the + * high-water mark. + * + * @param expiryScanEnabled {@code false} to skip the scan; {@code true} to enable it (default) + */ + public void setExpiryScanEnabled(boolean expiryScanEnabled) { + this.expiryScanEnabled = expiryScanEnabled; + } + } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/TopicSubscriptionEnableExpiryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/TopicSubscriptionEnableExpiryTest.java new file mode 100644 index 00000000000..f29fe80c3e9 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/TopicSubscriptionEnableExpiryTest.java @@ -0,0 +1,395 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region; + +import java.util.ArrayList; +import java.util.List; + +import jakarta.jms.Connection; +import jakarta.jms.MessageConsumer; +import jakarta.jms.MessageProducer; +import jakarta.jms.Session; +import jakarta.jms.TextMessage; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy; +import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQTopic; + +import junit.framework.TestCase; +import org.junit.experimental.categories.Category; +import org.apache.activemq.test.annotations.ParallelTest; + +/** + * Tests correctness of the {@code expiryScanEnabled} feature on + * {@link org.apache.activemq.broker.region.policy.MessageEvictionStrategy} and its effect on + * {@link TopicSubscription}. + * + *
Background: when a slow-consumer queue exceeds + * {@code evictExpiredMessagesHighWatermark} (default: 1000), ActiveMQ calls + * {@code TopicSubscription.removeExpiredMessages()} on every single + * {@code add()} call. That method iterates every pending message checking + * {@code isExpired()} — an O(n) scan. Setting {@code expiryScanEnabled=false} + * on the {@link org.apache.activemq.broker.region.policy.MessageEvictionStrategy} skips that + * scan entirely via a single boolean check guarding the call site. + */ +@Category(ParallelTest.class) +public class TopicSubscriptionEnableExpiryTest extends TestCase { + + // ------------------------------------------------------------------------- + // Unit tests — no broker needed + // ------------------------------------------------------------------------- + + /** + * {@link OldestMessageEvictionStrategy#isExpiryScanEnabled()} must default to {@code true} so + * that existing deployments that do not set the property are unaffected. + */ + public void testEvictionStrategyExpiryScanDefaultsToTrue() { + OldestMessageEvictionStrategy strategy = new OldestMessageEvictionStrategy(); + assertTrue("expiryScanEnabled must default to true (preserves existing behaviour)", + strategy.isExpiryScanEnabled()); + } + + public void testEvictionStrategySetExpiryScanFalse() { + OldestMessageEvictionStrategy strategy = new OldestMessageEvictionStrategy(); + strategy.setExpiryScanEnabled(false); + assertFalse("expiryScanEnabled should be false after setter call", + strategy.isExpiryScanEnabled()); + } + + public void testEvictionStrategySetExpiryScanRoundTrip() { + OldestMessageEvictionStrategy strategy = new OldestMessageEvictionStrategy(); + strategy.setExpiryScanEnabled(false); + assertFalse(strategy.isExpiryScanEnabled()); + strategy.setExpiryScanEnabled(true); + assertTrue(strategy.isExpiryScanEnabled()); + } + + /** + * {@link TopicSubscription} must pick up the eviction strategy flag — when + * {@code expiryScanEnabled=false} is set on the strategy the scan is skipped. + */ + public void testTopicSubscriptionUsesStrategyExpiryScanFlag() throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.start(); + try { + TopicSubscription sub = buildMinimalTopicSubscription(broker); + // default strategy — expiry scan enabled + assertTrue("default strategy must have expiryScanEnabled=true", + sub.getMessageEvictionStrategy().isExpiryScanEnabled()); + + OldestMessageEvictionStrategy strategy = new OldestMessageEvictionStrategy(); + strategy.setExpiryScanEnabled(false); + sub.setMessageEvictionStrategy(strategy); + assertFalse("strategy with expiryScanEnabled=false must reflect on subscription", + sub.getMessageEvictionStrategy().isExpiryScanEnabled()); + } finally { + broker.stop(); + } + } + + // ------------------------------------------------------------------------- + // PolicyEntry propagation tests + // ------------------------------------------------------------------------- + + /** + * When a {@link PolicyEntry} is configured with an eviction strategy that has + * {@code expiryScanEnabled=false}, {@code PolicyEntry.configure(TopicSubscription)} + * must propagate the strategy so the O(n) expiry scan is skipped. + */ + public void testPolicyEntryPropagatesEvictionStrategyToSubscription() throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.start(); + try { + ConstantPendingMessageLimitStrategy limitStrategy = new ConstantPendingMessageLimitStrategy(); + limitStrategy.setLimit(2000); + + OldestMessageEvictionStrategy evictionStrategy = new OldestMessageEvictionStrategy(); + evictionStrategy.setExpiryScanEnabled(false); + + PolicyEntry entry = new PolicyEntry(); + entry.setPendingMessageLimitStrategy(limitStrategy); + entry.setMessageEvictionStrategy(evictionStrategy); + + TopicSubscription sub = buildMinimalTopicSubscription(broker); + assertTrue(sub.getMessageEvictionStrategy().isExpiryScanEnabled()); // default + + entry.configure(broker.getBroker(), broker.getSystemUsage(), sub); + + assertFalse("PolicyEntry.configure() must propagate eviction strategy with expiryScanEnabled=false", + sub.getMessageEvictionStrategy().isExpiryScanEnabled()); + } finally { + broker.stop(); + } + } + + /** + * When the default eviction strategy is used (no override on PolicyEntry), + * the subscription's expiry scan must remain enabled. + */ + public void testDefaultPolicyEntryLeavesExpiryScanEnabled() throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.start(); + try { + ConstantPendingMessageLimitStrategy limitStrategy = new ConstantPendingMessageLimitStrategy(); + limitStrategy.setLimit(2000); + + PolicyEntry entry = new PolicyEntry(); + entry.setPendingMessageLimitStrategy(limitStrategy); + // no messageEvictionStrategy override — default OldestMessageEvictionStrategy used + + TopicSubscription sub = buildMinimalTopicSubscription(broker); + entry.configure(broker.getBroker(), broker.getSystemUsage(), sub); + + assertTrue("subscription must still have expiryScanEnabled=true when using the default eviction strategy", + sub.getMessageEvictionStrategy().isExpiryScanEnabled()); + } finally { + broker.stop(); + } + } + + /** + * When no {@link org.apache.activemq.broker.region.policy.PendingMessageLimitStrategy} is + * set at all, the subscription's expiry scan flag must remain at its default ({@code true}). + */ + public void testPolicyEntryWithNoStrategyLeavesExpiryScanEnabled() throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.start(); + try { + PolicyEntry entry = new PolicyEntry(); // no strategy, no eviction strategy override + + TopicSubscription sub = buildMinimalTopicSubscription(broker); + entry.configure(broker.getBroker(), broker.getSystemUsage(), sub); + + assertTrue("subscription must keep expiryScanEnabled=true when no strategy is configured", + sub.getMessageEvictionStrategy().isExpiryScanEnabled()); + } finally { + broker.stop(); + } + } + + /** + * A custom {@link org.apache.activemq.broker.region.policy.PendingMessageLimitStrategy} with no + * eviction strategy override must leave the subscription's expiry scan enabled. + */ + public void testCustomLimitStrategyWithDefaultEvictionLeavesExpiryScanEnabled() throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.start(); + try { + org.apache.activemq.broker.region.policy.PendingMessageLimitStrategy customStrategy = + subscription -> 500; + + PolicyEntry entry = new PolicyEntry(); + entry.setPendingMessageLimitStrategy(customStrategy); + // no eviction strategy override — default OldestMessageEvictionStrategy(expiryScanEnabled=true) + + TopicSubscription sub = buildMinimalTopicSubscription(broker); + entry.configure(broker.getBroker(), broker.getSystemUsage(), sub); + + assertTrue("A custom limit strategy with default eviction strategy must leave expiryScanEnabled=true", + sub.getMessageEvictionStrategy().isExpiryScanEnabled()); + } finally { + broker.stop(); + } + } + + // ------------------------------------------------------------------------- + // Integration tests — embedded broker, real JMS + // ------------------------------------------------------------------------- + + /** + * With {@code expiryScanEnabled=false} on the eviction strategy, messages + * with an explicit TTL that has passed must NOT be removed by the eager expiry + * scan. The messages remain in the pending queue and are only evicted by the + * normal eviction strategy when the limit is exceeded. + * + *
We verify this by: + *
When a topic consumer is slow (its pending queue exceeds + * {@code evictExpiredMessagesHighWatermark = 1000} by default), ActiveMQ calls + * {@link TopicSubscription#removeExpiredMessages()} on every single + * {@code add()} call. That method iterates every pending message to call + * {@code isExpired()} — an O(n) operation. With a pending limit of 5,000 + * that scan runs up to 5,000 iterations per message send, dominated by the + * Java heap iteration cost. + * + *
With {@code expiryScanEnabled=false} on the {@link org.apache.activemq.broker.region.policy.MessageEvictionStrategy} + * the scan body is skipped entirely via a single boolean check, reducing + * per-send work back to O(1). + * + *
The test asserts that the {@code expiryScanEnabled=false} run completes at + * least {@code MIN_SPEEDUP_FACTOR}× faster than the {@code expiryScanEnabled=true} + * run. A factor of 3 is deliberately conservative — in practice the + * improvement is often 50–200× for large queues with pure in-memory messages. + * + *
The test is annotated {@code @Category(ParallelTest.class)} so it runs
+ * in the normal CI suite, but uses a modest message count to keep wall-clock
+ * time acceptable on slow machines.
+ */
+@Category(ParallelTest.class)
+public class TopicSubscriptionEnableExpiryThroughputTest extends TestCase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TopicSubscriptionEnableExpiryThroughputTest.class);
+
+ /** Number of messages to send during the warm-up phase (fills queue above highWatermark). */
+ private static final int WARMUP_COUNT = 1200;
+
+ /**
+ * Number of messages timed during the measurement phase.
+ * Sending happens after the queue is already above 1000 (highWatermark),
+ * so every message triggers the expiry-scan code path.
+ */
+ private static final int TIMED_COUNT = 2000;
+
+ /** Pending message limit — large enough that O(n) scan is expensive. */
+ private static final int PENDING_LIMIT = 5000;
+
+ /**
+ * Minimum speedup factor we require for the test to pass.
+ * Conservative: real-world improvement is typically 50–200×.
+ */
+ private static final double MIN_SPEEDUP_FACTOR = 3.0;
+
+ // -------------------------------------------------------------------------
+
+ public void testEnableExpiryFalseIsFasterForSlowConsumer() throws Exception {
+ long msWithExpiry = measureSendTime(true);
+ long msWithoutExpiry = measureSendTime(false);
+
+ LOG.info("=== expiryScanEnabled throughput results ===");
+ LOG.info(" expiryScanEnabled=true : {} ms for {} timed messages ({} msg/s)",
+ msWithExpiry, TIMED_COUNT,
+ msWithExpiry > 0 ? (TIMED_COUNT * 1000L / msWithExpiry) : "n/a");
+ LOG.info(" expiryScanEnabled=false : {} ms for {} timed messages ({} msg/s)",
+ msWithoutExpiry, TIMED_COUNT,
+ msWithoutExpiry > 0 ? (TIMED_COUNT * 1000L / msWithoutExpiry) : "n/a");
+ LOG.info(" Speedup factor : {}", msWithExpiry > 0 ? String.format("%.1f×", (double) msWithExpiry / msWithoutExpiry) : "n/a");
+
+ // Guard against pathological results (e.g. CI machine starved)
+ // — only assert if the expiry run was genuinely slow (> 200 ms).
+ if (msWithExpiry > 200) {
+ double speedup = (double) msWithExpiry / Math.max(1, msWithoutExpiry);
+ assertTrue(
+ String.format(
+ "Expected expiryScanEnabled=false to be at least %.0f× faster than expiryScanEnabled=true, "
+ + "but got %.1f× (%d ms vs %d ms). "
+ + "This likely means the O(n) expiry scan is no longer being skipped.",
+ MIN_SPEEDUP_FACTOR, speedup, msWithoutExpiry, msWithExpiry),
+ speedup >= MIN_SPEEDUP_FACTOR);
+ } else {
+ LOG.warn("expiryScanEnabled=true run finished in only {} ms — machine may be too fast "
+ + "or warm-up count is too low to trigger the O(n) path reliably on this hardware. "
+ + "Skipping ratio assertion.", msWithExpiry);
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Helpers
+ // -------------------------------------------------------------------------
+
+ /**
+ * Starts a broker with the given {@code expiryScanEnabled} setting, creates a
+ * slow consumer (prefetch=1, never reads), sends {@code WARMUP_COUNT}
+ * messages to fill the pending queue above the eviction high-water mark,
+ * then times sending {@code TIMED_COUNT} additional messages.
+ *
+ * @return wall-clock milliseconds for the timed phase
+ */
+ private long measureSendTime(boolean expiryScanEnabled) throws Exception {
+ String brokerName = "perf-" + (expiryScanEnabled ? "expiry-on" : "expiry-off");
+ BrokerService broker = buildBroker(brokerName, PENDING_LIMIT, expiryScanEnabled);
+ try {
+ ActiveMQConnectionFactory cf =
+ new ActiveMQConnectionFactory("vm://" + brokerName + "?create=false");
+ Connection conn = cf.createConnection();
+ conn.start();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ ActiveMQTopic topic = new ActiveMQTopic("PERF.TOPIC");
+
+ // Create a consumer but never call receive() — this makes it slow.
+ // prefetch=1 so messages pile up in the broker's pending queue.
+ ActiveMQTopic topicWithPrefetch = new ActiveMQTopic("PERF.TOPIC?consumer.prefetchSize=1");
+ MessageConsumer consumer = session.createConsumer(topicWithPrefetch);
+
+ MessageProducer producer = session.createProducer(topic);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ // ---- Warm-up phase: fill pending queue above the high-water mark (1000) ----
+ for (int i = 0; i < WARMUP_COUNT; i++) {
+ producer.send(session.createTextMessage("warmup-" + i));
+ }
+
+ // ---- Timed phase: every add() triggers the expiry-scan code path ----
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < TIMED_COUNT; i++) {
+ producer.send(session.createTextMessage("timed-" + i));
+ }
+ long elapsed = System.currentTimeMillis() - start;
+
+ conn.close();
+ return elapsed;
+ } finally {
+ broker.stop();
+ }
+ }
+
+ private BrokerService buildBroker(String brokerName, int pendingLimit, boolean expiryScanEnabled)
+ throws Exception {
+ BrokerService broker = new BrokerService();
+ broker.setPersistent(false);
+ broker.setUseJmx(false);
+ broker.setBrokerName(brokerName);
+ broker.addConnector("vm://" + brokerName);
+ broker.setDeleteAllMessagesOnStartup(true);
+
+ ConstantPendingMessageLimitStrategy strategy = new ConstantPendingMessageLimitStrategy();
+ strategy.setLimit(pendingLimit);
+
+ OldestMessageEvictionStrategy evictionStrategy = new OldestMessageEvictionStrategy();
+ evictionStrategy.setExpiryScanEnabled(expiryScanEnabled);
+
+ PolicyEntry entry = new PolicyEntry();
+ entry.setTopic(">");
+ entry.setTopicPrefetch(1);
+ entry.setPendingMessageLimitStrategy(strategy);
+ entry.setMessageEvictionStrategy(evictionStrategy);
+ entry.setDeadLetterStrategy(null);
+
+ List