diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreterSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreterSpec.scala index b6919b4f1e..5af288c961 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreterSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreterSpec.scala @@ -436,5 +436,20 @@ class ActorGraphInterpreterSpec extends StreamSpec { done.future.futureValue // would throw on failure } + "continue working with subfused interpreters after initial shell reference is released" in { + val mat = Materializer(system) + + // flatMapConcat triggers subfusing: the inner Source is materialized via + // SubFusingActorMaterializerImpl, which registers additional shells into + // the same ActorGraphInterpreter actor via registerShell. + // The actor must continue processing subfused shells correctly after + // the initial shell reference is released in preStart(). + val result = Source(1 to 3) + .flatMapConcat(i => Source(List(i, i * 10))) + .runWith(Sink.seq)(mat) + + result.futureValue should ===(Seq(1, 10, 2, 20, 3, 30)) + } + } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala index 61eb9b8a29..d11aa15b37 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala @@ -729,7 +729,7 @@ import org.reactivestreams.Subscription /** * INTERNAL API */ -@InternalApi private[pekko] final class ActorGraphInterpreter(_initial: GraphInterpreterShell) +@InternalApi private[pekko] final class ActorGraphInterpreter(private var _initial: GraphInterpreterShell) extends Actor with ActorLogging { import ActorGraphInterpreter._ @@ -742,7 +742,7 @@ import org.reactivestreams.Subscription try { currentLimit = shell.init(self, subFusingMaterializerImpl, enqueueToShortCircuit, currentLimit) if (GraphInterpreter.Debug) - println(s"registering new shell in ${_initial}\n ${shell.toString.replace("\n", "\n ")}") + println(s"registering new shell in ${shell}\n ${shell.toString.replace("\n", "\n ")}") if (shell.isTerminated) false else { activeInterpreters += shell @@ -790,6 +790,9 @@ import org.reactivestreams.Subscription override def preStart(): Unit = { tryInit(_initial) + // Release reference to initial shell to avoid keeping it alive after it is shut down + // when the actor is still alive hosting other subfused interpreters + _initial = null if (activeInterpreters.isEmpty) context.stop(self) else if (shortCircuitBuffer ne null) shortCircuitBatch() }