From 99d0419860a7a08d288b94205a4e60bf78b96022 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Thu, 25 Jun 2026 14:16:42 +0800 Subject: [PATCH] feat: add supervision strategy support for delayWith Motivation: The delayWith operator computes each element's delay via a user-provided DelayStrategy.nextDelay, but it was called without a try-catch. Any exception failed the stream unconditionally, ignoring the configured SupervisionStrategy. Modification: Wrap delayStrategy.nextDelay(element) in Delay.grabAndPull() with a try-catch that consults the SupervisionStrategy decider. Stop fails the stage, Resume drops the offending element while keeping buffered elements and the strategy, and Restart recreates the (potentially stateful) delay strategy from the supplier. The delayStrategy field became a var to allow recreation. Because a dropped element no longer enqueues, onPush and onTimer now guard their buffer access with a non-empty check so a skip cannot dequeue/peek an empty buffer (onTimer previously could NPE when an overflow handler emptied the buffer while a timer was still armed). The decider is a lazy val for zero overhead on the happy path. Update the delayWith Scala/Java DSL scaladoc and the operator reference page. Result: delayWith now adheres to the SupervisionStrategy attribute with well-defined Resume and Restart semantics, with no buffer-underflow on the supervised paths. Tests: - sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowDelaySpec" -- 26/26 passed References: Refs #3110 --- .../operators/Source-or-Flow/delayWith.md | 2 + .../pekko/stream/scaladsl/FlowDelaySpec.scala | 128 ++++++++++++++++++ .../apache/pekko/stream/impl/fusing/Ops.scala | 32 ++++- .../apache/pekko/stream/javadsl/Flow.scala | 4 + .../apache/pekko/stream/javadsl/Source.scala | 4 + .../apache/pekko/stream/javadsl/SubFlow.scala | 4 + .../pekko/stream/javadsl/SubSource.scala | 4 + .../apache/pekko/stream/scaladsl/Flow.scala | 4 + 8 files changed, 178 insertions(+), 4 deletions(-) diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/delayWith.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/delayWith.md index f76efe39a0c..25f361ebca3 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/delayWith.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/delayWith.md @@ -14,6 +14,8 @@ Delay every element passed through with a duration that can be controlled dynami Delay every element passed through with a duration that can be controlled dynamically, individually for each elements (via the `DelayStrategy`). +This operator adheres to the ActorAttributes.SupervisionStrategy attribute. On `Supervision.Resume` the offending element is dropped; on `Supervision.Restart` the delay strategy is recreated from the supplier (already-buffered elements keep their delays). + @@@div { .callout } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDelaySpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDelaySpec.scala index 42608cb9c9c..e486eb98fc5 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDelaySpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDelaySpec.scala @@ -347,5 +347,133 @@ class FlowDelaySpec extends StreamSpec { probe.expectComplete() } + + "fail with stop supervision when delay strategy throws" in { + val ex = new RuntimeException("boom-stop") with NoStackTrace + val failed = Source(1 to 3) + .delayWith( + () => + new DelayStrategy[Int] { + override def nextDelay(elem: Int): FiniteDuration = + if (elem == 2) throw ex else Duration.Zero + }, + OverflowStrategy.backpressure) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .runWith(Sink.seq) + .failed + .futureValue + + failed should be(ex) + } + + "default to stop supervision when delay strategy throws" in { + val ex = new RuntimeException("boom-default-stop") with NoStackTrace + val failed = Source(1 to 3) + .delayWith( + () => + new DelayStrategy[Int] { + override def nextDelay(elem: Int): FiniteDuration = + if (elem == 2) throw ex else Duration.Zero + }, + OverflowStrategy.backpressure) + .runWith(Sink.seq) + .failed + .futureValue + + failed should be(ex) + } + + "drop the offending element on resume supervision when delay strategy throws" in { + val ex = new RuntimeException("boom-resume") with NoStackTrace + val result = Source(1 to 3) + .delayWith( + () => + new DelayStrategy[Int] { + override def nextDelay(elem: Int): FiniteDuration = + if (elem == 2) throw ex else Duration.Zero + }, + OverflowStrategy.backpressure) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(Sink.seq) + .futureValue + + result should ===(Seq(1, 3)) + } + + "drop the offending element and recreate stateful strategy on restart supervision" in { + val ex = new RuntimeException("boom-restart") with NoStackTrace + val stateNotResetEx = new RuntimeException("strategy state was not reset") with NoStackTrace + val result = Source(1 to 3) + .delayWith( + () => + new DelayStrategy[Int] { + private var invocationCount = 0 + + override def nextDelay(elem: Int): FiniteDuration = { + invocationCount += 1 + if (elem == 2) throw ex + else if (invocationCount == 1) Duration.Zero + else throw stateNotResetEx + } + }, + OverflowStrategy.backpressure) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .runWith(Sink.seq) + .futureValue + + result should ===(Seq(1, 3)) + } + + "keep the stateful strategy (no reset) on resume supervision" in { + val ex = new RuntimeException("boom-resume-stateful") with NoStackTrace + val stateWasResetEx = new RuntimeException("strategy state was reset") with NoStackTrace + val result = Source(1 to 3) + .delayWith( + () => + new DelayStrategy[Int] { + private var invocationCount = 0 + override def nextDelay(elem: Int): FiniteDuration = { + invocationCount += 1 + elem match { + case 2 => throw ex + case 3 if invocationCount != 3 => throw stateWasResetEx + case _ => Duration.Zero + } + } + }, + OverflowStrategy.backpressure) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(Sink.seq) + .futureValue + result should ===(Seq(1, 3)) + } + + "not crash in onTimer when an overflow handler empties the buffer and the strategy throws" taggedAs TimingTest in { + val ex = new RuntimeException("boom-overflow") with NoStackTrace + val upstream = TestPublisher.probe[Int]() + val downstream = TestSubscriber.probe[Int]() + Source + .fromPublisher(upstream) + .delayWith( + () => + new DelayStrategy[Int] { + override def nextDelay(elem: Int): FiniteDuration = + if (elem == 2) throw ex else 200.millis + }, + OverflowStrategy.dropBuffer) + .withAttributes(Attributes.inputBuffer(1, 1) and + ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .to(Sink.fromSubscriber(downstream)) + .run() + + downstream.request(10) + upstream.sendNext(1) + upstream.sendNext(2) + downstream.expectNoMessage(400.millis) + upstream.sendNext(3) + downstream.expectNext(500.millis, 3) + upstream.sendComplete() + downstream.expectComplete() + } } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala index 010260e03d0..39cd84b39a6 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala @@ -1982,7 +1982,8 @@ private[stream] object Collect { private val size = inheritedAttributes.mandatoryAttribute[InputBuffer].max - private val delayStrategy = delayStrategySupplier() + private var delayStrategy = delayStrategySupplier() + private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider // buffer has pairs of timestamp of expected push and element private val buffer = BufferImpl[(Long, T)](size, inheritedAttributes) @@ -2033,7 +2034,9 @@ private[stream] object Collect { onPushWhenBufferFull() else { grabAndPull() - if (!isTimerActive(TimerName)) { + // Resume/Restart may drop the current element, leaving the buffer empty. + // Restart also recreates the stateful strategy for subsequent elements. + if (!buffer.isEmpty && !isTimerActive(TimerName)) { val waitTime = nextElementWaitTime() if (waitTime <= DelayPrecisionMS && isAvailable(out)) { push(out, buffer.dequeue()._2) @@ -2051,7 +2054,26 @@ private[stream] object Collect { private def grabAndPull(): Unit = { val element = grab(in) - buffer.enqueue((System.nanoTime() + delayStrategy.nextDelay(element).toNanos, element)) + val delay = + try delayStrategy.nextDelay(element) + catch { + case NonFatal(ex) => + decider(ex) match { + case Supervision.Stop => + failStage(ex) + return + case Supervision.Resume => + // Skip only the offending element, keep buffered elements and current strategy state. + if (shouldPull) pull(in) + return + case Supervision.Restart => + // Skip offending element and recreate the potentially stateful strategy. + delayStrategy = delayStrategySupplier() + if (shouldPull) pull(in) + return + } + } + buffer.enqueue((System.nanoTime() + delay.toNanos, element)) if (shouldPull) pull(in) } @@ -2083,7 +2105,9 @@ private[stream] object Collect { } final override protected def onTimer(key: Any): Unit = { - if (isAvailable(out)) + // A Resume/Restart skip (possibly via an overflow handler that already mutated the buffer) can + // leave the buffer empty while this timer is still armed; guard against dequeuing an empty buffer. + if (isAvailable(out) && !buffer.isEmpty) push(out, buffer.dequeue()._2) completeIfReady() diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 512b8ac022c..a3a0bb0a192 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -1786,6 +1786,10 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * Internal buffer has default capacity 16. You can set buffer size by calling `addAttributes(inputBuffer)` * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is dropped; on `Supervision.Restart` the delay strategy is recreated from the + * supplier (already-buffered elements keep their delays). + * * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed * * EmitEarly - strategy do not wait to emit element if buffer is full * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index e6ed914a8e7..c3279b090b4 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -3670,6 +3670,10 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * Internal buffer has default capacity 16. You can set buffer size by calling `addAttributes(inputBuffer)` * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is dropped; on `Supervision.Restart` the delay strategy is recreated from the + * supplier (already-buffered elements keep their delays). + * * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed * * EmitEarly - strategy do not wait to emit element if buffer is full * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index e2a1644e12f..6214dae3773 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -1180,6 +1180,10 @@ final class SubFlow[In, Out, Mat]( * * Internal buffer has default capacity 16. You can set buffer size by calling `addAttributes(inputBuffer)` * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is dropped; on `Supervision.Restart` the delay strategy is recreated from the + * supplier (already-buffered elements keep their delays). + * * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed * * EmitEarly - strategy do not wait to emit element if buffer is full * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index c756c91c562..4454d74ceb8 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -1272,6 +1272,10 @@ final class SubSource[Out, Mat]( * * Internal buffer has default capacity 16. You can set buffer size by calling `addAttributes(inputBuffer)` * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is dropped; on `Supervision.Restart` the delay strategy is recreated from the + * supplier (already-buffered elements keep their delays). + * * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed * * EmitEarly - strategy do not wait to emit element if buffer is full * diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 4ae16e46ce4..e5a83241718 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -2287,6 +2287,10 @@ trait FlowOps[+Out, +Mat] { * * Internal buffer has default capacity 16. You can set buffer size by calling `addAttributes(inputBuffer)` * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is dropped; on `Supervision.Restart` the delay strategy is recreated from the + * supplier (already-buffered elements keep their delays). + * * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed * * EmitEarly - strategy do not wait to emit element if buffer is full *