Skip to content

fix(consumer): store original topic names for partition reconnection#388

Open
jr011 wants to merge 1 commit intostreamnative:masterfrom
jr011:fail_reconnect_partition_fix
Open

fix(consumer): store original topic names for partition reconnection#388
jr011 wants to merge 1 commit intostreamnative:masterfrom
jr011:fail_reconnect_partition_fix

Conversation

@jr011
Copy link

@jr011 jr011 commented Jan 25, 2026

Fix: MultiTopicConsumer Cannot Discover New Partitions After Topic Expansion

Problem Description

When subscribing to partitioned topics using with_topic() , MultiTopicConsumer fails to discover and connect to new partitions added during runtime partition expansion.

Symptoms

  • Consumer works normally with initial partitions
  • After topic partition expansion, new partitions are not discovered even after multiple refresh cycles
  • Logs show created 0 consumers instead of creating consumers for new partitions

Root Cause

In src/consumer/builder.rs (lines 288-289), the topics field was incorrectly initialized with expanded partition names instead of original topic names:

Before:

let topics: VecDeque<String> = consumers.keys().cloned().collect();
let existing_topics = topics.clone();

This causes two issues:

  1. Cannot discover new partitions: When update_topics() queries using partition names like "test-partition-0", the service discovery detects -partition- suffix and short-circuits returning 0, preventing partition re-expansion.

  2. Consumer lookup fails: If existing_topics contains original names while consumers map uses partition names, poll_next() cannot find consumers.

Solution

Store original topic names in topics and expanded partition names in existing_topics:

After (lines 290-296):

// Store original topic names (before partition expansion) for update_topics to re-lookup
let topics: VecDeque<String> = self.topics
    .clone()
    .unwrap_or_default()
    .into_iter()
    .collect();
// Store expanded partition names (keys of consumers map) to poll existing consumers
let existing_topics: VecDeque<String> = consumers.keys().cloned().collect();
info!("MultiTopicConsumer initialization - topics: {:?}, existing_topics: {:?}", topics, existing_topics);

Field Purposes

Field Content Purpose Example
topics Original topic names For update_topics() to re-query partition count ["test"]
existing_topics Expanded partition names For poll_next() to poll consumers ["test-partition-0", "test-partition-1", ...]

Files Changed

  • src/consumer/builder.rs (lines 290-296)

Related Files

  • src/consumer/multi.rs (lines 144-200) - update_topics() method
  • src/consumer/multi.rs (lines 362-390) - poll_next() method
  • src/service_discovery.rs (lines 175-264) - partition lookup logic

When using explicit topic subscription, MultiTopicConsumer failed to
reconnect to partitions after broker failures because the topics field
stored expanded partition names instead of original topic names.

This change ensures that:
- topics field stores original topic names (e.g., 'my-topic')
- existing_topics field stores expanded partition names
- update_topics() can correctly re-lookup and reconnect to partitions

Fixes partition reconnection issue in production environments.
@jr011 jr011 closed this Jan 28, 2026
@jr011 jr011 reopened this Jan 28, 2026
Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants