Skip to content

#2020 Add enableExpiry option to PolicyEntry to skip O(n) expiry scan#2019

Open
mwashburn2 wants to merge 6 commits into
apache:mainfrom
mwashburn2:feature/topic-subscription-enable-expiry
Open

#2020 Add enableExpiry option to PolicyEntry to skip O(n) expiry scan#2019
mwashburn2 wants to merge 6 commits into
apache:mainfrom
mwashburn2:feature/topic-subscription-enable-expiry

Conversation

@mwashburn2
Copy link
Copy Markdown

When a slow-consumer's pending-message buffer exceeds the high-water mark TopicSubscription.add() calls removeExpiredMessages() on every single message add. That method iterates every pending message and calls isExpired(), which is an O(n) scan over the full buffer. When messages carry no TTL (isExpired() always returns false), this scan provides no benefit -- it iterates the entire buffer on every add with zero messages removed. With a large pending limit (e.g. 20,000), this adds up to millions of no-op iterations per second on busy topics. This commit adds an enableExpiry boolean (default true) to PolicyEntry. Setting it to false inserts a single guard in TopicSubscription.add():
if (enableExpiry && !matched.isEmpty() && matched.size() > max) {
removeExpiredMessages();
}

Configuration in activemq.xml:





Also adds:

  • TopicSubscriptionEnableExpiryTest: 11 correctness/propagation/integration tests
  • TopicSubscriptionEnableExpiryThroughputTest: throughput comparison test

…picSubscription

When a slow-consumer's pending-message buffer exceeds the high-water mark
(default: 1000 messages), TopicSubscription.add() calls removeExpiredMessages()
on every single message add. That method iterates every pending message and
calls isExpired(), which is an O(n) scan over the full buffer.
When messages carry no TTL (isExpired() always returns false), this scan
provides no benefit -- it iterates the entire buffer on every add with zero
messages removed. With a large pending limit (e.g. 20,000), this adds up to
millions of no-op iterations per second on busy topics.
This commit adds an enableExpiry boolean (default true) to PolicyEntry.
Setting it to false inserts a single guard in TopicSubscription.add():
    if (enableExpiry && !matched.isEmpty() && matched.size() > max) {
        removeExpiredMessages();
    }
The flag lives on PolicyEntry -- the existing home for all subscription
policy configuration -- rather than on individual PendingMessageLimitStrategy
implementations, keeping the strategies as pure limit calculators.
Configuration in activemq.xml:
    <policyEntry topic=">" enableExpiry="false">
        <pendingMessageLimitStrategy>
            <constantPendingMessageLimitStrategy limit="20000"/>
        </pendingMessageLimitStrategy>
    </policyEntry>
Also adds:
- TopicSubscriptionEnableExpiryTest: 11 correctness/propagation/integration tests
- TopicSubscriptionEnableExpiryThroughputTest: throughput comparison test
@mattrpav mattrpav self-requested a review May 14, 2026 16:57

private MessageInterceptorStrategy messageInterceptorStrategy = null;

private boolean enableExpiry = true;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, setting the expireMessagesPeriod to zero disables expiry checking at the policyEntry level.

If the desire is to have a way to disable exipry only for the evictionStrategy, then the option should be added to that interface and impls

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps I should rename the variable expiryEnabledOnEnqueue? I'm just trying to short circuit the O(N) operation on enqueue. Is there a place i'm supposed to update an XSD or the Documentation for this new config?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see now the documentation is auto built

Copy link
Copy Markdown
Contributor

@mattrpav mattrpav May 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The XSD is automatically generated by the build.

I think a config flag 'expiryEnabled' to MessageEvictionStrategy is the best place. The section of code that this applies is within the guard that essentially tells the topic 'go do message eviction strategy stuff'

Also, the subscription automatically get a handle to the MessageEvictionStrategy, so no need to add anything or change signature on the TopicSubscription itself.

if (maximumPendingMessages > 0) {
...

Updating the doc is always welcome!

website repo: https://github.com/apache/activemq-website
page ref: https://activemq.apache.org/components/classic/documentation/slow-consumer-handling

private int maximumPendingMessages = -1;
private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
private final AtomicInteger discarded = new AtomicInteger();
private boolean enableExpiry = true;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a good addition to TopicSubscription so it may optimize dispatching.

Nit: please rename 'enableExpiry' to 'expiryEnabled' so we get the isExpiryEnabled() linguistics.

Copy link
Copy Markdown
Contributor

@mattrpav mattrpav left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went ahead and created a GH issue for this: #2020.

Please reference [#2020] as prefix to commit message

@mattrpav mattrpav self-assigned this May 14, 2026
@mwashburn2 mwashburn2 changed the title Add enableExpiry option to PolicyEntry to skip O(n) expiry scan #2020 Add enableExpiry option to PolicyEntry to skip O(n) expiry scan May 14, 2026
@mwashburn2 mwashburn2 requested a review from mattrpav May 14, 2026 18:59
this.enableAudit = enableAudit;
}


Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove blank space


private MessageInterceptorStrategy messageInterceptorStrategy = null;


Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove blank space

this.maximumPendingMessages = maximumPendingMessages;
}


Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove blank space

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed all 3

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: Backlog

Development

Successfully merging this pull request may close these issues.

2 participants