From 6bc01426582dd5415768ba019af4dff7228ff434 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Tue, 16 Jun 2026 20:14:49 +0800 Subject: [PATCH] fix: release reference to initial interpreter shell after it is shut down Motivation: ActorGraphInterpreter retains a strong reference to its initial GraphInterpreterShell via the _initial constructor val. When the actor outlives the initial shell (e.g., while hosting subfused interpreters registered via registerShell), the initial shell and all its logics cannot be garbage collected for the lifetime of the actor. Modification: - Convert _initial from a constructor val to a private var - Set _initial = null at the end of preStart() after tryInit completes - Fix debug println in tryInit to reference the shell parameter instead of _initial (which may be null after preStart) - Add behavioral test using flatMapConcat which triggers subfusing via SubFusingActorMaterializerImpl, verifying the actor continues processing subfused shells after the initial shell is released Result: The initial GraphInterpreterShell and its stage logics become eligible for garbage collection once the initial shell shuts down, reducing memory retention in long-lived ActorGraphInterpreter actors. Tests: - stream-tests/testOnly ActorGraphInterpreterSpec: 12/12 passed References: None - memory leak fix --- .../impl/fusing/ActorGraphInterpreterSpec.scala | 15 +++++++++++++++ .../impl/fusing/ActorGraphInterpreter.scala | 7 +++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreterSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreterSpec.scala index b6919b4f1e..5af288c961 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreterSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreterSpec.scala @@ -436,5 +436,20 @@ class ActorGraphInterpreterSpec extends StreamSpec { done.future.futureValue // would throw on failure } + "continue working with subfused interpreters after initial shell reference is released" in { + val mat = Materializer(system) + + // flatMapConcat triggers subfusing: the inner Source is materialized via + // SubFusingActorMaterializerImpl, which registers additional shells into + // the same ActorGraphInterpreter actor via registerShell. + // The actor must continue processing subfused shells correctly after + // the initial shell reference is released in preStart(). + val result = Source(1 to 3) + .flatMapConcat(i => Source(List(i, i * 10))) + .runWith(Sink.seq)(mat) + + result.futureValue should ===(Seq(1, 10, 2, 20, 3, 30)) + } + } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala index 61eb9b8a29..d11aa15b37 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala @@ -729,7 +729,7 @@ import org.reactivestreams.Subscription /** * INTERNAL API */ -@InternalApi private[pekko] final class ActorGraphInterpreter(_initial: GraphInterpreterShell) +@InternalApi private[pekko] final class ActorGraphInterpreter(private var _initial: GraphInterpreterShell) extends Actor with ActorLogging { import ActorGraphInterpreter._ @@ -742,7 +742,7 @@ import org.reactivestreams.Subscription try { currentLimit = shell.init(self, subFusingMaterializerImpl, enqueueToShortCircuit, currentLimit) if (GraphInterpreter.Debug) - println(s"registering new shell in ${_initial}\n ${shell.toString.replace("\n", "\n ")}") + println(s"registering new shell in ${shell}\n ${shell.toString.replace("\n", "\n ")}") if (shell.isTerminated) false else { activeInterpreters += shell @@ -790,6 +790,9 @@ import org.reactivestreams.Subscription override def preStart(): Unit = { tryInit(_initial) + // Release reference to initial shell to avoid keeping it alive after it is shut down + // when the actor is still alive hosting other subfused interpreters + _initial = null if (activeInterpreters.isEmpty) context.stop(self) else if (shortCircuitBuffer ne null) shortCircuitBatch() }