Skip to content

fix: add supervision strategy support for zipWith and zipWithN zipper functions#3186

Open
He-Pin wants to merge 2 commits into
apache:mainfrom
He-Pin:feat/stream-zipwith-supervision
Open

fix: add supervision strategy support for zipWith and zipWithN zipper functions#3186
He-Pin wants to merge 2 commits into
apache:mainfrom
He-Pin:feat/stream-zipwith-supervision

Conversation

@He-Pin

@He-Pin He-Pin commented Jun 25, 2026

Copy link
Copy Markdown
Member

Motivation

Issue #3110 tracks stream operators that should honor SupervisionStrategy for user functions.

zipWith / zipWithN invoke user-provided zipper functions, but failures were not supervised: exceptions failed the stream unconditionally.

Modification

  • Added zipper exception supervision handling in both implementations:
    • ZipWithApply.scala.template (covers all ZipWith arities)
    • Graph.scala (ZipWithN)
  • Supervision semantics for zipper failures:
    • Stop → fail stage
    • Resume / Restart → drop failing zipped element and continue
  • Updated pending accounting on resumed/restarted failures so the stage can continue satisfying current demand without stalling.
  • Updated Scala/Java API docs and operator docs:
    • zipWith.md
    • zipWithN.md
  • Added directional tests in:
    • FlowZipWithSpec
    • GraphZipWithNSpec
      covering explicit Stop, default Stop, Resume, Restart.

Result

zipWith and zipWithN now honor ActorAttributes.SupervisionStrategy for zipper function failures, with regression coverage for dropped-element paths.

which is now Apache licensed

Tests

  • sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowZipWithSpec org.apache.pekko.stream.scaladsl.GraphZipWithNSpec" (37/37 passed)
  • sbt "stream/mimaReportBinaryIssues" (clean)
  • sbt "docs/paradox" (passed)

References

Refs #3110

… functions

Motivation:
zipWith and zipWithN accepted user zipper/combiner functions but did not consult
SupervisionStrategy. Zipper exceptions failed the stream unconditionally, even
when Resume/Restart was configured.

Modification:
Add zipper exception supervision handling in both implementations:
- ZipWith boilerplate template (all arities)
- Graph.ZipWithN

Stop fails the stage; Resume/Restart drop the failing zipped element and
continue by pulling new inputs. Pending bookkeeping is updated on resumed/
restarted failures so the stage can satisfy the current downstream demand
without stalling.

Update Scala/Java API docs and operator docs (`zipWith.md`, `zipWithN.md`) to
document supervision behavior for combiner function failures.

Add directional tests in:
- FlowZipWithSpec
- GraphZipWithNSpec
covering explicit Stop, default Stop, Resume, and Restart.

Result:
zipWith/zipWithN now honor ActorAttributes.SupervisionStrategy for zipper
failures with deterministic regression coverage for dropped-element paths.

Tests:
- sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowZipWithSpec org.apache.pekko.stream.scaladsl.GraphZipWithNSpec" -- 37/37 passed
- sbt "stream/mimaReportBinaryIssues" -- clean
- sbt "docs/paradox" -- passed

References:
Refs apache#3110
@He-Pin He-Pin marked this pull request as ready for review June 26, 2026 18:47
Motivation:
The supervision support added in the previous commit causes ZipWith
zipper exceptions to be handled by the decider instead of being logged
as ERROR. GraphZipWithSpec still wrapped its sad-case test in
EventFilter[ArithmeticException], which now times out waiting for an
ERROR log message that is never emitted.

Modification:
Remove the EventFilter wrapping and unused import from
GraphZipWithSpec's "work in the sad case" test, matching the same
cleanup already applied to FlowZipWithSpec and GraphZipWithNSpec.

Result:
GraphZipWithSpec sad-case test passes without EventFilter timeout.

Tests:
- sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.GraphZipWithSpec" -- 15/15 passed

References:
Refs apache#3110
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant