Skip to content

fix: add supervision strategy support for expand and extrapolate#3185

Open
He-Pin wants to merge 2 commits into
apache:mainfrom
He-Pin:feat/stream-expand-supervision
Open

fix: add supervision strategy support for expand and extrapolate#3185
He-Pin wants to merge 2 commits into
apache:mainfrom
He-Pin:feat/stream-expand-supervision

Conversation

@He-Pin

@He-Pin He-Pin commented Jun 25, 2026

Copy link
Copy Markdown
Member

Motivation

Issue #3110 tracks stream operators that should honor SupervisionStrategy for user functions.

expand called the user-provided expansion logic without supervision, so exceptions failed the stream unconditionally. Since extrapolate is implemented via Expand, the same gap applied there too.

Modification

  • Added supervision handling in Expand for:
    • failures while creating extrapolation from an incoming element
    • failures while evaluating iterator paths (hasNext / next)
  • Supervision semantics:
    • Stop: fail stage
    • Resume: drop failed element and continue
    • Restart: drop failed element, reset current extrapolation state, continue
  • Updated Scala/Java API docs (Flow, Source, SubFlow, SubSource) to describe supervision behavior and clarify Resume vs Restart for expand.
  • Updated operator docs:
    • expand.md
    • extrapolate.md
  • Added directional regression tests:
    • FlowExpandSpec: Stop/default Stop/Resume/Restart + iterator-evaluation failure paths
    • FlowExtrapolateSpec: Stop/Resume/Restart for extrapolator failures during iterator evaluation

Result

expand and extrapolate now honor ActorAttributes.SupervisionStrategy for user-function failures with explicit regression coverage for exceptional iterator paths.

which is now Apache licensed

Tests

  • sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowExpandSpec org.apache.pekko.stream.scaladsl.FlowExtrapolateSpec" (23/23 passed)
  • sbt "stream/mimaReportBinaryIssues" (clean)
  • sbt "docs/paradox" (passed)

References

Refs #3110

He-Pin added 2 commits June 25, 2026 19:11
Motivation:
The expand operator evaluated user-provided expansion logic without consulting
SupervisionStrategy, so exceptions failed the stream unconditionally. This also
affected extrapolate because it is implemented on top of Expand.

Modification:
Add supervision handling in Expand for exceptions from the expander function and
from iterator evaluation paths (hasNext/next). Stop fails the stage, Resume
drops the failed element and continues, and Restart resets extrapolation state
before continuing.

Update Scala/Java API docs (Flow/Source/SubFlow/SubSource) and operator docs to
reflect supervision behavior. Expand docs now explicitly describe Resume vs
Restart state semantics.

Add directional tests in FlowExpandSpec and FlowExtrapolateSpec for Stop/default
Stop/Resume/Restart and iterator-evaluation failures.

Result:
expand and extrapolate now honor ActorAttributes.SupervisionStrategy for
user-function failures, with deterministic regression coverage for exceptional
iterator paths.

Tests:
- sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowExpandSpec org.apache.pekko.stream.scaladsl.FlowExtrapolateSpec" -- 23/23 passed
- sbt "stream/mimaReportBinaryIssues" -- clean
- sbt "docs/paradox" -- passed

References:
Refs apache#3110
…mantics

Motivation:
Review of the expand/extrapolate supervision support identified two
consistency and documentation gaps.

Modification:
- Change `private lazy val decider` to `private def decider` in Expand
  to match the established pattern in Map, Filter, Collect and other
  fusing operators.
- Collapse the Resume and Restart branches in `handleIteratorFailure`
  since a corrupt iterator must be discarded regardless of the
  supervision decision, and add a comment explaining why.
- Clarify in expand.md that iterator evaluation failures necessarily
  discard extrapolation state under Resume, while expander-function
  failures preserve it.

Result:
Expand's supervision handling is consistent with neighbouring operators
and its docs disambiguate the two failure sites.

Tests:
- sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowExpandSpec org.apache.pekko.stream.scaladsl.FlowExtrapolateSpec" (23/23 passed)

References:
Refs apache#3110
@He-Pin He-Pin marked this pull request as ready for review June 26, 2026 19:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant