From 8d3113755e0a0ee94a541aa973d633d0208ec29a Mon Sep 17 00:00:00 2001 From: Istvan Fajth Date: Mon, 8 Sep 2025 02:19:17 +0200 Subject: [PATCH 1/2] GH-1277 SafeNotifyService threads leak in CuratorFrameWorkImpl CURATOR-495 introduced a new runSafeService field in CuratorFrameworkImpl class, and this field is either initialized by an external ExecutorService via the builder, or it is created internally within the class. In the CuratorFrameworkImpl#close method though, this Executor is never closed, so the threads that are opened by the instances are lingering there until the VM is closed by default. Worse, if someone specifies a thread factory to the framework implementation via the builder that produces non-daemon threads, the VM never exits due to the unstopped single thread executor. --- .../framework/imps/CuratorFrameworkImpl.java | 12 +++++ .../imps/TestFrameworkBackground.java | 50 +++++++++++++++++++ 2 files changed, 62 insertions(+) diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index ae4ba3a81..f1d8032f7 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -100,6 +100,7 @@ public final class CuratorFrameworkImpl extends CuratorFrameworkBase { private final EnsembleTracker ensembleTracker; private final SchemaSet schemaSet; private final Executor runSafeService; + private boolean isExternalRunSafeService = false; private final ZookeeperCompatibility zookeeperCompatibility; private volatile ExecutorService executorService; @@ -194,6 +195,7 @@ public void process(WatchedEvent watchedEvent) { private Executor makeRunSafeService(CuratorFrameworkFactory.Builder builder) { if (builder.getRunSafeService() != null) { + isExternalRunSafeService = true; return builder.getRunSafeService(); } ThreadFactory threadFactory = builder.getThreadFactory(); @@ -383,6 +385,16 @@ public void close() { } } + if (!isExternalRunSafeService) { + ((ExecutorService) runSafeService).shutdownNow(); + try { + ((ExecutorService) runSafeService).awaitTermination(maxCloseWaitMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // Interrupted while interrupting; I give up. + Thread.currentThread().interrupt(); + } + } + if (ensembleTracker != null) { ensembleTracker.close(); } diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java index 0d0da50e3..8035ecd97 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java @@ -25,13 +25,17 @@ import com.google.common.collect.Lists; import com.google.common.collect.Queues; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.ACLProvider; @@ -45,6 +49,7 @@ import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.Timing; import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.utils.ThreadUtils; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.data.ACL; import org.junit.jupiter.api.Test; @@ -306,4 +311,49 @@ public void listen(OperationAndData data) { CloseableUtils.closeQuietly(client); } } + + @Test + public void testCloseShutsDownInternalRunSafeService() { + Timing timing = new Timing(); + CuratorFramework client = CuratorFrameworkFactory.newClient( + server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + client.start(); + client.runSafe(() -> {}); + assertTrue(enumerateThreads().anyMatch(t -> t.getName().contains("SafeNotifyService"))); + + client.close(); + + assertTrue(enumerateThreads().noneMatch(t -> t.getName().contains("SafeNotifyService"))); + } + + @Test + public void testCloseLeavesExternalRunSafeServiceRunning() throws Exception { + Timing timing = new Timing(); + ExecutorService externalRunner = + Executors.newSingleThreadScheduledExecutor(ThreadUtils.newThreadFactory("ExternalSafeNotifyService")); + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .sessionTimeoutMs(timing.session()) + .connectionTimeoutMs(timing.connection()) + .retryPolicy(new RetryOneTime(1)) + .maxCloseWaitMs(timing.forWaiting().milliseconds()) + .runSafeService(externalRunner) + .build(); + client.start(); + client.runSafe(() -> {}); + assertTrue(enumerateThreads().anyMatch(t -> t.getName().contains("ExternalSafeNotifyService"))); + + client.close(); + + assertTrue(enumerateThreads().anyMatch(t -> t.getName().contains("ExternalSafeNotifyService"))); + + externalRunner.shutdownNow(); + assertTrue(externalRunner.awaitTermination(10, TimeUnit.SECONDS)); + } + + private static Stream enumerateThreads() { + Thread[] threads = new Thread[Thread.activeCount()]; + Thread.enumerate(threads); + return Arrays.stream(threads); + } } From d158c08db883e41e5fca6174949b839e54d5699e Mon Sep 17 00:00:00 2001 From: Istvan Fajth Date: Tue, 9 Sep 2025 16:34:58 +0200 Subject: [PATCH 2/2] Addressing review comments. --- .../framework/CuratorFrameworkFactory.java | 3 ++ .../framework/imps/CuratorFrameworkImpl.java | 44 ++++++++++++------- 2 files changed, 32 insertions(+), 15 deletions(-) diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java index 87f0e9f17..33f23f7fb 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java @@ -509,6 +509,9 @@ public Builder schemaSet(SchemaSet schemaSet) { * By default, an executor is allocated internally using the provided (or default) * {@link #threadFactory(java.util.concurrent.ThreadFactory)}. Use this method * to set a custom executor. + * Whenever a custom executor is set, it is the caller's responsibility to close the + * executor after the CuratorFramework closure. + * The internally created executor is closed when CuratorFramework is closed. * * @param runSafeService executor to use for calls to notifyAll from Watcher callbacks etc * @return this diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index f1d8032f7..a9bcb53f7 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -26,6 +26,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; @@ -375,25 +376,18 @@ public void close() { } }); + Optional> executorServiceClosure = Optional.empty(); if (executorService != null) { - executorService.shutdownNow(); - try { - executorService.awaitTermination(maxCloseWaitMs, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - // Interrupted while interrupting; I give up. - Thread.currentThread().interrupt(); - } + executorServiceClosure = Optional.of(shutdownAndAwaitTerminationAsync(executorService)); } - if (!isExternalRunSafeService) { - ((ExecutorService) runSafeService).shutdownNow(); - try { - ((ExecutorService) runSafeService).awaitTermination(maxCloseWaitMs, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - // Interrupted while interrupting; I give up. - Thread.currentThread().interrupt(); - } + Optional> runSafeServiceClosure = Optional.empty(); + if (!isExternalRunSafeService && runSafeService != null) { + runSafeServiceClosure = + Optional.of(shutdownAndAwaitTerminationAsync(((ExecutorService) runSafeService))); } + executorServiceClosure.ifPresent(CompletableFuture::join); + runSafeServiceClosure.ifPresent(CompletableFuture::join); if (ensembleTracker != null) { ensembleTracker.close(); @@ -412,6 +406,26 @@ public void close() { } } + /** + * Utility method to run the executor service shutdown in a background thread. + * This is in order to ensure we don't extend the wait time above maxCloseWaitMs by waiting on multiple + * executors to terminate. + * + * @param service the ExecutorService to shut down. + * @return the future represents the job closing the executor service and waits on its termination. + */ + private CompletableFuture shutdownAndAwaitTerminationAsync(final ExecutorService service) { + return CompletableFuture.runAsync(() -> { + service.shutdownNow(); + try { + service.awaitTermination(maxCloseWaitMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // Interrupted while interrupting; I give up. + Thread.currentThread().interrupt(); + } + }); + } + NamespaceImpl getNamespaceImpl() { return namespace; }