diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/zipWith.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/zipWith.md index 0512ff1b22c..1a54c919c50 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/zipWith.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/zipWith.md @@ -15,6 +15,10 @@ Combines elements from multiple sources through a `combine` function and passes Combines elements from multiple sources through a `combine` function and passes the returned value downstream. +This operator adheres to the `ActorAttributes.SupervisionStrategy` attribute for exceptions thrown by the +`combine` function. On `Supervision.Stop` the stream fails; on `Supervision.Resume` and `Supervision.Restart` +the failing zipped element is dropped and the stream continues. + See also: * @ref:[zip](zip.md) diff --git a/docs/src/main/paradox/stream/operators/Source/zipWithN.md b/docs/src/main/paradox/stream/operators/Source/zipWithN.md index 63c688fdc05..16527f3c4af 100644 --- a/docs/src/main/paradox/stream/operators/Source/zipWithN.md +++ b/docs/src/main/paradox/stream/operators/Source/zipWithN.md @@ -15,6 +15,10 @@ Combine the elements of multiple streams into a stream of sequences using a comb This operator is essentially the same as using @ref:[zipN](zipN.md) followed by @ref[map](../Source-or-Flow/map.md) to turn the zipped sequence into an arbitrary object to emit downstream. +This operator adheres to the `ActorAttributes.SupervisionStrategy` attribute for exceptions thrown by the `zipper` +function. On `Supervision.Stop` the stream fails; on `Supervision.Resume` and `Supervision.Restart` the failing zipped +element is dropped and the stream continues. + See also: * @ref:[zipN](zipN.md) @@ -48,4 +52,3 @@ Note how it stops as soon as any of the original sources reaches its end. @@@ - diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowZipWithSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowZipWithSpec.scala index cb4827d126a..526d619c9db 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowZipWithSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowZipWithSpec.scala @@ -14,11 +14,13 @@ package org.apache.pekko.stream.scaladsl import org.apache.pekko +import pekko.stream.ActorAttributes +import pekko.stream.Supervision import pekko.stream.testkit.{ BaseTwoStreamsSetup, TestSubscriber } import org.reactivestreams.Publisher +import scala.concurrent.Await import scala.concurrent.duration._ -import pekko.testkit.EventFilter import scala.annotation.nowarn @nowarn // keep unused imports @@ -68,15 +70,46 @@ class FlowZipWithSpec extends BaseTwoStreamsSetup { probe.expectNext(1 / -2) probe.expectNext(2 / -1) - EventFilter[ArithmeticException](occurrences = 1).intercept { - subscription.request(2) - } + subscription.request(2) probe.expectError() match { case a: java.lang.ArithmeticException => a.getMessage should be("/ by zero") } probe.expectNoMessage(200.millis) } + "fail stream when zipper throws and supervision is Stop" in { + val ex = new RuntimeException("boom") + val result = Source(1 to 4) + .zipWith(Source(1 to 4))((a, b) => if (a == 3) throw ex else a + b) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .runWith(Sink.seq) + result.failed.futureValue shouldBe ex + } + + "fail stream when zipper throws and supervision defaults to Stop" in { + val ex = new RuntimeException("boom") + val result = Source(1 to 4) + .zipWith(Source(1 to 4))((a, b) => if (a == 3) throw ex else a + b) + .runWith(Sink.seq) + result.failed.futureValue shouldBe ex + } + + "resume when zipper throws and drop failed zipped element" in { + val future = Source(1 to 4) + .zipWith(Source(1 to 4))((a, b) => if (a == 3) throw new RuntimeException("boom") else a + b) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(Sink.seq) + Await.result(future, 3.seconds) shouldBe Seq(2, 4, 8) + } + + "restart when zipper throws and drop failed zipped element" in { + val future = Source(1 to 4) + .zipWith(Source(1 to 4))((a, b) => if (a == 3) throw new RuntimeException("boom") else a + b) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .runWith(Sink.seq) + Await.result(future, 3.seconds) shouldBe Seq(2, 4, 8) + } + commonTests() "work with one immediately completed and one nonempty publisher" in { diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/GraphZipWithNSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/GraphZipWithNSpec.scala index deacff1f2ba..260456aba12 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/GraphZipWithNSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/GraphZipWithNSpec.scala @@ -14,12 +14,13 @@ package org.apache.pekko.stream.scaladsl import scala.collection.immutable +import scala.concurrent.Await import scala.concurrent.duration._ import org.apache.pekko import pekko.stream._ -import pekko.stream.testkit._ -import pekko.testkit.EventFilter +import pekko.stream.testkit.TestSubscriber +import pekko.stream.testkit.TwoStreamsSetup class GraphZipWithNSpec extends TwoStreamsSetup { import GraphDSL.Implicits._ @@ -86,9 +87,7 @@ class GraphZipWithNSpec extends TwoStreamsSetup { probe.expectNext(1 / 1 / -2) probe.expectNext(1 / 2 / -1) - EventFilter[ArithmeticException](occurrences = 1).intercept { - subscription.request(2) - } + subscription.request(2) probe.expectError() match { case a: java.lang.ArithmeticException => a.getMessage should be("/ by zero") case unexpected => throw new RuntimeException(s"Unexpected: $unexpected") @@ -96,6 +95,41 @@ class GraphZipWithNSpec extends TwoStreamsSetup { probe.expectNoMessage(200.millis) } + "fail stream when zipper throws and supervision is Stop" in { + val ex = new RuntimeException("boom") + val result = Source + .zipWithN[Int, Int](s => if (s.head == 3) throw ex else s.sum)(immutable.Seq(Source(1 to 4), Source(1 to 4))) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .runWith(Sink.seq) + result.failed.futureValue shouldBe ex + } + + "fail stream when zipper throws and supervision defaults to Stop" in { + val ex = new RuntimeException("boom") + val result = Source + .zipWithN[Int, Int](s => if (s.head == 3) throw ex else s.sum)(immutable.Seq(Source(1 to 4), Source(1 to 4))) + .runWith(Sink.seq) + result.failed.futureValue shouldBe ex + } + + "resume when zipper throws and drop failed zipped element" in { + val future = Source + .zipWithN[Int, Int](s => if (s.head == 3) throw new RuntimeException("boom") else s.sum)( + immutable.Seq(Source(1 to 4), Source(1 to 4))) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(Sink.seq) + Await.result(future, 3.seconds) shouldBe Seq(2, 4, 8) + } + + "restart when zipper throws and drop failed zipped element" in { + val future = Source + .zipWithN[Int, Int](s => if (s.head == 3) throw new RuntimeException("boom") else s.sum)( + immutable.Seq(Source(1 to 4), Source(1 to 4))) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .runWith(Sink.seq) + Await.result(future, 3.seconds) shouldBe Seq(2, 4, 8) + } + commonTests() "work with one immediately completed and one nonempty publisher" in { diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/GraphZipWithSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/GraphZipWithSpec.scala index b2a9d69856d..5428dce8c7d 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/GraphZipWithSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/GraphZipWithSpec.scala @@ -18,7 +18,6 @@ import scala.concurrent.duration._ import org.apache.pekko import pekko.stream._ import pekko.stream.testkit._ -import pekko.testkit.EventFilter class GraphZipWithSpec extends TwoStreamsSetup { import GraphDSL.Implicits._ @@ -85,9 +84,7 @@ class GraphZipWithSpec extends TwoStreamsSetup { probe.expectNext(1 / -2) probe.expectNext(2 / -1) - EventFilter[ArithmeticException](occurrences = 1).intercept { - subscription.request(2) - } + subscription.request(2) probe.expectError() match { case a: java.lang.ArithmeticException => a.getMessage should be("/ by zero") case unexpected => throw new RuntimeException(s"Unexpected: $unexpected") diff --git a/stream/src/main/boilerplate/org/apache/pekko/stream/scaladsl/ZipWithApply.scala.template b/stream/src/main/boilerplate/org/apache/pekko/stream/scaladsl/ZipWithApply.scala.template index a1239db97a0..2cdcdf5aaab 100644 --- a/stream/src/main/boilerplate/org/apache/pekko/stream/scaladsl/ZipWithApply.scala.template +++ b/stream/src/main/boilerplate/org/apache/pekko/stream/scaladsl/ZipWithApply.scala.template @@ -13,7 +13,10 @@ package org.apache.pekko.stream.scaladsl +import scala.util.control.NonFatal + import org.apache.pekko.stream._ +import org.apache.pekko.stream.ActorAttributes.SupervisionStrategy import org.apache.pekko.stream.impl.ContextPropagation import org.apache.pekko.stream.stage._ @@ -41,6 +44,7 @@ class ZipWith1[[#A1#], O] (val zipper: ([#A1#]) => O) extends GraphStage[FanInSh ] override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider var pending = ##0 // Without this field the completion signalling would take one extra pull var willShutDown = false @@ -48,7 +52,22 @@ class ZipWith1[[#A1#], O] (val zipper: ([#A1#]) => O) extends GraphStage[FanInSh private def pushAll(): Unit = { contextPropagation.resumeContext() - push(out, zipper([#grab(in0)#])) + try push(out, zipper([#grab(in0)#])) + catch { + case NonFatal(ex) => + decider(ex) match { + case Supervision.Stop => + failStage(ex) + case Supervision.Resume | Supervision.Restart => + if (willShutDown) completeStage() + else { + pending += shape.inlets.size + [#pull(in0)# + ] + } + } + return + } if (willShutDown) completeStage() else { [#pull(in0)# diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 512b8ac022c..be72b2373e2 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -3831,6 +3831,12 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * Put together the elements of current [[Flow]] and the given [[Source]] * into a stream of combined elements using a combiner function. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the combiner function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or + * [[pekko.stream.Supervision#restart]] the zipped element is dropped and the stream continues. + * * '''Emits when''' all of the inputs have an element available * * '''Backpressures when''' downstream backpressures diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index e6ed914a8e7..19fdb55d8f9 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -589,6 +589,12 @@ object Source { /** * Combine the elements of multiple streams into a stream of lists using a combiner function. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the combiner function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or + * [[pekko.stream.Supervision#restart]] the zipped element is dropped and the stream continues. */ def zipWithN[T, O]( zipper: function.Function[java.util.List[T], O], @@ -1936,6 +1942,12 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * Put together the elements of current [[Source]] and the given one * into a stream of combined elements using a combiner function. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the combiner function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or + * [[pekko.stream.Supervision#restart]] the zipped element is dropped and the stream continues. + * * '''Emits when''' all of the inputs has an element available * * '''Backpressures when''' downstream backpressures diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index e2a1644e12f..f83e3b7d722 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -2554,6 +2554,12 @@ final class SubFlow[In, Out, Mat]( * Put together the elements of current [[Flow]] and the given [[Source]] * into a stream of combined elements using a combiner function. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the combiner function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or + * [[pekko.stream.Supervision#restart]] the zipped element is dropped and the stream continues. + * * '''Emits when''' all of the inputs has an element available * * '''Backpressures when''' downstream backpressures diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index c756c91c562..9f6a9b895d7 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -2521,6 +2521,12 @@ final class SubSource[Out, Mat]( * Put together the elements of current [[Flow]] and the given [[Source]] * into a stream of combined elements using a combiner function. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the combiner function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or + * [[pekko.stream.Supervision#restart]] the zipped element is dropped and the stream continues. + * * '''Emits when''' all of the inputs has an element available * * '''Backpressures when''' downstream backpressures diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 4ae16e46ce4..e191298fbe5 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -3396,6 +3396,12 @@ trait FlowOps[+Out, +Mat] { * Put together the elements of current flow and the given [[Source]] * into a stream of combined elements using a combiner function. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the combiner function throws and the supervision decision is [[pekko.stream.Supervision.Stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision.Resume]] or + * [[pekko.stream.Supervision.Restart]] the zipped element is dropped and the stream continues. + * * '''Emits when''' all of the inputs have an element available * * '''Backpressures when''' downstream backpressures diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala index 6cde3ffa9f2..415af7ea12e 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala @@ -1096,6 +1096,12 @@ final class ZipLatest[A, B](eagerComplete: Boolean) extends ZipLatestWith2[A, B, * '''Completes when''' any upstream completes * * '''Cancels when''' downstream cancels + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the combiner function throws and the supervision decision is [[pekko.stream.Supervision.Stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision.Resume]] or + * [[pekko.stream.Supervision.Restart]] the zipped element is dropped and the stream continues. */ object ZipWith extends ZipWithApply @@ -1219,6 +1225,12 @@ object ZipWithN { * '''Completes when''' any upstream completes * * '''Cancels when''' downstream cancels + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the combiner function throws and the supervision decision is [[pekko.stream.Supervision.Stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision.Resume]] or + * [[pekko.stream.Supervision.Restart]] the zipped element is dropped and the stream continues. */ class ZipWithN[A, O](zipper: immutable.Seq[A] => O)(n: Int) extends GraphStage[UniformFanInShape[A, O]] { override def initialAttributes = DefaultAttributes.zipWithN @@ -1227,6 +1239,7 @@ class ZipWithN[A, O](zipper: immutable.Seq[A] => O)(n: Int) extends GraphStage[U override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler { + private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider var pending = 0 // Without this field the completion signalling would take one extra pull var willShutDown = false @@ -1238,7 +1251,21 @@ class ZipWithN[A, O](zipper: immutable.Seq[A] => O)(n: Int) extends GraphStage[U private def pushAll(): Unit = { contextPropagation.resumeContext() - push(out, zipper(shape.inlets.map(grabInlet))) + try push(out, zipper(shape.inlets.map(grabInlet))) + catch { + case NonFatal(ex) => + decider(ex) match { + case Supervision.Stop => + failStage(ex) + case Supervision.Resume | Supervision.Restart => + if (willShutDown) completeStage() + else { + pending += n + shape.inlets.foreach(pullInlet) + } + } + return + } if (willShutDown) completeStage() else shape.inlets.foreach(pullInlet) } diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index 4cd4594e1d0..8b4e60d539f 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -887,6 +887,12 @@ object Source { /** * Combine the elements of multiple streams into a stream of sequences using a combiner function. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the combiner function throws and the supervision decision is [[pekko.stream.Supervision.Stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision.Resume]] or + * [[pekko.stream.Supervision.Restart]] the zipped element is dropped and the stream continues. */ def zipWithN[T, O](zipper: immutable.Seq[T] => O)(sources: immutable.Seq[Source[T, ?]]): Source[O, NotUsed] = { val source = sources match {