Skip to content

feat: add supervision strategy support for doOnFirst#3179

Open
He-Pin wants to merge 2 commits into
apache:mainfrom
He-Pin:feat/stream-doonfirst-supervision
Open

feat: add supervision strategy support for doOnFirst#3179
He-Pin wants to merge 2 commits into
apache:mainfrom
He-Pin:feat/stream-doonfirst-supervision

Conversation

@He-Pin

@He-Pin He-Pin commented Jun 25, 2026

Copy link
Copy Markdown
Member

Motivation

Per the stream error handling docs, operators that apply user-provided functions should consult the configured SupervisionStrategy. The doOnFirst operator runs a user side-effect function f on the first element, but it was called without a try-catch, so any exception failed the stream unconditionally — even under Supervision.Resume/Restart.

This is part of the meta-issue #3110 (add supervisor strategy support to stream operators that accept user functions). One operator per PR.

Modification

  • Wrap f(elem) in DoOnFirst with a try/catch (NonFatal) that consults the SupervisionStrategy decider:
    • Stop → fail the stage (unchanged fail-fast behavior)
    • Resume → drop the failing first element and switch to pass-through; f is not retried (the one-shot is consumed)
    • Restart → re-arm first-element detection so the next element is treated as the first (f is retried on it)
  • The first-element InHandler was extracted into a named val firstHandler so it can be re-armed on Restart. On success, the switch to pass-through now happens after f (behaviorally identical on the happy path — push does not synchronously re-enter onPush).
  • decider is a lazy val → zero overhead on the happy path.
  • Fix a fully-qualified type reference in the createLogic signature to use the project's two-line import style.
  • Document supervision adherence (including the Resume/Restart semantics) in the Scala/Java DSL scaladoc and the operator reference page.

Result

doOnFirst now adheres to the SupervisionStrategy attribute with clear, distinguishable Resume (drop first, no retry) and Restart (next becomes first) semantics.

Tests

  • sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowDoOnFirstSpec"6/6 passed, including 4 new directional tests:
    • fails on f throw with Supervision.Stop
    • fails with no supervision attribute set (default Stop) — back-compat guard
    • Resume drops the first element, does not retry f, passes the rest through
    • Restart re-arms so f is retried on each subsequent "first" element
  • sbt "stream/mimaReportBinaryIssues" — clean (binary compatible)

References

Refs #3110

This is a clean-room implementation written directly for Apache Pekko.

He-Pin added 2 commits June 25, 2026 12:54
Motivation:
The doOnFirst operator runs a user-provided side-effect function on the first
element, but it was called without a try-catch. Any exception failed the
stream unconditionally, ignoring the configured SupervisionStrategy.

Modification:
Wrap f(elem) in DoOnFirst with a try-catch that consults the
SupervisionStrategy decider. Stop fails the stage (unchanged), Resume drops
the failing first element and switches to pass-through without retrying the
function, and Restart re-arms first-element detection so the next element is
treated as the first. The decider is a lazy val for zero overhead on the
happy path. The first-element handler was extracted into a named val so it
can be re-armed on Restart. Also fix a fully-qualified type reference in the
createLogic signature to use the project's two-line import style, and update
the Scala/Java DSL scaladoc and the operator reference page.

Result:
doOnFirst now adheres to the SupervisionStrategy attribute with clear,
distinguishable Resume and Restart semantics.

Tests:
- sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowDoOnFirstSpec" -- 6/6 passed

References:
Refs apache#3110
Motivation:
The existing restart test only covered the path where f throws for
every element. The more common restart scenario is a transient failure
on the first element followed by success, after which the operator
must switch to pass-through.

Modification:
Add "restart and continue with pass-through once f succeeds on retry"
which asserts that the second element is treated as the first (f runs
again), and that once f succeeds the operator transitions to the
pass-through handler so later elements do not re-invoke f.

Result:
The doOnFirst restart semantics are now covered for both the
all-failing and the succeed-on-retry paths.

Tests:
- sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowDoOnFirstSpec" (7/7 passed)

References:
Refs apache#3110
@He-Pin He-Pin marked this pull request as ready for review June 26, 2026 21:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant