diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/expand.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/expand.md index d1ea059226..6dab793c98 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/expand.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/expand.md @@ -17,6 +17,13 @@ element, allowing for it to be rewritten and/or filtered. See @ref:[Understanding extrapolate and expand](../../stream-rate.md#understanding-extrapolate-and-expand) for more information and examples. +This operator adheres to the `ActorAttributes.SupervisionStrategy` attribute for exceptions thrown by the `expander` +function or during iterator evaluation (`hasNext`/`next`). On `Supervision.Stop` the stream fails; on +`Supervision.Resume` the failed element is dropped and the current extrapolation state is kept when the failure +occurred in the `expander` function (a previously active iterator is retained), but is necessarily discarded when +the failure occurred during iterator evaluation; on `Supervision.Restart` the failed element is dropped and the +current extrapolation state is reset. + ## Example Imagine a streaming client decoding a video. It is possible the network bandwidth is a bit @@ -43,4 +50,3 @@ Java **completes** when upstream completes @@@ - diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/extrapolate.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/extrapolate.md index 97ca17edab..3e06fd9d97 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/extrapolate.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/extrapolate.md @@ -22,6 +22,10 @@ Includes an optional `initial` argument to prevent blocking the entire stream wh See @ref:[Understanding extrapolate and expand](../../stream-rate.md#understanding-extrapolate-and-expand) for more information and examples. +This operator adheres to the `ActorAttributes.SupervisionStrategy` attribute for exceptions thrown by the `extrapolator` +function. On `Supervision.Stop` the stream fails; on `Supervision.Resume` and `Supervision.Restart` the failed element is +dropped and the stream continues. + ## Example Imagine a videoconference client decoding a video feed from a colleague working remotely. It is possible diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowExpandSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowExpandSpec.scala index 56e8accff8..e23497e58a 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowExpandSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowExpandSpec.scala @@ -20,6 +20,7 @@ import scala.concurrent.duration._ import org.apache.pekko import pekko.stream.ActorAttributes +import pekko.stream.Supervision import pekko.stream.testkit._ import pekko.stream.testkit.scaladsl.TestSink import pekko.stream.testkit.scaladsl.TestSource @@ -150,6 +151,108 @@ class FlowExpandSpec extends StreamSpec(""" source.sendNext(2).sendComplete() sink.expectNext(2 -> 0).expectComplete() } + + "fail stream when expander throws and supervision is Stop" in { + val ex = new RuntimeException("boom") + val result = Source(1 to 5) + .expand(i => if (i == 3) throw ex else Iterator.single(i)) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .runWith(Sink.ignore) + result.failed.futureValue shouldBe ex + } + + "fail stream when expander throws and supervision defaults to Stop" in { + val ex = new RuntimeException("boom") + val result = Source(1 to 5) + .expand(i => if (i == 3) throw ex else Iterator.single(i)) + .runWith(Sink.ignore) + result.failed.futureValue shouldBe ex + } + + "resume and keep current extrapolation when expander throws" in { + val ex = new RuntimeException("boom") + val publisher = TestPublisher.probe[Int]() + val subscriber = TestSubscriber.probe[Int]() + + Source + .fromPublisher(publisher) + .expand(i => if (i == 1) Iterator(1, 10, 11) else if (i == 2) throw ex else Iterator.single(i)) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .to(Sink.fromSubscriber(subscriber)) + .run() + + publisher.sendNext(1) + subscriber.requestNext(1) + publisher.sendNext(2) // throws in expander, element 2 is dropped under Resume + subscriber.requestNext(10) // continue from current extrapolation + subscriber.cancel() + } + + "complete immediately on upstream finish if expanded is true" in { + val subscriber = TestSubscriber.probe[Int]() + + Source.single(1) + .expand(_ => Iterator(1, 2, 3)) + .to(Sink.fromSubscriber(subscriber)) + .run() + + subscriber.requestNext(1) // expanded becomes true + subscriber.expectComplete() // Does it complete immediately? + } + + "restart and reset current extrapolation when expander throws" in { + val ex = new RuntimeException("boom") + val publisher = TestPublisher.probe[Int]() + val subscriber = TestSubscriber.probe[Int]() + + Source + .fromPublisher(publisher) + .expand(i => if (i == 1) Iterator(1, 10, 11) else if (i == 2) throw ex else Iterator.single(i)) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .to(Sink.fromSubscriber(subscriber)) + .run() + + publisher.sendNext(1) + subscriber.requestNext(1) + publisher.sendNext(2) // throws in expander, restart resets iterator state + subscriber.request(1) + subscriber.expectNoMessage(300.millis) + publisher.sendNext(3) + subscriber.requestNext(3) + subscriber.cancel() + } + + "resume when iterator produced by expander throws during iteration" in { + val ex = new RuntimeException("boom") + val result = Source(1 to 4) + .expand(i => + if (i == 3) + new Iterator[Int] { + override def hasNext: Boolean = true + override def next(): Int = throw ex + } + else Iterator.single(i)) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(Sink.seq) + + Await.result(result, 3.seconds) shouldBe Seq(1, 2, 4) + } + + "restart when iterator produced by expander throws during iteration" in { + val ex = new RuntimeException("boom") + val result = Source(1 to 4) + .expand(i => + if (i == 3) + new Iterator[Int] { + override def hasNext: Boolean = true + override def next(): Int = throw ex + } + else Iterator.single(i)) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .runWith(Sink.seq) + + Await.result(result, 3.seconds) shouldBe Seq(1, 2, 4) + } } } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowExtrapolateSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowExtrapolateSpec.scala index d3dbc7eebf..69e2cee951 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowExtrapolateSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowExtrapolateSpec.scala @@ -19,6 +19,8 @@ import scala.concurrent.Await import scala.concurrent.duration._ import org.apache.pekko +import pekko.stream.ActorAttributes +import pekko.stream.Supervision import pekko.stream.testkit._ import pekko.stream.testkit.scaladsl.TestSink import pekko.stream.testkit.scaladsl.TestSource @@ -170,6 +172,84 @@ class FlowExtrapolateSpec extends StreamSpec(""" source.sendNext(2).sendComplete() sink.expectNext(2 -> 0).expectComplete() } + + "stop when extrapolator throws during iterator evaluation" in { + val ex = new RuntimeException("boom") + val publisher = TestPublisher.probe[Int]() + val subscriber = TestSubscriber.probe[Int]() + + Source + .fromPublisher(publisher) + .extrapolate(i => + if (i == 3) + new Iterator[Int] { + override def hasNext: Boolean = throw ex + override def next(): Int = throw ex + } + else Iterator.empty) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .to(Sink.fromSubscriber(subscriber)) + .run() + + publisher.sendNext(3) + subscriber.requestNext(3) + subscriber.request(1) + subscriber.expectError(ex) + } + + "resume when extrapolator throws during iterator evaluation" in { + val ex = new RuntimeException("boom") + val publisher = TestPublisher.probe[Int]() + val subscriber = TestSubscriber.probe[Int]() + + Source + .fromPublisher(publisher) + .extrapolate(i => + if (i == 3) + new Iterator[Int] { + override def hasNext: Boolean = throw ex + override def next(): Int = throw ex + } + else Iterator.empty) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .to(Sink.fromSubscriber(subscriber)) + .run() + + publisher.sendNext(3) + subscriber.requestNext(3) + subscriber.request(1) + subscriber.expectNoMessage(300.millis) + publisher.sendNext(4) + subscriber.requestNext(4) + subscriber.cancel() + } + + "restart when extrapolator throws during iterator evaluation" in { + val ex = new RuntimeException("boom") + val publisher = TestPublisher.probe[Int]() + val subscriber = TestSubscriber.probe[Int]() + + Source + .fromPublisher(publisher) + .extrapolate(i => + if (i == 3) + new Iterator[Int] { + override def hasNext: Boolean = throw ex + override def next(): Int = throw ex + } + else Iterator.empty) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .to(Sink.fromSubscriber(subscriber)) + .run() + + publisher.sendNext(3) + subscriber.requestNext(3) + subscriber.request(1) + subscriber.expectNoMessage(300.millis) + publisher.sendNext(4) + subscriber.requestNext(4) + subscriber.cancel() + } } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala index 010260e03d..6f55ee08be 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala @@ -1200,6 +1200,7 @@ private[stream] object Collect { override val shape = FlowShape(in, out) override def createLogic(attr: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler { + private def decider = attr.mandatoryAttribute[SupervisionStrategy].decider private var iterator: Iterator[Out] = Iterator.empty private var expanded = false private val contextPropagation = ContextPropagation() @@ -1207,39 +1208,81 @@ private[stream] object Collect { override def preStart(): Unit = pull(in) def onPush(): Unit = { - iterator = extrapolate(grab(in)) - if (iterator.hasNext) { - contextPropagation.suspendContext() - if (isAvailable(out)) { - expanded = true - pull(in) - push(out, iterator.next()) - } else expanded = false - } else pull(in) + val elem = grab(in) + try iterator = extrapolate(elem) + catch { + case NonFatal(ex) => + decider(ex) match { + case Supervision.Stop => + failStage(ex) + case Supervision.Resume => + if (!isClosed(in) && !hasBeenPulled(in)) pull(in) + case Supervision.Restart => + restartState() + if (!isClosed(in) && !hasBeenPulled(in)) pull(in) + } + return + } + try { + if (iterator.hasNext) { + contextPropagation.suspendContext() + if (isAvailable(out)) { + expanded = true + pull(in) + push(out, iterator.next()) + } else expanded = false + } else pull(in) + } catch { + case NonFatal(ex) => handleIteratorFailure(ex) + } } override def onUpstreamFinish(): Unit = { - if (iterator.hasNext && !expanded) () // need to wait - else completeStage() + try { + if (iterator.hasNext && !expanded) () // need to wait + else completeStage() + } catch { + case NonFatal(ex) => handleIteratorFailure(ex) + } } def onPull(): Unit = { - if (iterator.hasNext) { - contextPropagation.resumeContext() - if (!expanded) { - expanded = true - if (isClosed(in)) { - push(out, iterator.next()) - completeStage() - } else { - // expand needs to pull first to be “fair” when upstream is not actually slow - pull(in) - push(out, iterator.next()) - } - } else push(out, iterator.next()) + try { + if (iterator.hasNext) { + contextPropagation.resumeContext() + if (!expanded) { + expanded = true + if (isClosed(in)) { + push(out, iterator.next()) + completeStage() + } else { + // expand needs to pull first to be “fair” when upstream is not actually slow + pull(in) + push(out, iterator.next()) + } + } else push(out, iterator.next()) + } + } catch { + case NonFatal(ex) => handleIteratorFailure(ex) } } + private def restartState(): Unit = { + iterator = Iterator.empty + expanded = false + } + + private def handleIteratorFailure(ex: Throwable): Unit = + decider(ex) match { + case Supervision.Stop => + failStage(ex) + case Supervision.Resume | Supervision.Restart => + // iterator is corrupt after any hasNext/next failure; must reset for both Resume and Restart + restartState() + if (isClosed(in)) completeStage() + else if (!hasBeenPulled(in)) pull(in) + } + setHandler(in, this) setHandler(out, this) } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 512b8ac022..abbf5021b5 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -2566,12 +2566,16 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * element until new element comes from the upstream. For example an expand step might repeat the last element for * the subscriber until it receives an update from upstream. * - * This element will never "drop" upstream elements as all elements go through at least one extrapolation step. - * This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream - * subscriber. + * Under normal operation all upstream elements go through at least one extrapolation step. + * If supervision drops a failed element, that element is not emitted. + * If the upstream is actually faster than the upstream it will be backpressured by the downstream subscriber. * - * Expand does not support [[pekko.stream.Supervision#restart]] and [[pekko.stream.Supervision#resume]]. - * Exceptions from the `expander` function will complete the stream with failure. + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the `expander` function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] the failed element is dropped + * and the stream continues, keeping current extrapolation state when available. If the supervision decision is + * [[pekko.stream.Supervision#restart]] the failed element is dropped and the current extrapolation state is reset. * * See also [[#extrapolate]] for a version that always preserves the original element and allows for an initial "startup" element. * @@ -2596,8 +2600,11 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream * signals demand. * - * Extrapolate does not support [[pekko.stream.Supervision#restart]] and [[pekko.stream.Supervision#resume]]. - * Exceptions from the `extrapolate` function will complete the stream with failure. + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the `extrapolator` function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or + * [[pekko.stream.Supervision#restart]] the failed element is dropped and the stream continues. * * See also [[#expand]] for a version that can overwrite the original element. * @@ -2624,8 +2631,11 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream * signals demand. * - * Extrapolate does not support [[pekko.stream.Supervision#restart]] and [[pekko.stream.Supervision#resume]]. - * Exceptions from the `extrapolate` function will complete the stream with failure. + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the `extrapolator` function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or + * [[pekko.stream.Supervision#restart]] the failed element is dropped and the stream continues. * * See also [[#expand]] for a version that can overwrite the original element. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index e6ed914a8e..fc66696d4b 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -3969,12 +3969,16 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * element until new element comes from the upstream. For example an expand step might repeat the last element for * the subscriber until it receives an update from upstream. * - * This element will never "drop" upstream elements as all elements go through at least one extrapolation step. - * This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream - * subscriber. + * Under normal operation all upstream elements go through at least one extrapolation step. + * If supervision drops a failed element, that element is not emitted. + * If the upstream is actually faster than the upstream it will be backpressured by the downstream subscriber. * - * Expand does not support [[pekko.stream.Supervision#restart]] and [[pekko.stream.Supervision#resume]]. - * Exceptions from the `expander` function will complete the stream with failure. + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the `expander` function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] the failed element is dropped + * and the stream continues, keeping current extrapolation state when available. If the supervision decision is + * [[pekko.stream.Supervision#restart]] the failed element is dropped and the current extrapolation state is reset. * * See also [[#extrapolate]] for a version that always preserves the original element and allows for an initial "startup" element. * @@ -3999,8 +4003,11 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream * signals demand. * - * Extrapolate does not support [[pekko.stream.Supervision#restart]] and [[pekko.stream.Supervision#resume]]. - * Exceptions from the `extrapolate` function will complete the stream with failure. + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the `extrapolator` function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or + * [[pekko.stream.Supervision#restart]] the failed element is dropped and the stream continues. * * See also [[#expand]] for a version that can overwrite the original element. * @@ -4027,8 +4034,11 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream * signals demand. * - * Extrapolate does not support [[pekko.stream.Supervision#restart]] and [[pekko.stream.Supervision#resume]]. - * Exceptions from the `extrapolate` function will complete the stream with failure. + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the `extrapolator` function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or + * [[pekko.stream.Supervision#restart]] the failed element is dropped and the stream continues. * * See also [[#expand]] for a version that can overwrite the original element. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index e2a1644e12..4b9d4495f0 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -1891,12 +1891,16 @@ final class SubFlow[In, Out, Mat]( * element until new element comes from the upstream. For example an expand step might repeat the last element for * the subscriber until it receives an update from upstream. * - * This element will never "drop" upstream elements as all elements go through at least one extrapolation step. - * This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream - * subscriber. + * Under normal operation all upstream elements go through at least one extrapolation step. + * If supervision drops a failed element, that element is not emitted. + * If the upstream is actually faster than the upstream it will be backpressured by the downstream subscriber. * - * Expand does not support [[pekko.stream.Supervision#restart]] and [[pekko.stream.Supervision#resume]]. - * Exceptions from the `expander` function will complete the stream with failure. + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the `expander` function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] the failed element is dropped + * and the stream continues, keeping current extrapolation state when available. If the supervision decision is + * [[pekko.stream.Supervision#restart]] the failed element is dropped and the current extrapolation state is reset. * * See also [[#extrapolate]] for a version that always preserves the original element and allows for an initial "startup" element. * @@ -1922,8 +1926,11 @@ final class SubFlow[In, Out, Mat]( * This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream * signals demand. * - * Extrapolate does not support [[pekko.stream.Supervision#restart]] and [[pekko.stream.Supervision#resume]]. - * Exceptions from the `extrapolate` function will complete the stream with failure. + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the `extrapolator` function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or + * [[pekko.stream.Supervision#restart]] the failed element is dropped and the stream continues. * * See also [[#expand]] for a version that can overwrite the original element. * @@ -1950,8 +1957,11 @@ final class SubFlow[In, Out, Mat]( * This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream * signals demand. * - * Extrapolate does not support [[pekko.stream.Supervision#restart]] and [[pekko.stream.Supervision#resume]]. - * Exceptions from the `extrapolate` function will complete the stream with failure. + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the `extrapolator` function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or + * [[pekko.stream.Supervision#restart]] the failed element is dropped and the stream continues. * * See also [[#expand]] for a version that can overwrite the original element. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index c756c91c56..ecb21b35b6 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -1859,12 +1859,16 @@ final class SubSource[Out, Mat]( * element until new element comes from the upstream. For example an expand step might repeat the last element for * the subscriber until it receives an update from upstream. * - * This element will never "drop" upstream elements as all elements go through at least one extrapolation step. - * This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream - * subscriber. + * Under normal operation all upstream elements go through at least one extrapolation step. + * If supervision drops a failed element, that element is not emitted. + * If the upstream is actually faster than the upstream it will be backpressured by the downstream subscriber. * - * Expand does not support [[pekko.stream.Supervision#restart]] and [[pekko.stream.Supervision#resume]]. - * Exceptions from the `expander` function will complete the stream with failure. + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the `expander` function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] the failed element is dropped + * and the stream continues, keeping current extrapolation state when available. If the supervision decision is + * [[pekko.stream.Supervision#restart]] the failed element is dropped and the current extrapolation state is reset. * * See also [[#extrapolate]] for a version that always preserves the original element and allows for an initial "startup" element. * @@ -1889,8 +1893,11 @@ final class SubSource[Out, Mat]( * This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream * signals demand. * - * Extrapolate does not support [[pekko.stream.Supervision#restart]] and [[pekko.stream.Supervision#resume]]. - * Exceptions from the `extrapolate` function will complete the stream with failure. + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the `extrapolator` function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or + * [[pekko.stream.Supervision#restart]] the failed element is dropped and the stream continues. * * See also [[#expand]] for a version that can overwrite the original element. * @@ -1917,8 +1924,11 @@ final class SubSource[Out, Mat]( * This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream * signals demand. * - * Extrapolate does not support [[pekko.stream.Supervision#restart]] and [[pekko.stream.Supervision#resume]]. - * Exceptions from the `extrapolate` function will complete the stream with failure. + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the `extrapolator` function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or + * [[pekko.stream.Supervision#restart]] the failed element is dropped and the stream continues. * * See also [[#expand]] for a version that can overwrite the original element. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 4ae16e46ce..d2ee127248 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -2501,12 +2501,16 @@ trait FlowOps[+Out, +Mat] { * element until new element comes from the upstream. For example an expand step might repeat the last element for * the subscriber until it receives an update from upstream. * - * This element will never "drop" upstream elements as all elements go through at least one extrapolation step. - * This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream - * subscriber. + * Under normal operation all upstream elements go through at least one extrapolation step. + * If supervision drops a failed element, that element is not emitted. + * If the upstream is actually faster than the upstream it will be backpressured by the downstream subscriber. * - * Expand does not support [[pekko.stream.Supervision.Restart]] and [[pekko.stream.Supervision.Resume]]. - * Exceptions from the `seed` function will complete the stream with failure. + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the `expander` function throws and the supervision decision is [[pekko.stream.Supervision.Stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision.Resume]] the failed element is dropped + * and the stream continues, keeping current extrapolation state when available. If the supervision decision is + * [[pekko.stream.Supervision.Restart]] the failed element is dropped and the current extrapolation state is reset. * * '''Emits when''' downstream stops backpressuring * @@ -2529,8 +2533,11 @@ trait FlowOps[+Out, +Mat] { * This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream * signals demand. * - * Extrapolate does not support [[pekko.stream.Supervision.Restart]] and [[pekko.stream.Supervision.Resume]]. - * Exceptions from the `extrapolate` function will complete the stream with failure. + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the `extrapolator` function throws and the supervision decision is [[pekko.stream.Supervision.Stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision.Resume]] or + * [[pekko.stream.Supervision.Restart]] the failed element is dropped and the stream continues. * * '''Emits when''' downstream stops backpressuring, AND EITHER upstream emits OR initial element is present OR * `extrapolate` is non-empty and applicable