fix: add supervision strategy support for aggregateWithBoundary callbacks#3187
Open
He-Pin wants to merge 2 commits into
Open
fix: add supervision strategy support for aggregateWithBoundary callbacks#3187He-Pin wants to merge 2 commits into
He-Pin wants to merge 2 commits into
Conversation
…acks Motivation: aggregateWithBoundary invokes multiple user callbacks (`allocate`, `aggregate`, `harvest`, and timer predicate), but exceptions were not supervised and failed the stream unconditionally. Modification: Add decider-based exception handling in AggregateWithBoundary for all callback sites: - allocate/aggregate in onPush - timer predicate and harvest in onTimer - harvest in onUpstreamFinish Stop fails the stage. Resume/Restart drop failing element/aggregate and continue. For aggregate callback failures, Resume now resets aggregate state to avoid retaining partially-mutated mutable state. Update Scala/Java API docs and operator docs to document supervision behavior. Add directional regression tests in AggregateWithBoundarySpec covering: - aggregate callback failures under Resume/Restart - allocate callback failure under Resume - harvest callback failure under Resume - timer predicate failure under Resume (explicit scheduler) Result: aggregateWithBoundary now honors ActorAttributes.SupervisionStrategy across all user callback failure points with deterministic regression coverage. Tests: - sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.AggregateWithBoundarySpec org.apache.pekko.stream.scaladsl.AggregateWithTimeBoundaryAndSimulatedTimeSpec" -- 11/11 passed - sbt "stream/mimaReportBinaryIssues" -- clean - sbt "docs/paradox" -- passed References: Refs apache#3110
…ision test Motivation: The supervision handling introduced two near-identical harvest methods (harvestAndEmitFromOnPush / harvestAndEmitOnTimer) differing only in whether pullIfPossible is called after recovery, and the onUpstreamFinish harvest recovery path lacked directional test coverage. Modification: - Unify the two harvest methods into a single harvestAndEmit(pullOnRecover) that conditionally pulls on recovery; onPush passes true, onTimer passes false to preserve the original timer semantics of not managing pull state. - Add a directional test for harvest failure on upstream completion (Resume): the failing final aggregate is dropped and the stream completes normally. Result: Less duplication in the supervision recovery code and regression coverage for the previously untested onUpstreamFinish harvest path. Tests: - sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.AggregateWithBoundarySpec org.apache.pekko.stream.scaladsl.AggregateWithTimeBoundaryAndSimulatedTimeSpec" -- 12/12 passed - sbt "stream/mimaReportBinaryIssues" -- clean - scalafmt --test --mode diff-ref=origin/main -- all formatted References: Refs apache#3110
Contributor
There was a problem hiding this comment.
Pull request overview
This PR updates the aggregateWithBoundary stream operator so that exceptions thrown by user callbacks (allocate, aggregate, harvest, and optional timer predicate) are handled according to the configured ActorAttributes.SupervisionStrategy, rather than unconditionally failing the stream.
Changes:
- Added supervision-aware exception handling to all user callback invocation sites in
AggregateWithBoundary(push, timer, and upstream-finish paths). - Updated Scala/Java API scaladoc and operator documentation to explicitly describe the supervision semantics.
- Added regression tests covering
Resume/Restartbehavior across callback failure points, including a simulated-timer scenario.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala | Documents supervision support and semantics for the Scala DSL aggregateWithBoundary operator. |
| stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala | Documents supervision support and semantics for the Java DSL SubSource API surface. |
| stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala | Documents supervision support and semantics for the Java DSL SubFlow API surface. |
| stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala | Documents supervision support and semantics for the Java DSL Source API surface. |
| stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala | Documents supervision support and semantics for the Java DSL Flow API surface. |
| stream/src/main/scala/org/apache/pekko/stream/impl/fusing/AggregateWithBoundary.scala | Implements supervision strategy handling for all callback sites (allocate/aggregate/harvest/timer predicate). |
| stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/AggregateWithBoundarySpec.scala | Adds directional tests for supervised callback failures, including simulated-time timer coverage. |
| docs/src/main/paradox/stream/operators/Source-or-Flow/aggregateWithBoundary.md | Updates operator docs to state and explain supervision behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
Issue #3110 tracks stream operators that should honor
SupervisionStrategyfor user callbacks.aggregateWithBoundaryexecutes several user callbacks (allocate,aggregate,harvest, timer predicate), but callback exceptions were not supervised and failed the stream unconditionally.Modification
AggregateWithBoundaryfor all callback sites:allocate/aggregateinonPushharvestinonTimerharvestinonUpstreamFinishStop→ fail stageResume/Restart→ drop failing element or aggregate and continueaggregatecallback failures underResume, reset aggregate state to avoid retaining partially-mutated mutable state.aggregateWithBoundary.md) to describe supervision behavior.AggregateWithBoundarySpec(including simulated-timer coverage) for:Resume/Restart)Resume)Resume)Resume)Result
aggregateWithBoundarynow honorsActorAttributes.SupervisionStrategyacross all user callback failure points with regression coverage for push/timer/finish paths.which is now Apache licensed
Tests
sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.AggregateWithBoundarySpec org.apache.pekko.stream.scaladsl.AggregateWithTimeBoundaryAndSimulatedTimeSpec"(11/11 passed)sbt "stream/mimaReportBinaryIssues"(clean)sbt "docs/paradox"(passed)References
Refs #3110