Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ Partitions this stream into chunks by a delimiter function.

Partitions this stream into chunks by a delimiter function.

Adheres to the ActorAttributes.SupervisionStrategy attribute (applied to the key function). On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped.

See also:

* @ref[groupedAdjacentByWeighted](groupedAdjacentByWeighted.md) for a variant that groups with weight limit too.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ Partitions this stream into chunks by a delimiter function and a weight limit.

## Description

Partitions this stream into chunks by a delimiter function.
Partitions this stream into chunks by a delimiter function and a weight limit.

Adheres to the ActorAttributes.SupervisionStrategy attribute (applied to both the key and cost functions). On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped.

See also:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.pekko.stream.scaladsl

import org.apache.pekko
import pekko.stream.{ ActorAttributes, Supervision }
import pekko.stream.testkit.{ ScriptedTest, StreamSpec }
import pekko.stream.testkit.scaladsl.TestSink

Expand Down Expand Up @@ -95,6 +96,92 @@ class FlowGroupedAdjacentByWeightedSpec extends StreamSpec("""
.expectComplete()
}

"fail when costFn throws and supervision decides to stop" in {
val ex = new RuntimeException("cost boom")
Source(List(1, 2, 3))
.groupedAdjacentByWeighted(_ => "k", 2)(elem => if (elem == 2) throw ex else 1L)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
.runWith(TestSink[Seq[Int]]())
.request(10)
.expectError(ex)
}

"fail when costFn throws with default supervision strategy" in {
val ex = new RuntimeException("cost boom")
Source(List(1, 2, 3))
.groupedAdjacentByWeighted(_ => "k", 2)(elem => if (elem == 2) throw ex else 1L)
.runWith(TestSink[Seq[Int]]())
.request(10)
.expectError(ex)
}

"resume when costFn throws and keep surrounding groups" in {
val ex = new RuntimeException("cost boom")
Source(List(1, 2, 3, 4, 5))
.groupedAdjacentByWeighted(_ => "k", 2)(elem => if (elem == 3) throw ex else 1L)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runWith(TestSink[Seq[Int]]())
.request(10)
.expectNext(Seq(1, 2))
.expectNext(Seq(4, 5))
.expectComplete()
}

"restart when costFn throws and drop current in-progress group" in {
val ex = new RuntimeException("cost boom")
Source(List(1, 2, 3, 4, 5))
.groupedAdjacentByWeighted(_ => "k", 2)(elem => if (elem == 3) throw ex else 1L)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.runWith(TestSink[Seq[Int]]())
.request(10)
.expectNext(Seq(4, 5))
.expectComplete()
}

"fail when groupedAdjacentBy key function throws and supervision decides to stop" in {
val ex = new RuntimeException("key boom")
Source(List(1, 1, 2, 3))
.groupedAdjacentBy(elem => if (elem == 2) throw ex else elem)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
.runWith(TestSink[Seq[Int]]())
.request(10)
.expectError(ex)
}

"resume when groupedAdjacentBy key function throws and skip offending elements" in {
val ex = new RuntimeException("key boom")
Source(List(1, 1, 2, 2, 3, 3))
.groupedAdjacentBy(elem => if (elem == 2) throw ex else elem)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runWith(TestSink[Seq[Int]]())
.request(10)
.expectNext(Seq(1, 1))
.expectNext(Seq(3, 3))
.expectComplete()
}

"resume when the key function throws on the last element and flush the kept group at completion" in {
val ex = new RuntimeException("key boom")
Source(List(1, 1, 2))
.groupedAdjacentBy(elem => if (elem == 2) throw ex else elem)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runWith(TestSink[Seq[Int]]())
.request(10)
.expectNext(Seq(1, 1))
.expectComplete()
}

"restart when groupedAdjacentBy key function throws and drop current group" in {
val ex = new RuntimeException("key boom")
Source(List(1, 1, 2, 2, 3, 3))
.groupedAdjacentBy(elem => if (elem == 2) throw ex else elem)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.runWith(TestSink[Seq[Int]]())
.request(10)
.expectNext(Seq(3, 3))
.expectComplete()
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
package org.apache.pekko.stream.impl.fusing

import scala.collection.immutable
import scala.util.control.NonFatal

import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.stream.ActorAttributes.SupervisionStrategy
import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet }
import pekko.stream.Supervision
import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import pekko.util.OptionVal
Expand Down Expand Up @@ -54,17 +57,30 @@ private[pekko] final case class GroupedAdjacentByWeighted[T, R](
private var hasElements: Boolean = false
private var currentKey: OptionVal[R] = OptionVal.none
private var pendingGroup: OptionVal[immutable.Seq[T]] = OptionVal.none
private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider

override def onPush(): Unit = {
val elem = grab(in)
val cost = costFn(elem)
val cost =
try costFn(elem)
catch {
case NonFatal(ex) =>
superviseUserFn(ex)
return
}

if (cost < 0L) {
failStage(new IllegalArgumentException(s"Negative weight [$cost] for element [$elem] is not allowed"))
return
}

val elemKey = f(elem)
val elemKey =
try f(elem)
catch {
case NonFatal(ex) =>
superviseUserFn(ex)
return
}
require(elemKey != null, "Element key must not be null")

if (shouldPushDirectly(cost)) {
Expand All @@ -78,6 +94,12 @@ private[pekko] final case class GroupedAdjacentByWeighted[T, R](
}
}

private def superviseUserFn(ex: Throwable): Unit = decider(ex) match {
case Supervision.Stop => failStage(ex)
case Supervision.Resume => tryPullIfNeeded()
case Supervision.Restart => resetGroup(); tryPullIfNeeded()
}

private def shouldPushDirectly(cost: Long): Boolean = {
cost >= maxWeight && !hasElements
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1340,6 +1340,9 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* The `f` function must return a non-null value for all elements, otherwise the stage will fail.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to the key function).
* On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped.
*
* '''Emits when''' the delimiter function returns a different value than the previous element's result
*
* '''Backpressures when''' a chunk has been assembled and downstream backpressures
Expand All @@ -1362,6 +1365,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs,
* otherwise the stage will fail.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to both the key and cost functions). On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped.
*
* '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`.
*
* '''Backpressures when''' a chunk has been assembled and downstream backpressures
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3236,6 +3236,9 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* The `f` function must return a non-null value for all elements, otherwise the stage will fail.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to the key function).
* On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped.
*
* '''Emits when''' the delimiter function returns a different value than the previous element's result
*
* '''Backpressures when''' a chunk has been assembled and downstream backpressures
Expand All @@ -3257,6 +3260,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs,
* otherwise the stage will fail.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to both the key and cost functions). On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped.
*
* '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`.
*
* '''Backpressures when''' a chunk has been assembled and downstream backpressures
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,9 @@ final class SubFlow[In, Out, Mat](
*
* The `f` function must return a non-null value for all elements, otherwise the stage will fail.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to the key function).
* On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped.
*
* '''Emits when''' the delimiter function returns a different value than the previous element's result
*
* '''Backpressures when''' a chunk has been assembled and downstream backpressures
Expand All @@ -763,6 +766,8 @@ final class SubFlow[In, Out, Mat](
* The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs,
* otherwise the stage will fail.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to both the key and cost functions). On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped.
*
* '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`.
*
* '''Backpressures when''' a chunk has been assembled and downstream backpressures
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,9 @@ final class SubSource[Out, Mat](
*
* The `f` function must return a non-null value for all elements, otherwise the stage will fail.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to the key function).
* On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped.
*
* '''Emits when''' the delimiter function returns a different value than the previous element's result
*
* '''Backpressures when''' a chunk has been assembled and downstream backpressures
Expand All @@ -753,6 +756,8 @@ final class SubSource[Out, Mat](
* The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs,
* otherwise the stage will fail.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to both the key and cost functions). On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped.
*
* '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`.
*
* '''Backpressures when''' a chunk has been assembled and downstream backpressures
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1857,6 +1857,9 @@ trait FlowOps[+Out, +Mat] {
*
* The `f` function must return a non-null value for all elements, otherwise the stage will fail.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to the key function).
* On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped.
*
* '''Emits when''' the delimiter function returns a different value than the previous element's result
*
* '''Backpressures when''' a chunk has been assembled and downstream backpressures
Expand All @@ -1878,6 +1881,8 @@ trait FlowOps[+Out, +Mat] {
* The `f` function must return a non-null value , and the `costFn` must return a non-negative result for all inputs,
* otherwise the stage will fail.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute (applied to both the key and cost functions). On `Supervision.Resume` the offending element is skipped; on `Supervision.Restart` the current group is dropped.
*
* '''Emits when''' the delimiter function returns a different value than the previous element's result, or exceeds the `maxWeight`.
*
* '''Backpressures when''' a chunk has been assembled and downstream backpressures
Expand Down