diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/throttle.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/throttle.md index efc1a1ed39..9f663dca02 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/throttle.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/throttle.md @@ -16,6 +16,8 @@ a function has to be provided to calculate the individual cost of each element. The throttle operator combines well with the @ref[`queue`](./../Source/queue.md) operator to adapt the speeds on both ends of the `queue`-`throttle` pair. +When a cost-calculation function is supplied, this operator adheres to the ActorAttributes.SupervisionStrategy attribute. On Supervision.Resume the offending element is dropped; Supervision.Restart behaves the same as Supervision.Resume because throttle keeps no accumulated per-element state. + See also @ref:[Buffers and working with rate](../../stream-rate.md) for related operators. ## Example diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowThrottleSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowThrottleSpec.scala index 92a5570864..19d693d4e5 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowThrottleSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowThrottleSpec.scala @@ -337,16 +337,71 @@ class FlowThrottleSpec extends StreamSpec(""" .expectComplete() } - "handle rate calculation function exception" in { - val ex = new RuntimeException with NoStackTrace + "stop on cost calculation function exception with explicit stopping decider" in { + val ex = new RuntimeException("boom") with NoStackTrace Source(1 to 5) - .throttle(2, 200.millis, 0, _ => { throw ex }, Shaping) - .throttle(1, 100.millis, 5, Enforcing) + .throttle(1, 1.second, 100, n => if (n == 3) throw ex else 1, Shaping) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .runWith(TestSink[Int]()) + .request(5) + .expectNext(1, 2) + .expectError(ex) + } + + "stop on cost calculation function exception by default" in { + val ex = new RuntimeException("boom") with NoStackTrace + Source(1 to 5) + .throttle(1, 1.second, 100, n => if (n == 3) throw ex else 1, Shaping) .runWith(TestSink[Int]()) .request(5) + .expectNext(1, 2) .expectError(ex) } + "resume on cost calculation function exception in shaping mode" in { + val ex = new RuntimeException("boom") with NoStackTrace + Source(1 to 5) + .throttle(1, 1.second, 100, n => if (n == 3) throw ex else 1, Shaping) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(TestSink[Int]()) + .request(5) + .expectNext(1, 2, 4, 5) + .expectComplete() + } + + "resume and complete when the last element fails cost calculation" in { + val ex = new RuntimeException("boom") with NoStackTrace + Source(1 to 5) + .throttle(1, 1.second, 100, n => if (n == 5) throw ex else 1, Shaping) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(TestSink[Int]()) + .request(5) + .expectNext(1, 2, 3, 4) + .expectComplete() + } + + "resume on cost calculation function exception in enforcing mode" in { + val ex = new RuntimeException("boom") with NoStackTrace + Source(1 to 5) + .throttle(1, 1.second, 100, n => if (n == 3) throw ex else 1, Enforcing) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(TestSink[Int]()) + .request(5) + .expectNext(1, 2, 4, 5) + .expectComplete() + } + + "restart on cost calculation function exception like resume" in { + val ex = new RuntimeException("boom") with NoStackTrace + Source(1 to 5) + .throttle(1, 1.second, 100, n => if (n == 3) throw ex else 1, Shaping) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .runWith(TestSink[Int]()) + .request(5) + .expectNext(1, 2, 4, 5) + .expectComplete() + } + "work for real scenario with automatic burst size" taggedAs TimingTest in { val startTime = System.nanoTime() val counter1 = new AtomicInteger diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Throttle.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Throttle.scala index c2c04aede9..9b241d1377 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Throttle.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Throttle.scala @@ -14,10 +14,12 @@ package org.apache.pekko.stream.impl import scala.concurrent.duration.{ FiniteDuration, _ } +import scala.util.control.NonFatal import org.apache.pekko import pekko.annotation.InternalApi import pekko.stream._ +import pekko.stream.ActorAttributes.SupervisionStrategy import pekko.stream.ThrottleMode.Enforcing import pekko.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import pekko.stream.stage._ @@ -59,6 +61,7 @@ import pekko.util.NanoTimeTokenBucket override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler { private val tokenBucket = new NanoTimeTokenBucket(effectiveMaximumBurst, nanosBetweenTokens) + private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider private var currentElement: T = _ override def preStart(): Unit = tokenBucket.init() @@ -70,7 +73,21 @@ import pekko.util.NanoTimeTokenBucket override def onPush(): Unit = { val elem = grab(in) - val cost = costCalculation(elem) + val cost = + try costCalculation(elem) + catch { + case NonFatal(ex) => + decider(ex) match { + case Supervision.Stop => + failStage(ex) + return + // Throttle keeps no accumulated per-element state, so Restart behaves like Resume: + // skip the offending element. The rate-limiting token bucket is deliberately not reset. + case Supervision.Resume | Supervision.Restart => + pull(in) + return + } + } val delayNanos = tokenBucket.offer(cost) if (delayNanos == 0L) push(out, elem) diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 512b8ac022..cec29dd823 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -4131,6 +4131,10 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * The throttle `mode` is [[pekko.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to * meet throttle rate. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is dropped; `Supervision.Restart` behaves the same as `Supervision.Resume` + * because throttle keeps no accumulated per-element state. + * * '''Emits when''' upstream emits an element and configured time per each element elapsed * * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit @@ -4175,6 +4179,10 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in * certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is dropped; `Supervision.Restart` behaves the same as `Supervision.Resume` + * because throttle keeps no accumulated per-element state. + * * '''Emits when''' upstream emits an element and configured time per each element elapsed * * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index e6ed914a8e..1fdeef8272 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -4619,6 +4619,10 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * The throttle `mode` is [[pekko.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to * meet throttle rate. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is dropped; `Supervision.Restart` behaves the same as `Supervision.Resume` + * because throttle keeps no accumulated per-element state. + * * '''Emits when''' upstream emits an element and configured time per each element elapsed * * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit @@ -4663,6 +4667,10 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in * certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is dropped; `Supervision.Restart` behaves the same as `Supervision.Resume` + * because throttle keeps no accumulated per-element state. + * * '''Emits when''' upstream emits an element and configured time per each element elapsed * * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index e2a1644e12..117ef1fa64 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -2780,6 +2780,10 @@ final class SubFlow[In, Out, Mat]( * The throttle `mode` is [[pekko.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to * meet throttle rate. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is dropped; `Supervision.Restart` behaves the same as `Supervision.Resume` + * because throttle keeps no accumulated per-element state. + * * '''Emits when''' upstream emits an element and configured time per each element elapsed * * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit @@ -2824,6 +2828,10 @@ final class SubFlow[In, Out, Mat]( * In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in * certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is dropped; `Supervision.Restart` behaves the same as `Supervision.Resume` + * because throttle keeps no accumulated per-element state. + * * '''Emits when''' upstream emits an element and configured time per each element elapsed * * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index c756c91c56..b0566afdf9 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -2747,6 +2747,10 @@ final class SubSource[Out, Mat]( * The throttle `mode` is [[pekko.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to * meet throttle rate. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is dropped; `Supervision.Restart` behaves the same as `Supervision.Resume` + * because throttle keeps no accumulated per-element state. + * * '''Emits when''' upstream emits an element and configured time per each element elapsed * * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit @@ -2791,6 +2795,10 @@ final class SubSource[Out, Mat]( * In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in * certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is dropped; `Supervision.Restart` behaves the same as `Supervision.Resume` + * because throttle keeps no accumulated per-element state. + * * '''Emits when''' upstream emits an element and configured time per each element elapsed * * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 4ae16e46ce..d9f39fdda8 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -3178,6 +3178,10 @@ trait FlowOps[+Out, +Mat] { * The throttle `mode` is [[pekko.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to * meet throttle rate. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is dropped; `Supervision.Restart` behaves the same as `Supervision.Resume` + * because throttle keeps no accumulated per-element state. + * * '''Emits when''' upstream emits an element and configured time per each element elapsed * * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit @@ -3219,6 +3223,10 @@ trait FlowOps[+Out, +Mat] { * In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in * certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is dropped; `Supervision.Restart` behaves the same as `Supervision.Resume` + * because throttle keeps no accumulated per-element state. + * * '''Emits when''' upstream emits an element and configured time per each element elapsed * * '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit