From 65785258085ecfb458680f2ef1a213a50364e1c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Thu, 25 Jun 2026 12:43:46 +0800 Subject: [PATCH 1/2] feat: add supervision strategy support for doOnFirst Motivation: The doOnFirst operator runs a user-provided side-effect function on the first element, but it was called without a try-catch. Any exception failed the stream unconditionally, ignoring the configured SupervisionStrategy. Modification: Wrap f(elem) in DoOnFirst with a try-catch that consults the SupervisionStrategy decider. Stop fails the stage (unchanged), Resume drops the failing first element and switches to pass-through without retrying the function, and Restart re-arms first-element detection so the next element is treated as the first. The decider is a lazy val for zero overhead on the happy path. The first-element handler was extracted into a named val so it can be re-armed on Restart. Also fix a fully-qualified type reference in the createLogic signature to use the project's two-line import style, and update the Scala/Java DSL scaladoc and the operator reference page. Result: doOnFirst now adheres to the SupervisionStrategy attribute with clear, distinguishable Resume and Restart semantics. Tests: - sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowDoOnFirstSpec" -- 6/6 passed References: Refs #3110 --- .../operators/Source-or-Flow/doOnFirst.md | 2 + .../stream/scaladsl/FlowDoOnFirstSpec.scala | 64 +++++++++++++++++-- .../pekko/stream/impl/fusing/DoOnFirst.scala | 42 +++++++++--- .../apache/pekko/stream/javadsl/Flow.scala | 4 ++ .../apache/pekko/stream/javadsl/Source.scala | 4 ++ .../apache/pekko/stream/javadsl/SubFlow.scala | 4 ++ .../pekko/stream/javadsl/SubSource.scala | 4 ++ .../apache/pekko/stream/scaladsl/Flow.scala | 4 ++ 8 files changed, 115 insertions(+), 13 deletions(-) diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/doOnFirst.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/doOnFirst.md index 13a57301b3b..42ac2e53558 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/doOnFirst.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/doOnFirst.md @@ -13,6 +13,8 @@ Run the given function when the first element is received. Run the given function when the first element is received. +The `doOnFirst` operator adheres to the ActorAttributes.SupervisionStrategy attribute. On `Supervision.Resume` the failing first element is dropped and the function is not retried; on `Supervision.Restart` the next element is treated as the first. + ## Examples Scala diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDoOnFirstSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDoOnFirstSpec.scala index 9c832d80033..87854730aa1 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDoOnFirstSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDoOnFirstSpec.scala @@ -17,8 +17,12 @@ package org.apache.pekko.stream.scaladsl -import org.apache.pekko.Done -import org.apache.pekko.stream.testkit._ +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.pekko +import pekko.Done +import pekko.stream.{ ActorAttributes, Supervision } +import pekko.stream.testkit._ class FlowDoOnFirstSpec extends StreamSpec(""" pekko.stream.materializer.initial-input-buffer-size = 2 @@ -27,7 +31,7 @@ class FlowDoOnFirstSpec extends StreamSpec(""" "A DoOnFirst" must { "can only invoke on first" in { - val invoked = new java.util.concurrent.atomic.AtomicInteger(0) + val invoked = new AtomicInteger(0) Source(1 to 10) .via(Flow[Int].doOnFirst(invoked.addAndGet)) .runWith(Sink.ignore) @@ -37,7 +41,7 @@ class FlowDoOnFirstSpec extends StreamSpec(""" } "will not invoke on empty stream" in { - val invoked = new java.util.concurrent.atomic.AtomicInteger(0) + val invoked = new AtomicInteger(0) Source.empty .via(Flow[Int].doOnFirst(invoked.addAndGet)) .runWith(Sink.ignore) @@ -46,6 +50,58 @@ class FlowDoOnFirstSpec extends StreamSpec(""" invoked.get() shouldBe 0 } + "fail the stream when f throws and supervision is Stop" in { + val ex = new RuntimeException("doOnFirst-stop") + val result = Source(1 to 5) + .via(Flow[Int].doOnFirst(_ => throw ex)) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .runWith(Sink.ignore) + + result.failed.futureValue shouldBe ex + } + + "fail the stream when f throws and no supervision attribute is set (default Stop)" in { + val ex = new RuntimeException("doOnFirst-default") + val result = Source(1 to 5) + .via(Flow[Int].doOnFirst(_ => throw ex)) + .runWith(Sink.ignore) + + result.failed.futureValue shouldBe ex + } + + "resume drops the first element, does not retry f, and passes the rest through" in { + val invocationCount = new AtomicInteger(0) + val ex = new RuntimeException("doOnFirst-resume") + + val result = Source(1 to 5) + .via(Flow[Int].doOnFirst { _ => + if (invocationCount.incrementAndGet() == 1) throw ex + }) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(Sink.seq) + .futureValue + + result shouldBe Seq(2, 3, 4, 5) + invocationCount.get() shouldBe 1 + } + + "restart re-arms so f is retried on the next element" in { + val invocationCount = new AtomicInteger(0) + val ex = new RuntimeException("doOnFirst-restart") + + val result = Source(1 to 3) + .via(Flow[Int].doOnFirst { _ => + invocationCount.incrementAndGet() + throw ex + }) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .runWith(Sink.seq) + .futureValue + + result shouldBe Seq.empty + invocationCount.get() shouldBe 3 + } + } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/DoOnFirst.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/DoOnFirst.scala index 009d15b5dc0..5eebe576550 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/DoOnFirst.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/DoOnFirst.scala @@ -17,10 +17,14 @@ package org.apache.pekko.stream.impl.fusing +import scala.util.control.NonFatal + import org.apache.pekko import pekko.annotation.InternalApi +import pekko.stream.ActorAttributes.SupervisionStrategy import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet } import pekko.stream.Attributes.SourceLocation +import pekko.stream.Supervision import pekko.stream.impl.Stages.DefaultAttributes import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } @@ -34,20 +38,40 @@ import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } override def initialAttributes: Attributes = DefaultAttributes.doOnFirst and SourceLocation.forLambda(f) - override def createLogic(inheritedAttributes: org.apache.pekko.stream.Attributes) = + override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler { self => + private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider + final override def onPush(): Unit = push(out, grab(in)) final override def onPull(): Unit = pull(in) setHandler(out, this) - setHandler(in, - new InHandler { - override def onPush(): Unit = { - setHandler(in, self) - val elem = grab(in) - f(elem) - push(out, elem) + + // Resume consumes the one-shot and switches to pass-through; Restart keeps this handler armed for retry. + val firstHandler: InHandler = new InHandler { + override def onPush(): Unit = { + val elem = grab(in) + try f(elem) + catch { + case NonFatal(ex) => + decider(ex) match { + case Supervision.Stop => + failStage(ex) + return + case Supervision.Resume => + setHandler(in, self) + pull(in) + return + case Supervision.Restart => + pull(in) + return + } } - }) + setHandler(in, self) + push(out, elem) + } + } + + setHandler(in, firstHandler) } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 512b8ac022c..913abf53a16 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -1150,6 +1150,10 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr /** * Run the given function when the first element is received. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * first element is dropped and the function is not retried; on `Supervision.Restart` the next + * element is treated as the first. + * * '''Emits when''' upstream emits an element * * '''Backpressures when''' downstream backpressures diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index e6ed914a8e7..8e654add94c 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -3046,6 +3046,10 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ /** * Run the given function when the first element is received. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * first element is dropped and the function is not retried; on `Supervision.Restart` the next + * element is treated as the first. + * * '''Emits when''' upstream emits an element * * '''Backpressures when''' downstream backpressures diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index e2a1644e12f..952c1840da3 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -552,6 +552,10 @@ final class SubFlow[In, Out, Mat]( /** * Run the given function when the first element is received. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * first element is dropped and the function is not retried; on `Supervision.Restart` the next + * element is treated as the first. + * * '''Emits when''' upstream emits an element * * '''Backpressures when''' downstream backpressures diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index c756c91c562..d4cd7265597 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -543,6 +543,10 @@ final class SubSource[Out, Mat]( /** * Run the given function when the first element is received. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * first element is dropped and the function is not retried; on `Supervision.Restart` the next + * element is treated as the first. + * * '''Emits when''' upstream emits an element * * '''Backpressures when''' downstream backpressures diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 4ae16e46ce4..66c9c3649aa 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -1594,6 +1594,10 @@ trait FlowOps[+Out, +Mat] { /** * Run the given function when the first element is received. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * first element is dropped and the function is not retried; on `Supervision.Restart` the next + * element is treated as the first. + * * '''Emits when''' upstream emits an element * * '''Backpressures when''' downstream backpressures From 764b7f17f6e911ca4834c362b3c1865468ed1fd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Sat, 27 Jun 2026 05:34:49 +0800 Subject: [PATCH 2/2] test: add directional test for doOnFirst restart that succeeds on retry Motivation: The existing restart test only covered the path where f throws for every element. The more common restart scenario is a transient failure on the first element followed by success, after which the operator must switch to pass-through. Modification: Add "restart and continue with pass-through once f succeeds on retry" which asserts that the second element is treated as the first (f runs again), and that once f succeeds the operator transitions to the pass-through handler so later elements do not re-invoke f. Result: The doOnFirst restart semantics are now covered for both the all-failing and the succeed-on-retry paths. Tests: - sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowDoOnFirstSpec" (7/7 passed) References: Refs #3110 --- .../stream/scaladsl/FlowDoOnFirstSpec.scala | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDoOnFirstSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDoOnFirstSpec.scala index 87854730aa1..6ced5fa70db 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDoOnFirstSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDoOnFirstSpec.scala @@ -102,6 +102,25 @@ class FlowDoOnFirstSpec extends StreamSpec(""" invocationCount.get() shouldBe 3 } + "restart and continue with pass-through once f succeeds on retry" in { + // Element 1 triggers f which throws -> Restart drops element 1 and re-arms. + // Element 2 triggers f which succeeds -> handler switches to pass-through. + // Elements 3 and 4 pass through without invoking f. + val invocationCount = new AtomicInteger(0) + val ex = new RuntimeException("doOnFirst-restart-then-ok") + + val result = Source(1 to 4) + .via(Flow[Int].doOnFirst { _ => + if (invocationCount.incrementAndGet() == 1) throw ex + }) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .runWith(Sink.seq) + .futureValue + + result shouldBe Seq(2, 3, 4) + invocationCount.get() shouldBe 2 + } + } }