diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/delayWith.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/delayWith.md index f76efe39a0..25f361ebca 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/delayWith.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/delayWith.md @@ -14,6 +14,8 @@ Delay every element passed through with a duration that can be controlled dynami Delay every element passed through with a duration that can be controlled dynamically, individually for each elements (via the `DelayStrategy`). +This operator adheres to the ActorAttributes.SupervisionStrategy attribute. On `Supervision.Resume` the offending element is dropped; on `Supervision.Restart` the delay strategy is recreated from the supplier (already-buffered elements keep their delays). + @@@div { .callout } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDelaySpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDelaySpec.scala index 42608cb9c9..e486eb98fc 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDelaySpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDelaySpec.scala @@ -347,5 +347,133 @@ class FlowDelaySpec extends StreamSpec { probe.expectComplete() } + + "fail with stop supervision when delay strategy throws" in { + val ex = new RuntimeException("boom-stop") with NoStackTrace + val failed = Source(1 to 3) + .delayWith( + () => + new DelayStrategy[Int] { + override def nextDelay(elem: Int): FiniteDuration = + if (elem == 2) throw ex else Duration.Zero + }, + OverflowStrategy.backpressure) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .runWith(Sink.seq) + .failed + .futureValue + + failed should be(ex) + } + + "default to stop supervision when delay strategy throws" in { + val ex = new RuntimeException("boom-default-stop") with NoStackTrace + val failed = Source(1 to 3) + .delayWith( + () => + new DelayStrategy[Int] { + override def nextDelay(elem: Int): FiniteDuration = + if (elem == 2) throw ex else Duration.Zero + }, + OverflowStrategy.backpressure) + .runWith(Sink.seq) + .failed + .futureValue + + failed should be(ex) + } + + "drop the offending element on resume supervision when delay strategy throws" in { + val ex = new RuntimeException("boom-resume") with NoStackTrace + val result = Source(1 to 3) + .delayWith( + () => + new DelayStrategy[Int] { + override def nextDelay(elem: Int): FiniteDuration = + if (elem == 2) throw ex else Duration.Zero + }, + OverflowStrategy.backpressure) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(Sink.seq) + .futureValue + + result should ===(Seq(1, 3)) + } + + "drop the offending element and recreate stateful strategy on restart supervision" in { + val ex = new RuntimeException("boom-restart") with NoStackTrace + val stateNotResetEx = new RuntimeException("strategy state was not reset") with NoStackTrace + val result = Source(1 to 3) + .delayWith( + () => + new DelayStrategy[Int] { + private var invocationCount = 0 + + override def nextDelay(elem: Int): FiniteDuration = { + invocationCount += 1 + if (elem == 2) throw ex + else if (invocationCount == 1) Duration.Zero + else throw stateNotResetEx + } + }, + OverflowStrategy.backpressure) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .runWith(Sink.seq) + .futureValue + + result should ===(Seq(1, 3)) + } + + "keep the stateful strategy (no reset) on resume supervision" in { + val ex = new RuntimeException("boom-resume-stateful") with NoStackTrace + val stateWasResetEx = new RuntimeException("strategy state was reset") with NoStackTrace + val result = Source(1 to 3) + .delayWith( + () => + new DelayStrategy[Int] { + private var invocationCount = 0 + override def nextDelay(elem: Int): FiniteDuration = { + invocationCount += 1 + elem match { + case 2 => throw ex + case 3 if invocationCount != 3 => throw stateWasResetEx + case _ => Duration.Zero + } + } + }, + OverflowStrategy.backpressure) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(Sink.seq) + .futureValue + result should ===(Seq(1, 3)) + } + + "not crash in onTimer when an overflow handler empties the buffer and the strategy throws" taggedAs TimingTest in { + val ex = new RuntimeException("boom-overflow") with NoStackTrace + val upstream = TestPublisher.probe[Int]() + val downstream = TestSubscriber.probe[Int]() + Source + .fromPublisher(upstream) + .delayWith( + () => + new DelayStrategy[Int] { + override def nextDelay(elem: Int): FiniteDuration = + if (elem == 2) throw ex else 200.millis + }, + OverflowStrategy.dropBuffer) + .withAttributes(Attributes.inputBuffer(1, 1) and + ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .to(Sink.fromSubscriber(downstream)) + .run() + + downstream.request(10) + upstream.sendNext(1) + upstream.sendNext(2) + downstream.expectNoMessage(400.millis) + upstream.sendNext(3) + downstream.expectNext(500.millis, 3) + upstream.sendComplete() + downstream.expectComplete() + } } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala index 010260e03d..39cd84b39a 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala @@ -1982,7 +1982,8 @@ private[stream] object Collect { private val size = inheritedAttributes.mandatoryAttribute[InputBuffer].max - private val delayStrategy = delayStrategySupplier() + private var delayStrategy = delayStrategySupplier() + private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider // buffer has pairs of timestamp of expected push and element private val buffer = BufferImpl[(Long, T)](size, inheritedAttributes) @@ -2033,7 +2034,9 @@ private[stream] object Collect { onPushWhenBufferFull() else { grabAndPull() - if (!isTimerActive(TimerName)) { + // Resume/Restart may drop the current element, leaving the buffer empty. + // Restart also recreates the stateful strategy for subsequent elements. + if (!buffer.isEmpty && !isTimerActive(TimerName)) { val waitTime = nextElementWaitTime() if (waitTime <= DelayPrecisionMS && isAvailable(out)) { push(out, buffer.dequeue()._2) @@ -2051,7 +2054,26 @@ private[stream] object Collect { private def grabAndPull(): Unit = { val element = grab(in) - buffer.enqueue((System.nanoTime() + delayStrategy.nextDelay(element).toNanos, element)) + val delay = + try delayStrategy.nextDelay(element) + catch { + case NonFatal(ex) => + decider(ex) match { + case Supervision.Stop => + failStage(ex) + return + case Supervision.Resume => + // Skip only the offending element, keep buffered elements and current strategy state. + if (shouldPull) pull(in) + return + case Supervision.Restart => + // Skip offending element and recreate the potentially stateful strategy. + delayStrategy = delayStrategySupplier() + if (shouldPull) pull(in) + return + } + } + buffer.enqueue((System.nanoTime() + delay.toNanos, element)) if (shouldPull) pull(in) } @@ -2083,7 +2105,9 @@ private[stream] object Collect { } final override protected def onTimer(key: Any): Unit = { - if (isAvailable(out)) + // A Resume/Restart skip (possibly via an overflow handler that already mutated the buffer) can + // leave the buffer empty while this timer is still armed; guard against dequeuing an empty buffer. + if (isAvailable(out) && !buffer.isEmpty) push(out, buffer.dequeue()._2) completeIfReady() diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 512b8ac022..a3a0bb0a19 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -1786,6 +1786,10 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * Internal buffer has default capacity 16. You can set buffer size by calling `addAttributes(inputBuffer)` * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is dropped; on `Supervision.Restart` the delay strategy is recreated from the + * supplier (already-buffered elements keep their delays). + * * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed * * EmitEarly - strategy do not wait to emit element if buffer is full * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index e6ed914a8e..c3279b090b 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -3670,6 +3670,10 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * Internal buffer has default capacity 16. You can set buffer size by calling `addAttributes(inputBuffer)` * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is dropped; on `Supervision.Restart` the delay strategy is recreated from the + * supplier (already-buffered elements keep their delays). + * * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed * * EmitEarly - strategy do not wait to emit element if buffer is full * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index e2a1644e12..6214dae377 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -1180,6 +1180,10 @@ final class SubFlow[In, Out, Mat]( * * Internal buffer has default capacity 16. You can set buffer size by calling `addAttributes(inputBuffer)` * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is dropped; on `Supervision.Restart` the delay strategy is recreated from the + * supplier (already-buffered elements keep their delays). + * * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed * * EmitEarly - strategy do not wait to emit element if buffer is full * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index c756c91c56..4454d74ceb 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -1272,6 +1272,10 @@ final class SubSource[Out, Mat]( * * Internal buffer has default capacity 16. You can set buffer size by calling `addAttributes(inputBuffer)` * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is dropped; on `Supervision.Restart` the delay strategy is recreated from the + * supplier (already-buffered elements keep their delays). + * * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed * * EmitEarly - strategy do not wait to emit element if buffer is full * diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 4ae16e46ce..e5a8324171 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -2287,6 +2287,10 @@ trait FlowOps[+Out, +Mat] { * * Internal buffer has default capacity 16. You can set buffer size by calling `addAttributes(inputBuffer)` * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is dropped; on `Supervision.Restart` the delay strategy is recreated from the + * supplier (already-buffered elements keep their delays). + * * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed * * EmitEarly - strategy do not wait to emit element if buffer is full *