Skip to content

[server] Implement cross-TP parallel processing for ConsumptionTask#2401

Merged
sushantmane merged 5 commits intolinkedin:mainfrom
sushantmane:i1-process-records-from-different-TPs-in-Parallel
Feb 4, 2026
Merged

[server] Implement cross-TP parallel processing for ConsumptionTask#2401
sushantmane merged 5 commits intolinkedin:mainfrom
sushantmane:i1-process-records-from-different-TPs-in-Parallel

Conversation

@sushantmane
Copy link
Copy Markdown
Contributor

@sushantmane sushantmane commented Jan 24, 2026

[server] implement cross-TP parallel processing in ConsumptionTask

Problem:
IngestionBatchProcessor only parallelizes records within a single topic-partition
by key. However, ConsumptionTask processes topic-partitions sequentially in its
for-loop. When a single consumer poll returns records from multiple TPs or replicas,
processing happens in iteration order. If an earlier TP is slow, for example due to
many same-key records or expensive write-compute, all subsequent TPs in the batch
are blocked. This head-of-line blocking causes cascading ingestion delays across
unrelated partitions, increases end-to-end latency, and can lead to SLO breaches
when time-sensitive data is delayed by one slow TP.

Solution:
Introduce optional cross-TP parallel processing so topic-partitions within a single
poll can be processed concurrently. A shared ExecutorService is created in
AggKafkaConsumerService and passed to each ConsumptionTask. When enabled,
ConsumptionTask uses CompletableFuture.supplyAsync() to process TPs in parallel
using this shared pool.

The behavior is controlled via the following configs:

  • SERVER_CROSS_TP_PARALLEL_PROCESSING_ENABLED (default: false)
  • SERVER_CROSS_TP_PARALLEL_PROCESSING_THREAD_POOL_SIZE (default: 4)
  • SERVER_CROSS_TP_PARALLEL_PROCESSING_CURRENT_VERSION_AA_WC_LEADER_ONLY
    (default: false), to optionally restrict parallelism to the write-compute leader
    pool where it provides the most benefit.

Additional changes:

  • Add CrossTpProcessingStats to monitor thread pool usage and behavior.
  • Refactor ConsumptionTask.run() by extracting helper methods to improve readability.
  • Add comprehensive test coverage, including integration tests, to validate behavior
    under parallel and non-parallel execution modes.

Diagram illustrating current processing pipeline

                              ┌─────────────────────────────────────┐
                              │          Kafka poll()               │
                              │  Returns records for multiple TPs   │
                              └─────────────────────────────────────┘
                                               │
                    ┌──────────────────────────┼──────────────────────────┐
                    ▼                          ▼                          ▼
              ┌──────────┐              ┌──────────┐              ┌──────────┐
              │ TP0: 8   │              │ TP1: 5   │              │ TP2: 6   │
              │ records  │              │ records  │              │ records  │
              └──────────┘              └──────────┘              └──────────┘
                                               │
                              ┌────────────────┴────────────────┐
                              │  ConsumptionTask for-loop       │
                              │  (SEQUENTIAL by TP)             │
                              └─────────────────────────────────┘
                                               │
        ══════════════════════════════════════════════════════════════════════
        STEP 1: Process TP0 (BLOCKS TP1, TP2)
        ══════════════════════════════════════════════════════════════════════
                                               │
                              ┌────────────────┴────────────────┐
                              │  TP0: Group by key, parallel    │
                              └─────────────────────────────────┘
                    ┌──────────────┬──────────────┬──────────────┐
                    ▼              ▼              ▼              ▼
               ┌────────┐    ┌────────┐    ┌────────┐    ┌────────┐
               │ Key A  │    │ Key B  │    │ Key C  │    │ Key D  │
               │ rec1,5 │    │ rec2   │    │ rec3,7 │    │ rec4,6,8│
               └────────┘    └────────┘    └────────┘    └────────┘
                Thread 1      Thread 2      Thread 3      Thread 4
                    │              │              │              │
                    └──────────────┴──────────────┴──────────────┘
                                        │
                            CompletableFuture.allOf().get()
                            (wait for slowest key - Key D)
                                        │
                                        ▼
                                   TP0 DONE
                                        │
        ══════════════════════════════════════════════════════════════════════
        STEP 2: Process TP1 (only NOW starts, TP2 still waiting)
        ══════════════════════════════════════════════════════════════════════
                                        │
                              ┌─────────┴─────────┐
                              │ TP1: Group by key │
                              └───────────────────┘
                                       ...
                                        │
                                   TP1 DONE
                                        │
        ══════════════════════════════════════════════════════════════════════
        STEP 3: Process TP2 (finally starts)
        ══════════════════════════════════════════════════════════════════════
                                        │
                              ┌─────────┴─────────┐
                              │ TP2: Group by key │
                              └───────────────────┘
                                       ...
                                        │
                                   TP2 DONE
                                        │
                                        ▼
                              ┌───────────────────┐
                              │  Next poll()      │
                              └───────────────────┘
