Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ Delay every element passed through with a duration that can be controlled dynami

Delay every element passed through with a duration that can be controlled dynamically, individually for each elements (via the `DelayStrategy`).

This operator adheres to the ActorAttributes.SupervisionStrategy attribute. On `Supervision.Resume` the offending element is dropped; on `Supervision.Restart` the delay strategy is recreated from the supplier (already-buffered elements keep their delays).


@@@div { .callout }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,5 +347,133 @@ class FlowDelaySpec extends StreamSpec {
probe.expectComplete()

}

"fail with stop supervision when delay strategy throws" in {
val ex = new RuntimeException("boom-stop") with NoStackTrace
val failed = Source(1 to 3)
.delayWith(
() =>
new DelayStrategy[Int] {
override def nextDelay(elem: Int): FiniteDuration =
if (elem == 2) throw ex else Duration.Zero
},
OverflowStrategy.backpressure)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
.runWith(Sink.seq)
.failed
.futureValue

failed should be(ex)
}

"default to stop supervision when delay strategy throws" in {
val ex = new RuntimeException("boom-default-stop") with NoStackTrace
val failed = Source(1 to 3)
.delayWith(
() =>
new DelayStrategy[Int] {
override def nextDelay(elem: Int): FiniteDuration =
if (elem == 2) throw ex else Duration.Zero
},
OverflowStrategy.backpressure)
.runWith(Sink.seq)
.failed
.futureValue

failed should be(ex)
}

"drop the offending element on resume supervision when delay strategy throws" in {
val ex = new RuntimeException("boom-resume") with NoStackTrace
val result = Source(1 to 3)
.delayWith(
() =>
new DelayStrategy[Int] {
override def nextDelay(elem: Int): FiniteDuration =
if (elem == 2) throw ex else Duration.Zero
},
OverflowStrategy.backpressure)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runWith(Sink.seq)
.futureValue

result should ===(Seq(1, 3))
}

"drop the offending element and recreate stateful strategy on restart supervision" in {
val ex = new RuntimeException("boom-restart") with NoStackTrace
val stateNotResetEx = new RuntimeException("strategy state was not reset") with NoStackTrace
val result = Source(1 to 3)
.delayWith(
() =>
new DelayStrategy[Int] {
private var invocationCount = 0

override def nextDelay(elem: Int): FiniteDuration = {
invocationCount += 1
if (elem == 2) throw ex
else if (invocationCount == 1) Duration.Zero
else throw stateNotResetEx
}
},
OverflowStrategy.backpressure)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.runWith(Sink.seq)
.futureValue

result should ===(Seq(1, 3))
}

"keep the stateful strategy (no reset) on resume supervision" in {
val ex = new RuntimeException("boom-resume-stateful") with NoStackTrace
val stateWasResetEx = new RuntimeException("strategy state was reset") with NoStackTrace
val result = Source(1 to 3)
.delayWith(
() =>
new DelayStrategy[Int] {
private var invocationCount = 0
override def nextDelay(elem: Int): FiniteDuration = {
invocationCount += 1
elem match {
case 2 => throw ex
case 3 if invocationCount != 3 => throw stateWasResetEx
case _ => Duration.Zero
}
}
},
OverflowStrategy.backpressure)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runWith(Sink.seq)
.futureValue
result should ===(Seq(1, 3))
}

