From ab247329b089fce5e35d2cf3184c2f73831cb22a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Thu, 25 Jun 2026 13:16:42 +0800 Subject: [PATCH] feat: add supervision strategy support for groupedWeightedWithin costFn Motivation: The groupedWeightedWithin operator's costFn is user-provided and may throw, but it was called without a try-catch. Any exception failed the stream unconditionally, ignoring the configured SupervisionStrategy. This was inconsistent with operators such as map, filter and batch. Modification: Wrap costFn(elem) inside GroupedWeightedWithin.nextElement() with a try-catch that consults the SupervisionStrategy decider. The cost is computed before any state mutation so the happy path is unchanged. Stop fails the stage, Resume skips the offending element while keeping the current group, and Restart drops the in-progress group via a new resetGroupState() helper and reschedules the time-window timer so the fresh group gets a full window. The decider is a lazy val for zero overhead on the happy path. Update the Scala and Java DSL scaladoc (both overloads) and the operator reference page to document supervision adherence. Result: groupedWeightedWithin now adheres to the SupervisionStrategy attribute with well-defined Resume and Restart semantics. Tests: - sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowGroupedWithinSpec" -- 21/21 passed References: Refs #3110 --- .../Source-or-Flow/groupedWeightedWithin.md | 1 + .../scaladsl/FlowGroupedWithinSpec.scala | 127 ++++++++++++++++++ .../apache/pekko/stream/impl/fusing/Ops.scala | 34 ++++- .../apache/pekko/stream/javadsl/Flow.scala | 8 ++ .../apache/pekko/stream/javadsl/Source.scala | 8 ++ .../apache/pekko/stream/javadsl/SubFlow.scala | 8 ++ .../pekko/stream/javadsl/SubSource.scala | 8 ++ .../apache/pekko/stream/scaladsl/Flow.scala | 8 ++ 8 files changed, 201 insertions(+), 1 deletion(-) diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedWeightedWithin.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedWeightedWithin.md index bbf9a94d18e..e7c15491df0 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedWeightedWithin.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedWeightedWithin.md @@ -15,6 +15,7 @@ Chunk up this stream into groups of elements received within a time window, or l Chunk up this stream into groups of elements received within a time window, or limited by the weight and number of the elements, whatever happens first. Empty groups will not be emitted if no elements are received from upstream. The last group before end-of-stream will contain the buffered elements since the previously emitted group. +The `groupedWeightedWithin` operator adheres to the ActorAttributes.SupervisionStrategy attribute. On `Supervision.Resume` the offending element is skipped and the current group is kept; on `Supervision.Restart` the current group is dropped. See also: diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupedWithinSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupedWithinSpec.scala index 53092a3de86..53165db4324 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupedWithinSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupedWithinSpec.scala @@ -19,6 +19,8 @@ import scala.collection.immutable import scala.concurrent.duration._ import org.apache.pekko +import pekko.stream.ActorAttributes +import pekko.stream.Supervision import pekko.stream.ThrottleMode import pekko.stream.testkit._ import pekko.testkit.TimingTest @@ -336,5 +338,130 @@ class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest { downstream.expectNext(Vector(7, 2): immutable.Seq[Long]) downstream.expectComplete() } + + // These supervision tests use a 30.seconds window that never fires, so they are deterministic and + // intentionally NOT tagged TimingTest — they must run in PR validation as regression guards. + "stop when cost function throws and supervision is Stop" in { + val ex = new RuntimeException("cost function boom") + val upstream = TestPublisher.probe[Long]() + val downstream = TestSubscriber.probe[immutable.Seq[Long]]() + Source + .fromPublisher(upstream) + .groupedWeightedWithin(100, 3, 30.seconds) { elem => + if (elem == 99L) throw ex + else elem + } + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .to(Sink.fromSubscriber(downstream)) + .run() + + downstream.ensureSubscription() + downstream.request(1) + upstream.sendNext(1L) + upstream.sendNext(99L) + downstream.expectError(ex) + } + + "stop when cost function throws with default supervision" in { + val ex = new RuntimeException("cost function boom") + Source(List(1L, 99L, 2L)) + .groupedWeightedWithin(100, 3, 30.seconds) { elem => + if (elem == 99L) throw ex + else elem + } + .runWith(Sink.ignore) + .failed + .futureValue shouldBe ex + } + + "resume when cost function throws and keep current group" in { + val ex = new RuntimeException("cost function boom") + val upstream = TestPublisher.probe[Long]() + val downstream = TestSubscriber.probe[immutable.Seq[Long]]() + Source + .fromPublisher(upstream) + .groupedWeightedWithin(100, 3, 30.seconds) { elem => + if (elem == 99L) throw ex + else elem + } + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .to(Sink.fromSubscriber(downstream)) + .run() + + downstream.ensureSubscription() + upstream.sendNext(1L) + upstream.sendNext(2L) + upstream.sendNext(99L) + upstream.sendNext(3L) + upstream.sendNext(4L) + upstream.sendNext(5L) + upstream.sendComplete() + + downstream.request(1) + downstream.expectNext(Vector(1L, 2L, 3L): immutable.Seq[Long]) + downstream.request(1) + downstream.expectNext(Vector(4L, 5L): immutable.Seq[Long]) + downstream.expectComplete() + } + + "resume keeps weight accounting when cost function throws" in { + val ex = new RuntimeException("cost function boom") + val upstream = TestPublisher.probe[Long]() + val downstream = TestSubscriber.probe[immutable.Seq[Long]]() + Source + .fromPublisher(upstream) + // maxWeight=5, large maxNumber so grouping is weight-driven; identity cost + .groupedWeightedWithin(5, 100, 30.seconds) { elem => + if (elem == 99L) throw ex + else elem + } + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .to(Sink.fromSubscriber(downstream)) + .run() + + downstream.ensureSubscription() + upstream.sendNext(2L) + upstream.sendNext(2L) + upstream.sendNext(99L) // skipped; its weight must NOT be counted, [2,2] (weight 4) kept + upstream.sendNext(3L) // 4 + 3 = 7 > maxWeight 5 -> [2,2] flushed, 3 starts a new group + upstream.sendComplete() + + downstream.request(1) + downstream.expectNext(Vector(2L, 2L): immutable.Seq[Long]) + downstream.request(1) + downstream.expectNext(Vector(3L): immutable.Seq[Long]) + downstream.expectComplete() + } + + "restart when cost function throws and drop current group" in { + val ex = new RuntimeException("cost function boom") + val upstream = TestPublisher.probe[Long]() + val downstream = TestSubscriber.probe[immutable.Seq[Long]]() + Source + .fromPublisher(upstream) + .groupedWeightedWithin(100, 3, 30.seconds) { elem => + if (elem == 99L) throw ex + else elem + } + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .to(Sink.fromSubscriber(downstream)) + .run() + + downstream.ensureSubscription() + upstream.sendNext(1L) + upstream.sendNext(2L) + upstream.sendNext(99L) + upstream.sendNext(3L) + upstream.sendNext(4L) + upstream.sendNext(5L) + upstream.sendNext(6L) + upstream.sendComplete() + + downstream.request(1) + downstream.expectNext(Vector(3L, 4L, 5L): immutable.Seq[Long]) + downstream.request(1) + downstream.expectNext(Vector(6L): immutable.Seq[Long]) + downstream.expectComplete() + } } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala index 010260e03d0..6f1d82af268 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala @@ -1845,6 +1845,7 @@ private[stream] object Collect { private var totalWeight = 0L private var totalNumber = 0 private var hasElements = false + private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider private val contextPropagation = ContextPropagation() override def preStart() = { @@ -1853,8 +1854,28 @@ private[stream] object Collect { } private def nextElement(elem: T): Unit = { + // costFn is user code and may throw; honor supervision strategy. + val cost = + try costFn(elem) + catch { + case NonFatal(ex) => + decider(ex) match { + case Supervision.Stop => + failStage(ex) + return + case Supervision.Resume => + // skip the offending element and keep the current group + if (!hasBeenPulled(in)) pull(in) + return + case Supervision.Restart => + // drop the current group and reset the time window, then continue from a fresh accumulator + resetGroupState() + scheduleWithFixedDelay(GroupedWeightedWithin.groupedWeightedWithinTimer, interval, interval) + if (!hasBeenPulled(in)) pull(in) + return + } + } groupEmitted = false - val cost = costFn(elem) if (cost < 0L) failStage(new IllegalArgumentException(s"Negative weight [$cost] for element [$elem] is not allowed")) else { @@ -1897,6 +1918,17 @@ private[stream] object Collect { } } + private def resetGroupState(): Unit = { + builder.clear() + pending = null.asInstanceOf[T] + pendingWeight = 0L + pushEagerly = false + groupEmitted = true + totalWeight = 0L + totalNumber = 0 + hasElements = false + } + private def tryCloseGroup(): Unit = { if (isAvailable(out)) emitGroup() else if (pending != null || finished) pushEagerly = true diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 512b8ac022c..1fda31b964b 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -1699,6 +1699,10 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * The last group before end-of-stream will contain the buffered elements * since the previously emitted group. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is skipped and the current group is kept; on `Supervision.Restart` the current + * group is dropped. + * * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached * * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight` @@ -1723,6 +1727,10 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * The last group before end-of-stream will contain the buffered elements * since the previously emitted group. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is skipped and the current group is kept; on `Supervision.Restart` the current + * group is dropped. + * * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached * * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index e6ed914a8e7..e889fbf5eaa 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -3583,6 +3583,10 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * The last group before end-of-stream will contain the buffered elements * since the previously emitted group. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is skipped and the current group is kept; on `Supervision.Restart` the current + * group is dropped. + * * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached * * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight` @@ -3607,6 +3611,10 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * The last group before end-of-stream will contain the buffered elements * since the previously emitted group. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is skipped and the current group is kept; on `Supervision.Restart` the current + * group is dropped. + * * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached * * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index e2a1644e12f..e54eebec08a 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -1093,6 +1093,10 @@ final class SubFlow[In, Out, Mat]( * The last group before end-of-stream will contain the buffered elements * since the previously emitted group. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is skipped and the current group is kept; on `Supervision.Restart` the current + * group is dropped. + * * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached * * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight` @@ -1117,6 +1121,10 @@ final class SubFlow[In, Out, Mat]( * The last group before end-of-stream will contain the buffered elements * since the previously emitted group. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is skipped and the current group is kept; on `Supervision.Restart` the current + * group is dropped. + * * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached * * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index c756c91c562..5e16f5d39ff 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -1079,6 +1079,10 @@ final class SubSource[Out, Mat]( * The last group before end-of-stream will contain the buffered elements * since the previously emitted group. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is skipped and the current group is kept; on `Supervision.Restart` the current + * group is dropped. + * * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached * * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight` @@ -1103,6 +1107,10 @@ final class SubSource[Out, Mat]( * The last group before end-of-stream will contain the buffered elements * since the previously emitted group. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is skipped and the current group is kept; on `Supervision.Restart` the current + * group is dropped. + * * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached * * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 4ae16e46ce4..6e3c0ffe4da 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -2207,6 +2207,10 @@ trait FlowOps[+Out, +Mat] { * `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is skipped and the current group is kept; on `Supervision.Restart` the current + * group is dropped. + * * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached * * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight` @@ -2228,6 +2232,10 @@ trait FlowOps[+Out, +Mat] { * `maxWeight` must be positive, `maxNumber` must be positive, and `d` must be greater than 0 seconds, * otherwise IllegalArgumentException is thrown. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the + * offending element is skipped and the current group is kept; on `Supervision.Restart` the current + * group is dropped. + * * '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached * * '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than