Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -216,4 +216,11 @@ public void launchFrame(final RunFrame frame, final VirtualProc proc) {
public void setTestMode(boolean testMode) {
this.testMode = testMode;
}

public void shutdown() {
if (channelCache != null) {
logger.info("Shutting down RqdClientGrpc channel cache");
channelCache.invalidateAll();
}
Comment on lines +220 to +224
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Prevent channel recreation after shutdown starts.

shutdown() currently closes existing cached channels, but new channels can still be created afterward via channelCache.get(host) if any teardown-time call races in. That can reintroduce the same leak warning during shutdown.

Suggested fix
+import java.util.concurrent.atomic.AtomicBoolean;
...
+    private final AtomicBoolean shuttingDown = new AtomicBoolean(false);
...
     private RqdInterfaceGrpc.RqdInterfaceBlockingStub getStub(String host)
             throws ExecutionException {
+        if (shuttingDown.get()) {
+            throw new IllegalStateException("RqdClientGrpc is shutting down");
+        }
         if (channelCache == null) {
             buildChannelCache();
         }
         ManagedChannel channel = channelCache.get(host);
         return RqdInterfaceGrpc.newBlockingStub(channel).withDeadlineAfter(rqdTaskDeadlineSeconds,
                 TimeUnit.SECONDS);
     }

     private RunningFrameGrpc.RunningFrameBlockingStub getRunningFrameStub(String host)
             throws ExecutionException {
+        if (shuttingDown.get()) {
+            throw new IllegalStateException("RqdClientGrpc is shutting down");
+        }
         if (channelCache == null) {
             buildChannelCache();
         }
         ManagedChannel channel = channelCache.get(host);
         return RunningFrameGrpc.newBlockingStub(channel).withDeadlineAfter(rqdTaskDeadlineSeconds,
                 TimeUnit.SECONDS);
     }
...
     public void shutdown() {
-        if (channelCache != null) {
+        if (!shuttingDown.compareAndSet(false, true)) {
+            return;
+        }
+        if (channelCache != null) {
             logger.info("Shutting down RqdClientGrpc channel cache");
             channelCache.invalidateAll();
+            channelCache.cleanUp();
         }
     }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@cuebot/src/main/java/com/imageworks/spcue/rqd/RqdClientGrpc.java` around
lines 220 - 224, The shutdown() method currently only invalidates channelCache
but does not prevent new channels from being created via channelCache.get(host);
add a shutdown guard (e.g., an atomic boolean like isShutdown) that shutdown()
sets before invalidating/invalidationComplete and update any code that calls
channelCache.get(host) to check this guard and refuse or short-circuit creation
if shutdown has begun; ensure shutdown() also prevents further puts/gets by
either making channelCache null after setting the flag or having creation paths
throw/return early when isShutdown is true so no new channels are recreated
during teardown.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
<!-- Non-Transactional Service Domain -->
<!-- ##################################################################################### -->

<bean id="rqdClient" class="com.imageworks.spcue.rqd.RqdClientGrpc">
<bean id="rqdClient" class="com.imageworks.spcue.rqd.RqdClientGrpc" destroy-method="shutdown">
<constructor-arg index="0" type="int">
<value>${grpc.rqd_server_port}</value>
</constructor-arg>
Expand Down Expand Up @@ -66,7 +66,7 @@
<property name="queueCapacity" value="500" />
</bean>

<bean id="bookingQueue" class="com.imageworks.spcue.dispatcher.BookingQueue" lazy-init="true" destroy-method="shutdown" depends-on="cueDataSource">
<bean id="bookingQueue" class="com.imageworks.spcue.dispatcher.BookingQueue" lazy-init="true" destroy-method="shutdown" depends-on="cueDataSource,rqdClient">
<constructor-arg index="0" type="int">
<value>${booking_queue.threadpool.health_threshold}</value>
</constructor-arg>
Expand All @@ -85,7 +85,7 @@
<property name="shutdownDrainMs" value="${healthy_threadpool.shutdown_drain_ms:60000}"/>
</bean>

<bean id="dispatchQueue" class="com.imageworks.spcue.dispatcher.DispatchQueue" destroy-method="shutdown" depends-on="cueDataSource">
<bean id="dispatchQueue" class="com.imageworks.spcue.dispatcher.DispatchQueue" destroy-method="shutdown" depends-on="cueDataSource,rqdClient">
<constructor-arg index="0" type="java.lang.String">
<value>DispatchQueue</value>
</constructor-arg>
Expand All @@ -107,7 +107,7 @@
<property name="shutdownDrainMs" value="${healthy_threadpool.shutdown_drain_ms:60000}"/>
</bean>

<bean id="manageQueue" class="com.imageworks.spcue.dispatcher.DispatchQueue" destroy-method="shutdown" depends-on="cueDataSource">
<bean id="manageQueue" class="com.imageworks.spcue.dispatcher.DispatchQueue" destroy-method="shutdown" depends-on="cueDataSource,rqdClient">
<constructor-arg index="0" type="java.lang.String">
<value>ManageQueue</value>
</constructor-arg>
Expand All @@ -128,7 +128,7 @@
</constructor-arg>
<property name="shutdownDrainMs" value="${healthy_threadpool.shutdown_drain_ms:60000}"/>
</bean>
<bean id="reportQueue" class="com.imageworks.spcue.dispatcher.HostReportQueue" destroy-method="shutdown" depends-on="cueDataSource">
<bean id="reportQueue" class="com.imageworks.spcue.dispatcher.HostReportQueue" destroy-method="shutdown" depends-on="cueDataSource,rqdClient">
<constructor-arg index="0" type="int">
<value>${report_queue.threadPoolSizeInitial}</value>
</constructor-arg>
Expand All @@ -140,7 +140,7 @@
</constructor-arg>
<property name="shutdownDrainMs" value="${host_report_queue.shutdown_drain_ms:60000}"/>
</bean>
<bean id="killQueue" class="com.imageworks.spcue.dispatcher.HostReportQueue" destroy-method="shutdown" depends-on="cueDataSource">
<bean id="killQueue" class="com.imageworks.spcue.dispatcher.HostReportQueue" destroy-method="shutdown" depends-on="cueDataSource,rqdClient">
<constructor-arg index="0" type="int">
<value>${kill_queue.threadPoolSizeInitial}</value>
</constructor-arg>
Expand Down
Loading