From bd87a8d36adfbae04aa1291d0c8fac01ce406f30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Thu, 25 Jun 2026 20:24:58 +0800 Subject: [PATCH 1/2] fix: add supervision strategy support for aggregateWithBoundary callbacks Motivation: aggregateWithBoundary invokes multiple user callbacks (`allocate`, `aggregate`, `harvest`, and timer predicate), but exceptions were not supervised and failed the stream unconditionally. Modification: Add decider-based exception handling in AggregateWithBoundary for all callback sites: - allocate/aggregate in onPush - timer predicate and harvest in onTimer - harvest in onUpstreamFinish Stop fails the stage. Resume/Restart drop failing element/aggregate and continue. For aggregate callback failures, Resume now resets aggregate state to avoid retaining partially-mutated mutable state. Update Scala/Java API docs and operator docs to document supervision behavior. Add directional regression tests in AggregateWithBoundarySpec covering: - aggregate callback failures under Resume/Restart - allocate callback failure under Resume - harvest callback failure under Resume - timer predicate failure under Resume (explicit scheduler) Result: aggregateWithBoundary now honors ActorAttributes.SupervisionStrategy across all user callback failure points with deterministic regression coverage. Tests: - sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.AggregateWithBoundarySpec org.apache.pekko.stream.scaladsl.AggregateWithTimeBoundaryAndSimulatedTimeSpec" -- 11/11 passed - sbt "stream/mimaReportBinaryIssues" -- clean - sbt "docs/paradox" -- passed References: Refs #3110 --- .../Source-or-Flow/aggregateWithBoundary.md | 4 + .../scaladsl/AggregateWithBoundarySpec.scala | 96 +++++++++++++ .../impl/fusing/AggregateWithBoundary.scala | 136 ++++++++++++++++-- .../apache/pekko/stream/javadsl/Flow.scala | 7 + .../apache/pekko/stream/javadsl/Source.scala | 7 + .../apache/pekko/stream/javadsl/SubFlow.scala | 7 + .../pekko/stream/javadsl/SubSource.scala | 7 + .../apache/pekko/stream/scaladsl/Flow.scala | 7 + 8 files changed, 257 insertions(+), 14 deletions(-) diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/aggregateWithBoundary.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/aggregateWithBoundary.md index 4c053db52a9..304575216f3 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/aggregateWithBoundary.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/aggregateWithBoundary.md @@ -16,6 +16,10 @@ Aggregate and emit until custom boundary condition met. This operator can be customized into a broad class of aggregate/group/fold operators, based on custom state or timer conditions. +This operator adheres to the `ActorAttributes.SupervisionStrategy` attribute for exceptions thrown by `allocate`, +`aggregate`, `harvest`, or timer predicate functions. On `Supervision.Stop` the stream fails; on +`Supervision.Resume` and `Supervision.Restart` the failing element or aggregate is dropped and the stream continues. + ## Reactive Streams semantics @@@div { .callout } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/AggregateWithBoundarySpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/AggregateWithBoundarySpec.scala index bc04ff384e5..4e10e562adc 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/AggregateWithBoundarySpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/AggregateWithBoundarySpec.scala @@ -19,7 +19,9 @@ import scala.concurrent.duration._ import org.apache.pekko import pekko.actor.ActorSystem +import pekko.stream.ActorAttributes import pekko.stream.OverflowStrategy +import pekko.stream.Supervision import pekko.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } import pekko.testkit.{ ExplicitlyTriggeredScheduler, PekkoSpec } @@ -76,6 +78,76 @@ class AggregateWithBoundarySpec extends StreamSpec { Await.result(result, 30.seconds) should be(Seq(Seq(1, 2, 3, 4), Seq(5, 6), Seq(7))) } + "resume and reset aggregate state when aggregate throws" in { + val result = Source(1 to 4) + .aggregateWithBoundary(allocate = () => ListBuffer.empty[Int])( + aggregate = (buffer, i) => { + buffer += i + if (i == 3) throw new RuntimeException("boom") + (buffer, buffer.size >= 3) + }, + harvest = _.toSeq, + emitOnTimer = None) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(Sink.collection) + + Await.result(result, 30.seconds) shouldBe Seq(Seq(4)) + } + + "restart and reset aggregate state when aggregate throws" in { + val result = Source(1 to 4) + .aggregateWithBoundary(allocate = () => ListBuffer.empty[Int])( + aggregate = (buffer, i) => { + buffer += i + if (i == 3) throw new RuntimeException("boom") + (buffer, buffer.size >= 3) + }, + harvest = _.toSeq, + emitOnTimer = None) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .runWith(Sink.collection) + + Await.result(result, 30.seconds) shouldBe Seq(Seq(4)) + } + + "resume when allocate throws and drop failing element" in { + var allocationCount = 0 + val result = Source(1 to 5) + .aggregateWithBoundary(allocate = () => { + allocationCount += 1 + if (allocationCount == 1) throw new RuntimeException("boom") + ListBuffer.empty[Int] + })( + aggregate = (buffer, i) => { + buffer += i + (buffer, buffer.size >= 2) + }, + harvest = _.toSeq, + emitOnTimer = None) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(Sink.collection) + + Await.result(result, 30.seconds) shouldBe Seq(Seq(2, 3), Seq(4, 5)) + } + + "resume when harvest throws and drop failing aggregate" in { + val result = Source(1 to 5) + .aggregateWithBoundary(allocate = () => ListBuffer.empty[Int])( + aggregate = (buffer, i) => { + buffer += i + (buffer, buffer.size >= 2) + }, + harvest = buffer => { + if (buffer.contains(3)) throw new RuntimeException("boom") + buffer.toSeq + }, + emitOnTimer = None) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(Sink.collection) + + Await.result(result, 30.seconds) shouldBe Seq(Seq(1, 2), Seq(5)) + } + } // To run multiple tests in parallel using simulated timer, @@ -284,4 +356,28 @@ class AggregateWithTimeBoundaryAndSimulatedTimeSpec extends AnyWordSpecLike with downstream.request(2).expectNext(Seq(4, 5, 6), Seq(7)) // clean up the emit queue and complete downstream downstream.expectComplete() } + + "drop current aggregate when timer predicate throws and supervision is Resume" in { + implicit val actorSystem = createActorSystem("4") + + val p = TestPublisher.probe[Int]() + val result = Source + .fromPublisher(p) + .aggregateWithBoundary(allocate = () => ListBuffer.empty[Int])( + aggregate = (buffer, i) => { + buffer += i + (buffer, false) + }, + harvest = _.toSeq, + emitOnTimer = Some((((_: ListBuffer[Int]) => throw new RuntimeException("boom")), 1.second))) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(Sink.collection) + + p.sendNext(1) + timePasses(1.second) + p.sendNext(2) + p.sendComplete() + + Await.result(result, 30.seconds) shouldBe Seq(Seq(2)) + } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/AggregateWithBoundary.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/AggregateWithBoundary.scala index 7d886a8ed09..c10f162a9b0 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/AggregateWithBoundary.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/AggregateWithBoundary.scala @@ -14,9 +14,12 @@ package org.apache.pekko.stream.impl.fusing import scala.concurrent.duration._ +import scala.util.control.NonFatal import org.apache.pekko import pekko.annotation.InternalApi +import pekko.stream.ActorAttributes.SupervisionStrategy +import pekko.stream.Supervision import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet } import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler, TimerGraphStageLogic } @@ -41,6 +44,7 @@ private[pekko] final case class AggregateWithBoundary[In, Agg, Out]( override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler { + private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider private var aggregated: Agg = null.asInstanceOf[Agg] @@ -52,26 +56,95 @@ private[pekko] final case class AggregateWithBoundary[In, Agg, Out]( override protected def onTimer(timerKey: Any): Unit = { emitOnTimer.foreach { - case (isReadyOnTimer, _) => if (aggregated != null && isReadyOnTimer(aggregated)) harvestAndEmit() + case (isReadyOnTimer, _) => + if (aggregated != null) { + val maybeReadyToEmit = + try isReadyOnTimer(aggregated) + catch { + case NonFatal(ex) => + decider(ex) match { + case Supervision.Stop => + failStage(ex) + case Supervision.Resume => + aggregated = null.asInstanceOf[Agg] + case Supervision.Restart => + restartState() + } + false + } + if (maybeReadyToEmit) harvestAndEmitOnTimer() + } } } // at onPush, isAvailable(in)=true hasBeenPulled(in)=false, isAvailable(out) could be true or false due to timer triggered emit override def onPush(): Unit = { - if (aggregated == null) aggregated = allocate() - val (updated, result) = aggregate(aggregated, grab(in)) - aggregated = updated - if (result) harvestAndEmit() - // the decision to pull entirely depend on isAvailable(out)=true, regardless of result of aggregate - // 1. aggregate=true: isAvailable(out) will be false - // 2. aggregate=false: if isAvailable(out)=false, this means timer has caused emit, cannot pull or it could emit indefinitely bypassing back pressure - if (isAvailable(out)) pull(in) + val inElem = grab(in) + if (aggregated == null) + try aggregated = allocate() + catch { + case NonFatal(ex) => + decider(ex) match { + case Supervision.Stop => + failStage(ex) + case Supervision.Resume => + pullIfPossible() + case Supervision.Restart => + restartState() + pullIfPossible() + } + return + } + + val shouldEmit = + try { + val (updated, result) = aggregate(aggregated, inElem) + aggregated = updated + result + } catch { + case NonFatal(ex) => + decider(ex) match { + case Supervision.Stop => + failStage(ex) + case Supervision.Resume => + restartState() + pullIfPossible() + case Supervision.Restart => + restartState() + pullIfPossible() + } + return + } + + if (shouldEmit) harvestAndEmitFromOnPush() + else { + // the decision to pull entirely depend on isAvailable(out)=true + // if isAvailable(out)=false, this means timer has caused emit, cannot pull or it could emit indefinitely bypassing back pressure + pullIfPossible() + } } override def onUpstreamFinish(): Unit = { // Note that emit is asynchronous, it will keep the stage alive until downstream actually take the element - if (aggregated != null) emit(out, harvest(aggregated)) - completeStage() + if (aggregated != null) + try { + emit(out, harvest(aggregated)) + aggregated = null.asInstanceOf[Agg] + completeStage() + } catch { + case NonFatal(ex) => + decider(ex) match { + case Supervision.Stop => + failStage(ex) + case Supervision.Resume => + aggregated = null.asInstanceOf[Agg] + completeStage() + case Supervision.Restart => + restartState() + completeStage() + } + } + else completeStage() } // at onPull, isAvailable(out) is always true indicating downstream is waiting @@ -80,10 +153,45 @@ private[pekko] final case class AggregateWithBoundary[In, Agg, Out]( setHandlers(in, out, this) - private def harvestAndEmit(): Unit = { - emit(out, harvest(aggregated)) + private def restartState(): Unit = aggregated = null.asInstanceOf[Agg] - } + + private def pullIfPossible(): Unit = + if (isAvailable(out) && !isClosed(in) && !hasBeenPulled(in)) pull(in) + + private def harvestAndEmitFromOnPush(): Unit = + try { + emit(out, harvest(aggregated)) + aggregated = null.asInstanceOf[Agg] + } catch { + case NonFatal(ex) => + decider(ex) match { + case Supervision.Stop => + failStage(ex) + case Supervision.Resume => + aggregated = null.asInstanceOf[Agg] + pullIfPossible() + case Supervision.Restart => + restartState() + pullIfPossible() + } + } + + private def harvestAndEmitOnTimer(): Unit = + try { + emit(out, harvest(aggregated)) + aggregated = null.asInstanceOf[Agg] + } catch { + case NonFatal(ex) => + decider(ex) match { + case Supervision.Stop => + failStage(ex) + case Supervision.Resume => + aggregated = null.asInstanceOf[Agg] + case Supervision.Restart => + restartState() + } + } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 512b8ac022c..a29dd00a6bc 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -4509,6 +4509,13 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * when custom condition is met which can be triggered by aggregate or timer. * It can be thought of a more general [[groupedWeightedWithin]]. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If `allocate`, `aggregate`, `harvest`, or timer predicate throws and the supervision decision is + * [[pekko.stream.Supervision#stop]] the stream fails. If the supervision decision is + * [[pekko.stream.Supervision#resume]] or [[pekko.stream.Supervision#restart]], the failing element or + * aggregate is dropped and the stream continues. + * * '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true * * '''Backpressures when''' downstream backpressures and the aggregate is complete diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index e6ed914a8e7..6db06d32efc 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -4976,6 +4976,13 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * when custom condition is met which can be triggered by aggregate or timer. * It can be thought of a more general [[groupedWeightedWithin]]. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If `allocate`, `aggregate`, `harvest`, or timer predicate throws and the supervision decision is + * [[pekko.stream.Supervision#stop]] the stream fails. If the supervision decision is + * [[pekko.stream.Supervision#resume]] or [[pekko.stream.Supervision#restart]], the failing element or + * aggregate is dropped and the stream continues. + * * '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true * * '''Backpressures when''' downstream backpressures and the aggregate is complete diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index e2a1644e12f..2deaa316ac9 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -3082,6 +3082,13 @@ final class SubFlow[In, Out, Mat]( * when custom condition is met which can be triggered by aggregate or timer. * It can be thought of a more general [[groupedWeightedWithin]]. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If `allocate`, `aggregate`, `harvest`, or timer predicate throws and the supervision decision is + * [[pekko.stream.Supervision#stop]] the stream fails. If the supervision decision is + * [[pekko.stream.Supervision#resume]] or [[pekko.stream.Supervision#restart]], the failing element or + * aggregate is dropped and the stream continues. + * * '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true * * '''Backpressures when''' downstream backpressures and the aggregate is complete diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index c756c91c562..75aef72eead 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -3049,6 +3049,13 @@ final class SubSource[Out, Mat]( * when custom condition is met which can be triggered by aggregate or timer. * It can be thought of a more general [[groupedWeightedWithin]]. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If `allocate`, `aggregate`, `harvest`, or timer predicate throws and the supervision decision is + * [[pekko.stream.Supervision#stop]] the stream fails. If the supervision decision is + * [[pekko.stream.Supervision#resume]] or [[pekko.stream.Supervision#restart]], the failing element or + * aggregate is dropped and the stream continues. + * * '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true * * '''Backpressures when''' downstream backpressures and the aggregate is complete diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 4ae16e46ce4..f5f487dbafe 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -4116,6 +4116,13 @@ trait FlowOps[+Out, +Mat] { * when custom condition is met which can be triggered by aggregate or timer. * It can be thought of a more general [[groupedWeightedWithin]]. * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * If `allocate`, `aggregate`, `harvest`, or timer predicate throws and the supervision decision is + * [[pekko.stream.Supervision.Stop]] the stream fails. If the supervision decision is + * [[pekko.stream.Supervision.Resume]] or [[pekko.stream.Supervision.Restart]], the failing element or + * aggregate is dropped and the stream continues. + * * '''Emits when''' the aggregation function decides the aggregate is complete or the timer function returns true * * '''Backpressures when''' downstream backpressures and the aggregate is complete From 9a4f0b9934d82e0582ab50cd16cb71e29094d0ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Sat, 27 Jun 2026 02:11:58 +0800 Subject: [PATCH 2/2] refactor: deduplicate harvest emission and add upstream-finish supervision test Motivation: The supervision handling introduced two near-identical harvest methods (harvestAndEmitFromOnPush / harvestAndEmitOnTimer) differing only in whether pullIfPossible is called after recovery, and the onUpstreamFinish harvest recovery path lacked directional test coverage. Modification: - Unify the two harvest methods into a single harvestAndEmit(pullOnRecover) that conditionally pulls on recovery; onPush passes true, onTimer passes false to preserve the original timer semantics of not managing pull state. - Add a directional test for harvest failure on upstream completion (Resume): the failing final aggregate is dropped and the stream completes normally. Result: Less duplication in the supervision recovery code and regression coverage for the previously untested onUpstreamFinish harvest path. Tests: - sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.AggregateWithBoundarySpec org.apache.pekko.stream.scaladsl.AggregateWithTimeBoundaryAndSimulatedTimeSpec" -- 12/12 passed - sbt "stream/mimaReportBinaryIssues" -- clean - scalafmt --test --mode diff-ref=origin/main -- all formatted References: Refs #3110 --- .../scaladsl/AggregateWithBoundarySpec.scala | 18 +++++++++++++ .../impl/fusing/AggregateWithBoundary.scala | 26 ++++--------------- 2 files changed, 23 insertions(+), 21 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/AggregateWithBoundarySpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/AggregateWithBoundarySpec.scala index 4e10e562adc..023baf3d679 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/AggregateWithBoundarySpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/AggregateWithBoundarySpec.scala @@ -148,6 +148,24 @@ class AggregateWithBoundarySpec extends StreamSpec { Await.result(result, 30.seconds) shouldBe Seq(Seq(1, 2), Seq(5)) } + "resume when harvest throws on upstream finish and complete" in { + val result = Source(1 to 3) + .aggregateWithBoundary(allocate = () => ListBuffer.empty[Int])( + aggregate = (buffer, i) => { + buffer += i + (buffer, buffer.size >= 2) + }, + harvest = buffer => { + if (buffer.contains(3)) throw new RuntimeException("boom") + buffer.toSeq + }, + emitOnTimer = None) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(Sink.collection) + + Await.result(result, 30.seconds) shouldBe Seq(Seq(1, 2)) + } + } // To run multiple tests in parallel using simulated timer, diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/AggregateWithBoundary.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/AggregateWithBoundary.scala index c10f162a9b0..296ac9f548c 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/AggregateWithBoundary.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/AggregateWithBoundary.scala @@ -72,7 +72,7 @@ private[pekko] final case class AggregateWithBoundary[In, Agg, Out]( } false } - if (maybeReadyToEmit) harvestAndEmitOnTimer() + if (maybeReadyToEmit) harvestAndEmit(pullOnRecover = false) } } } @@ -116,7 +116,7 @@ private[pekko] final case class AggregateWithBoundary[In, Agg, Out]( return } - if (shouldEmit) harvestAndEmitFromOnPush() + if (shouldEmit) harvestAndEmit(pullOnRecover = true) else { // the decision to pull entirely depend on isAvailable(out)=true // if isAvailable(out)=false, this means timer has caused emit, cannot pull or it could emit indefinitely bypassing back pressure @@ -159,25 +159,7 @@ private[pekko] final case class AggregateWithBoundary[In, Agg, Out]( private def pullIfPossible(): Unit = if (isAvailable(out) && !isClosed(in) && !hasBeenPulled(in)) pull(in) - private def harvestAndEmitFromOnPush(): Unit = - try { - emit(out, harvest(aggregated)) - aggregated = null.asInstanceOf[Agg] - } catch { - case NonFatal(ex) => - decider(ex) match { - case Supervision.Stop => - failStage(ex) - case Supervision.Resume => - aggregated = null.asInstanceOf[Agg] - pullIfPossible() - case Supervision.Restart => - restartState() - pullIfPossible() - } - } - - private def harvestAndEmitOnTimer(): Unit = + private def harvestAndEmit(pullOnRecover: Boolean): Unit = try { emit(out, harvest(aggregated)) aggregated = null.asInstanceOf[Agg] @@ -188,8 +170,10 @@ private[pekko] final case class AggregateWithBoundary[In, Agg, Out]( failStage(ex) case Supervision.Resume => aggregated = null.asInstanceOf[Agg] + if (pullOnRecover) pullIfPossible() case Supervision.Restart => restartState() + if (pullOnRecover) pullIfPossible() } }