TP1 and TP2 wait idle while TP0 is processing. If TP0 has a slow key group (e.g., Key D with 3 sequential records), the entire poll batch is delayed.

Proposed change in this PR:

                              ┌─────────────────────────────────────┐
                              │          Kafka poll()               │
                              │  Returns records for multiple TPs   │
                              └─────────────────────────────────────┘
                                               │
                    ┌──────────────────────────┼──────────────────────────┐
                    ▼                          ▼                          ▼
              ┌──────────┐              ┌──────────┐              ┌──────────┐
              │ TP0: 8   │              │ TP1: 5   │              │ TP2: 6   │
              │ records  │              │ records  │              │ records  │
              └──────────┘              └──────────┘              └──────────┘
                                               │
                              ┌────────────────┴────────────────┐
                              │  processAllTopicPartitions()    │
                              │  crossTpProcessingPool != null  │
                              │  AND polledMessages.size() > 1  │
                              └─────────────────────────────────┘
                                               │
        ══════════════════════════════════════════════════════════════════════
        PARALLEL PROCESSING: All TPs processed concurrently
        ══════════════════════════════════════════════════════════════════════
                                               │
              ┌────────────────────────────────┼────────────────────────────────┐
              │                                │                                │
              ▼                                ▼                                ▼
    CompletableFuture              CompletableFuture              CompletableFuture
       .supplyAsync()                 .supplyAsync()                 .supplyAsync()
              │                                │                                │
              ▼                                ▼                                ▼
    ┌─────────────────────┐      ┌─────────────────────┐      ┌─────────────────────┐
    │ crossTpProcessing   │      │ crossTpProcessing   │      │ crossTpProcessing   │
    │ Pool Thread         │      │ Pool Thread         │      │ Pool Thread         │
    │                     │      │                     │      │                     │
    │  ┌───────────────┐  │      │  ┌───────────────┐  │      │  ┌───────────────┐  │
    │  │     TP0       │  │      │  │     TP1       │  │      │  │     TP2       │  │
    │  │   8 records   │  │      │  │   5 records   │  │      │  │   6 records   │  │
    │  └───────────────┘  │      │  └───────────────┘  │      │  └───────────────┘  │
    │         │           │      │         │           │      │         │           │
    │         ▼           │      │         ▼           │      │         ▼           │
    │  processTopicPart-  │      │  processTopicPart-  │      │  processTopicPart-  │
    │  ition(TP0, msgs)   │      │  ition(TP1, msgs)   │      │  ition(TP2, msgs)   │
    │         │           │      │         │           │      │         │           │
    │         ▼           │      │         ▼           │      │         ▼           │
    │  TpProcessingResult │      │  TpProcessingResult │      │  TpProcessingResult │
    └─────────┬───────────┘      └─────────┬───────────┘      └─────────┬───────────┘
              │                            │                            │
              │◄─── RUNNING IN PARALLEL ───┼────────────────────────────┤
              │                            │                            │
              ▼                            ▼                            ▼
           TP0 DONE                     TP1 DONE                     TP2 DONE
              │                            │                            │
              └────────────────────────────┼────────────────────────────┘
                                           │
                                           ▼
                    ┌─────────────────────────────────────────────────┐
                    │     CompletableFuture.allOf(futures).join()     │
                    │     (waits for ALL TPs to complete)             │
                    └─────────────────────────────────────────────────┘
                                           │
                                           ▼
                    ┌─────────────────────────────────────────────────┐
                    │  Aggregate all TpProcessingResult objects       │
                    │  • Total messageCount                           │
                    │  • Total payloadBytesConsumed                   │
                    │  • Collect errors & missing receivers           │
                    └─────────────────────────────────────────────────┘
                                           │
                                           ▼
                              ┌───────────────────┐
                              │  Next poll()      │
                              └───────────────────┘

        TOTAL TIME: max(T(TP0), T(TP1), T(TP2)) instead of sum

