From 126c82f1fa65f5530665a4b86b8b16317d0dee1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Thu, 25 Jun 2026 19:11:28 +0800 Subject: [PATCH 1/2] fix: add supervision strategy support for expand and extrapolate Motivation: The expand operator evaluated user-provided expansion logic without consulting SupervisionStrategy, so exceptions failed the stream unconditionally. This also affected extrapolate because it is implemented on top of Expand. Modification: Add supervision handling in Expand for exceptions from the expander function and from iterator evaluation paths (hasNext/next). Stop fails the stage, Resume drops the failed element and continues, and Restart resets extrapolation state before continuing. Update Scala/Java API docs (Flow/Source/SubFlow/SubSource) and operator docs to reflect supervision behavior. Expand docs now explicitly describe Resume vs Restart state semantics. Add directional tests in FlowExpandSpec and FlowExtrapolateSpec for Stop/default Stop/Resume/Restart and iterator-evaluation failures. Result: expand and extrapolate now honor ActorAttributes.SupervisionStrategy for user-function failures, with deterministic regression coverage for exceptional iterator paths. Tests: - sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowExpandSpec org.apache.pekko.stream.scaladsl.FlowExtrapolateSpec" -- 23/23 passed - sbt "stream/mimaReportBinaryIssues" -- clean - sbt "docs/paradox" -- passed References: Refs #3110 --- .../stream/operators/Source-or-Flow/expand.md | 6 +- .../operators/Source-or-Flow/extrapolate.md | 4 + .../stream/scaladsl/FlowExpandSpec.scala | 103 ++++++++++++++++++ .../stream/scaladsl/FlowExtrapolateSpec.scala | 80 ++++++++++++++ .../apache/pekko/stream/impl/fusing/Ops.scala | 94 ++++++++++++---- .../apache/pekko/stream/javadsl/Flow.scala | 28 +++-- .../apache/pekko/stream/javadsl/Source.scala | 28 +++-- .../apache/pekko/stream/javadsl/SubFlow.scala | 28 +++-- .../pekko/stream/javadsl/SubSource.scala | 28 +++-- .../apache/pekko/stream/scaladsl/Flow.scala | 21 ++-- 10 files changed, 352 insertions(+), 68 deletions(-) diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/expand.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/expand.md index d1ea059226b..37098333140 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/expand.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/expand.md @@ -17,6 +17,11 @@ element, allowing for it to be rewritten and/or filtered. See @ref:[Understanding extrapolate and expand](../../stream-rate.md#understanding-extrapolate-and-expand) for more information and examples. +This operator adheres to the `ActorAttributes.SupervisionStrategy` attribute for exceptions thrown by the `expander` +function. On `Supervision.Stop` the stream fails; on `Supervision.Resume` the failed element is dropped and current +extrapolation state is kept when available; on `Supervision.Restart` the failed element is dropped and the current +extrapolation state is reset. + ## Example Imagine a streaming client decoding a video. It is possible the network bandwidth is a bit @@ -43,4 +48,3 @@ Java **completes** when upstream completes @@@ - diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/extrapolate.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/extrapolate.md index 97ca17edab3..3e06fd9d972 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/extrapolate.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/extrapolate.md @@ -22,6 +22,10 @@ Includes an optional `initial` argument to prevent blocking the entire stream wh See @ref:[Understanding extrapolate and expand](../../stream-rate.md#understanding-extrapolate-and-expand) for more information and examples. +This operator adheres to the `ActorAttributes.SupervisionStrategy` attribute for exceptions thrown by the `extrapolator` +function. On `Supervision.Stop` the stream fails; on `Supervision.Resume` and `Supervision.Restart` the failed element is +dropped and the stream continues. + ## Example Imagine a videoconference client decoding a video feed from a colleague working remotely. It is possible diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowExpandSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowExpandSpec.scala index 56e8accff85..e23497e58a4 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowExpandSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowExpandSpec.scala @@ -20,6 +20,7 @@ import scala.concurrent.duration._ import org.apache.pekko import pekko.stream.ActorAttributes +import pekko.stream.Supervision import pekko.stream.testkit._ import pekko.stream.testkit.scaladsl.TestSink import pekko.stream.testkit.scaladsl.TestSource @@ -150,6 +151,108 @@ class FlowExpandSpec extends StreamSpec(""" source.sendNext(2).sendComplete() sink.expectNext(2 -> 0).expectComplete() } + + "fail stream when expander throws and supervision is Stop" in { + val ex = new RuntimeException("boom") + val result = Source(1 to 5) + .expand(i => if (i == 3) throw ex else Iterator.single(i)) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .runWith(Sink.ignore) + result.failed.futureValue shouldBe ex + } + + "fail stream when expander throws and supervision defaults to Stop" in { + val ex = new RuntimeException("boom") + val result = Source(1 to 5) + .expand(i => if (i == 3) throw ex else Iterator.single(i)) + .runWith(Sink.ignore) + result.failed.futureValue shouldBe ex + } + + "resume and keep current extrapolation when expander throws" in { + val ex = new RuntimeException("boom") + val publisher = TestPublisher.probe[Int]() + val subscriber = TestSubscriber.probe[Int]() + + Source + .fromPublisher(publisher) + .expand(i => if (i == 1) Iterator(1, 10, 11) else if (i == 2) throw ex else Iterator.single(i)) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .to(Sink.fromSubscriber(subscriber)) + .run() + + publisher.sendNext(1) + subscriber.requestNext(1) + publisher.sendNext(2) // throws in expander, element 2 is dropped under Resume + subscriber.requestNext(10) // continue from current extrapolation + subscriber.cancel() + } + + "complete immediately on upstream finish if expanded is true" in { + val subscriber = TestSubscriber.probe[Int]() + + Source.single(1) + .expand(_ => Iterator(1, 2, 3)) + .to(Sink.fromSubscriber(subscriber)) + .run() + + subscriber.requestNext(1) // expanded becomes true + subscriber.expectComplete() // Does it complete immediately? + } + + "restart and reset current extrapolation when expander throws" in { + val ex = new RuntimeException("boom") + val publisher = TestPublisher.probe[Int]() + val subscriber = TestSubscriber.probe[Int]() + + Source + .fromPublisher(publisher) + .expand(i => if (i == 1) Iterator(1, 10, 11) else if (i == 2) throw ex else Iterator.single(i)) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .to(Sink.fromSubscriber(subscriber)) + .run() + + publisher.sendNext(1) + subscriber.requestNext(1) + publisher.sendNext(2) // throws in expander, restart resets iterator state + subscriber.request(1) + subscriber.expectNoMessage(300.millis) + publisher.sendNext(3) + subscriber.requestNext(3) + subscriber.cancel() + } + + "resume when iterator produced by expander throws during iteration" in { + val ex = new RuntimeException("boom") + val result = Source(1 to 4) + .expand(i => + if (i == 3) + new Iterator[Int] { + override def hasNext: Boolean = true + override def next(): Int = throw ex + } + else Iterator.single(i)) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(Sink.seq) + + Await.result(result, 3.seconds) shouldBe Seq(1, 2, 4) + } + + "restart when iterator produced by expander throws during iteration" in { + val ex = new RuntimeException("boom") + val result = Source(1 to 4) + .expand(i => + if (i == 3) + new Iterator[Int] { + override def hasNext: Boolean = true + override def next(): Int = throw ex + } + else Iterator.single(i)) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .runWith(Sink.seq) + + Await.result(result, 3.seconds) shouldBe Seq(1, 2, 4) + } } } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowExtrapolateSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowExtrapolateSpec.scala index d3dbc7eebf7..69e2cee951b 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowExtrapolateSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowExtrapolateSpec.scala @@ -19,6 +19,8 @@ import scala.concurrent.Await import scala.concurrent.duration._ import org.apache.pekko +import pekko.stream.ActorAttributes +import pekko.stream.Supervision import pekko.stream.testkit._ import pekko.stream.testkit.scaladsl.TestSink import pekko.stream.testkit.scaladsl.TestSource @@ -170,6 +172,84 @@ class FlowExtrapolateSpec extends StreamSpec(""" source.sendNext(2).sendComplete() sink.expectNext(2 -> 0).expectComplete() } + + "stop when extrapolator throws during iterator evaluation" in { + val ex = new RuntimeException("boom") + val publisher = TestPublisher.probe[Int]() + val subscriber = TestSubscriber.probe[Int]() + + Source + .fromPublisher(publisher) + .extrapolate(i => + if (i == 3) + new Iterator[Int] { + override def hasNext: Boolean = throw ex + override def next(): Int = throw ex + } + else Iterator.empty) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .to(Sink.fromSubscriber(subscriber)) + .run() + + publisher.sendNext(3) + subscriber.requestNext(3) + subscriber.request(1) + subscriber.expectError(ex) + } + + "resume when extrapolator throws during iterator evaluation" in { + val ex = new RuntimeException("boom") + val publisher = TestPublisher.probe[Int]() + val subscriber = TestSubscriber.probe[Int]() + + Source + .fromPublisher(publisher) + .extrapolate(i => + if (i == 3) + new Iterator[Int] { + override def hasNext: Boolean = throw ex + override def next(): Int = throw ex + } + else Iterator.empty) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .to(Sink.fromSubscriber(subscriber)) + .run() + + publisher.sendNext(3) + subscriber.requestNext(3) + subscriber.request(1) + subscriber.expectNoMessage(300.millis) + publisher.sendNext(4) + subscriber.requestNext(4) + subscriber.cancel() + } + + "restart when extrapolator throws during iterator evaluation" in { + val ex = new RuntimeException("boom") + val publisher = TestPublisher.probe[Int]() + val subscriber = TestSubscriber.probe[Int]() + + Source + .fromPublisher(publisher) + .extrapolate(i => + if (i == 3) + new Iterator[Int] { + override def hasNext: Boolean = throw ex + override def next(): Int = throw ex + } + else Iterator.empty) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .to(Sink.fromSubscriber(subscriber)) + .run() + + publisher.sendNext(3) + subscriber.requestNext(3) + subscriber.request(1) + subscriber.expectNoMessage(300.millis) + publisher.sendNext(4) + subscriber.requestNext(4) + subscriber.cancel() + } } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala index 010260e03d0..5529539847a 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala @@ -1200,6 +1200,7 @@ private[stream] object Collect { override val shape = FlowShape(in, out) override def createLogic(attr: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler { + private lazy val decider = attr.mandatoryAttribute[SupervisionStrategy].decider private var iterator: Iterator[Out] = Iterator.empty private var expanded = false private val contextPropagation = ContextPropagation() @@ -1207,39 +1208,84 @@ private[stream] object Collect { override def preStart(): Unit = pull(in) def onPush(): Unit = { - iterator = extrapolate(grab(in)) - if (iterator.hasNext) { - contextPropagation.suspendContext() - if (isAvailable(out)) { - expanded = true - pull(in) - push(out, iterator.next()) - } else expanded = false - } else pull(in) + val elem = grab(in) + try iterator = extrapolate(elem) + catch { + case NonFatal(ex) => + decider(ex) match { + case Supervision.Stop => + failStage(ex) + case Supervision.Resume => + if (!isClosed(in) && !hasBeenPulled(in)) pull(in) + case Supervision.Restart => + restartState() + if (!isClosed(in) && !hasBeenPulled(in)) pull(in) + } + return + } + try { + if (iterator.hasNext) { + contextPropagation.suspendContext() + if (isAvailable(out)) { + expanded = true + pull(in) + push(out, iterator.next()) + } else expanded = false + } else pull(in) + } catch { + case NonFatal(ex) => handleIteratorFailure(ex) + } } override def onUpstreamFinish(): Unit = { - if (iterator.hasNext && !expanded) () // need to wait - else completeStage() + try { + if (iterator.hasNext && !expanded) () // need to wait + else completeStage() + } catch { + case NonFatal(ex) => handleIteratorFailure(ex) + } } def onPull(): Unit = { - if (iterator.hasNext) { - contextPropagation.resumeContext() - if (!expanded) { - expanded = true - if (isClosed(in)) { - push(out, iterator.next()) - completeStage() - } else { - // expand needs to pull first to be “fair” when upstream is not actually slow - pull(in) - push(out, iterator.next()) - } - } else push(out, iterator.next()) + try { + if (iterator.hasNext) { + contextPropagation.resumeContext() + if (!expanded) { + expanded = true + if (isClosed(in)) { + push(out, iterator.next()) + completeStage() + } else { + // expand needs to pull first to be “fair” when upstream is not actually slow + pull(in) + push(out, iterator.next()) + } + } else push(out, iterator.next()) + } + } catch { + case NonFatal(ex) => handleIteratorFailure(ex) } } + private def restartState(): Unit = { + iterator = Iterator.empty + expanded = false + } + + private def handleIteratorFailure(ex: Throwable): Unit = + decider(ex) match { + case Supervision.Stop => + failStage(ex) + case Supervision.Resume => + restartState() + if (isClosed(in)) completeStage() + else if (!hasBeenPulled(in)) pull(in) + case Supervision.Restart => + restartState() + if (isClosed(in)) completeStage() + else if (!hasBeenPulled(in)) pull(in) + } + setHandler(in, this) setHandler(out, this) } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 512b8ac022c..abbf5021b53 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -2566,12 +2566,16 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * element until new element comes from the upstream. For example an expand step might repeat the last element for * the subscriber until it receives an update from upstream. * - * This element will never "drop" upstream elements as all elements go through at least one extrapolation step. - * This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream - * subscriber. + * Under normal operation all upstream elements go through at least one extrapolation step. + * If supervision drops a failed element, that element is not emitted. + * If the upstream is actually faster than the upstream it will be backpressured by the downstream subscriber. * - * Expand does not support [[pekko.stream.Supervision#restart]] and [[pekko.stream.Supervision#resume]]. - * Exceptions from the `expander` function will complete the stream with failure. + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the `expander` function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] the failed element is dropped + * and the stream continues, keeping current extrapolation state when available. If the supervision decision is + * [[pekko.stream.Supervision#restart]] the failed element is dropped and the current extrapolation state is reset. * * See also [[#extrapolate]] for a version that always preserves the original element and allows for an initial "startup" element. * @@ -2596,8 +2600,11 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream * signals demand. * - * Extrapolate does not support [[pekko.stream.Supervision#restart]] and [[pekko.stream.Supervision#resume]]. - * Exceptions from the `extrapolate` function will complete the stream with failure. + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the `extrapolator` function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or + * [[pekko.stream.Supervision#restart]] the failed element is dropped and the stream continues. * * See also [[#expand]] for a version that can overwrite the original element. * @@ -2624,8 +2631,11 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream * signals demand. * - * Extrapolate does not support [[pekko.stream.Supervision#restart]] and [[pekko.stream.Supervision#resume]]. - * Exceptions from the `extrapolate` function will complete the stream with failure. + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the `extrapolator` function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or + * [[pekko.stream.Supervision#restart]] the failed element is dropped and the stream continues. * * See also [[#expand]] for a version that can overwrite the original element. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index e6ed914a8e7..fc66696d4b5 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -3969,12 +3969,16 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * element until new element comes from the upstream. For example an expand step might repeat the last element for * the subscriber until it receives an update from upstream. * - * This element will never "drop" upstream elements as all elements go through at least one extrapolation step. - * This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream - * subscriber. + * Under normal operation all upstream elements go through at least one extrapolation step. + * If supervision drops a failed element, that element is not emitted. + * If the upstream is actually faster than the upstream it will be backpressured by the downstream subscriber. * - * Expand does not support [[pekko.stream.Supervision#restart]] and [[pekko.stream.Supervision#resume]]. - * Exceptions from the `expander` function will complete the stream with failure. + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the `expander` function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] the failed element is dropped + * and the stream continues, keeping current extrapolation state when available. If the supervision decision is + * [[pekko.stream.Supervision#restart]] the failed element is dropped and the current extrapolation state is reset. * * See also [[#extrapolate]] for a version that always preserves the original element and allows for an initial "startup" element. * @@ -3999,8 +4003,11 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream * signals demand. * - * Extrapolate does not support [[pekko.stream.Supervision#restart]] and [[pekko.stream.Supervision#resume]]. - * Exceptions from the `extrapolate` function will complete the stream with failure. + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the `extrapolator` function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or + * [[pekko.stream.Supervision#restart]] the failed element is dropped and the stream continues. * * See also [[#expand]] for a version that can overwrite the original element. * @@ -4027,8 +4034,11 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream * signals demand. * - * Extrapolate does not support [[pekko.stream.Supervision#restart]] and [[pekko.stream.Supervision#resume]]. - * Exceptions from the `extrapolate` function will complete the stream with failure. + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the `extrapolator` function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or + * [[pekko.stream.Supervision#restart]] the failed element is dropped and the stream continues. * * See also [[#expand]] for a version that can overwrite the original element. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index e2a1644e12f..4b9d4495f0b 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -1891,12 +1891,16 @@ final class SubFlow[In, Out, Mat]( * element until new element comes from the upstream. For example an expand step might repeat the last element for * the subscriber until it receives an update from upstream. * - * This element will never "drop" upstream elements as all elements go through at least one extrapolation step. - * This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream - * subscriber. + * Under normal operation all upstream elements go through at least one extrapolation step. + * If supervision drops a failed element, that element is not emitted. + * If the upstream is actually faster than the upstream it will be backpressured by the downstream subscriber. * - * Expand does not support [[pekko.stream.Supervision#restart]] and [[pekko.stream.Supervision#resume]]. - * Exceptions from the `expander` function will complete the stream with failure. + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the `expander` function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] the failed element is dropped + * and the stream continues, keeping current extrapolation state when available. If the supervision decision is + * [[pekko.stream.Supervision#restart]] the failed element is dropped and the current extrapolation state is reset. * * See also [[#extrapolate]] for a version that always preserves the original element and allows for an initial "startup" element. * @@ -1922,8 +1926,11 @@ final class SubFlow[In, Out, Mat]( * This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream * signals demand. * - * Extrapolate does not support [[pekko.stream.Supervision#restart]] and [[pekko.stream.Supervision#resume]]. - * Exceptions from the `extrapolate` function will complete the stream with failure. + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the `extrapolator` function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or + * [[pekko.stream.Supervision#restart]] the failed element is dropped and the stream continues. * * See also [[#expand]] for a version that can overwrite the original element. * @@ -1950,8 +1957,11 @@ final class SubFlow[In, Out, Mat]( * This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream * signals demand. * - * Extrapolate does not support [[pekko.stream.Supervision#restart]] and [[pekko.stream.Supervision#resume]]. - * Exceptions from the `extrapolate` function will complete the stream with failure. + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the `extrapolator` function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or + * [[pekko.stream.Supervision#restart]] the failed element is dropped and the stream continues. * * See also [[#expand]] for a version that can overwrite the original element. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index c756c91c562..ecb21b35b6d 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -1859,12 +1859,16 @@ final class SubSource[Out, Mat]( * element until new element comes from the upstream. For example an expand step might repeat the last element for * the subscriber until it receives an update from upstream. * - * This element will never "drop" upstream elements as all elements go through at least one extrapolation step. - * This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream - * subscriber. + * Under normal operation all upstream elements go through at least one extrapolation step. + * If supervision drops a failed element, that element is not emitted. + * If the upstream is actually faster than the upstream it will be backpressured by the downstream subscriber. * - * Expand does not support [[pekko.stream.Supervision#restart]] and [[pekko.stream.Supervision#resume]]. - * Exceptions from the `expander` function will complete the stream with failure. + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the `expander` function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] the failed element is dropped + * and the stream continues, keeping current extrapolation state when available. If the supervision decision is + * [[pekko.stream.Supervision#restart]] the failed element is dropped and the current extrapolation state is reset. * * See also [[#extrapolate]] for a version that always preserves the original element and allows for an initial "startup" element. * @@ -1889,8 +1893,11 @@ final class SubSource[Out, Mat]( * This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream * signals demand. * - * Extrapolate does not support [[pekko.stream.Supervision#restart]] and [[pekko.stream.Supervision#resume]]. - * Exceptions from the `extrapolate` function will complete the stream with failure. + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the `extrapolator` function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or + * [[pekko.stream.Supervision#restart]] the failed element is dropped and the stream continues. * * See also [[#expand]] for a version that can overwrite the original element. * @@ -1917,8 +1924,11 @@ final class SubSource[Out, Mat]( * This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream * signals demand. * - * Extrapolate does not support [[pekko.stream.Supervision#restart]] and [[pekko.stream.Supervision#resume]]. - * Exceptions from the `extrapolate` function will complete the stream with failure. + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the `extrapolator` function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or + * [[pekko.stream.Supervision#restart]] the failed element is dropped and the stream continues. * * See also [[#expand]] for a version that can overwrite the original element. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 4ae16e46ce4..d2ee1272488 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -2501,12 +2501,16 @@ trait FlowOps[+Out, +Mat] { * element until new element comes from the upstream. For example an expand step might repeat the last element for * the subscriber until it receives an update from upstream. * - * This element will never "drop" upstream elements as all elements go through at least one extrapolation step. - * This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream - * subscriber. + * Under normal operation all upstream elements go through at least one extrapolation step. + * If supervision drops a failed element, that element is not emitted. + * If the upstream is actually faster than the upstream it will be backpressured by the downstream subscriber. * - * Expand does not support [[pekko.stream.Supervision.Restart]] and [[pekko.stream.Supervision.Resume]]. - * Exceptions from the `seed` function will complete the stream with failure. + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the `expander` function throws and the supervision decision is [[pekko.stream.Supervision.Stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision.Resume]] the failed element is dropped + * and the stream continues, keeping current extrapolation state when available. If the supervision decision is + * [[pekko.stream.Supervision.Restart]] the failed element is dropped and the current extrapolation state is reset. * * '''Emits when''' downstream stops backpressuring * @@ -2529,8 +2533,11 @@ trait FlowOps[+Out, +Mat] { * This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream * signals demand. * - * Extrapolate does not support [[pekko.stream.Supervision.Restart]] and [[pekko.stream.Supervision.Resume]]. - * Exceptions from the `extrapolate` function will complete the stream with failure. + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the `extrapolator` function throws and the supervision decision is [[pekko.stream.Supervision.Stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision.Resume]] or + * [[pekko.stream.Supervision.Restart]] the failed element is dropped and the stream continues. * * '''Emits when''' downstream stops backpressuring, AND EITHER upstream emits OR initial element is present OR * `extrapolate` is non-empty and applicable From 35b7281cb64c0c22f15e41e9dbfee8fdf3232754 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Sat, 27 Jun 2026 03:27:23 +0800 Subject: [PATCH 2/2] fix: align Expand decider and clarify iterator-failure supervision semantics Motivation: Review of the expand/extrapolate supervision support identified two consistency and documentation gaps. Modification: - Change `private lazy val decider` to `private def decider` in Expand to match the established pattern in Map, Filter, Collect and other fusing operators. - Collapse the Resume and Restart branches in `handleIteratorFailure` since a corrupt iterator must be discarded regardless of the supervision decision, and add a comment explaining why. - Clarify in expand.md that iterator evaluation failures necessarily discard extrapolation state under Resume, while expander-function failures preserve it. Result: Expand's supervision handling is consistent with neighbouring operators and its docs disambiguate the two failure sites. Tests: - sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowExpandSpec org.apache.pekko.stream.scaladsl.FlowExtrapolateSpec" (23/23 passed) References: Refs #3110 --- .../paradox/stream/operators/Source-or-Flow/expand.md | 8 +++++--- .../scala/org/apache/pekko/stream/impl/fusing/Ops.scala | 9 +++------ 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/expand.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/expand.md index 37098333140..6dab793c980 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/expand.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/expand.md @@ -18,9 +18,11 @@ See @ref:[Understanding extrapolate and expand](../../stream-rate.md#understandi and examples. This operator adheres to the `ActorAttributes.SupervisionStrategy` attribute for exceptions thrown by the `expander` -function. On `Supervision.Stop` the stream fails; on `Supervision.Resume` the failed element is dropped and current -extrapolation state is kept when available; on `Supervision.Restart` the failed element is dropped and the current -extrapolation state is reset. +function or during iterator evaluation (`hasNext`/`next`). On `Supervision.Stop` the stream fails; on +`Supervision.Resume` the failed element is dropped and the current extrapolation state is kept when the failure +occurred in the `expander` function (a previously active iterator is retained), but is necessarily discarded when +the failure occurred during iterator evaluation; on `Supervision.Restart` the failed element is dropped and the +current extrapolation state is reset. ## Example diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala index 5529539847a..6f55ee08bee 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala @@ -1200,7 +1200,7 @@ private[stream] object Collect { override val shape = FlowShape(in, out) override def createLogic(attr: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler { - private lazy val decider = attr.mandatoryAttribute[SupervisionStrategy].decider + private def decider = attr.mandatoryAttribute[SupervisionStrategy].decider private var iterator: Iterator[Out] = Iterator.empty private var expanded = false private val contextPropagation = ContextPropagation() @@ -1276,11 +1276,8 @@ private[stream] object Collect { decider(ex) match { case Supervision.Stop => failStage(ex) - case Supervision.Resume => - restartState() - if (isClosed(in)) completeStage() - else if (!hasBeenPulled(in)) pull(in) - case Supervision.Restart => + case Supervision.Resume | Supervision.Restart => + // iterator is corrupt after any hasNext/next failure; must reset for both Resume and Restart restartState() if (isClosed(in)) completeStage() else if (!hasBeenPulled(in)) pull(in)