diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedWeightedWithin.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedWeightedWithin.md index bbf9a94d18..e7c15491df 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedWeightedWithin.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedWeightedWithin.md @@ -15,6 +15,7 @@ Chunk up this stream into groups of elements received within a time window, or l Chunk up this stream into groups of elements received within a time window, or limited by the weight and number of the elements, whatever happens first. Empty groups will not be emitted if no elements are received from upstream. The last group before end-of-stream will contain the buffered elements since the previously emitted group. +The `groupedWeightedWithin` operator adheres to the ActorAttributes.SupervisionStrategy attribute. On `Supervision.Resume` the offending element is skipped and the current group is kept; on `Supervision.Restart` the current group is dropped. See also: diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupedWithinSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupedWithinSpec.scala index 53092a3de8..53165db432 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupedWithinSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupedWithinSpec.scala @@ -19,6 +19,8 @@ import scala.collection.immutable import scala.concurrent.duration._ import org.apache.pekko +import pekko.stream.ActorAttributes +import pekko.stream.Supervision import pekko.stream.ThrottleMode import pekko.stream.testkit._ import pekko.testkit.TimingTest @@ -336,5 +338,130 @@ class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest { downstream.expectNext(Vector(7, 2): immutable.Seq[Long]) downstream.expectComplete() } + + // These supervision tests use a 30.seconds window that never fires, so they are deterministic and + // intentionally NOT tagged TimingTest — they must run in PR validation as regression guards. + "stop when cost function throws and supervision is Stop" in { + val ex = new RuntimeException("cost function boom") + val upstream = TestPublisher.probe[Long]() + val downstream = TestSubscriber.probe[immutable.Seq[Long]]() + Source + .fromPublisher(upstream) + .groupedWeightedWithin(100, 3, 30.seconds) { elem => + if (elem == 99L) throw ex + else elem + } + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .to(Sink.fromSubscriber(downstream)) + .run() + + downstream.ensureSubscription() + downstream.request(1) + upstream.sendNext(1L) + upstream.sendNext(99L) + downstream.expectError(ex) + } + + "stop when cost function throws with default supervision" in { + val ex = new RuntimeException("cost function boom") + Source(List(1L, 99L, 2L)) + .groupedWeightedWithin(100, 3, 30.seconds) { elem => + if (elem == 99L) throw ex + else elem + } + .runWith(Sink.ignore) + .failed + .futureValue shouldBe ex + } + + "resume when cost function throws and keep current group" in { + val ex = new RuntimeException("cost function boom") + val upstream = TestPublisher.probe[Long]() + val downstream = TestSubscriber.probe[immutable.Seq[Long]]() + Source + .fromPublisher(upstream) + .groupedWeightedWithin(100, 3, 30.seconds) { elem => + if (elem == 99L) throw ex + else elem + } + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .to(Sink.fromSubscriber(downstream)) + .run() + + downstream.ensureSubscription() + upstream.sendNext(1L) + upstream.sendNext(2L) + upstream.sendNext(99L) + upstream.sendNext(3L) + upstream.sendNext(4L) + upstream.sendNext(5L) + upstream.sendComplete() + + downstream.request(1) + downstream.expectNext(Vector(1L, 2L, 3L): immutable.Seq[Long]) + downstream.request(1) + downstream.expectNext(Vector(4L, 5L): immutable.Seq[Long]) + downstream.expectComplete() + } + + "resume keeps weight accounting when cost function throws" in { + val ex = new RuntimeException("cost function boom") + val upstream = TestPublisher.probe[Long]() + val downstream = TestSubscriber.probe[immutable.Seq[Long]]() + Source + .fromPublisher(upstream) + // maxWeight=5, large maxNumber so grouping is weight-driven; identity cost + .groupedWeightedWithin(5, 100, 30.seconds) { elem => + if (elem == 99L) throw ex + else elem + } + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .to(Sink.fromSubscriber(downstream)) + .run() + + downstream.ensureSubscription() + upstream.sendNext(2L) + upstream.sendNext(2L) + upstream.sendNext(99L) // skipped; its weight must NOT be counted, [2,2] (weight 4) kept + upstream.sendNext(3L) // 4 + 3 = 7 > maxWeight 5 -> [2,2] flushed, 3 starts a new group + upstream.sendComplete() + + downstream.request(1) + downstream.expectNext(Vector(2L, 2L): immutable.Seq[Long]) + downstream.request(1) + downstream.expectNext(Vector(3L): immutable.Seq[Long]) + downstream.expectComplete() + } + + "restart when cost function throws and drop current group" in { + val ex = new RuntimeException("cost function boom") + val upstream = TestPublisher.probe[Long]() + val downstream = TestSubscriber.probe[immutable.Seq[Long]]() + Source + .fromPublisher(upstream) + .groupedWeightedWithin(100, 3, 30.seconds) { elem => + if (elem == 99L) throw ex + else elem + } + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .to(Sink.fromSubscriber(downstream)) + .run() + + downstream.ensureSubscription() + upstream.sendNext(1L) + upstream.sendNext(2L) + upstream.sendNext(99L) + upstream.sendNext(3L) + upstream.sendNext(4L) + upstream.sendNext(5L) + upstream.sendNext(6L) + upstream.sendComplete() + + downstream.request(1) + downstream.expectNext(Vector(3L, 4L, 5L): immutable.Seq[Long]) + downstream.request(1) + downstream.expectNext(Vector(6L): immutable.Seq[Long]) + downstream.expectComplete() + } } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala index 010260e03d..6f1d82af26 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala @@ -1845,6 +1845,7 @@ private[stream] object Collect { private var totalWeight = 0L private var totalNumber = 0 private var hasElements = false + private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider private val contextPropagation = ContextPropagation() override def preStart() = { @@ -1853,8 +1854,28 @@ private[stream] object Collect { } private def nextElement(elem: T): Unit = { + // costFn is user code and may throw; honor supervision strategy. + val cost = + try costFn(elem) + catch { + case NonFatal(ex) => + decider(ex) match { + case Supervision.Stop => + failStage(ex) + return + case Supervision.Resume => + // skip the offending element and keep the current group + if (!hasBeenPulled(in)) pull(in) + return + case Supervision.Restart => + // drop the current group and reset the time window, then continue from a fresh accumulator + resetGroupState() + scheduleWithFixedDelay(GroupedWeightedWithin.groupedWeightedWithinTimer, interval, interval) + if (!hasBeenPulled(in)) pull(in) + return + } + } groupEmitted = false - val cost = costFn(elem) if (cost < 0L) failStage(new IllegalArgumentException(s"Negative weight [$cost] for element [$elem] is not allowed")) else { @@ -1897,6 +1918,17 @@ private[stream] object Collect { } } + private def resetGroupState(): Unit = { + builder.clear() + pending = null.asInstanceOf[T] + pendingWeight = 0L + pushEagerly = false + groupEmitted = true + totalWeight = 0L + totalNumber = 0 + hasElements = false + } + private def tryCloseGroup(): Unit = { if (isAvailable(out)) emitGroup() else if (pending != null || finished) pushEagerly = true diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 512b8ac022..1fda31b964 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -1699,6 +1699,10 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * The last group before end-of-stream will contain the buffered elements * since the previously emitted group. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is skipped and the current group is kept; on `Supervision.Restart` the current + * group is dropped. + * * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached * * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight` @@ -1723,6 +1727,10 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * The last group before end-of-stream will contain the buffered elements * since the previously emitted group. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is skipped and the current group is kept; on `Supervision.Restart` the current + * group is dropped. + * * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached * * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index e6ed914a8e..e889fbf5ea 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -3583,6 +3583,10 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * The last group before end-of-stream will contain the buffered elements * since the previously emitted group. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is skipped and the current group is kept; on `Supervision.Restart` the current + * group is dropped. + * * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached * * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight` @@ -3607,6 +3611,10 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * The last group before end-of-stream will contain the buffered elements * since the previously emitted group. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is skipped and the current group is kept; on `Supervision.Restart` the current + * group is dropped. + * * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached * * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index e2a1644e12..e54eebec08 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -1093,6 +1093,10 @@ final class SubFlow[In, Out, Mat]( * The last group before end-of-stream will contain the buffered elements * since the previously emitted group. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is skipped and the current group is kept; on `Supervision.Restart` the current + * group is dropped. + * * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached * * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight` @@ -1117,6 +1121,10 @@ final class SubFlow[In, Out, Mat]( * The last group before end-of-stream will contain the buffered elements * since the previously emitted group. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is skipped and the current group is kept; on `Supervision.Restart` the current + * group is dropped. + * * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached * * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index c756c91c56..5e16f5d39f 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -1079,6 +1079,10 @@ final class SubSource[Out, Mat]( * The last group before end-of-stream will contain the buffered elements * since the previously emitted group. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is skipped and the current group is kept; on `Supervision.Restart` the current + * group is dropped. + * * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached * * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight` @@ -1103,6 +1107,10 @@ final class SubSource[Out, Mat]( * The last group before end-of-stream will contain the buffered elements * since the previously emitted group. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is skipped and the current group is kept; on `Supervision.Restart` the current + * group is dropped. + * * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached * * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 4ae16e46ce..6e3c0ffe4d 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -2207,6 +2207,10 @@ trait FlowOps[+Out, +Mat] { * `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is skipped and the current group is kept; on `Supervision.Restart` the current + * group is dropped. + * * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached * * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight` @@ -2228,6 +2232,10 @@ trait FlowOps[+Out, +Mat] { * `maxWeight` must be positive, `maxNumber` must be positive, and `d` must be greater than 0 seconds, * otherwise IllegalArgumentException is thrown. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is skipped and the current group is kept; on `Supervision.Restart` the current + * group is dropped. + * * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached * * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than