Hybrid of Cross-TP Parallel Processing and IngestionBatchProcessor

                              ┌─────────────────────────────────────┐
                              │          Kafka poll()               │
                              │  Returns records for multiple TPs   │
                              └─────────────────────────────────────┘
                                               │
                    ┌──────────────────────────┼──────────────────────────┐
                    ▼                          ▼                          ▼
              ┌──────────┐              ┌──────────┐              ┌──────────┐
              │ TP0: 8   │              │ TP1: 5   │              │ TP2: 6   │
              │ records  │              │ records  │              │ records  │
              └──────────┘              └──────────┘              └──────────┘
                                               │
        ══════════════════════════════════════════════════════════════════════
        LEVEL 1: Cross-TP Parallel Processing (ConsumptionTask)
                 crossTpProcessingPool - shared across KafkaConsumerServices
        ══════════════════════════════════════════════════════════════════════
                                               │
              ┌────────────────────────────────┼────────────────────────────────┐
              │                                │                                │
              ▼                                ▼                                ▼
    CompletableFuture              CompletableFuture              CompletableFuture
       .supplyAsync()                 .supplyAsync()                 .supplyAsync()
              │                                │                                │
              ▼                                ▼                                ▼
    ┌─────────────────────┐      ┌─────────────────────┐      ┌─────────────────────┐
    │ crossTpProcessing   │      │ crossTpProcessing   │      │ crossTpProcessing   │
    │ Pool Thread         │      │ Pool Thread         │      │ Pool Thread         │
    │                     │      │                     │      │                     │
    │   processTopicPart- │      │   processTopicPart- │      │   processTopicPart- │
    │   ition(TP0)        │      │   ition(TP1)        │      │   ition(TP2)        │
    │         │           │      │         │           │      │         │           │
    └─────────┼───────────┘      └─────────┼───────────┘      └─────────┼───────────┘
              │◄─────────── RUNNING IN PARALLEL ───────────────┼────────────────────┤
              │                                                │                    │
              ▼                                                ▼                    ▼
        ══════════════════════════════════════════════════════════════════════════════
        LEVEL 2: Within-TP Parallel Processing (IngestionBatchProcessor)
                 batchProcessingThreadPool - per StoreIngestionTask
                 Groups records by KEY, processes different keys in parallel
        ══════════════════════════════════════════════════════════════════════════════
              │
              ▼
    ┌─────────────────────────────────────────────────────────────────────┐
    │  TP0: Group 8 records by key                                        │
    │  keyGroupMap = { KeyA: [rec1,rec5], KeyB: [rec2], KeyC: [rec3,rec7], │
    │                  KeyD: [rec4,rec6,rec8] }                           │
    └─────────────────────────────────────────────────────────────────────┘
                                       │
           ┌───────────────────────────┼───────────────────────────┐
           │                           │                           │
           ▼                           ▼                           ▼
    CompletableFuture          CompletableFuture          CompletableFuture
       .runAsync()                .runAsync()                .runAsync()
           │                           │                           │
           ▼                           ▼                           ▼
    ┌─────────────────┐      ┌─────────────────┐      ┌─────────────────┐
    │ batchProcessing │      │ batchProcessing │      │ batchProcessing │
    │ Pool Thread 1   │      │ Pool Thread 2   │      │ Pool Thread 3   │
    │                 │      │                 │      │                 │
    │ ┌─────────────┐ │      │ ┌─────────────┐ │      │ ┌─────────────┐ │
    │ │   Key A     │ │      │ │   Key B     │ │      │ │   Key C     │ │
    │ │  rec1 ──►   │ │      │ │  rec2 ──►   │ │      │ │  rec3 ──►   │ │
    │ │  rec5 ──►   │ │      │ │   DONE      │ │      │ │  rec7 ──►   │ │
    │ │   DONE      │ │      │ └─────────────┘ │      │ │   DONE      │ │
    │ └─────────────┘ │      └─────────────────┘      │ └─────────────┘ │
    └────────┬────────┘                               └────────┬────────┘
             │                                                  │
             │                 ┌─────────────────┐              │
             │                 │ batchProcessing │              │
             │                 │ Pool Thread 4   │              │
             │                 │                 │              │
             │                 │ ┌─────────────┐ │              │
             │                 │ │   Key D     │ │              │
             │                 │ │  rec4 ──►   │ │              │
             │                 │ │  rec6 ──►   │ │              │
             │                 │ │  rec8 ──►   │ │              │
             │                 │ │   DONE      │ │ (slowest)    │
             │                 │ └─────────────┘ │              │
             │                 └────────┬────────┘              │
             │                          │                       │
             │◄── KEYS IN PARALLEL ─────┼───────────────────────┤
             │    (same key sequential) │                       │
             └──────────────────────────┼───────────────────────┘
                                        │
                                        ▼
                    ┌─────────────────────────────────────────────────┐
                    │     CompletableFuture.allOf(futureList).get()   │
                    │     (waits for slowest KEY GROUP - Key D)       │
                    └─────────────────────────────────────────────────┘
                                        │
                                        ▼
                    ┌─────────────────────────────────────────────────┐
                    │           TP0 processing DONE                   │
                    │           Return TpProcessingResult             │
                    └─────────────────────────────────────────────────┘
                                        │
        ════════════════════════════════════════════════════════════════
        Back to LEVEL 1: All TPs complete
        ════════════════════════════════════════════════════════════════
                                        │
                                        ▼
                    ┌─────────────────────────────────────────────────┐
                    │  CompletableFuture.allOf(futures).join()        │
                    │  (waits for slowest TP)                         │
                    └─────────────────────────────────────────────────┘
                                        │
                                        ▼
                              ┌───────────────────┐
                              │  Next poll()      │
                              └───────────────────┘
                              

