Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Chunk up this stream into groups of elements received within a time window, or l
Chunk up this stream into groups of elements received within a time window, or limited by the weight and number of
the elements, whatever happens first. Empty groups will not be emitted if no elements are received from upstream.
The last group before end-of-stream will contain the buffered elements since the previously emitted group.
The `groupedWeightedWithin` operator adheres to the ActorAttributes.SupervisionStrategy attribute. On `Supervision.Resume` the offending element is skipped and the current group is kept; on `Supervision.Restart` the current group is dropped.

See also:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import scala.collection.immutable
import scala.concurrent.duration._

import org.apache.pekko
import pekko.stream.ActorAttributes
import pekko.stream.Supervision
import pekko.stream.ThrottleMode
import pekko.stream.testkit._
import pekko.testkit.TimingTest
Expand Down Expand Up @@ -336,5 +338,130 @@ class FlowGroupedWithinSpec extends StreamSpec with ScriptedTest {
downstream.expectNext(Vector(7, 2): immutable.Seq[Long])
downstream.expectComplete()
}

// These supervision tests use a 30.seconds window that never fires, so they are deterministic and
// intentionally NOT tagged TimingTest — they must run in PR validation as regression guards.
"stop when cost function throws and supervision is Stop" in {
val ex = new RuntimeException("cost function boom")
val upstream = TestPublisher.probe[Long]()
val downstream = TestSubscriber.probe[immutable.Seq[Long]]()
Source
.fromPublisher(upstream)
.groupedWeightedWithin(100, 3, 30.seconds) { elem =>
if (elem == 99L) throw ex
else elem
}
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
.to(Sink.fromSubscriber(downstream))
.run()

downstream.ensureSubscription()
downstream.request(1)
upstream.sendNext(1L)
upstream.sendNext(99L)
downstream.expectError(ex)
}

"stop when cost function throws with default supervision" in {
val ex = new RuntimeException("cost function boom")
Source(List(1L, 99L, 2L))
.groupedWeightedWithin(100, 3, 30.seconds) { elem =>
if (elem == 99L) throw ex
else elem
}
.runWith(Sink.ignore)
.failed
.futureValue shouldBe ex
}

"resume when cost function throws and keep current group" in {
val ex = new RuntimeException("cost function boom")
val upstream = TestPublisher.probe[Long]()
val downstream = TestSubscriber.probe[immutable.Seq[Long]]()
Source
.fromPublisher(upstream)
.groupedWeightedWithin(100, 3, 30.seconds) { elem =>
if (elem == 99L) throw ex
else elem
}
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.to(Sink.fromSubscriber(downstream))
.run()

downstream.ensureSubscription()
upstream.sendNext(1L)
upstream.sendNext(2L)
upstream.sendNext(99L)
upstream.sendNext(3L)
upstream.sendNext(4L)
upstream.sendNext(5L)
upstream.sendComplete()

downstream.request(1)
downstream.expectNext(Vector(1L, 2L, 3L): immutable.Seq[Long])
downstream.request(1)
downstream.expectNext(Vector(4L, 5L): immutable.Seq[Long])
downstream.expectComplete()
}

"resume keeps weight accounting when cost function throws" in {
val ex = new RuntimeException("cost function boom")
val upstream = TestPublisher.probe[Long]()
val downstream = TestSubscriber.probe[immutable.Seq[Long]]()
Source
.fromPublisher(upstream)
// maxWeight=5, large maxNumber so grouping is weight-driven; identity cost
.groupedWeightedWithin(5, 100, 30.seconds) { elem =>
if (elem == 99L) throw ex
else elem
}
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.to(Sink.fromSubscriber(downstream))
.run()

downstream.ensureSubscription()
upstream.sendNext(2L)
upstream.sendNext(2L)
upstream.sendNext(99L) // skipped; its weight must NOT be counted, [2,2] (weight 4) kept
upstream.sendNext(3L) // 4 + 3 = 7 > maxWeight 5 -> [2,2] flushed, 3 starts a new group
upstream.sendComplete()

downstream.request(1)
downstream.expectNext(Vector(2L, 2L): immutable.Seq[Long])
downstream.request(1)
downstream.expectNext(Vector(3L): immutable.Seq[Long])
downstream.expectComplete()
}

