Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ Combines elements from multiple sources through a `combine` function and passes
Combines elements from multiple sources through a `combine` function and passes the
returned value downstream.

This operator adheres to the `ActorAttributes.SupervisionStrategy` attribute for exceptions thrown by the
`combine` function. On `Supervision.Stop` the stream fails; on `Supervision.Resume` and `Supervision.Restart`
the failing zipped element is dropped and the stream continues.

See also:

* @ref:[zip](zip.md)
Expand Down
5 changes: 4 additions & 1 deletion docs/src/main/paradox/stream/operators/Source/zipWithN.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ Combine the elements of multiple streams into a stream of sequences using a comb
This operator is essentially the same as using @ref:[zipN](zipN.md) followed by @ref[map](../Source-or-Flow/map.md)
to turn the zipped sequence into an arbitrary object to emit downstream.

This operator adheres to the `ActorAttributes.SupervisionStrategy` attribute for exceptions thrown by the `zipper`
function. On `Supervision.Stop` the stream fails; on `Supervision.Resume` and `Supervision.Restart` the failing zipped
element is dropped and the stream continues.

See also:

* @ref:[zipN](zipN.md)
Expand Down Expand Up @@ -48,4 +52,3 @@ Note how it stops as soon as any of the original sources reaches its end.

@@@


Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
package org.apache.pekko.stream.scaladsl

import org.apache.pekko
import pekko.stream.ActorAttributes
import pekko.stream.Supervision
import pekko.stream.testkit.{ BaseTwoStreamsSetup, TestSubscriber }
import org.reactivestreams.Publisher

import scala.concurrent.Await
import scala.concurrent.duration._
import pekko.testkit.EventFilter
import scala.annotation.nowarn

@nowarn // keep unused imports
Expand Down Expand Up @@ -68,15 +70,46 @@ class FlowZipWithSpec extends BaseTwoStreamsSetup {
probe.expectNext(1 / -2)
probe.expectNext(2 / -1)

EventFilter[ArithmeticException](occurrences = 1).intercept {
subscription.request(2)
}
subscription.request(2)
probe.expectError() match {
case a: java.lang.ArithmeticException => a.getMessage should be("/ by zero")
}
probe.expectNoMessage(200.millis)
}

"fail stream when zipper throws and supervision is Stop" in {
val ex = new RuntimeException("boom")
val result = Source(1 to 4)
.zipWith(Source(1 to 4))((a, b) => if (a == 3) throw ex else a + b)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
.runWith(Sink.seq)
result.failed.futureValue shouldBe ex
}

"fail stream when zipper throws and supervision defaults to Stop" in {
val ex = new RuntimeException("boom")
val result = Source(1 to 4)
.zipWith(Source(1 to 4))((a, b) => if (a == 3) throw ex else a + b)
.runWith(Sink.seq)
result.failed.futureValue shouldBe ex
}

"resume when zipper throws and drop failed zipped element" in {
val future = Source(1 to 4)
.zipWith(Source(1 to 4))((a, b) => if (a == 3) throw new RuntimeException("boom") else a + b)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runWith(Sink.seq)
Await.result(future, 3.seconds) shouldBe Seq(2, 4, 8)
}

"restart when zipper throws and drop failed zipped element" in {
val future = Source(1 to 4)
.zipWith(Source(1 to 4))((a, b) => if (a == 3) throw new RuntimeException("boom") else a + b)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.runWith(Sink.seq)
Await.result(future, 3.seconds) shouldBe Seq(2, 4, 8)
}

commonTests()

"work with one immediately completed and one nonempty publisher" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
package org.apache.pekko.stream.scaladsl

import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._

import org.apache.pekko
import pekko.stream._
import pekko.stream.testkit._
import pekko.testkit.EventFilter
import pekko.stream.testkit.TestSubscriber
import pekko.stream.testkit.TwoStreamsSetup