"not crash in onTimer when an overflow handler empties the buffer and the strategy throws" taggedAs TimingTest in {
val ex = new RuntimeException("boom-overflow") with NoStackTrace
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]()
Source
.fromPublisher(upstream)
.delayWith(
() =>
new DelayStrategy[Int] {
override def nextDelay(elem: Int): FiniteDuration =
if (elem == 2) throw ex else 200.millis
},
OverflowStrategy.dropBuffer)
.withAttributes(Attributes.inputBuffer(1, 1) and
ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.to(Sink.fromSubscriber(downstream))
.run()

downstream.request(10)
upstream.sendNext(1)
upstream.sendNext(2)
downstream.expectNoMessage(400.millis)
upstream.sendNext(3)
downstream.expectNext(500.millis, 3)
upstream.sendComplete()
downstream.expectComplete()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1982,7 +1982,8 @@ private[stream] object Collect {

private val size = inheritedAttributes.mandatoryAttribute[InputBuffer].max

private val delayStrategy = delayStrategySupplier()
private var delayStrategy = delayStrategySupplier()
private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider

// buffer has pairs of timestamp of expected push and element
private val buffer = BufferImpl[(Long, T)](size, inheritedAttributes)
Expand Down Expand Up @@ -2033,7 +2034,9 @@ private[stream] object Collect {
onPushWhenBufferFull()
else {
grabAndPull()
if (!isTimerActive(TimerName)) {
// Resume/Restart may drop the current element, leaving the buffer empty.
// Restart also recreates the stateful strategy for subsequent elements.
if (!buffer.isEmpty && !isTimerActive(TimerName)) {
val waitTime = nextElementWaitTime()
if (waitTime <= DelayPrecisionMS && isAvailable(out)) {
push(out, buffer.dequeue()._2)
Expand All @@ -2051,7 +2054,26 @@ private[stream] object Collect {

private def grabAndPull(): Unit = {
val element = grab(in)
buffer.enqueue((System.nanoTime() + delayStrategy.nextDelay(element).toNanos, element))
val delay =
try delayStrategy.nextDelay(element)
catch {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Stop =>
failStage(ex)
return
case Supervision.Resume =>
// Skip only the offending element, keep buffered elements and current strategy state.
if (shouldPull) pull(in)
return
case Supervision.Restart =>
// Skip offending element and recreate the potentially stateful strategy.
delayStrategy = delayStrategySupplier()
if (shouldPull) pull(in)
return
}
}
buffer.enqueue((System.nanoTime() + delay.toNanos, element))
if (shouldPull) pull(in)
}

Expand Down Expand Up @@ -2083,7 +2105,9 @@ private[stream] object Collect {
}

final override protected def onTimer(key: Any): Unit = {
if (isAvailable(out))
// A Resume/Restart skip (possibly via an overflow handler that already mutated the buffer) can
// leave the buffer empty while this timer is still armed; guard against dequeuing an empty buffer.
if (isAvailable(out) && !buffer.isEmpty)
push(out, buffer.dequeue()._2)

completeIfReady()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1786,6 +1786,10 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* Internal buffer has default capacity 16. You can set buffer size by calling `addAttributes(inputBuffer)`
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* offending element is dropped; on `Supervision.Restart` the delay strategy is recreated from the
* supplier (already-buffered elements keep their delays).
*
* '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed
* * EmitEarly - strategy do not wait to emit element if buffer is full
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3670,6 +3670,10 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* Internal buffer has default capacity 16. You can set buffer size by calling `addAttributes(inputBuffer)`
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* offending element is dropped; on `Supervision.Restart` the delay strategy is recreated from the
* supplier (already-buffered elements keep their delays).
*
* '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed
* * EmitEarly - strategy do not wait to emit element if buffer is full
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,10 @@ final class SubFlow[In, Out, Mat](
*
* Internal buffer has default capacity 16. You can set buffer size by calling `addAttributes(inputBuffer)`
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* offending element is dropped; on `Supervision.Restart` the delay strategy is recreated from the
* supplier (already-buffered elements keep their delays).
*
* '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed
* * EmitEarly - strategy do not wait to emit element if buffer is full
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1272,6 +1272,10 @@ final class SubSource[Out, Mat](
*
* Internal buffer has default capacity 16. You can set buffer size by calling `addAttributes(inputBuffer)`
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* offending element is dropped; on `Supervision.Restart` the delay strategy is recreated from the
* supplier (already-buffered elements keep their delays).
*
* '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed
* * EmitEarly - strategy do not wait to emit element if buffer is full
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2287,6 +2287,10 @@ trait FlowOps[+Out, +Mat] {
*
* Internal buffer has default capacity 16. You can set buffer size by calling `addAttributes(inputBuffer)`
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* offending element is dropped; on `Supervision.Restart` the delay strategy is recreated from the
* supplier (already-buffered elements keep their delays).
*
* '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed
* * EmitEarly - strategy do not wait to emit element if buffer is full
*
Expand Down