diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentBy.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentBy.md index 0d970777c2..61c7d4ec24 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentBy.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentBy.md @@ -14,6 +14,8 @@ Partitions this stream into chunks by a delimiter function. Partitions this stream into chunks by a delimiter function. +Adheres to the ActorAttributes.SupervisionStrategy attribute (applied to the key function). On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped. + See also: * @ref[groupedAdjacentByWeighted](groupedAdjacentByWeighted.md) for a variant that groups with weight limit too. diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentByWeighted.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentByWeighted.md index 4aaa68020c..c9b6b4ad23 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentByWeighted.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentByWeighted.md @@ -11,7 +11,9 @@ Partitions this stream into chunks by a delimiter function and a weight limit. ## Description -Partitions this stream into chunks by a delimiter function. +Partitions this stream into chunks by a delimiter function and a weight limit. + +Adheres to the ActorAttributes.SupervisionStrategy attribute (applied to both the key and cost functions). On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped. See also: diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupedAdjacentByWeightedSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupedAdjacentByWeightedSpec.scala index 0996f40514..4a18b5e9a7 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupedAdjacentByWeightedSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupedAdjacentByWeightedSpec.scala @@ -18,6 +18,7 @@ package org.apache.pekko.stream.scaladsl import org.apache.pekko +import pekko.stream.{ ActorAttributes, Supervision } import pekko.stream.testkit.{ ScriptedTest, StreamSpec } import pekko.stream.testkit.scaladsl.TestSink @@ -95,6 +96,92 @@ class FlowGroupedAdjacentByWeightedSpec extends StreamSpec(""" .expectComplete() } + "fail when costFn throws and supervision decides to stop" in { + val ex = new RuntimeException("cost boom") + Source(List(1, 2, 3)) + .groupedAdjacentByWeighted(_ => "k", 2)(elem => if (elem == 2) throw ex else 1L) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .runWith(TestSink[Seq[Int]]()) + .request(10) + .expectError(ex) + } + + "fail when costFn throws with default supervision strategy" in { + val ex = new RuntimeException("cost boom") + Source(List(1, 2, 3)) + .groupedAdjacentByWeighted(_ => "k", 2)(elem => if (elem == 2) throw ex else 1L) + .runWith(TestSink[Seq[Int]]()) + .request(10) + .expectError(ex) + } + + "resume when costFn throws and keep surrounding groups" in { + val ex = new RuntimeException("cost boom") + Source(List(1, 2, 3, 4, 5)) + .groupedAdjacentByWeighted(_ => "k", 2)(elem => if (elem == 3) throw ex else 1L) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(TestSink[Seq[Int]]()) + .request(10) + .expectNext(Seq(1, 2)) + .expectNext(Seq(4, 5)) + .expectComplete() + } + + "restart when costFn throws and drop current in-progress group" in { + val ex = new RuntimeException("cost boom") + Source(List(1, 2, 3, 4, 5)) + .groupedAdjacentByWeighted(_ => "k", 2)(elem => if (elem == 3) throw ex else 1L) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .runWith(TestSink[Seq[Int]]()) + .request(10) + .expectNext(Seq(4, 5)) + .expectComplete() + } + + "fail when groupedAdjacentBy key function throws and supervision decides to stop" in { + val ex = new RuntimeException("key boom") + Source(List(1, 1, 2, 3)) + .groupedAdjacentBy(elem => if (elem == 2) throw ex else elem) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .runWith(TestSink[Seq[Int]]()) + .request(10) + .expectError(ex) + } + + "resume when groupedAdjacentBy key function throws and skip offending elements" in { + val ex = new RuntimeException("key boom") + Source(List(1, 1, 2, 2, 3, 3)) + .groupedAdjacentBy(elem => if (elem == 2) throw ex else elem) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(TestSink[Seq[Int]]()) + .request(10) + .expectNext(Seq(1, 1)) + .expectNext(Seq(3, 3)) + .expectComplete() + } + + "resume when the key function throws on the last element and flush the kept group at completion" in { + val ex = new RuntimeException("key boom") + Source(List(1, 1, 2)) + .groupedAdjacentBy(elem => if (elem == 2) throw ex else elem) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(TestSink[Seq[Int]]()) + .request(10) + .expectNext(Seq(1, 1)) + .expectComplete() + } + + "restart when groupedAdjacentBy key function throws and drop current group" in { + val ex = new RuntimeException("key boom") + Source(List(1, 1, 2, 2, 3, 3)) + .groupedAdjacentBy(elem => if (elem == 2) throw ex else elem) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .runWith(TestSink[Seq[Int]]()) + .request(10) + .expectNext(Seq(3, 3)) + .expectComplete() + } + } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GroupedAdjacentByWeighted.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GroupedAdjacentByWeighted.scala index 36739ff524..5bed5f5f2c 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GroupedAdjacentByWeighted.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GroupedAdjacentByWeighted.scala @@ -18,10 +18,13 @@ package org.apache.pekko.stream.impl.fusing import scala.collection.immutable +import scala.util.control.NonFatal import org.apache.pekko import pekko.annotation.InternalApi +import pekko.stream.ActorAttributes.SupervisionStrategy import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet } +import pekko.stream.Supervision import pekko.stream.impl.Stages.DefaultAttributes import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } import pekko.util.OptionVal @@ -54,17 +57,30 @@ private[pekko] final case class GroupedAdjacentByWeighted[T, R]( private var hasElements: Boolean = false private var currentKey: OptionVal[R] = OptionVal.none private var pendingGroup: OptionVal[immutable.Seq[T]] = OptionVal.none + private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider override def onPush(): Unit = { val elem = grab(in) - val cost = costFn(elem) + val cost = + try costFn(elem) + catch { + case NonFatal(ex) => + superviseUserFn(ex) + return + } if (cost < 0L) { failStage(new IllegalArgumentException(s"Negative weight [$cost] for element [$elem] is not allowed")) return } - val elemKey = f(elem) + val elemKey = + try f(elem) + catch { + case NonFatal(ex) => + superviseUserFn(ex) + return + } require(elemKey != null, "Element key must not be null") if (shouldPushDirectly(cost)) { @@ -78,6 +94,12 @@ private[pekko] final case class GroupedAdjacentByWeighted[T, R]( } } + private def superviseUserFn(ex: Throwable): Unit = decider(ex) match { + case Supervision.Stop => failStage(ex) + case Supervision.Resume => tryPullIfNeeded() + case Supervision.Restart => resetGroup(); tryPullIfNeeded() + } + private def shouldPushDirectly(cost: Long): Boolean = { cost >= maxWeight && !hasElements } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 512b8ac022..faaf91f1ac 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -1340,6 +1340,9 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * The `f` function must return a non-null value for all elements, otherwise the stage will fail. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to the key function). + * On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped. + * * '''Emits when''' the delimiter function returns a different value than the previous element's result * * '''Backpressures when''' a chunk has been assembled and downstream backpressures @@ -1362,6 +1365,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs, * otherwise the stage will fail. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to both the key and cost functions). On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped. + * * '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`. * * '''Backpressures when''' a chunk has been assembled and downstream backpressures diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index e6ed914a8e..523ee758e1 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -3236,6 +3236,9 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * The `f` function must return a non-null value for all elements, otherwise the stage will fail. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to the key function). + * On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped. + * * '''Emits when''' the delimiter function returns a different value than the previous element's result * * '''Backpressures when''' a chunk has been assembled and downstream backpressures @@ -3257,6 +3260,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs, * otherwise the stage will fail. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to both the key and cost functions). On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped. + * * '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`. * * '''Backpressures when''' a chunk has been assembled and downstream backpressures diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index e2a1644e12..02f5e78381 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -742,6 +742,9 @@ final class SubFlow[In, Out, Mat]( * * The `f` function must return a non-null value for all elements, otherwise the stage will fail. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to the key function). + * On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped. + * * '''Emits when''' the delimiter function returns a different value than the previous element's result * * '''Backpressures when''' a chunk has been assembled and downstream backpressures @@ -763,6 +766,8 @@ final class SubFlow[In, Out, Mat]( * The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs, * otherwise the stage will fail. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to both the key and cost functions). On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped. + * * '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`. * * '''Backpressures when''' a chunk has been assembled and downstream backpressures diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index c756c91c56..b4f2dd4c55 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -732,6 +732,9 @@ final class SubSource[Out, Mat]( * * The `f` function must return a non-null value for all elements, otherwise the stage will fail. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to the key function). + * On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped. + * * '''Emits when''' the delimiter function returns a different value than the previous element's result * * '''Backpressures when''' a chunk has been assembled and downstream backpressures @@ -753,6 +756,8 @@ final class SubSource[Out, Mat]( * The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs, * otherwise the stage will fail. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to both the key and cost functions). On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped. + * * '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`. * * '''Backpressures when''' a chunk has been assembled and downstream backpressures diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 4ae16e46ce..60e383b76f 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -1857,6 +1857,9 @@ trait FlowOps[+Out, +Mat] { * * The `f` function must return a non-null value for all elements, otherwise the stage will fail. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to the key function). + * On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped. + * * '''Emits when''' the delimiter function returns a different value than the previous element's result * * '''Backpressures when''' a chunk has been assembled and downstream backpressures @@ -1878,6 +1881,8 @@ trait FlowOps[+Out, +Mat] { * The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs, * otherwise the stage will fail. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to both the key and cost functions). On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped. + * * '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`. * * '''Backpressures when''' a chunk has been assembled and downstream backpressures