From b44c75fbe8b7e94b7af3dc55e7d1c80f4724ddfb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E9=B5=BA?= Date: Wed, 25 Mar 2026 18:07:25 +0800 Subject: [PATCH] [hotfix] Fix flaky TableChangeWatcherTest by awaiting CuratorCache init TableChangeWatcherTest.testSchemaChanges intermittently fails because tables are created before CuratorCache completes its initial sync. During initial sync, NODE_CHANGED events from the tree scan can race with table creation, causing processCreateTable() to observe incomplete ZK state and silently drop the CreateTableEvent. Add awaitInitialized() to TableChangeWatcher using CuratorCacheListener .initialized() callback, and wait for it in beforeEach before creating tables. Clear any stale events from initial sync afterward. --- .../event/watcher/TableChangeWatcher.java | 18 ++++++++++++++++++ .../event/watcher/TableChangeWatcherTest.java | 11 ++++++++++- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java index 51e934c2a4..1de38ba9e2 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java @@ -48,6 +48,8 @@ import org.slf4j.LoggerFactory; import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; /** A watcher to watch the table changes(create/delete) in zookeeper. */ public class TableChangeWatcher { @@ -59,6 +61,7 @@ public class TableChangeWatcher { private final EventManager eventManager; private final ZooKeeperClient zooKeeperClient; + private final CountDownLatch initializedLatch = new CountDownLatch(1); public TableChangeWatcher(ZooKeeperClient zooKeeperClient, EventManager eventManager) { this.zooKeeperClient = zooKeeperClient; @@ -73,6 +76,15 @@ public void start() { curatorCache.start(); } + /** + * Waits until the CuratorCache has completed its initial sync with ZooKeeper. This should be + * called after {@link #start()} to ensure the cache is fully warmed up before relying on change + * events. + */ + public boolean awaitInitialized(long timeout, TimeUnit unit) throws InterruptedException { + return initializedLatch.await(timeout, unit); + } + public void stop() { if (!running) { return; @@ -85,6 +97,12 @@ public void stop() { /** A listener to monitor the changes of table nodes in zookeeper. */ private final class TablePathChangeListener implements CuratorCacheListener { + @Override + public void initialized() { + LOG.info("CuratorCache initial sync completed for TableChangeWatcher."); + initializedLatch.countDown(); + } + @Override public void event(Type type, ChildData oldData, ChildData newData) { if (newData != null) { diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java index bb0cb8a7bf..6c2a228d90 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java @@ -99,7 +99,7 @@ static void beforeAll() { } @BeforeEach - void before() { + void before() throws Exception { // Clean up ZK state from previous tests to prevent CuratorCache initial sync // from picking up leftover data try { @@ -111,6 +111,15 @@ void before() { eventManager = new TestingEventManager(); tableChangeWatcher = new TableChangeWatcher(zookeeperClient, eventManager); tableChangeWatcher.start(); + // Wait for CuratorCache to complete its initial sync before creating tables. + // Without this, the cache may fire NODE_CHANGED events from the initial tree + // scan that race with table creation, causing processCreateTable() to read + // stale or incomplete ZK state. + assertThat(tableChangeWatcher.awaitInitialized(30, java.util.concurrent.TimeUnit.SECONDS)) + .as("CuratorCache should complete initial sync within timeout") + .isTrue(); + // Clear any events generated during initial sync (e.g., from leftover ZK nodes) + eventManager.clearEvents(); } @AfterEach