Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ Aggregate and emit until custom boundary condition met.

This operator can be customized into a broad class of aggregate/group/fold operators, based on custom state or timer conditions.

This operator adheres to the `ActorAttributes.SupervisionStrategy` attribute for exceptions thrown by `allocate`,
`aggregate`, `harvest`, or timer predicate functions. On `Supervision.Stop` the stream fails; on
`Supervision.Resume` and `Supervision.Restart` the failing element or aggregate is dropped and the stream continues.

## Reactive Streams semantics

@@@div { .callout }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import scala.concurrent.duration._

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.stream.ActorAttributes
import pekko.stream.OverflowStrategy
import pekko.stream.Supervision
import pekko.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber }
import pekko.testkit.{ ExplicitlyTriggeredScheduler, PekkoSpec }

Expand Down Expand Up @@ -76,6 +78,94 @@ class AggregateWithBoundarySpec extends StreamSpec {
Await.result(result, 30.seconds) should be(Seq(Seq(1, 2, 3, 4), Seq(5, 6), Seq(7)))
}

"resume and reset aggregate state when aggregate throws" in {
val result = Source(1 to 4)
.aggregateWithBoundary(allocate = () => ListBuffer.empty[Int])(
aggregate = (buffer, i) => {
buffer += i
if (i == 3) throw new RuntimeException("boom")
(buffer, buffer.size >= 3)
},
harvest = _.toSeq,
emitOnTimer = None)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runWith(Sink.collection)

Await.result(result, 30.seconds) shouldBe Seq(Seq(4))
}

"restart and reset aggregate state when aggregate throws" in {
val result = Source(1 to 4)
.aggregateWithBoundary(allocate = () => ListBuffer.empty[Int])(
aggregate = (buffer, i) => {
buffer += i
if (i == 3) throw new RuntimeException("boom")
(buffer, buffer.size >= 3)
},
harvest = _.toSeq,
emitOnTimer = None)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.runWith(Sink.collection)

Await.result(result, 30.seconds) shouldBe Seq(Seq(4))
}

"resume when allocate throws and drop failing element" in {
var allocationCount = 0
val result = Source(1 to 5)
.aggregateWithBoundary(allocate = () => {
allocationCount += 1
if (allocationCount == 1) throw new RuntimeException("boom")
ListBuffer.empty[Int]
})(
aggregate = (buffer, i) => {
buffer += i
(buffer, buffer.size >= 2)
},
harvest = _.toSeq,
emitOnTimer = None)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runWith(Sink.collection)

Await.result(result, 30.seconds) shouldBe Seq(Seq(2, 3), Seq(4, 5))
}

"resume when harvest throws and drop failing aggregate" in {
val result = Source(1 to 5)
.aggregateWithBoundary(allocate = () => ListBuffer.empty[Int])(
aggregate = (buffer, i) => {
buffer += i
(buffer, buffer.size >= 2)
},
harvest = buffer => {
if (buffer.contains(3)) throw new RuntimeException("boom")
buffer.toSeq
},
emitOnTimer = None)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runWith(Sink.collection)

Await.result(result, 30.seconds) shouldBe Seq(Seq(1, 2), Seq(5))
}

"resume when harvest throws on upstream finish and complete" in {
val result = Source(1 to 3)
.aggregateWithBoundary(allocate = () => ListBuffer.empty[Int])(
aggregate = (buffer, i) => {
buffer += i
(buffer, buffer.size >= 2)
},
harvest = buffer => {
if (buffer.contains(3)) throw new RuntimeException("boom")
buffer.toSeq
},
emitOnTimer = None)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runWith(Sink.collection)

Await.result(result, 30.seconds) shouldBe Seq(Seq(1, 2))
}

}

// To run multiple tests in parallel using simulated timer,
Expand Down Expand Up @@ -284,4 +374,28 @@ class AggregateWithTimeBoundaryAndSimulatedTimeSpec extends AnyWordSpecLike with
downstream.request(2).expectNext(Seq(4, 5, 6), Seq(7)) // clean up the emit queue and complete downstream
downstream.expectComplete()
}

