Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -436,5 +436,20 @@ class ActorGraphInterpreterSpec extends StreamSpec {
done.future.futureValue // would throw on failure
}

"continue working with subfused interpreters after initial shell reference is released" in {
val mat = Materializer(system)

// flatMapConcat triggers subfusing: the inner Source is materialized via
// SubFusingActorMaterializerImpl, which registers additional shells into
// the same ActorGraphInterpreter actor via registerShell.
// The actor must continue processing subfused shells correctly after
// the initial shell reference is released in preStart().
val result = Source(1 to 3)
.flatMapConcat(i => Source(List(i, i * 10)))
.runWith(Sink.seq)(mat)

result.futureValue should ===(Seq(1, 10, 2, 20, 3, 30))
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ import org.reactivestreams.Subscription
/**
* INTERNAL API
*/
@InternalApi private[pekko] final class ActorGraphInterpreter(_initial: GraphInterpreterShell)
@InternalApi private[pekko] final class ActorGraphInterpreter(private var _initial: GraphInterpreterShell)
extends Actor
with ActorLogging {
import ActorGraphInterpreter._
Expand All @@ -742,7 +742,7 @@ import org.reactivestreams.Subscription
try {
currentLimit = shell.init(self, subFusingMaterializerImpl, enqueueToShortCircuit, currentLimit)
if (GraphInterpreter.Debug)
println(s"registering new shell in ${_initial}\n ${shell.toString.replace("\n", "\n ")}")
println(s"registering new shell in ${shell}\n ${shell.toString.replace("\n", "\n ")}")
if (shell.isTerminated) false
else {
activeInterpreters += shell
Expand Down Expand Up @@ -790,6 +790,9 @@ import org.reactivestreams.Subscription

override def preStart(): Unit = {
tryInit(_initial)
// Release reference to initial shell to avoid keeping it alive after it is shut down
// when the actor is still alive hosting other subfused interpreters
_initial = null
if (activeInterpreters.isEmpty) context.stop(self)
else if (shortCircuitBuffer ne null) shortCircuitBatch()
}
Expand Down