class GraphZipWithNSpec extends TwoStreamsSetup {
import GraphDSL.Implicits._
Expand Down Expand Up @@ -86,16 +87,49 @@ class GraphZipWithNSpec extends TwoStreamsSetup {
probe.expectNext(1 / 1 / -2)
probe.expectNext(1 / 2 / -1)

EventFilter[ArithmeticException](occurrences = 1).intercept {
subscription.request(2)
}
subscription.request(2)
probe.expectError() match {
case a: java.lang.ArithmeticException => a.getMessage should be("/ by zero")
case unexpected => throw new RuntimeException(s"Unexpected: $unexpected")
}
probe.expectNoMessage(200.millis)
}

"fail stream when zipper throws and supervision is Stop" in {
val ex = new RuntimeException("boom")
val result = Source
.zipWithN[Int, Int](s => if (s.head == 3) throw ex else s.sum)(immutable.Seq(Source(1 to 4), Source(1 to 4)))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
.runWith(Sink.seq)
result.failed.futureValue shouldBe ex
}

"fail stream when zipper throws and supervision defaults to Stop" in {
val ex = new RuntimeException("boom")
val result = Source
.zipWithN[Int, Int](s => if (s.head == 3) throw ex else s.sum)(immutable.Seq(Source(1 to 4), Source(1 to 4)))
.runWith(Sink.seq)
result.failed.futureValue shouldBe ex
}

"resume when zipper throws and drop failed zipped element" in {
val future = Source
.zipWithN[Int, Int](s => if (s.head == 3) throw new RuntimeException("boom") else s.sum)(
immutable.Seq(Source(1 to 4), Source(1 to 4)))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runWith(Sink.seq)
Await.result(future, 3.seconds) shouldBe Seq(2, 4, 8)
}

"restart when zipper throws and drop failed zipped element" in {
val future = Source
.zipWithN[Int, Int](s => if (s.head == 3) throw new RuntimeException("boom") else s.sum)(
immutable.Seq(Source(1 to 4), Source(1 to 4)))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.runWith(Sink.seq)
Await.result(future, 3.seconds) shouldBe Seq(2, 4, 8)
}

commonTests()

"work with one immediately completed and one nonempty publisher" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import scala.concurrent.duration._
import org.apache.pekko
import pekko.stream._
import pekko.stream.testkit._
import pekko.testkit.EventFilter

class GraphZipWithSpec extends TwoStreamsSetup {
import GraphDSL.Implicits._
Expand Down Expand Up @@ -85,9 +84,7 @@ class GraphZipWithSpec extends TwoStreamsSetup {
probe.expectNext(1 / -2)
probe.expectNext(2 / -1)

EventFilter[ArithmeticException](occurrences = 1).intercept {
subscription.request(2)
}
subscription.request(2)
probe.expectError() match {
case a: java.lang.ArithmeticException => a.getMessage should be("/ by zero")
case unexpected => throw new RuntimeException(s"Unexpected: $unexpected")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@

package org.apache.pekko.stream.scaladsl

import scala.util.control.NonFatal

import org.apache.pekko.stream._
import org.apache.pekko.stream.ActorAttributes.SupervisionStrategy
import org.apache.pekko.stream.impl.ContextPropagation
import org.apache.pekko.stream.stage._

Expand Down Expand Up @@ -41,14 +44,30 @@ class ZipWith1[[#A1#], O] (val zipper: ([#A1#]) => O) extends GraphStage[FanInSh
]

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
var pending = ##0
// Without this field the completion signalling would take one extra pull
var willShutDown = false
private val contextPropagation = ContextPropagation()

private def pushAll(): Unit = {
contextPropagation.resumeContext()
push(out, zipper([#grab(in0)#]))
try push(out, zipper([#grab(in0)#]))
catch {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Stop =>
failStage(ex)
case Supervision.Resume | Supervision.Restart =>
if (willShutDown) completeStage()
else {
pending += shape.inlets.size
[#pull(in0)#
]
}
}
return
}
if (willShutDown) completeStage()
else {
[#pull(in0)#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3831,6 +3831,12 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* Put together the elements of current [[Flow]] and the given [[Source]]
* into a stream of combined elements using a combiner function.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* If the combiner function throws and the supervision decision is [[pekko.stream.Supervision#stop]]
* the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or
* [[pekko.stream.Supervision#restart]] the zipped element is dropped and the stream continues.
*
* '''Emits when''' all of the inputs have an element available
*
* '''Backpressures when''' downstream backpressures
Expand Down
12 changes: 12 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,12 @@ object Source {

/**
* Combine the elements of multiple streams into a stream of lists using a combiner function.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* If the combiner function throws and the supervision decision is [[pekko.stream.Supervision#stop]]
* the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or
* [[pekko.stream.Supervision#restart]] the zipped element is dropped and the stream continues.
*/
def zipWithN[T, O](
zipper: function.Function[java.util.List[T], O],
Expand Down Expand Up @@ -1936,6 +1942,12 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* Put together the elements of current [[Source]] and the given one
* into a stream of combined elements using a combiner function.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* If the combiner function throws and the supervision decision is [[pekko.stream.Supervision#stop]]
* the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or
* [[pekko.stream.Supervision#restart]] the zipped element is dropped and the stream continues.
*
* '''Emits when''' all of the inputs has an element available
*
* '''Backpressures when''' downstream backpressures
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2554,6 +2554,12 @@ final class SubFlow[In, Out, Mat](
* Put together the elements of current [[Flow]] and the given [[Source]]
* into a stream of combined elements using a combiner function.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* If the combiner function throws and the supervision decision is [[pekko.stream.Supervision#stop]]
* the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or
* [[pekko.stream.Supervision#restart]] the zipped element is dropped and the stream continues.
*
* '''Emits when''' all of the inputs has an element available
*
* '''Backpressures when''' downstream backpressures
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2521,6 +2521,12 @@ final class SubSource[Out, Mat](
* Put together the elements of current [[Flow]] and the given [[Source]]
* into a stream of combined elements using a combiner function.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* If the combiner function throws and the supervision decision is [[pekko.stream.Supervision#stop]]
* the stream fails. If the supervision decision is [[pekko.stream.Supervision#resume]] or
* [[pekko.stream.Supervision#restart]] the zipped element is dropped and the stream continues.
*
* '''Emits when''' all of the inputs has an element available
*
* '''Backpressures when''' downstream backpressures
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3396,6 +3396,12 @@ trait FlowOps[+Out, +Mat] {
* Put together the elements of current flow and the given [[Source]]
* into a stream of combined elements using a combiner function.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* If the combiner function throws and the supervision decision is [[pekko.stream.Supervision.Stop]]
* the stream fails. If the supervision decision is [[pekko.stream.Supervision.Resume]] or
* [[pekko.stream.Supervision.Restart]] the zipped element is dropped and the stream continues.
*
* '''Emits when''' all of the inputs have an element available
*
* '''Backpressures when''' downstream backpressures
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,12 @@ final class ZipLatest[A, B](eagerComplete: Boolean) extends ZipLatestWith2[A, B,
* '''Completes when''' any upstream completes
*
* '''Cancels when''' downstream cancels
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* If the combiner function throws and the supervision decision is [[pekko.stream.Supervision.Stop]]
* the stream fails. If the supervision decision is [[pekko.stream.Supervision.Resume]] or
* [[pekko.stream.Supervision.Restart]] the zipped element is dropped and the stream continues.
*/
object ZipWith extends ZipWithApply

Expand Down Expand Up @@ -1219,6 +1225,12 @@ object ZipWithN {
* '''Completes when''' any upstream completes
*
* '''Cancels when''' downstream cancels
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* If the combiner function throws and the supervision decision is [[pekko.stream.Supervision.Stop]]
* the stream fails. If the supervision decision is [[pekko.stream.Supervision.Resume]] or
* [[pekko.stream.Supervision.Restart]] the zipped element is dropped and the stream continues.
*/
class ZipWithN[A, O](zipper: immutable.Seq[A] => O)(n: Int) extends GraphStage[UniformFanInShape[A, O]] {
override def initialAttributes = DefaultAttributes.zipWithN
Expand All @@ -1227,6 +1239,7 @@ class ZipWithN[A, O](zipper: immutable.Seq[A] => O)(n: Int) extends GraphStage[U

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler {
private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
var pending = 0
// Without this field the completion signalling would take one extra pull
var willShutDown = false
Expand All @@ -1238,7 +1251,21 @@ class ZipWithN[A, O](zipper: immutable.Seq[A] => O)(n: Int) extends GraphStage[U

private def pushAll(): Unit = {
contextPropagation.resumeContext()
push(out, zipper(shape.inlets.map(grabInlet)))
try push(out, zipper(shape.inlets.map(grabInlet)))
catch {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Stop =>
failStage(ex)
case Supervision.Resume | Supervision.Restart =>
if (willShutDown) completeStage()
else {
pending += n
shape.inlets.foreach(pullInlet)
}
}
return
}
if (willShutDown) completeStage()
else shape.inlets.foreach(pullInlet)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,12 @@ object Source {

/**
* Combine the elements of multiple streams into a stream of sequences using a combiner function.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* If the combiner function throws and the supervision decision is [[pekko.stream.Supervision.Stop]]
* the stream fails. If the supervision decision is [[pekko.stream.Supervision.Resume]] or
* [[pekko.stream.Supervision.Restart]] the zipped element is dropped and the stream continues.
*/
def zipWithN[T, O](zipper: immutable.Seq[T] => O)(sources: immutable.Seq[Source[T, ?]]): Source[O, NotUsed] = {
val source = sources match {
Expand Down