"drop current aggregate when timer predicate throws and supervision is Resume" in {
implicit val actorSystem = createActorSystem("4")

val p = TestPublisher.probe[Int]()
val result = Source
.fromPublisher(p)
.aggregateWithBoundary(allocate = () => ListBuffer.empty[Int])(
aggregate = (buffer, i) => {
buffer += i
(buffer, false)
},
harvest = _.toSeq,
emitOnTimer = Some((((_: ListBuffer[Int]) => throw new RuntimeException("boom")), 1.second)))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runWith(Sink.collection)

p.sendNext(1)
timePasses(1.second)
p.sendNext(2)
p.sendComplete()

Await.result(result, 30.seconds) shouldBe Seq(Seq(2))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
package org.apache.pekko.stream.impl.fusing

import scala.concurrent.duration._
import scala.util.control.NonFatal

import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.stream.ActorAttributes.SupervisionStrategy
import pekko.stream.Supervision
import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet }
import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler, TimerGraphStageLogic }

Expand All @@ -41,6 +44,7 @@ private[pekko] final case class AggregateWithBoundary[In, Agg, Out](

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with InHandler with OutHandler {
private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider

private var aggregated: Agg = null.asInstanceOf[Agg]

Expand All @@ -52,26 +56,95 @@ private[pekko] final case class AggregateWithBoundary[In, Agg, Out](

override protected def onTimer(timerKey: Any): Unit = {
emitOnTimer.foreach {
case (isReadyOnTimer, _) => if (aggregated != null && isReadyOnTimer(aggregated)) harvestAndEmit()
case (isReadyOnTimer, _) =>
if (aggregated != null) {
val maybeReadyToEmit =
try isReadyOnTimer(aggregated)
catch {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Stop =>
failStage(ex)
case Supervision.Resume =>
aggregated = null.asInstanceOf[Agg]
case Supervision.Restart =>
restartState()
}
false
}
if (maybeReadyToEmit) harvestAndEmit(pullOnRecover = false)
}
}
}

// at onPush, isAvailable(in)=true hasBeenPulled(in)=false, isAvailable(out) could be true or false due to timer triggered emit
override def onPush(): Unit = {
if (aggregated == null) aggregated = allocate()
val (updated, result) = aggregate(aggregated, grab(in))
aggregated = updated
if (result) harvestAndEmit()
// the decision to pull entirely depend on isAvailable(out)=true, regardless of result of aggregate
// 1. aggregate=true: isAvailable(out) will be false
// 2. aggregate=false: if isAvailable(out)=false, this means timer has caused emit, cannot pull or it could emit indefinitely bypassing back pressure
if (isAvailable(out)) pull(in)
val inElem = grab(in)
if (aggregated == null)
try aggregated = allocate()
catch {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Stop =>
failStage(ex)
case Supervision.Resume =>
pullIfPossible()
case Supervision.Restart =>
restartState()
pullIfPossible()
}
return
}

val shouldEmit =
try {
val (updated, result) = aggregate(aggregated, inElem)
aggregated = updated
result
} catch {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Stop =>
failStage(ex)
case Supervision.Resume =>
restartState()
pullIfPossible()
case Supervision.Restart =>
restartState()
pullIfPossible()
}
return
}

if (shouldEmit) harvestAndEmit(pullOnRecover = true)
else {
// the decision to pull entirely depend on isAvailable(out)=true
// if isAvailable(out)=false, this means timer has caused emit, cannot pull or it could emit indefinitely bypassing back pressure
pullIfPossible()
}
}

override def onUpstreamFinish(): Unit = {
// Note that emit is asynchronous, it will keep the stage alive until downstream actually take the element
if (aggregated != null) emit(out, harvest(aggregated))
completeStage()
if (aggregated != null)
try {
emit(out, harvest(aggregated))
aggregated = null.asInstanceOf[Agg]
completeStage()
} catch {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Stop =>
failStage(ex)
case Supervision.Resume =>
aggregated = null.asInstanceOf[Agg]
completeStage()
case Supervision.Restart =>
restartState()
completeStage()
}
}
else completeStage()
}