Code changes

  • Added new code behind a config. If so list the config names and their default values in the PR description.
  • Introduced new log lines.
    • Confirmed if logs need to be rate limited to avoid excessive logging.

Concurrency-Specific Checks

Both reviewer and PR author to verify

  • Code has no race conditions or thread safety issues.
  • Proper synchronization mechanisms (e.g., synchronized, RWLock) are used where needed.
  • No blocking calls inside critical sections that could lead to deadlocks or performance degradation.
  • Verified thread-safe collections are used (e.g., ConcurrentHashMap, CopyOnWriteArrayList).
  • Validated proper exception handling in multi-threaded code to avoid silent thread termination.

How was this PR tested?

  • New unit tests added.
  • New integration tests added.
  • Modified or extended existing tests.
  • Verified backward compatibility (if applicable).

Does this PR introduce any user-facing or breaking changes?

  • No. You can skip the rest of this section.
  • Yes. Clearly explain the behavior change and its impact.

Copilot AI review requested due to automatic review settings January 24, 2026 11:16
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements cross-TP (topic-partition) parallel processing within the ConsumptionTask to prevent slow topic-partitions from blocking others in the same poll batch. The feature is disabled by default and can be enabled via configuration.

Changes:

  • Added configuration options SERVER_CROSS_TP_PARALLEL_PROCESSING_ENABLED (default: false) and SERVER_CROSS_TP_PARALLEL_PROCESSING_THREAD_POOL_SIZE (default: 4)
  • Created a single shared thread pool in AggKafkaConsumerService that is passed down to ConsumptionTask
  • Modified ConsumptionTask to use CompletableFuture.supplyAsync() for parallel TP processing when enabled, falling back to sequential processing when the pool is null or only 1 TP exists
  • Added CrossTpProcessingStats for thread pool monitoring with metrics for active_thread_number, max_thread_number, and queued_task_count
  • Updated all KafkaConsumerService constructors to accept ExecutorService parameter
  • Added comprehensive unit tests in ConsumptionTaskTest and integration test TestActiveActiveIngestionWithCrossTpParallelProcessing

