Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ element, allowing for it to be rewritten and/or filtered.
See @ref:[Understanding extrapolate and expand](../../stream-rate.md#understanding-extrapolate-and-expand) for more information
and examples.

This operator adheres to the `ActorAttributes.SupervisionStrategy` attribute for exceptions thrown by the `expander`
function or during iterator evaluation (`hasNext`/`next`). On `Supervision.Stop` the stream fails; on
`Supervision.Resume` the failed element is dropped and the current extrapolation state is kept when the failure
occurred in the `expander` function (a previously active iterator is retained), but is necessarily discarded when
the failure occurred during iterator evaluation; on `Supervision.Restart` the failed element is dropped and the
current extrapolation state is reset.

## Example

Imagine a streaming client decoding a video. It is possible the network bandwidth is a bit
Expand All @@ -43,4 +50,3 @@ Java
**completes** when upstream completes

@@@

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ Includes an optional `initial` argument to prevent blocking the entire stream wh
See @ref:[Understanding extrapolate and expand](../../stream-rate.md#understanding-extrapolate-and-expand) for more information
and examples.

This operator adheres to the `ActorAttributes.SupervisionStrategy` attribute for exceptions thrown by the `extrapolator`
function. On `Supervision.Stop` the stream fails; on `Supervision.Resume` and `Supervision.Restart` the failed element is
dropped and the stream continues.

## Example

Imagine a videoconference client decoding a video feed from a colleague working remotely. It is possible
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import scala.concurrent.duration._

import org.apache.pekko
import pekko.stream.ActorAttributes
import pekko.stream.Supervision
import pekko.stream.testkit._
import pekko.stream.testkit.scaladsl.TestSink
import pekko.stream.testkit.scaladsl.TestSource
Expand Down Expand Up @@ -150,6 +151,108 @@ class FlowExpandSpec extends StreamSpec("""
source.sendNext(2).sendComplete()
sink.expectNext(2 -> 0).expectComplete()
}

"fail stream when expander throws and supervision is Stop" in {
val ex = new RuntimeException("boom")
val result = Source(1 to 5)
.expand(i => if (i == 3) throw ex else Iterator.single(i))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
.runWith(Sink.ignore)
result.failed.futureValue shouldBe ex
}

"fail stream when expander throws and supervision defaults to Stop" in {
val ex = new RuntimeException("boom")
val result = Source(1 to 5)
.expand(i => if (i == 3) throw ex else Iterator.single(i))
.runWith(Sink.ignore)
result.failed.futureValue shouldBe ex
}

"resume and keep current extrapolation when expander throws" in {
val ex = new RuntimeException("boom")
val publisher = TestPublisher.probe[Int]()
val subscriber = TestSubscriber.probe[Int]()

Source
.fromPublisher(publisher)
.expand(i => if (i == 1) Iterator(1, 10, 11) else if (i == 2) throw ex else Iterator.single(i))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.to(Sink.fromSubscriber(subscriber))
.run()

publisher.sendNext(1)
subscriber.requestNext(1)
publisher.sendNext(2) // throws in expander, element 2 is dropped under Resume
subscriber.requestNext(10) // continue from current extrapolation
subscriber.cancel()
}

"complete immediately on upstream finish if expanded is true" in {
val subscriber = TestSubscriber.probe[Int]()

Source.single(1)
.expand(_ => Iterator(1, 2, 3))
.to(Sink.fromSubscriber(subscriber))
.run()

subscriber.requestNext(1) // expanded becomes true
subscriber.expectComplete() // Does it complete immediately?
}

"restart and reset current extrapolation when expander throws" in {
val ex = new RuntimeException("boom")
val publisher = TestPublisher.probe[Int]()
val subscriber = TestSubscriber.probe[Int]()

Source
.fromPublisher(publisher)
.expand(i => if (i == 1) Iterator(1, 10, 11) else if (i == 2) throw ex else Iterator.single(i))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.to(Sink.fromSubscriber(subscriber))
.run()

publisher.sendNext(1)
subscriber.requestNext(1)
publisher.sendNext(2) // throws in expander, restart resets iterator state
subscriber.request(1)
subscriber.expectNoMessage(300.millis)
publisher.sendNext(3)
subscriber.requestNext(3)
subscriber.cancel()
}

"resume when iterator produced by expander throws during iteration" in {
val ex = new RuntimeException("boom")
val result = Source(1 to 4)
.expand(i =>
if (i == 3)
new Iterator[Int] {
override def hasNext: Boolean = true
override def next(): Int = throw ex
}
else Iterator.single(i))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runWith(Sink.seq)

Await.result(result, 3.seconds) shouldBe Seq(1, 2, 4)
}

"restart when iterator produced by expander throws during iteration" in {
val ex = new RuntimeException("boom")
val result = Source(1 to 4)
.expand(i =>
if (i == 3)
new Iterator[Int] {
override def hasNext: Boolean = true
override def next(): Int = throw ex
}
else Iterator.single(i))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.runWith(Sink.seq)

Await.result(result, 3.seconds) shouldBe Seq(1, 2, 4)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import scala.concurrent.Await
import scala.concurrent.duration._

import org.apache.pekko
import pekko.stream.ActorAttributes
import pekko.stream.Supervision
import pekko.stream.testkit._
import pekko.stream.testkit.scaladsl.TestSink
import pekko.stream.testkit.scaladsl.TestSource
Expand Down Expand Up @@ -170,6 +172,84 @@ class FlowExtrapolateSpec extends StreamSpec("""
source.sendNext(2).sendComplete()
sink.expectNext(2 -> 0).expectComplete()
}

"stop when extrapolator throws during iterator evaluation" in {
val ex = new RuntimeException("boom")
val publisher = TestPublisher.probe[Int]()
val subscriber = TestSubscriber.probe[Int]()

Source
.fromPublisher(publisher)
.extrapolate(i =>
if (i == 3)
new Iterator[Int] {
override def hasNext: Boolean = throw ex
override def next(): Int = throw ex
}
else Iterator.empty)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
.to(Sink.fromSubscriber(subscriber))
.run()

publisher.sendNext(3)
subscriber.requestNext(3)
subscriber.request(1)
subscriber.expectError(ex)
}

"resume when extrapolator throws during iterator evaluation" in {
val ex = new RuntimeException("boom")
val publisher = TestPublisher.probe[Int]()
val subscriber = TestSubscriber.probe[Int]()

Source
.fromPublisher(publisher)
.extrapolate(i =>
if (i == 3)
new Iterator[Int] {
override def hasNext: Boolean = throw ex
override def next(): Int = throw ex
}
else Iterator.empty)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.to(Sink.fromSubscriber(subscriber))
.run()

publisher.sendNext(3)
subscriber.requestNext(3)
subscriber.request(1)
subscriber.expectNoMessage(300.millis)
publisher.sendNext(4)
subscriber.requestNext(4)
subscriber.cancel()
}

"restart when extrapolator throws during iterator evaluation" in {
val ex = new RuntimeException("boom")
val publisher = TestPublisher.probe[Int]()
val subscriber = TestSubscriber.probe[Int]()

Source
.fromPublisher(publisher)
.extrapolate(i =>
if (i == 3)
new Iterator[Int] {
override def hasNext: Boolean = throw ex
override def next(): Int = throw ex
}
else Iterator.empty)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.to(Sink.fromSubscriber(subscriber))
.run()

publisher.sendNext(3)
subscriber.requestNext(3)
subscriber.request(1)
subscriber.expectNoMessage(300.millis)
publisher.sendNext(4)
subscriber.requestNext(4)
subscriber.cancel()
}
}

}
91 changes: 67 additions & 24 deletions stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1200,46 +1200,89 @@ private[stream] object Collect {
override val shape = FlowShape(in, out)

override def createLogic(attr: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler {
private def decider = attr.mandatoryAttribute[SupervisionStrategy].decider
private var iterator: Iterator[Out] = Iterator.empty
private var expanded = false
private val contextPropagation = ContextPropagation()

override def preStart(): Unit = pull(in)

def onPush(): Unit = {
iterator = extrapolate(grab(in))
if (iterator.hasNext) {
contextPropagation.suspendContext()
if (isAvailable(out)) {
expanded = true
pull(in)
push(out, iterator.next())
} else expanded = false
} else pull(in)
val elem = grab(in)
try iterator = extrapolate(elem)
catch {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Stop =>
failStage(ex)
case Supervision.Resume =>
if (!isClosed(in) && !hasBeenPulled(in)) pull(in)
case Supervision.Restart =>
restartState()
if (!isClosed(in) && !hasBeenPulled(in)) pull(in)
}
return
}
try {
if (iterator.hasNext) {
contextPropagation.suspendContext()
if (isAvailable(out)) {
expanded = true
pull(in)
push(out, iterator.next())
} else expanded = false
} else pull(in)
} catch {
case NonFatal(ex) => handleIteratorFailure(ex)
}
}

override def onUpstreamFinish(): Unit = {
if (iterator.hasNext && !expanded) () // need to wait
else completeStage()
try {
if (iterator.hasNext && !expanded) () // need to wait
else completeStage()
} catch {
case NonFatal(ex) => handleIteratorFailure(ex)
}
}

def onPull(): Unit = {
if (iterator.hasNext) {
contextPropagation.resumeContext()
if (!expanded) {
expanded = true
if (isClosed(in)) {
push(out, iterator.next())
completeStage()
} else {
// expand needs to pull first to be “fair” when upstream is not actually slow
pull(in)
push(out, iterator.next())
}
} else push(out, iterator.next())
try {
if (iterator.hasNext) {
contextPropagation.resumeContext()
if (!expanded) {
expanded = true
if (isClosed(in)) {
push(out, iterator.next())
completeStage()
} else {
// expand needs to pull first to be “fair” when upstream is not actually slow
pull(in)
push(out, iterator.next())
}
} else push(out, iterator.next())
}
} catch {
case NonFatal(ex) => handleIteratorFailure(ex)
}
}

private def restartState(): Unit = {
iterator = Iterator.empty
expanded = false
}

private def handleIteratorFailure(ex: Throwable): Unit =
decider(ex) match {
case Supervision.Stop =>
failStage(ex)
case Supervision.Resume | Supervision.Restart =>
// iterator is corrupt after any hasNext/next failure; must reset for both Resume and Restart
restartState()
if (isClosed(in)) completeStage()
else if (!hasBeenPulled(in)) pull(in)
}

setHandler(in, this)
setHandler(out, this)
}
Expand Down
Loading