// at onPull, isAvailable(out) is always true indicating downstream is waiting
Expand All @@ -80,10 +153,29 @@ private[pekko] final case class AggregateWithBoundary[In, Agg, Out](

setHandlers(in, out, this)

private def harvestAndEmit(): Unit = {
emit(out, harvest(aggregated))
private def restartState(): Unit =
aggregated = null.asInstanceOf[Agg]
}

private def pullIfPossible(): Unit =
if (isAvailable(out) && !isClosed(in) && !hasBeenPulled(in)) pull(in)

private def harvestAndEmit(pullOnRecover: Boolean): Unit =
try {
emit(out, harvest(aggregated))
aggregated = null.asInstanceOf[Agg]
} catch {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Stop =>
failStage(ex)
case Supervision.Resume =>
aggregated = null.asInstanceOf[Agg]
if (pullOnRecover) pullIfPossible()
case Supervision.Restart =>
restartState()
if (pullOnRecover) pullIfPossible()
}
}

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4509,6 +4509,13 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* when custom condition is met which can be triggered by aggregate or timer.
* It can be thought of a more general [[groupedWeightedWithin]].
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* If `allocate`, `aggregate`, `harvest`, or timer predicate throws and the supervision decision is
* [[pekko.stream.Supervision#stop]] the stream fails. If the supervision decision is
* [[pekko.stream.Supervision#resume]] or [[pekko.stream.Supervision#restart]], the failing element or
* aggregate is dropped and the stream continues.
*
* '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true
*
* '''Backpressures when''' downstream backpressures and the aggregate is complete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4976,6 +4976,13 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* when custom condition is met which can be triggered by aggregate or timer.
* It can be thought of a more general [[groupedWeightedWithin]].
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* If `allocate`, `aggregate`, `harvest`, or timer predicate throws and the supervision decision is
* [[pekko.stream.Supervision#stop]] the stream fails. If the supervision decision is
* [[pekko.stream.Supervision#resume]] or [[pekko.stream.Supervision#restart]], the failing element or
* aggregate is dropped and the stream continues.
*
* '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true
*
* '''Backpressures when''' downstream backpressures and the aggregate is complete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3082,6 +3082,13 @@ final class SubFlow[In, Out, Mat](
* when custom condition is met which can be triggered by aggregate or timer.
* It can be thought of a more general [[groupedWeightedWithin]].
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* If `allocate`, `aggregate`, `harvest`, or timer predicate throws and the supervision decision is
* [[pekko.stream.Supervision#stop]] the stream fails. If the supervision decision is
* [[pekko.stream.Supervision#resume]] or [[pekko.stream.Supervision#restart]], the failing element or
* aggregate is dropped and the stream continues.
*
* '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true
*
* '''Backpressures when''' downstream backpressures and the aggregate is complete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3049,6 +3049,13 @@ final class SubSource[Out, Mat](
* when custom condition is met which can be triggered by aggregate or timer.
* It can be thought of a more general [[groupedWeightedWithin]].
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* If `allocate`, `aggregate`, `harvest`, or timer predicate throws and the supervision decision is
* [[pekko.stream.Supervision#stop]] the stream fails. If the supervision decision is
* [[pekko.stream.Supervision#resume]] or [[pekko.stream.Supervision#restart]], the failing element or
* aggregate is dropped and the stream continues.
*
* '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true
*
* '''Backpressures when''' downstream backpressures and the aggregate is complete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4116,6 +4116,13 @@ trait FlowOps[+Out, +Mat] {
* when custom condition is met which can be triggered by aggregate or timer.
* It can be thought of a more general [[groupedWeightedWithin]].
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* If `allocate`, `aggregate`, `harvest`, or timer predicate throws and the supervision decision is
* [[pekko.stream.Supervision.Stop]] the stream fails. If the supervision decision is
* [[pekko.stream.Supervision.Resume]] or [[pekko.stream.Supervision.Restart]], the failing element or
* aggregate is dropped and the stream continues.
*
* '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true
*
* '''Backpressures when''' downstream backpressures and the aggregate is complete
Expand Down