Reviewed changes

Copilot reviewed 15 out of 15 changed files in this pull request and generated no comments.

Show a summary per file
File Description
ConfigKeys.java Added two new configuration keys for enabling cross-TP parallel processing and setting thread pool size
VeniceServerConfig.java Added configuration parsing and getter methods for the new config options
AggKafkaConsumerService.java Creates and manages the shared thread pool lifecycle, passes it to KafkaConsumerService instances
KafkaConsumerService.java Accepts and stores the cross-TP processing pool, passes it to ConsumptionTask
PartitionWiseKafkaConsumerService.java Updated constructors to propagate the ExecutorService parameter
StoreAwarePartitionWiseKafkaConsumerService.java Updated constructor to propagate the ExecutorService parameter
ConsumptionTask.java Implements parallel processing logic using CompletableFuture, includes TpProcessingResult class for aggregating results
CrossTpProcessingStats.java Extends ThreadPoolStats to provide metrics for the cross-TP processing thread pool
ConsumptionTaskTest.java Comprehensive unit tests covering parallel processing, sequential fallback, single TP, and missing receiver scenarios
TestActiveActiveIngestionWithCrossTpParallelProcessing.java Integration test extending TestActiveActiveIngestion with the feature enabled
cross-tp-parallel-processing-plan.md Documentation explaining the feature, configuration, architecture, and constraints
StoreIngestionTaskTest.java, KafkaConsumerServiceTest.java, KafkaConsumerServiceDelegatorTest.java Updated test constructors to pass null for the new ExecutorService parameter

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copilot AI review requested due to automatic review settings January 25, 2026 01:09
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 19 out of 19 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@sushantmane sushantmane force-pushed the i1-process-records-from-different-TPs-in-Parallel branch from 9eaa000 to ed00575 Compare January 25, 2026 01:22
Copilot AI review requested due to automatic review settings January 25, 2026 03:50
@sushantmane sushantmane force-pushed the i1-process-records-from-different-TPs-in-Parallel branch from ed00575 to a6eaa7f Compare January 25, 2026 03:50
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 19 out of 19 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@sushantmane sushantmane force-pushed the i1-process-records-from-different-TPs-in-Parallel branch 2 times, most recently from ffce755 to edad5a8 Compare January 25, 2026 09:15
Copilot AI review requested due to automatic review settings January 25, 2026 09:15
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 20 out of 20 changed files in this pull request and generated no new comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@sixpluszero
Copy link
Copy Markdown
Contributor

Although I think implementation wise it is not wrong as how you explained, but now I actually think today's case might not be as bad - I might not be thinking through the idea, but if we think outside the single consumer, all the consumers in the leader pool(s) will share the same parallel processing thread pool. So even if we are sequentially processing batches inside one consumer, as long as they together saturated the AAWC processing/lookup capability in parallel among consumer pool, I think it should be fine. More parallelism might end up increasing everyone's processing latency together, if other resources are not tuned up.

