Skip to content

fix: Correctly handle consumer errors and removal due to pulsar cluster restarts by unconditionally update_topics in MultiTopicConsumer#384

Merged
BewareMyPower merged 1 commit intostreamnative:masterfrom
chamons:fix_another_multi_consumer_drop
Jan 13, 2026
Merged

fix: Correctly handle consumer errors and removal due to pulsar cluster restarts by unconditionally update_topics in MultiTopicConsumer#384
BewareMyPower merged 1 commit intostreamnative:masterfrom
chamons:fix_another_multi_consumer_drop

Conversation

@chamons
Copy link
Contributor

@chamons chamons commented Jan 9, 2026

While testing #379 it was found in that cases when the entire pulsar broker was restarted, we'd hit a similar case:

  • Instead of hitting Some(Err(e) at
    Poll::Ready(Some(Err(e))) => {
    error!(
    "Unexpected error consuming from pulsar topic {}: {}",
    &topic, e
    );
    // Only remove topic from MultiTopicConsumer on error if they
    // can be re-added later by regex
    if self.topic_regex.is_some() {
    topics_to_remove.push(topic.clone());
    }
    }
  • We hit None at
    Poll::Ready(None) => {
    error!("Unexpected end of stream for pulsar topic {}", &topic);
    topics_to_remove.push(topic.clone());

This means we still rip out the topic out of the MultiTopicConsumer forever.

I originally was going to apply a similar fix to #379, but review of the code made me realize that there was code in:

https://github.com/streamnative/pulsar-rs/blob/a14e8a15144a48d7d97b20c6a7fc637cbf5d780b/src/consumer/multi.rs#L145C12-L145C25

That actually handle keeping the initial topics around by recreating them.

It however, wasn't being called because the call was guarded by a topic_regex existence check.

I believe it is necessary for this code to be unconditionally called, because errors in consumers can drop them from a MultiTopicConsumer and we need to recreate.

Local testing confirmed that it works, with 5 full restarts of our staging pulsar cluster while under loading not hanging up once.

…er restarts by unconditionally update_topics in MultiTopicConsumer

While testing streamnative#379 it was found in that cases when the entire
pulsar broker was restarted, we'd hit a similar case:

- Instead of hitting Some(Err(e) at https://github.com/streamnative/pulsar-rs/blob/a14e8a15144a48d7d97b20c6a7fc637cbf5d780b/src/consumer/multi.rs#L381-L391
- We hit None at https://github.com/streamnative/pulsar-rs/blob/a14e8a15144a48d7d97b20c6a7fc637cbf5d780b/src/consumer/multi.rs#L377-L379

This means we still rip out the topic out of the MultiTopicConsumer forever.

I originally was going to apply a similar fix to streamnative#379, but review of the code made me realize that there was code in:

https://github.com/streamnative/pulsar-rs/blob/a14e8a15144a48d7d97b20c6a7fc637cbf5d780b/src/consumer/multi.rs#L145C12-L145C25

That actually handle keeping the initial topics around by recreating them.

It however, wasn't being called because the call was guarded by a topic_regex existence check.

I believe it is necessary for this code to be unconditionally called, because errors in consumers can drop them from a MultiTopicConsumer and we need to recreate.

Local testing confirmed that it works, with 5 full restarts of our staging pulsar cluster while under loading not hanging up once.
@chamons
Copy link
Contributor Author

chamons commented Jan 9, 2026

@BewareMyPower - You reviewed my previous PR over this issue. I believe this is the real proper fix.

@chamons
Copy link
Contributor Author

chamons commented Jan 9, 2026

@mdeltito - FYI

@mdeltito
Copy link

@chamons Excellent find! This clears up my confusion as well - the removal of topics on fatal errors makes a lot more sense when there is a consistent, out-of-band mechanism for ensuring a connection/consumer exists for all topics.

I suppose this does mean for the (likely much more common) case of static topics, we end up hitting the broker a lot more often to fetch the partitioned topic metadata. Unclear whether that is a "problem" or not; there is not much to go off in the PR or commit that introduced the original change, as I'm sure you saw.

@chamons
Copy link
Contributor Author

chamons commented Jan 12, 2026

That is concerning that it was originally removed for what appears to be performance reasons (?), however if any broker issue can cause the topics to be removed then I think was strictly need to do those refreshes, else you can drop some (or in one case locally, all) topics out of a multi consumer.

A once a 60 second query for all of the topics can't be that bad, right? (Famous last words)

@mdeltito
Copy link

Totally agree, this behavior and erring the side of being chatty with the broker seems preferable to the current state.

A once a 60 second query for all of the topics can't be that bad, right?

I too want to believe 😄 This also handles the case where a topic for a static subscription has the number of partitions increased. For our use-case we are either forced to use a regex subscription and leverage that refresh interval to pick up the new partition(s), or restart consumers with a static subscription.

@chamons
Copy link
Contributor Author

chamons commented Jan 12, 2026

has the number of partitions increased

That is a very good point, that can happen at an arbitrary point, is non-error, and we need to handle as well.

@BewareMyPower BewareMyPower merged commit 50baf97 into streamnative:master Jan 13, 2026
8 checks passed
@BewareMyPower
Copy link
Contributor

FYI, Pulsar Java client has two configs for such periodical tasks:

  • patternAutoDiscoveryPeriod for pattern consumer to detect changes of topics that match the regex
  • autoUpdatePartitionsInterval for multi-topics consumer to find increased partitions

Though it seems reasonable to use a single configuration for it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants