diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java index 87f0e9f17..33f23f7fb 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java @@ -509,6 +509,9 @@ public Builder schemaSet(SchemaSet schemaSet) { * By default, an executor is allocated internally using the provided (or default) * {@link #threadFactory(java.util.concurrent.ThreadFactory)}. Use this method * to set a custom executor. + * Whenever a custom executor is set, it is the caller's responsibility to close the + * executor after the CuratorFramework closure. + * The internally created executor is closed when CuratorFramework is closed. * * @param runSafeService executor to use for calls to notifyAll from Watcher callbacks etc * @return this diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index ae4ba3a81..a9bcb53f7 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -26,6 +26,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; @@ -100,6 +101,7 @@ public final class CuratorFrameworkImpl extends CuratorFrameworkBase { private final EnsembleTracker ensembleTracker; private final SchemaSet schemaSet; private final Executor runSafeService; + private boolean isExternalRunSafeService = false; private final ZookeeperCompatibility zookeeperCompatibility; private volatile ExecutorService executorService; @@ -194,6 +196,7 @@ public void process(WatchedEvent watchedEvent) { private Executor makeRunSafeService(CuratorFrameworkFactory.Builder builder) { if (builder.getRunSafeService() != null) { + isExternalRunSafeService = true; return builder.getRunSafeService(); } ThreadFactory threadFactory = builder.getThreadFactory(); @@ -373,16 +376,19 @@ public void close() { } }); + Optional> executorServiceClosure = Optional.empty(); if (executorService != null) { - executorService.shutdownNow(); - try { - executorService.awaitTermination(maxCloseWaitMs, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - // Interrupted while interrupting; I give up. - Thread.currentThread().interrupt(); - } + executorServiceClosure = Optional.of(shutdownAndAwaitTerminationAsync(executorService)); } + Optional> runSafeServiceClosure = Optional.empty(); + if (!isExternalRunSafeService && runSafeService != null) { + runSafeServiceClosure = + Optional.of(shutdownAndAwaitTerminationAsync(((ExecutorService) runSafeService))); + } + executorServiceClosure.ifPresent(CompletableFuture::join); + runSafeServiceClosure.ifPresent(CompletableFuture::join); + if (ensembleTracker != null) { ensembleTracker.close(); } @@ -400,6 +406,26 @@ public void close() { } } + /** + * Utility method to run the executor service shutdown in a background thread. + * This is in order to ensure we don't extend the wait time above maxCloseWaitMs by waiting on multiple + * executors to terminate. + * + * @param service the ExecutorService to shut down. + * @return the future represents the job closing the executor service and waits on its termination. + */ + private CompletableFuture shutdownAndAwaitTerminationAsync(final ExecutorService service) { + return CompletableFuture.runAsync(() -> { + service.shutdownNow(); + try { + service.awaitTermination(maxCloseWaitMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // Interrupted while interrupting; I give up. + Thread.currentThread().interrupt(); + } + }); + } + NamespaceImpl getNamespaceImpl() { return namespace; } diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java index 0d0da50e3..8035ecd97 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java @@ -25,13 +25,17 @@ import com.google.common.collect.Lists; import com.google.common.collect.Queues; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.ACLProvider; @@ -45,6 +49,7 @@ import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.Timing; import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.utils.ThreadUtils; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.data.ACL; import org.junit.jupiter.api.Test; @@ -306,4 +311,49 @@ public void listen(OperationAndData data) { CloseableUtils.closeQuietly(client); } } + + @Test + public void testCloseShutsDownInternalRunSafeService() { + Timing timing = new Timing(); + CuratorFramework client = CuratorFrameworkFactory.newClient( + server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + client.start(); + client.runSafe(() -> {}); + assertTrue(enumerateThreads().anyMatch(t -> t.getName().contains("SafeNotifyService"))); + + client.close(); + + assertTrue(enumerateThreads().noneMatch(t -> t.getName().contains("SafeNotifyService"))); + } + + @Test + public void testCloseLeavesExternalRunSafeServiceRunning() throws Exception { + Timing timing = new Timing(); + ExecutorService externalRunner = + Executors.newSingleThreadScheduledExecutor(ThreadUtils.newThreadFactory("ExternalSafeNotifyService")); + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .sessionTimeoutMs(timing.session()) + .connectionTimeoutMs(timing.connection()) + .retryPolicy(new RetryOneTime(1)) + .maxCloseWaitMs(timing.forWaiting().milliseconds()) + .runSafeService(externalRunner) + .build(); + client.start(); + client.runSafe(() -> {}); + assertTrue(enumerateThreads().anyMatch(t -> t.getName().contains("ExternalSafeNotifyService"))); + + client.close(); + + assertTrue(enumerateThreads().anyMatch(t -> t.getName().contains("ExternalSafeNotifyService"))); + + externalRunner.shutdownNow(); + assertTrue(externalRunner.awaitTermination(10, TimeUnit.SECONDS)); + } + + private static Stream enumerateThreads() { + Thread[] threads = new Thread[Thread.activeCount()]; + Thread.enumerate(threads); + return Arrays.stream(threads); + } }