It would be great if we can have a offline discussion and we can evaluate what's the actual overall throughput bottleneck. I think for the case you describe in the description section, it might not be helping as, if a TP has problem producing to PubSub broker (like we observed in production env), then it is still not going to help much, because at the end of the day the whole poll() is slowed down, blocking next poll...

@sushantmane
Copy link
Copy Markdown
Contributor Author

Although I think implementation wise it is not wrong as how you explained, but now I actually think today's case might not be as bad - I might not be thinking through the idea, but if we think outside the single consumer, all the consumers in the leader pool(s) will share the same parallel processing thread pool. So even if we are sequentially processing batches inside one consumer, as long as they together saturated the AAWC processing/lookup capability in parallel among consumer pool, I think it should be fine. More parallelism might end up increasing everyone's processing latency together, if other resources are not tuned up.

It would be great if we can have a offline discussion and we can evaluate what's the actual overall throughput bottleneck. I think for the case you describe in the description section, it might not be helping as, if a TP has problem producing to PubSub broker (like we observed in production env), then it is still not going to help much, because at the end of the day the whole poll() is slowed down, blocking next poll...

Let's experiment with real workloads

sixpluszero
sixpluszero previously approved these changes Feb 4, 2026
Copy link
Copy Markdown
Contributor

@sixpluszero sixpluszero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!
Let's try this out separately and compare with other discussed tuning to see how it works out in our real workload.

Feature: Parallelize processing of topic-partitions within ConsumptionTask to prevent
slow TPs from blocking others in the same poll batch.

Configuration:
- SERVER_CROSS_TP_PARALLEL_PROCESSING_ENABLED (default: false)
- SERVER_CROSS_TP_PARALLEL_PROCESSING_THREAD_POOL_SIZE (default: 4)

Key Changes:
- Add config keys in ConfigKeys.java and parsing in VeniceServerConfig.java
- Create single shared thread pool in AggKafkaConsumerService (not per KafkaConsumerService)
- ConsumptionTask uses CompletableFuture.supplyAsync() for parallel TP processing when enabled
- Add TpProcessingResult class to aggregate results from parallel processing
- Falls back to sequential processing when pool is null or only 1 TP

Architecture:
- Thread pool created once in AggKafkaConsumerService and passed down to ConsumptionTask
- Update KCSConstructor interface and all subclass constructors to accept ExecutorService
- Shutdown handled centrally in AggKafkaConsumerService.stopInner()

Metrics:
- Add CrossTpProcessingStats extending ThreadPoolStats for thread pool monitoring
- Exposes active_thread_number, max_thread_number, queued_task_count metrics

Testing:
- Add ConsumptionTaskTest.java with 4 test cases covering parallel/sequential paths
- Add TestActiveActiveIngestionWithCrossTpParallelProcessing integration test
- Update existing tests to use new constructor signatures
Extract common per-topic-partition processing logic into reusable methods
to eliminate code duplication between parallel and sequential paths:

- Add processAllTopicPartitions() to orchestrate parallel vs sequential
- Add processTopicPartition() for common per-TP processing logic
- Unify result aggregation for both processing paths
- Add consistent error handling: both paths now set addSomeDelay and
  call recordTotalPollError() when any TP encounters an error
- Remove unused variables from run() method

Behavior change: Sequential path now catches per-TP exceptions and
continues processing other TPs (matching parallel path behavior)
instead of failing fast on first error.

Refactor ConsumptionTask.run() for improved readability

- Extract helper methods: waitForNextCycleIfNeeded(), processAndClearUnsubscriptions(),
  processPollResults(), recordAggregateStats(), applyThrottling(), cleanupUnsubscribedPartitions()
