diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java index 51e934c2a4..1de38ba9e2 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java @@ -48,6 +48,8 @@ import org.slf4j.LoggerFactory; import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; /** A watcher to watch the table changes(create/delete) in zookeeper. */ public class TableChangeWatcher { @@ -59,6 +61,7 @@ public class TableChangeWatcher { private final EventManager eventManager; private final ZooKeeperClient zooKeeperClient; + private final CountDownLatch initializedLatch = new CountDownLatch(1); public TableChangeWatcher(ZooKeeperClient zooKeeperClient, EventManager eventManager) { this.zooKeeperClient = zooKeeperClient; @@ -73,6 +76,15 @@ public void start() { curatorCache.start(); } + /** + * Waits until the CuratorCache has completed its initial sync with ZooKeeper. This should be + * called after {@link #start()} to ensure the cache is fully warmed up before relying on change + * events. + */ + public boolean awaitInitialized(long timeout, TimeUnit unit) throws InterruptedException { + return initializedLatch.await(timeout, unit); + } + public void stop() { if (!running) { return; @@ -85,6 +97,12 @@ public void stop() { /** A listener to monitor the changes of table nodes in zookeeper. */ private final class TablePathChangeListener implements CuratorCacheListener { + @Override + public void initialized() { + LOG.info("CuratorCache initial sync completed for TableChangeWatcher."); + initializedLatch.countDown(); + } + @Override public void event(Type type, ChildData oldData, ChildData newData) { if (newData != null) { diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java index bb0cb8a7bf..6c2a228d90 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java @@ -99,7 +99,7 @@ static void beforeAll() { } @BeforeEach - void before() { + void before() throws Exception { // Clean up ZK state from previous tests to prevent CuratorCache initial sync // from picking up leftover data try { @@ -111,6 +111,15 @@ void before() { eventManager = new TestingEventManager(); tableChangeWatcher = new TableChangeWatcher(zookeeperClient, eventManager); tableChangeWatcher.start(); + // Wait for CuratorCache to complete its initial sync before creating tables. + // Without this, the cache may fire NODE_CHANGED events from the initial tree + // scan that race with table creation, causing processCreateTable() to read + // stale or incomplete ZK state. + assertThat(tableChangeWatcher.awaitInitialized(30, java.util.concurrent.TimeUnit.SECONDS)) + .as("CuratorCache should complete initial sync within timeout") + .isTrue(); + // Clear any events generated during initial sync (e.g., from leftover ZK nodes) + eventManager.clearEvents(); } @AfterEach