"restart when cost function throws and drop current group" in {
val ex = new RuntimeException("cost function boom")
val upstream = TestPublisher.probe[Long]()
val downstream = TestSubscriber.probe[immutable.Seq[Long]]()
Source
.fromPublisher(upstream)
.groupedWeightedWithin(100, 3, 30.seconds) { elem =>
if (elem == 99L) throw ex
else elem
}
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.to(Sink.fromSubscriber(downstream))
.run()

downstream.ensureSubscription()
upstream.sendNext(1L)
upstream.sendNext(2L)
upstream.sendNext(99L)
upstream.sendNext(3L)
upstream.sendNext(4L)
upstream.sendNext(5L)
upstream.sendNext(6L)
upstream.sendComplete()

downstream.request(1)
downstream.expectNext(Vector(3L, 4L, 5L): immutable.Seq[Long])
downstream.request(1)
downstream.expectNext(Vector(6L): immutable.Seq[Long])
downstream.expectComplete()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1845,6 +1845,7 @@ private[stream] object Collect {
private var totalWeight = 0L
private var totalNumber = 0
private var hasElements = false
private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
private val contextPropagation = ContextPropagation()

override def preStart() = {
Expand All @@ -1853,8 +1854,28 @@ private[stream] object Collect {
}

private def nextElement(elem: T): Unit = {
// costFn is user code and may throw; honor supervision strategy.
val cost =
try costFn(elem)
catch {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Stop =>
failStage(ex)
return
case Supervision.Resume =>
// skip the offending element and keep the current group
if (!hasBeenPulled(in)) pull(in)
return
case Supervision.Restart =>
// drop the current group and reset the time window, then continue from a fresh accumulator
resetGroupState()
scheduleWithFixedDelay(GroupedWeightedWithin.groupedWeightedWithinTimer, interval, interval)
if (!hasBeenPulled(in)) pull(in)
return
}
}
groupEmitted = false
val cost = costFn(elem)
if (cost < 0L)
failStage(new IllegalArgumentException(s"Negative weight [$cost] for element [$elem] is not allowed"))
else {
Expand Down Expand Up @@ -1897,6 +1918,17 @@ private[stream] object Collect {
}
}

private def resetGroupState(): Unit = {
builder.clear()
pending = null.asInstanceOf[T]
pendingWeight = 0L
pushEagerly = false
groupEmitted = true
totalWeight = 0L
totalNumber = 0
hasElements = false
}

private def tryCloseGroup(): Unit = {
if (isAvailable(out)) emitGroup()
else if (pending != null || finished) pushEagerly = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1699,6 +1699,10 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* The last group before end-of-stream will contain the buffered elements
* since the previously emitted group.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* offending element is skipped and the current group is kept; on `Supervision.Restart` the current
* group is dropped.
*
* '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached
*
* '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight`
Expand All @@ -1723,6 +1727,10 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* The last group before end-of-stream will contain the buffered elements
* since the previously emitted group.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* offending element is skipped and the current group is kept; on `Supervision.Restart` the current
* group is dropped.
*
* '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached
*
* '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3583,6 +3583,10 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* The last group before end-of-stream will contain the buffered elements
* since the previously emitted group.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* offending element is skipped and the current group is kept; on `Supervision.Restart` the current
* group is dropped.
*
* '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached
*
* '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight`
Expand All @@ -3607,6 +3611,10 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* The last group before end-of-stream will contain the buffered elements
* since the previously emitted group.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* offending element is skipped and the current group is kept; on `Supervision.Restart` the current
* group is dropped.
*
* '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached
*
* '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,10 @@ final class SubFlow[In, Out, Mat](
* The last group before end-of-stream will contain the buffered elements
* since the previously emitted group.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* offending element is skipped and the current group is kept; on `Supervision.Restart` the current
* group is dropped.
*
* '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached
*
* '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight`
Expand All @@ -1117,6 +1121,10 @@ final class SubFlow[In, Out, Mat](
* The last group before end-of-stream will contain the buffered elements
* since the previously emitted group.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* offending element is skipped and the current group is kept; on `Supervision.Restart` the current
* group is dropped.
*
* '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached
*
* '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,10 @@ final class SubSource[Out, Mat](
* The last group before end-of-stream will contain the buffered elements
* since the previously emitted group.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* offending element is skipped and the current group is kept; on `Supervision.Restart` the current
* group is dropped.
*
* '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached
*
* '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight`
Expand All @@ -1103,6 +1107,10 @@ final class SubSource[Out, Mat](
* The last group before end-of-stream will contain the buffered elements
* since the previously emitted group.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* offending element is skipped and the current group is kept; on `Supervision.Restart` the current
* group is dropped.
*
* '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached
*
* '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2207,6 +2207,10 @@ trait FlowOps[+Out, +Mat] {
* `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise
* IllegalArgumentException is thrown.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* offending element is skipped and the current group is kept; on `Supervision.Restart` the current
* group is dropped.
*
* '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached
*
* '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight`
Expand All @@ -2228,6 +2232,10 @@ trait FlowOps[+Out, +Mat] {
* `maxWeight` must be positive, `maxNumber` must be positive, and `d` must be greater than 0 seconds,
* otherwise IllegalArgumentException is thrown.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* offending element is skipped and the current group is kept; on `Supervision.Restart` the current
* group is dropped.
*
* '''Emits when''' the configured time elapses since the last group has been emitted or weight limit reached
*
* '''Backpressures when''' downstream backpressures, and buffered group (+ pending element) weighs more than
Expand Down