- Reduce run() from 117 lines to ~35 lines
- Reuse storePollCounterMap across iterations (avoid repeated HashMap allocation)
- Remove unused payloadBytesConsumed parameter from recordAggregateStats()
- No behavioral changes - all existing tests pass

Consolidate ConsumptionTaskTest - reduce duplication while maintaining coverage

- Merge overlapping tests: 9 tests -> 7 tests
- Add helper methods: createTask(), createOneShotPollFunction(), runTaskUntilLatch(),
  createMockReceiver(), createPollResult()
- Rename tests for clarity: testSequentialProcessingAndMetrics,
  testMissingReceiverHandlingAndMetrics, testEmptyPollMetrics, testPollErrorMetrics,
  testPerStoreMetrics
- Reduce test file from 589 to 405 lines (-31%)

Add config to enable cross-TP parallel processing only for CURRENT_VERSION_AA_WC_LEADER_POOL

- Add SERVER_CROSS_TP_PARALLEL_PROCESSING_CURRENT_VERSION_AA_WC_LEADER_ONLY config key
- Update VeniceServerConfig to read the new config (default: false)
- Update AggKafkaConsumerService.getCrossTpProcessingPoolForPoolType() to conditionally
  return the pool based on pool type when the new config is enabled

Configuration behavior:
- Feature disabled by default (server.cross.tp.parallel.processing.enabled=false)
- When enabled with current.version.aa.wc.leader.only=false: applies to all pools
- When enabled with current.version.aa.wc.leader.only=true: only applies to
  CURRENT_VERSION_AA_WC_LEADER_POOL

Add unit test for cross-TP parallel processing configuration options

Test coverage for:
- Default values (disabled, 4 threads, not AA_WC_LEADER only)
- Enabling cross-TP parallel processing
- Custom thread pool size
- Enabling/disabling CURRENT_VERSION_AA_WC_LEADER_ONLY mode

Fix 'Wait not in loop' warning in ConsumptionTask

- Wrap wait() in a while loop to handle spurious wakeups correctly
- Add early exit check after wait returns to stop promptly when task is stopped

Fix flaky test

Fix flaky tests

reduced get latest postion cached calls
Fix flaky tests in StoreIngestionTaskTest after ConsumptionTask refactoring

- testRecordLevelMetricForCurrentVersion: Added timeout() to metric
  verifications (recordTotalBytesConsumed, recordTotalRecordsConsumed)
  to allow async operations to complete

- testIngestionTaskForCurrentVersionResetExceptionReportError: Added
  timeout() to reportIngestionNotifier() verification

- testReportIfCatchUpVersionTopicOffset: Moved measureLagWithCallToPubSub
  stubbing to setBeforeStartingConsumption() to avoid UnfinishedStubbingException
  from concurrent mock access by the SIT thread

Root cause: ConsumptionTask refactoring changed timing of message processing
(sequential path now catches per-TP exceptions and continues), exposing
existing race conditions in tests that used exact times() verifications
without timeout() for async operations.

Fix ConsumptionTask wait loop blocking notifyAll() from setDataReceiver()

The refactored waitForNextCycleIfNeeded() method used a while loop that
forced the full delay to elapse even when notifyAll() was called. This
broke the resubscription flow - when setDataReceiver() notified the
consumer to wake up for new subscriptions, the consumer ignored it and
kept waiting.

Inlined the wait back into run() with simple wait(readCycleDelayMs) that
returns immediately on notification, matching the original behavior.
Copilot AI review requested due to automatic review settings February 4, 2026 03:25
@sushantmane sushantmane force-pushed the i1-process-records-from-different-TPs-in-Parallel branch from 2e2209b to 89e538c Compare February 4, 2026 03:25
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 19 out of 19 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@sushantmane sushantmane enabled auto-merge (squash) February 4, 2026 10:19
@sushantmane sushantmane merged commit 7b60de4 into linkedin:main Feb 4, 2026
107 of 154 checks passed
@sushantmane
Copy link
Copy Markdown
Contributor Author

