From c1c1288bbe6a309cce33febe32099133303c9fd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Thu, 25 Jun 2026 13:48:27 +0800 Subject: [PATCH 1/2] feat: add supervision strategy support for groupedAdjacentBy/Weighted Motivation: The groupedAdjacentBy and groupedAdjacentByWeighted operators apply two user-provided functions, a key function and a cost function, but both were called without a try-catch. Any exception failed the stream unconditionally, ignoring the configured SupervisionStrategy. Modification: Wrap both the costFn and the key function calls in GroupedAdjacentByWeighted with a try-catch that consults the SupervisionStrategy decider via a shared superviseUserFn helper. Stop fails the stage, Resume skips the offending element while keeping the current group, and Restart drops the in-progress group. The negative-cost and null-key contract checks are left unchanged (they remain unconditional failures, since they are output-contract violations rather than user-function exceptions). The decider is a lazy val for zero overhead on the happy path. Update the Scala and Java DSL scaladoc for both operators and the operator reference pages. Result: groupedAdjacentBy and groupedAdjacentByWeighted now adhere to the SupervisionStrategy attribute for both user functions. Tests: - sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowGroupedAdjacentByWeightedSpec" -- 14/14 passed References: Refs #3110 --- .../Source-or-Flow/groupedAdjacentBy.md | 2 + .../groupedAdjacentByWeighted.md | 4 +- .../FlowGroupedAdjacentByWeightedSpec.scala | 87 +++++++++++++++++++ .../fusing/GroupedAdjacentByWeighted.scala | 26 +++++- .../apache/pekko/stream/javadsl/Flow.scala | 4 + .../apache/pekko/stream/javadsl/Source.scala | 4 + .../apache/pekko/stream/javadsl/SubFlow.scala | 4 + .../pekko/stream/javadsl/SubSource.scala | 4 + .../apache/pekko/stream/scaladsl/Flow.scala | 4 + 9 files changed, 136 insertions(+), 3 deletions(-) diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentBy.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentBy.md index 0d970777c2f..6ff72663007 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentBy.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentBy.md @@ -14,6 +14,8 @@ Partitions this stream into chunks by a delimiter function. Partitions this stream into chunks by a delimiter function. +Adheres to the ActorAttributes.SupervisionStrategy attribute (applied to the key function). + See also: * @ref[groupedAdjacentByWeighted](groupedAdjacentByWeighted.md) for a variant that groups with weight limit too. diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentByWeighted.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentByWeighted.md index 4aaa68020cd..c9b6b4ad23b 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentByWeighted.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentByWeighted.md @@ -11,7 +11,9 @@ Partitions this stream into chunks by a delimiter function and a weight limit. ## Description -Partitions this stream into chunks by a delimiter function. +Partitions this stream into chunks by a delimiter function and a weight limit. + +Adheres to the ActorAttributes.SupervisionStrategy attribute (applied to both the key and cost functions). On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped. See also: diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupedAdjacentByWeightedSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupedAdjacentByWeightedSpec.scala index 0996f405141..4a18b5e9a76 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupedAdjacentByWeightedSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupedAdjacentByWeightedSpec.scala @@ -18,6 +18,7 @@ package org.apache.pekko.stream.scaladsl import org.apache.pekko +import pekko.stream.{ ActorAttributes, Supervision } import pekko.stream.testkit.{ ScriptedTest, StreamSpec } import pekko.stream.testkit.scaladsl.TestSink @@ -95,6 +96,92 @@ class FlowGroupedAdjacentByWeightedSpec extends StreamSpec(""" .expectComplete() } + "fail when costFn throws and supervision decides to stop" in { + val ex = new RuntimeException("cost boom") + Source(List(1, 2, 3)) + .groupedAdjacentByWeighted(_ => "k", 2)(elem => if (elem == 2) throw ex else 1L) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .runWith(TestSink[Seq[Int]]()) + .request(10) + .expectError(ex) + } + + "fail when costFn throws with default supervision strategy" in { + val ex = new RuntimeException("cost boom") + Source(List(1, 2, 3)) + .groupedAdjacentByWeighted(_ => "k", 2)(elem => if (elem == 2) throw ex else 1L) + .runWith(TestSink[Seq[Int]]()) + .request(10) + .expectError(ex) + } + + "resume when costFn throws and keep surrounding groups" in { + val ex = new RuntimeException("cost boom") + Source(List(1, 2, 3, 4, 5)) + .groupedAdjacentByWeighted(_ => "k", 2)(elem => if (elem == 3) throw ex else 1L) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(TestSink[Seq[Int]]()) + .request(10) + .expectNext(Seq(1, 2)) + .expectNext(Seq(4, 5)) + .expectComplete() + } + + "restart when costFn throws and drop current in-progress group" in { + val ex = new RuntimeException("cost boom") + Source(List(1, 2, 3, 4, 5)) + .groupedAdjacentByWeighted(_ => "k", 2)(elem => if (elem == 3) throw ex else 1L) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .runWith(TestSink[Seq[Int]]()) + .request(10) + .expectNext(Seq(4, 5)) + .expectComplete() + } + + "fail when groupedAdjacentBy key function throws and supervision decides to stop" in { + val ex = new RuntimeException("key boom") + Source(List(1, 1, 2, 3)) + .groupedAdjacentBy(elem => if (elem == 2) throw ex else elem) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .runWith(TestSink[Seq[Int]]()) + .request(10) + .expectError(ex) + } + + "resume when groupedAdjacentBy key function throws and skip offending elements" in { + val ex = new RuntimeException("key boom") + Source(List(1, 1, 2, 2, 3, 3)) + .groupedAdjacentBy(elem => if (elem == 2) throw ex else elem) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(TestSink[Seq[Int]]()) + .request(10) + .expectNext(Seq(1, 1)) + .expectNext(Seq(3, 3)) + .expectComplete() + } + + "resume when the key function throws on the last element and flush the kept group at completion" in { + val ex = new RuntimeException("key boom") + Source(List(1, 1, 2)) + .groupedAdjacentBy(elem => if (elem == 2) throw ex else elem) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(TestSink[Seq[Int]]()) + .request(10) + .expectNext(Seq(1, 1)) + .expectComplete() + } + + "restart when groupedAdjacentBy key function throws and drop current group" in { + val ex = new RuntimeException("key boom") + Source(List(1, 1, 2, 2, 3, 3)) + .groupedAdjacentBy(elem => if (elem == 2) throw ex else elem) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .runWith(TestSink[Seq[Int]]()) + .request(10) + .expectNext(Seq(3, 3)) + .expectComplete() + } + } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GroupedAdjacentByWeighted.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GroupedAdjacentByWeighted.scala index 36739ff5247..5bed5f5f2c4 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GroupedAdjacentByWeighted.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GroupedAdjacentByWeighted.scala @@ -18,10 +18,13 @@ package org.apache.pekko.stream.impl.fusing import scala.collection.immutable +import scala.util.control.NonFatal import org.apache.pekko import pekko.annotation.InternalApi +import pekko.stream.ActorAttributes.SupervisionStrategy import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet } +import pekko.stream.Supervision import pekko.stream.impl.Stages.DefaultAttributes import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } import pekko.util.OptionVal @@ -54,17 +57,30 @@ private[pekko] final case class GroupedAdjacentByWeighted[T, R]( private var hasElements: Boolean = false private var currentKey: OptionVal[R] = OptionVal.none private var pendingGroup: OptionVal[immutable.Seq[T]] = OptionVal.none + private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider override def onPush(): Unit = { val elem = grab(in) - val cost = costFn(elem) + val cost = + try costFn(elem) + catch { + case NonFatal(ex) => + superviseUserFn(ex) + return + } if (cost < 0L) { failStage(new IllegalArgumentException(s"Negative weight [$cost] for element [$elem] is not allowed")) return } - val elemKey = f(elem) + val elemKey = + try f(elem) + catch { + case NonFatal(ex) => + superviseUserFn(ex) + return + } require(elemKey != null, "Element key must not be null") if (shouldPushDirectly(cost)) { @@ -78,6 +94,12 @@ private[pekko] final case class GroupedAdjacentByWeighted[T, R]( } } + private def superviseUserFn(ex: Throwable): Unit = decider(ex) match { + case Supervision.Stop => failStage(ex) + case Supervision.Resume => tryPullIfNeeded() + case Supervision.Restart => resetGroup(); tryPullIfNeeded() + } + private def shouldPushDirectly(cost: Long): Boolean = { cost >= maxWeight && !hasElements } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 512b8ac022c..0f7ecd1385d 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -1340,6 +1340,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * The `f` function must return a non-null value for all elements, otherwise the stage will fail. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to the key function). + * * '''Emits when''' the delimiter function returns a different value than the previous element's result * * '''Backpressures when''' a chunk has been assembled and downstream backpressures @@ -1362,6 +1364,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs, * otherwise the stage will fail. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to both the key and cost functions). On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped. + * * '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`. * * '''Backpressures when''' a chunk has been assembled and downstream backpressures diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index e6ed914a8e7..03af92cee55 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -3236,6 +3236,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * The `f` function must return a non-null value for all elements, otherwise the stage will fail. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to the key function). + * * '''Emits when''' the delimiter function returns a different value than the previous element's result * * '''Backpressures when''' a chunk has been assembled and downstream backpressures @@ -3257,6 +3259,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs, * otherwise the stage will fail. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to both the key and cost functions). On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped. + * * '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`. * * '''Backpressures when''' a chunk has been assembled and downstream backpressures diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index e2a1644e12f..64920242bf7 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -742,6 +742,8 @@ final class SubFlow[In, Out, Mat]( * * The `f` function must return a non-null value for all elements, otherwise the stage will fail. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to the key function). + * * '''Emits when''' the delimiter function returns a different value than the previous element's result * * '''Backpressures when''' a chunk has been assembled and downstream backpressures @@ -763,6 +765,8 @@ final class SubFlow[In, Out, Mat]( * The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs, * otherwise the stage will fail. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to both the key and cost functions). On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped. + * * '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`. * * '''Backpressures when''' a chunk has been assembled and downstream backpressures diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index c756c91c562..acf06e68193 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -732,6 +732,8 @@ final class SubSource[Out, Mat]( * * The `f` function must return a non-null value for all elements, otherwise the stage will fail. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to the key function). + * * '''Emits when''' the delimiter function returns a different value than the previous element's result * * '''Backpressures when''' a chunk has been assembled and downstream backpressures @@ -753,6 +755,8 @@ final class SubSource[Out, Mat]( * The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs, * otherwise the stage will fail. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to both the key and cost functions). On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped. + * * '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`. * * '''Backpressures when''' a chunk has been assembled and downstream backpressures diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 4ae16e46ce4..1466e5c3bf8 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -1857,6 +1857,8 @@ trait FlowOps[+Out, +Mat] { * * The `f` function must return a non-null value for all elements, otherwise the stage will fail. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to the key function). + * * '''Emits when''' the delimiter function returns a different value than the previous element's result * * '''Backpressures when''' a chunk has been assembled and downstream backpressures @@ -1878,6 +1880,8 @@ trait FlowOps[+Out, +Mat] { * The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs, * otherwise the stage will fail. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to both the key and cost functions). On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped. + * * '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`. * * '''Backpressures when''' a chunk has been assembled and downstream backpressures From 7726b13297c2356a43c3127161006483775fb7c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Sat, 27 Jun 2026 05:01:47 +0800 Subject: [PATCH 2/2] docs: describe Resume/Restart semantics for groupedAdjacentBy Motivation: The weighted variant's documentation explicitly describes what Resume and Restart do, but the non-weighted groupedAdjacentBy only said "adheres to SupervisionStrategy" without the detail. Modification: Extend the supervision note in groupedAdjacentBy.md and the groupedAdjacentBy Scaladoc on FlowOps and all four Java DSL classes (Flow, Source, SubFlow, SubSource) to state that Resume skips the offending element while Restart drops the current in-progress group, matching the weighted variant. Result: Both groupedAdjacentBy operators document their supervision semantics with the same level of detail across all DSLs. Tests: - sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowGroupedAdjacentByWeightedSpec" (14/14 passed) References: Refs #3110 --- .../stream/operators/Source-or-Flow/groupedAdjacentBy.md | 2 +- .../src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala | 1 + .../src/main/scala/org/apache/pekko/stream/javadsl/Source.scala | 1 + .../main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala | 1 + .../main/scala/org/apache/pekko/stream/javadsl/SubSource.scala | 1 + .../src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala | 1 + 6 files changed, 6 insertions(+), 1 deletion(-) diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentBy.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentBy.md index 6ff72663007..61c7d4ec247 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentBy.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/groupedAdjacentBy.md @@ -14,7 +14,7 @@ Partitions this stream into chunks by a delimiter function. Partitions this stream into chunks by a delimiter function. -Adheres to the ActorAttributes.SupervisionStrategy attribute (applied to the key function). +Adheres to the ActorAttributes.SupervisionStrategy attribute (applied to the key function). On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped. See also: diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 0f7ecd1385d..faaf91f1acb 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -1341,6 +1341,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * The `f` function must return a non-null value for all elements, otherwise the stage will fail. * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to the key function). + * On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped. * * '''Emits when''' the delimiter function returns a different value than the previous element's result * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 03af92cee55..523ee758e1b 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -3237,6 +3237,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * The `f` function must return a non-null value for all elements, otherwise the stage will fail. * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to the key function). + * On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped. * * '''Emits when''' the delimiter function returns a different value than the previous element's result * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index 64920242bf7..02f5e783814 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -743,6 +743,7 @@ final class SubFlow[In, Out, Mat]( * The `f` function must return a non-null value for all elements, otherwise the stage will fail. * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to the key function). + * On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped. * * '''Emits when''' the delimiter function returns a different value than the previous element's result * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index acf06e68193..b4f2dd4c558 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -733,6 +733,7 @@ final class SubSource[Out, Mat]( * The `f` function must return a non-null value for all elements, otherwise the stage will fail. * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to the key function). + * On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped. * * '''Emits when''' the delimiter function returns a different value than the previous element's result * diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 1466e5c3bf8..60e383b76f4 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -1858,6 +1858,7 @@ trait FlowOps[+Out, +Mat] { * The `f` function must return a non-null value for all elements, otherwise the stage will fail. * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to the key function). + * On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped. * * '''Emits when''' the delimiter function returns a different value than the previous element's result *