Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ a function has to be provided to calculate the individual cost of each element.

The throttle operator combines well with the @ref[`queue`](./../Source/queue.md) operator to adapt the speeds on both ends of the `queue`-`throttle` pair.

When a cost-calculation function is supplied, this operator adheres to the ActorAttributes.SupervisionStrategy attribute. On Supervision.Resume the offending element is dropped; Supervision.Restart behaves the same as Supervision.Resume because throttle keeps no accumulated per-element state.

See also @ref:[Buffers and working with rate](../../stream-rate.md) for related operators.

## Example
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,16 +337,71 @@ class FlowThrottleSpec extends StreamSpec("""
.expectComplete()
}

"handle rate calculation function exception" in {
val ex = new RuntimeException with NoStackTrace
"stop on cost calculation function exception with explicit stopping decider" in {
val ex = new RuntimeException("boom") with NoStackTrace
Source(1 to 5)
.throttle(2, 200.millis, 0, _ => { throw ex }, Shaping)
.throttle(1, 100.millis, 5, Enforcing)
.throttle(1, 1.second, 100, n => if (n == 3) throw ex else 1, Shaping)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
.runWith(TestSink[Int]())
.request(5)
.expectNext(1, 2)
.expectError(ex)
}

"stop on cost calculation function exception by default" in {
val ex = new RuntimeException("boom") with NoStackTrace
Source(1 to 5)
.throttle(1, 1.second, 100, n => if (n == 3) throw ex else 1, Shaping)
.runWith(TestSink[Int]())
.request(5)
.expectNext(1, 2)
.expectError(ex)
}

"resume on cost calculation function exception in shaping mode" in {
val ex = new RuntimeException("boom") with NoStackTrace
Source(1 to 5)
.throttle(1, 1.second, 100, n => if (n == 3) throw ex else 1, Shaping)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runWith(TestSink[Int]())
.request(5)
.expectNext(1, 2, 4, 5)
.expectComplete()
}

"resume and complete when the last element fails cost calculation" in {
val ex = new RuntimeException("boom") with NoStackTrace
Source(1 to 5)
.throttle(1, 1.second, 100, n => if (n == 5) throw ex else 1, Shaping)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runWith(TestSink[Int]())
.request(5)
.expectNext(1, 2, 3, 4)
.expectComplete()
}

"resume on cost calculation function exception in enforcing mode" in {
val ex = new RuntimeException("boom") with NoStackTrace
Source(1 to 5)
.throttle(1, 1.second, 100, n => if (n == 3) throw ex else 1, Enforcing)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runWith(TestSink[Int]())
.request(5)
.expectNext(1, 2, 4, 5)
.expectComplete()
}

"restart on cost calculation function exception like resume" in {
val ex = new RuntimeException("boom") with NoStackTrace
Source(1 to 5)
.throttle(1, 1.second, 100, n => if (n == 3) throw ex else 1, Shaping)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.runWith(TestSink[Int]())
.request(5)
.expectNext(1, 2, 4, 5)
.expectComplete()
}

"work for real scenario with automatic burst size" taggedAs TimingTest in {
val startTime = System.nanoTime()
val counter1 = new AtomicInteger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
package org.apache.pekko.stream.impl

import scala.concurrent.duration.{ FiniteDuration, _ }
import scala.util.control.NonFatal

import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.stream._
import pekko.stream.ActorAttributes.SupervisionStrategy
import pekko.stream.ThrottleMode.Enforcing
import pekko.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import pekko.stream.stage._
Expand Down Expand Up @@ -59,6 +61,7 @@ import pekko.util.NanoTimeTokenBucket
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with InHandler with OutHandler {
private val tokenBucket = new NanoTimeTokenBucket(effectiveMaximumBurst, nanosBetweenTokens)
private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
private var currentElement: T = _

override def preStart(): Unit = tokenBucket.init()
Expand All @@ -70,7 +73,21 @@ import pekko.util.NanoTimeTokenBucket

override def onPush(): Unit = {
val elem = grab(in)
val cost = costCalculation(elem)
val cost =
try costCalculation(elem)
catch {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Stop =>
failStage(ex)
return
// Throttle keeps no accumulated per-element state, so Restart behaves like Resume:
// skip the offending element. The rate-limiting token bucket is deliberately not reset.
case Supervision.Resume | Supervision.Restart =>
pull(in)
return
}
}
val delayNanos = tokenBucket.offer(cost)

if (delayNanos == 0L) push(out, elem)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4131,6 +4131,10 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* The throttle `mode` is [[pekko.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to
* meet throttle rate.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* offending element is dropped; `Supervision.Restart` behaves the same as `Supervision.Resume`
* because throttle keeps no accumulated per-element state.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
Expand Down Expand Up @@ -4175,6 +4179,10 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* offending element is dropped; `Supervision.Restart` behaves the same as `Supervision.Resume`
* because throttle keeps no accumulated per-element state.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4619,6 +4619,10 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* The throttle `mode` is [[pekko.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to
* meet throttle rate.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* offending element is dropped; `Supervision.Restart` behaves the same as `Supervision.Resume`
* because throttle keeps no accumulated per-element state.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
Expand Down Expand Up @@ -4663,6 +4667,10 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* offending element is dropped; `Supervision.Restart` behaves the same as `Supervision.Resume`
* because throttle keeps no accumulated per-element state.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2780,6 +2780,10 @@ final class SubFlow[In, Out, Mat](
* The throttle `mode` is [[pekko.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to
* meet throttle rate.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* offending element is dropped; `Supervision.Restart` behaves the same as `Supervision.Resume`
* because throttle keeps no accumulated per-element state.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
Expand Down Expand Up @@ -2824,6 +2828,10 @@ final class SubFlow[In, Out, Mat](
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* offending element is dropped; `Supervision.Restart` behaves the same as `Supervision.Resume`
* because throttle keeps no accumulated per-element state.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2747,6 +2747,10 @@ final class SubSource[Out, Mat](
* The throttle `mode` is [[pekko.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to
* meet throttle rate.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* offending element is dropped; `Supervision.Restart` behaves the same as `Supervision.Resume`
* because throttle keeps no accumulated per-element state.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
Expand Down Expand Up @@ -2791,6 +2795,10 @@ final class SubSource[Out, Mat](
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* offending element is dropped; `Supervision.Restart` behaves the same as `Supervision.Resume`
* because throttle keeps no accumulated per-element state.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3178,6 +3178,10 @@ trait FlowOps[+Out, +Mat] {
* The throttle `mode` is [[pekko.stream.ThrottleMode.Shaping]], which makes pauses before emitting messages to
* meet throttle rate.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* offending element is dropped; `Supervision.Restart` behaves the same as `Supervision.Resume`
* because throttle keeps no accumulated per-element state.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
Expand Down Expand Up @@ -3219,6 +3223,10 @@ trait FlowOps[+Out, +Mat] {
* In other words the throttler always enforces the rate limit when `maximumBurst` parameter is given, but in
* certain cases (mostly due to limited scheduler resolution) it enforces a tighter bound than what was prescribed.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* offending element is dropped; `Supervision.Restart` behaves the same as `Supervision.Resume`
* because throttle keeps no accumulated per-element state.
*
* '''Emits when''' upstream emits an element and configured time per each element elapsed
*
* '''Backpressures when''' downstream backpressures or the incoming rate is higher than the speed limit
Expand Down