feat: add supervision strategy support for groupedWeightedWithin costFn#3180
Merged
He-Pin merged 1 commit intoJun 27, 2026
Merged
Conversation
Motivation: The groupedWeightedWithin operator's costFn is user-provided and may throw, but it was called without a try-catch. Any exception failed the stream unconditionally, ignoring the configured SupervisionStrategy. This was inconsistent with operators such as map, filter and batch. Modification: Wrap costFn(elem) inside GroupedWeightedWithin.nextElement() with a try-catch that consults the SupervisionStrategy decider. The cost is computed before any state mutation so the happy path is unchanged. Stop fails the stage, Resume skips the offending element while keeping the current group, and Restart drops the in-progress group via a new resetGroupState() helper and reschedules the time-window timer so the fresh group gets a full window. The decider is a lazy val for zero overhead on the happy path. Update the Scala and Java DSL scaladoc (both overloads) and the operator reference page to document supervision adherence. Result: groupedWeightedWithin now adheres to the SupervisionStrategy attribute with well-defined Resume and Restart semantics. Tests: - sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowGroupedWithinSpec" -- 21/21 passed References: Refs apache#3110
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
Per the stream error handling docs, operators that apply user-provided functions should consult the configured
SupervisionStrategy. ThegroupedWeightedWithinoperator'scostFnwas called without a try-catch, so any exception failed the stream unconditionally — even underSupervision.Resume/Restart.This is part of the meta-issue #3110 (add supervisor strategy support to stream operators that accept user functions). One operator per PR.
Modification
costFn(elem)insideGroupedWeightedWithin.nextElement()with atry/catch(NonFatal) that consults theSupervisionStrategydecider. The cost is computed before any state mutation, so the happy path is unchanged:resetGroupState()helper, and reschedule the time-window timer so the fresh group gets a full windowdecideris alazy val→ zero overhead on the happy path.groupedWeightedWithinoverloads in the Scala/Java DSL scaladoc and the operator reference page.Result
groupedWeightedWithinnow adheres to theSupervisionStrategyattribute with well-defined Resume/Restart semantics.Tests
sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowGroupedWithinSpec"— 21/21 passed, including 5 new deterministic directional tests (Stop, default Stop, Resume keeps group, Resume preserves weight accounting, Restart drops group). They use a 30s window that never fires, so they are deterministic and run in PR validation (intentionally not taggedTimingTest).sbt "stream/mimaReportBinaryIssues"— clean (binary compatible)References
Refs #3110
This is a clean-room implementation written directly for Apache Pekko.