From f6df975e7485585da25e41c0f3f4b4d5edad584a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Thu, 25 Jun 2026 19:51:25 +0800 Subject: [PATCH 1/2] fix: add supervision strategy support for zipWith and zipWithN zipper functions Motivation: zipWith and zipWithN accepted user zipper/combiner functions but did not consult SupervisionStrategy. Zipper exceptions failed the stream unconditionally, even when Resume/Restart was configured. Modification: Add zipper exception supervision handling in both implementations: - ZipWith boilerplate template (all arities) - Graph.ZipWithN Stop fails the stage; Resume/Restart drop the failing zipped element and continue by pulling new inputs. Pending bookkeeping is updated on resumed/ restarted failures so the stage can satisfy the current downstream demand without stalling. Update Scala/Java API docs and operator docs (`zipWith.md`, `zipWithN.md`) to document supervision behavior for combiner function failures. Add directional tests in: - FlowZipWithSpec - GraphZipWithNSpec covering explicit Stop, default Stop, Resume, and Restart. Result: zipWith/zipWithN now honor ActorAttributes.SupervisionStrategy for zipper failures with deterministic regression coverage for dropped-element paths. Tests: - sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowZipWithSpec org.apache.pekko.stream.scaladsl.GraphZipWithNSpec" -- 37/37 passed - sbt "stream/mimaReportBinaryIssues" -- clean - sbt "docs/paradox" -- passed References: Refs #3110 --- .../operators/Source-or-Flow/zipWith.md | 4 ++ .../stream/operators/Source/zipWithN.md | 5 ++- .../stream/scaladsl/FlowZipWithSpec.scala | 41 +++++++++++++++-- .../stream/scaladsl/GraphZipWithNSpec.scala | 44 ++++++++++++++++--- .../scaladsl/ZipWithApply.scala.template | 21 ++++++++- .../apache/pekko/stream/javadsl/Flow.scala | 6 +++ .../apache/pekko/stream/javadsl/Source.scala | 12 +++++ .../apache/pekko/stream/javadsl/SubFlow.scala | 6 +++ .../pekko/stream/javadsl/SubSource.scala | 6 +++ .../apache/pekko/stream/scaladsl/Flow.scala | 6 +++ .../apache/pekko/stream/scaladsl/Graph.scala | 29 +++++++++++- .../apache/pekko/stream/scaladsl/Source.scala | 6 +++ 12 files changed, 174 insertions(+), 12 deletions(-) diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/zipWith.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/zipWith.md index 0512ff1b22c..1a54c919c50 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/zipWith.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/zipWith.md @@ -15,6 +15,10 @@ Combines elements from multiple sources through a `combine` function and passes Combines elements from multiple sources through a `combine` function and passes the returned value downstream. +This operator adheres to the `ActorAttributes.SupervisionStrategy` attribute for exceptions thrown by the +`combine` function. On `Supervision.Stop` the stream fails; on `Supervision.Resume` and `Supervision.Restart` +the failing zipped element is dropped and the stream continues. + See also: * @ref:[zip](zip.md) diff --git a/docs/src/main/paradox/stream/operators/Source/zipWithN.md b/docs/src/main/paradox/stream/operators/Source/zipWithN.md index 63c688fdc05..16527f3c4af 100644 --- a/docs/src/main/paradox/stream/operators/Source/zipWithN.md +++ b/docs/src/main/paradox/stream/operators/Source/zipWithN.md @@ -15,6 +15,10 @@ Combine the elements of multiple streams into a stream of sequences using a comb This operator is essentially the same as using @ref:[zipN](zipN.md) followed by @ref[map](../Source-or-Flow/map.md) to turn the zipped sequence into an arbitrary object to emit downstream. +This operator adheres to the `ActorAttributes.SupervisionStrategy` attribute for exceptions thrown by the `zipper` +function. On `Supervision.Stop` the stream fails; on `Supervision.Resume` and `Supervision.Restart` the failing zipped +element is dropped and the stream continues. + See also: * @ref:[zipN](zipN.md) @@ -48,4 +52,3 @@ Note how it stops as soon as any of the original sources reaches its end. @@@ - diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowZipWithSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowZipWithSpec.scala index cb4827d126a..526d619c9db 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowZipWithSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowZipWithSpec.scala @@ -14,11 +14,13 @@ package org.apache.pekko.stream.scaladsl import org.apache.pekko +import pekko.stream.ActorAttributes +import pekko.stream.Supervision import pekko.stream.testkit.{ BaseTwoStreamsSetup, TestSubscriber } import org.reactivestreams.Publisher +import scala.concurrent.Await import scala.concurrent.duration._ -import pekko.testkit.EventFilter import scala.annotation.nowarn @nowarn // keep unused imports @@ -68,15 +70,46 @@ class FlowZipWithSpec extends BaseTwoStreamsSetup { probe.expectNext(1 / -2) probe.expectNext(2 / -1) - EventFilter[ArithmeticException](occurrences = 1).intercept { - subscription.request(2) - } + subscription.request(2) probe.expectError() match { case a: java.lang.ArithmeticException => a.getMessage should be("/ by zero") } probe.expectNoMessage(200.millis) } + "fail stream when zipper throws and supervision is Stop" in { + val ex = new RuntimeException("boom") + val result = Source(1 to 4) + .zipWith(Source(1 to 4))((a, b) => if (a == 3) throw ex else a + b) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .runWith(Sink.seq) + result.failed.futureValue shouldBe ex + } + + "fail stream when zipper throws and supervision defaults to Stop" in { + val ex = new RuntimeException("boom") + val result = Source(1 to 4) + .zipWith(Source(1 to 4))((a, b) => if (a == 3) throw ex else a + b) + .runWith(Sink.seq) + result.failed.futureValue shouldBe ex + } + + "resume when zipper throws and drop failed zipped element" in { + val future = Source(1 to 4) + .zipWith(Source(1 to 4))((a, b) => if (a == 3) throw new RuntimeException("boom") else a + b) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(Sink.seq) + Await.result(future, 3.seconds) shouldBe Seq(2, 4, 8) + } + + "restart when zipper throws and drop failed zipped element" in { + val future = Source(1 to 4) + .zipWith(Source(1 to 4))((a, b) => if (a == 3) throw new RuntimeException("boom") else a + b) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .runWith(Sink.seq) + Await.result(future, 3.seconds) shouldBe Seq(2, 4, 8) + } + commonTests() "work with one immediately completed and one nonempty publisher" in { diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/GraphZipWithNSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/GraphZipWithNSpec.scala index deacff1f2ba..260456aba12 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/GraphZipWithNSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/GraphZipWithNSpec.scala @@ -14,12 +14,13 @@ package org.apache.pekko.stream.scaladsl import scala.collection.immutable +import scala.concurrent.Await import scala.concurrent.duration._ import org.apache.pekko import pekko.stream._ -import pekko.stream.testkit._ -import pekko.testkit.EventFilter +import pekko.stream.testkit.TestSubscriber +import pekko.stream.testkit.TwoStreamsSetup class GraphZipWithNSpec extends TwoStreamsSetup { import GraphDSL.Implicits._ @@ -86,9 +87,7 @@ class GraphZipWithNSpec extends TwoStreamsSetup { probe.expectNext(1 / 1 / -2) probe.expectNext(1 / 2 / -1) - EventFilter[ArithmeticException](occurrences = 1).intercept { - subscription.request(2) - } + subscription.request(2) probe.expectError() match { case a: java.lang.ArithmeticException => a.getMessage should be("/ by zero") case unexpected => throw new RuntimeException(s"Unexpected: $unexpected") @@ -96,6 +95,41 @@ class GraphZipWithNSpec extends TwoStreamsSetup { probe.expectNoMessage(200.millis) } + "fail stream when zipper throws and supervision is Stop" in { + val ex = new RuntimeException("boom") + val result = Source + .zipWithN[Int, Int](s => if (s.head == 3) throw ex else s.sum)(immutable.Seq(Source(1 to 4), Source(1 to 4))) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .runWith(Sink.seq) + result.failed.futureValue shouldBe ex + } + + "fail stream when zipper throws and supervision defaults to Stop" in { + val ex = new RuntimeException("boom") + val result = Source + .zipWithN[Int, Int](s => if (s.head == 3) throw ex else s.sum)(immutable.Seq(Source(1 to 4), Source(1 to 4))) + .runWith(Sink.seq) + result.failed.futureValue shouldBe ex + } + + "resume when zipper throws and drop failed zipped element" in { + val future = Source + .zipWithN[Int, Int](s => if (s.head == 3) throw new RuntimeException("boom") else s.sum)( + immutable.Seq(Source(1 to 4), Source(1 to 4))) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(Sink.seq) + Await.result(future, 3.seconds) shouldBe Seq(2, 4, 8) + } + + "restart when zipper throws and drop failed zipped element" in { + val future = Source + .zipWithN[Int, Int](s => if (s.head == 3) throw new RuntimeException("boom") else s.sum)( + immutable.Seq(Source(1 to 4), Source(1 to 4))) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .runWith(Sink.seq) + Await.result(future, 3.seconds) shouldBe Seq(2, 4, 8) + } + commonTests() "work with one immediately completed and one nonempty publisher" in { diff --git a/stream/src/main/boilerplate/org/apache/pekko/stream/scaladsl/ZipWithApply.scala.template b/stream/src/main/boilerplate/org/apache/pekko/stream/scaladsl/ZipWithApply.scala.template index a1239db97a0..2cdcdf5aaab 100644 --- a/stream/src/main/boilerplate/org/apache/pekko/stream/scaladsl/ZipWithApply.scala.template +++ b/stream/src/main/boilerplate/org/apache/pekko/stream/scaladsl/ZipWithApply.scala.template @@ -13,7 +13,10 @@ package org.apache.pekko.stream.scaladsl +import scala.util.control.NonFatal + import org.apache.pekko.stream._ +import org.apache.pekko.stream.ActorAttributes.SupervisionStrategy import org.apache.pekko.stream.impl.ContextPropagation import org.apache.pekko.stream.stage._ @@ -41,6 +44,7 @@ class ZipWith1[[#A1#], O] (val zipper: ([#A1#]) => O) extends GraphStage[FanInSh ] override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider var pending = ##0 // Without this field the completion signalling would take one extra pull var willShutDown = false @@ -48,7 +52,22 @@ class ZipWith1[[#A1#], O] (val zipper: ([#A1#]) => O) extends GraphStage[FanInSh private def pushAll(): Unit = { contextPropagation.resumeContext() - push(out, zipper([#grab(in0)#])) + try push(out, zipper([#grab(in0)#])) + catch { + case NonFatal(ex) => + decider(ex) match { + case Supervision.Stop => + failStage(ex) + case Supervision.Resume | Supervision.Restart => + if (willShutDown) completeStage() + else { + pending += shape.inlets.size + [#pull(in0)# + ] + } + } + return + } if (willShutDown) completeStage() else { [#pull(in0)# diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 512b8ac022c..be72b2373e2 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -3831,6 +3831,12 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * Put together the elements of current [[Flow]] and the given [[Source]] * into a stream of combined elements using a combiner function. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the combiner function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or + * [[pekko.stream.Supervision#restart]] the zipped element is dropped and the stream continues. + * * '''Emits when''' all of the inputs have an element available * * '''Backpressures when''' downstream backpressures diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index e6ed914a8e7..19fdb55d8f9 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -589,6 +589,12 @@ object Source { /** * Combine the elements of multiple streams into a stream of lists using a combiner function. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the combiner function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or + * [[pekko.stream.Supervision#restart]] the zipped element is dropped and the stream continues. */ def zipWithN[T, O]( zipper: function.Function[java.util.List[T], O], @@ -1936,6 +1942,12 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * Put together the elements of current [[Source]] and the given one * into a stream of combined elements using a combiner function. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the combiner function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or + * [[pekko.stream.Supervision#restart]] the zipped element is dropped and the stream continues. + * * '''Emits when''' all of the inputs has an element available * * '''Backpressures when''' downstream backpressures diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index e2a1644e12f..f83e3b7d722 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -2554,6 +2554,12 @@ final class SubFlow[In, Out, Mat]( * Put together the elements of current [[Flow]] and the given [[Source]] * into a stream of combined elements using a combiner function. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the combiner function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or + * [[pekko.stream.Supervision#restart]] the zipped element is dropped and the stream continues. + * * '''Emits when''' all of the inputs has an element available * * '''Backpressures when''' downstream backpressures diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index c756c91c562..9f6a9b895d7 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -2521,6 +2521,12 @@ final class SubSource[Out, Mat]( * Put together the elements of current [[Flow]] and the given [[Source]] * into a stream of combined elements using a combiner function. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the combiner function throws and the supervision decision is [[pekko.stream.Supervision#stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or + * [[pekko.stream.Supervision#restart]] the zipped element is dropped and the stream continues. + * * '''Emits when''' all of the inputs has an element available * * '''Backpressures when''' downstream backpressures diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 4ae16e46ce4..e191298fbe5 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -3396,6 +3396,12 @@ trait FlowOps[+Out, +Mat] { * Put together the elements of current flow and the given [[Source]] * into a stream of combined elements using a combiner function. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the combiner function throws and the supervision decision is [[pekko.stream.Supervision.Stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision.Resume]] or + * [[pekko.stream.Supervision.Restart]] the zipped element is dropped and the stream continues. + * * '''Emits when''' all of the inputs have an element available * * '''Backpressures when''' downstream backpressures diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala index 6cde3ffa9f2..415af7ea12e 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala @@ -1096,6 +1096,12 @@ final class ZipLatest[A, B](eagerComplete: Boolean) extends ZipLatestWith2[A, B, * '''Completes when''' any upstream completes * * '''Cancels when''' downstream cancels + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the combiner function throws and the supervision decision is [[pekko.stream.Supervision.Stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision.Resume]] or + * [[pekko.stream.Supervision.Restart]] the zipped element is dropped and the stream continues. */ object ZipWith extends ZipWithApply @@ -1219,6 +1225,12 @@ object ZipWithN { * '''Completes when''' any upstream completes * * '''Cancels when''' downstream cancels + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the combiner function throws and the supervision decision is [[pekko.stream.Supervision.Stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision.Resume]] or + * [[pekko.stream.Supervision.Restart]] the zipped element is dropped and the stream continues. */ class ZipWithN[A, O](zipper: immutable.Seq[A] => O)(n: Int) extends GraphStage[UniformFanInShape[A, O]] { override def initialAttributes = DefaultAttributes.zipWithN @@ -1227,6 +1239,7 @@ class ZipWithN[A, O](zipper: immutable.Seq[A] => O)(n: Int) extends GraphStage[U override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler { + private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider var pending = 0 // Without this field the completion signalling would take one extra pull var willShutDown = false @@ -1238,7 +1251,21 @@ class ZipWithN[A, O](zipper: immutable.Seq[A] => O)(n: Int) extends GraphStage[U private def pushAll(): Unit = { contextPropagation.resumeContext() - push(out, zipper(shape.inlets.map(grabInlet))) + try push(out, zipper(shape.inlets.map(grabInlet))) + catch { + case NonFatal(ex) => + decider(ex) match { + case Supervision.Stop => + failStage(ex) + case Supervision.Resume | Supervision.Restart => + if (willShutDown) completeStage() + else { + pending += n + shape.inlets.foreach(pullInlet) + } + } + return + } if (willShutDown) completeStage() else shape.inlets.foreach(pullInlet) } diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index 4cd4594e1d0..8b4e60d539f 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -887,6 +887,12 @@ object Source { /** * Combine the elements of multiple streams into a stream of sequences using a combiner function. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If the combiner function throws and the supervision decision is [[pekko.stream.Supervision.Stop]] + * the stream fails. If the supervision decision is [[pekko.stream.Supervision.Resume]] or + * [[pekko.stream.Supervision.Restart]] the zipped element is dropped and the stream continues. */ def zipWithN[T, O](zipper: immutable.Seq[T] => O)(sources: immutable.Seq[Source[T, ?]]): Source[O, NotUsed] = { val source = sources match { From 258c0ba3ca7fe1b2727cfe0791039e91980a2ccb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Sat, 27 Jun 2026 02:57:37 +0800 Subject: [PATCH 2/2] fix: remove stale EventFilter from GraphZipWithSpec Motivation: The supervision support added in the previous commit causes ZipWith zipper exceptions to be handled by the decider instead of being logged as ERROR. GraphZipWithSpec still wrapped its sad-case test in EventFilter[ArithmeticException], which now times out waiting for an ERROR log message that is never emitted. Modification: Remove the EventFilter wrapping and unused import from GraphZipWithSpec's "work in the sad case" test, matching the same cleanup already applied to FlowZipWithSpec and GraphZipWithNSpec. Result: GraphZipWithSpec sad-case test passes without EventFilter timeout. Tests: - sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.GraphZipWithSpec" -- 15/15 passed References: Refs #3110 --- .../org/apache/pekko/stream/scaladsl/GraphZipWithSpec.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/GraphZipWithSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/GraphZipWithSpec.scala index b2a9d69856d..5428dce8c7d 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/GraphZipWithSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/GraphZipWithSpec.scala @@ -18,7 +18,6 @@ import scala.concurrent.duration._ import org.apache.pekko import pekko.stream._ import pekko.stream.testkit._ -import pekko.testkit.EventFilter class GraphZipWithSpec extends TwoStreamsSetup { import GraphDSL.Implicits._ @@ -85,9 +84,7 @@ class GraphZipWithSpec extends TwoStreamsSetup { probe.expectNext(1 / -2) probe.expectNext(2 / -1) - EventFilter[ArithmeticException](occurrences = 1).intercept { - subscription.request(2) - } + subscription.request(2) probe.expectError() match { case a: java.lang.ArithmeticException => a.getMessage should be("/ by zero") case unexpected => throw new RuntimeException(s"Unexpected: $unexpected")