Thanks for the review, @sixpluszero!

@sushantmane sushantmane deleted the i1-process-records-from-different-TPs-in-Parallel branch February 4, 2026 18:30
sushantmane added a commit to sushantmane/venice that referenced this pull request Feb 8, 2026
…inkedin#2401)

IngestionBatchProcessor only parallelizes records within a single topic-partition  
by key. However, ConsumptionTask processes topic-partitions sequentially in its  
for-loop. When a single consumer poll returns records from multiple TPs or replicas,  
processing happens in iteration order. If an earlier TP is slow, for example due to  
many same-key records or expensive write-compute, all subsequent TPs in the batch  
are blocked. This head-of-line blocking causes cascading ingestion delays across  
unrelated partitions, increases end-to-end latency, and can lead to SLO breaches  
when time-sensitive data is delayed by one slow TP.  

Solution:  
Introduce optional cross-TP parallel processing so topic-partitions within a single  
poll can be processed concurrently. A shared ExecutorService is created in  
AggKafkaConsumerService and passed to each ConsumptionTask. When enabled,  
ConsumptionTask uses CompletableFuture.supplyAsync() to process TPs in parallel  
using this shared pool.  

The behavior is controlled via the following configs:  
- SERVER_CROSS_TP_PARALLEL_PROCESSING_ENABLED (default: false)  
- SERVER_CROSS_TP_PARALLEL_PROCESSING_THREAD_POOL_SIZE (default: 4)  
- SERVER_CROSS_TP_PARALLEL_PROCESSING_CURRENT_VERSION_AA_WC_LEADER_ONLY  
  (default: false), to optionally restrict parallelism to the write-compute leader  
  pool where it provides the most benefit.  

Additional changes:  
- Add CrossTpProcessingStats to monitor thread pool usage and behavior.  
- Refactor ConsumptionTask.run() by extracting helper methods to improve readability.  
- Add comprehensive test coverage, including integration tests, to validate behavior  
  under parallel and non-parallel execution modes.
misyel pushed a commit to misyel/venice that referenced this pull request Feb 17, 2026
…inkedin#2401)

IngestionBatchProcessor only parallelizes records within a single topic-partition  
by key. However, ConsumptionTask processes topic-partitions sequentially in its  
for-loop. When a single consumer poll returns records from multiple TPs or replicas,  
processing happens in iteration order. If an earlier TP is slow, for example due to  
many same-key records or expensive write-compute, all subsequent TPs in the batch  
are blocked. This head-of-line blocking causes cascading ingestion delays across  
unrelated partitions, increases end-to-end latency, and can lead to SLO breaches  
when time-sensitive data is delayed by one slow TP.  

Solution:  
Introduce optional cross-TP parallel processing so topic-partitions within a single  
poll can be processed concurrently. A shared ExecutorService is created in  
AggKafkaConsumerService and passed to each ConsumptionTask. When enabled,  
ConsumptionTask uses CompletableFuture.supplyAsync() to process TPs in parallel  
using this shared pool.  

The behavior is controlled via the following configs:  
- SERVER_CROSS_TP_PARALLEL_PROCESSING_ENABLED (default: false)  
- SERVER_CROSS_TP_PARALLEL_PROCESSING_THREAD_POOL_SIZE (default: 4)  
- SERVER_CROSS_TP_PARALLEL_PROCESSING_CURRENT_VERSION_AA_WC_LEADER_ONLY  
  (default: false), to optionally restrict parallelism to the write-compute leader  
  pool where it provides the most benefit.  

Additional changes:  
- Add CrossTpProcessingStats to monitor thread pool usage and behavior.  
- Refactor ConsumptionTask.run() by extracting helper methods to improve readability.  
- Add comprehensive test coverage, including integration tests, to validate behavior  
  under parallel and non-parallel execution modes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants