diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/doOnFirst.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/doOnFirst.md index 13a57301b3..42ac2e5355 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/doOnFirst.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/doOnFirst.md @@ -13,6 +13,8 @@ Run the given function when the first element is received. Run the given function when the first element is received. +The `doOnFirst` operator adheres to the ActorAttributes.SupervisionStrategy attribute. On `Supervision.Resume` the failing first element is dropped and the function is not retried; on `Supervision.Restart` the next element is treated as the first. + ## Examples Scala diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDoOnFirstSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDoOnFirstSpec.scala index 9c832d8003..6ced5fa70d 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDoOnFirstSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDoOnFirstSpec.scala @@ -17,8 +17,12 @@ package org.apache.pekko.stream.scaladsl -import org.apache.pekko.Done -import org.apache.pekko.stream.testkit._ +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.pekko +import pekko.Done +import pekko.stream.{ ActorAttributes, Supervision } +import pekko.stream.testkit._ class FlowDoOnFirstSpec extends StreamSpec(""" pekko.stream.materializer.initial-input-buffer-size = 2 @@ -27,7 +31,7 @@ class FlowDoOnFirstSpec extends StreamSpec(""" "A DoOnFirst" must { "can only invoke on first" in { - val invoked = new java.util.concurrent.atomic.AtomicInteger(0) + val invoked = new AtomicInteger(0) Source(1 to 10) .via(Flow[Int].doOnFirst(invoked.addAndGet)) .runWith(Sink.ignore) @@ -37,7 +41,7 @@ class FlowDoOnFirstSpec extends StreamSpec(""" } "will not invoke on empty stream" in { - val invoked = new java.util.concurrent.atomic.AtomicInteger(0) + val invoked = new AtomicInteger(0) Source.empty .via(Flow[Int].doOnFirst(invoked.addAndGet)) .runWith(Sink.ignore) @@ -46,6 +50,77 @@ class FlowDoOnFirstSpec extends StreamSpec(""" invoked.get() shouldBe 0 } + "fail the stream when f throws and supervision is Stop" in { + val ex = new RuntimeException("doOnFirst-stop") + val result = Source(1 to 5) + .via(Flow[Int].doOnFirst(_ => throw ex)) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .runWith(Sink.ignore) + + result.failed.futureValue shouldBe ex + } + + "fail the stream when f throws and no supervision attribute is set (default Stop)" in { + val ex = new RuntimeException("doOnFirst-default") + val result = Source(1 to 5) + .via(Flow[Int].doOnFirst(_ => throw ex)) + .runWith(Sink.ignore) + + result.failed.futureValue shouldBe ex + } + + "resume drops the first element, does not retry f, and passes the rest through" in { + val invocationCount = new AtomicInteger(0) + val ex = new RuntimeException("doOnFirst-resume") + + val result = Source(1 to 5) + .via(Flow[Int].doOnFirst { _ => + if (invocationCount.incrementAndGet() == 1) throw ex + }) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(Sink.seq) + .futureValue + + result shouldBe Seq(2, 3, 4, 5) + invocationCount.get() shouldBe 1 + } + + "restart re-arms so f is retried on the next element" in { + val invocationCount = new AtomicInteger(0) + val ex = new RuntimeException("doOnFirst-restart") + + val result = Source(1 to 3) + .via(Flow[Int].doOnFirst { _ => + invocationCount.incrementAndGet() + throw ex + }) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .runWith(Sink.seq) + .futureValue + + result shouldBe Seq.empty + invocationCount.get() shouldBe 3 + } + + "restart and continue with pass-through once f succeeds on retry" in { + // Element 1 triggers f which throws -> Restart drops element 1 and re-arms. + // Element 2 triggers f which succeeds -> handler switches to pass-through. + // Elements 3 and 4 pass through without invoking f. + val invocationCount = new AtomicInteger(0) + val ex = new RuntimeException("doOnFirst-restart-then-ok") + + val result = Source(1 to 4) + .via(Flow[Int].doOnFirst { _ => + if (invocationCount.incrementAndGet() == 1) throw ex + }) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .runWith(Sink.seq) + .futureValue + + result shouldBe Seq(2, 3, 4) + invocationCount.get() shouldBe 2 + } + } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/DoOnFirst.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/DoOnFirst.scala index 009d15b5dc..5eebe57655 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/DoOnFirst.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/DoOnFirst.scala @@ -17,10 +17,14 @@ package org.apache.pekko.stream.impl.fusing +import scala.util.control.NonFatal + import org.apache.pekko import pekko.annotation.InternalApi +import pekko.stream.ActorAttributes.SupervisionStrategy import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet } import pekko.stream.Attributes.SourceLocation +import pekko.stream.Supervision import pekko.stream.impl.Stages.DefaultAttributes import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } @@ -34,20 +38,40 @@ import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } override def initialAttributes: Attributes = DefaultAttributes.doOnFirst and SourceLocation.forLambda(f) - override def createLogic(inheritedAttributes: org.apache.pekko.stream.Attributes) = + override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler { self => + private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider + final override def onPush(): Unit = push(out, grab(in)) final override def onPull(): Unit = pull(in) setHandler(out, this) - setHandler(in, - new InHandler { - override def onPush(): Unit = { - setHandler(in, self) - val elem = grab(in) - f(elem) - push(out, elem) + + // Resume consumes the one-shot and switches to pass-through; Restart keeps this handler armed for retry. + val firstHandler: InHandler = new InHandler { + override def onPush(): Unit = { + val elem = grab(in) + try f(elem) + catch { + case NonFatal(ex) => + decider(ex) match { + case Supervision.Stop => + failStage(ex) + return + case Supervision.Resume => + setHandler(in, self) + pull(in) + return + case Supervision.Restart => + pull(in) + return + } } - }) + setHandler(in, self) + push(out, elem) + } + } + + setHandler(in, firstHandler) } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 512b8ac022..913abf53a1 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -1150,6 +1150,10 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr /** * Run the given function when the first element is received. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * first element is dropped and the function is not retried; on `Supervision.Restart` the next + * element is treated as the first. + * * '''Emits when''' upstream emits an element * * '''Backpressures when''' downstream backpressures diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index e6ed914a8e..8e654add94 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -3046,6 +3046,10 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ /** * Run the given function when the first element is received. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * first element is dropped and the function is not retried; on `Supervision.Restart` the next + * element is treated as the first. + * * '''Emits when''' upstream emits an element * * '''Backpressures when''' downstream backpressures diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index e2a1644e12..952c1840da 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -552,6 +552,10 @@ final class SubFlow[In, Out, Mat]( /** * Run the given function when the first element is received. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * first element is dropped and the function is not retried; on `Supervision.Restart` the next + * element is treated as the first. + * * '''Emits when''' upstream emits an element * * '''Backpressures when''' downstream backpressures diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index c756c91c56..d4cd726559 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -543,6 +543,10 @@ final class SubSource[Out, Mat]( /** * Run the given function when the first element is received. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * first element is dropped and the function is not retried; on `Supervision.Restart` the next + * element is treated as the first. + * * '''Emits when''' upstream emits an element * * '''Backpressures when''' downstream backpressures diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 4ae16e46ce..66c9c3649a 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -1594,6 +1594,10 @@ trait FlowOps[+Out, +Mat] { /** * Run the given function when the first element is received. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * first element is dropped and the function is not retried; on `Supervision.Restart` the next + * element is treated as the first. + * * '''Emits when''' upstream emits an element * * '''Backpressures when''' downstream backpressures