Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.slf4j.LoggerFactory;

import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/** A watcher to watch the table changes(create/delete) in zookeeper. */
public class TableChangeWatcher {
Expand All @@ -59,6 +61,7 @@ public class TableChangeWatcher {

private final EventManager eventManager;
private final ZooKeeperClient zooKeeperClient;
private final CountDownLatch initializedLatch = new CountDownLatch(1);

public TableChangeWatcher(ZooKeeperClient zooKeeperClient, EventManager eventManager) {
this.zooKeeperClient = zooKeeperClient;
Expand All @@ -73,6 +76,15 @@ public void start() {
curatorCache.start();
}

/**
* Waits until the CuratorCache has completed its initial sync with ZooKeeper. This should be
* called after {@link #start()} to ensure the cache is fully warmed up before relying on change
* events.
*/
public boolean awaitInitialized(long timeout, TimeUnit unit) throws InterruptedException {
return initializedLatch.await(timeout, unit);
}

public void stop() {
if (!running) {
return;
Expand All @@ -85,6 +97,12 @@ public void stop() {
/** A listener to monitor the changes of table nodes in zookeeper. */
private final class TablePathChangeListener implements CuratorCacheListener {

@Override
public void initialized() {
LOG.info("CuratorCache initial sync completed for TableChangeWatcher.");
initializedLatch.countDown();
}

@Override
public void event(Type type, ChildData oldData, ChildData newData) {
if (newData != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ static void beforeAll() {
}

@BeforeEach
void before() {
void before() throws Exception {
// Clean up ZK state from previous tests to prevent CuratorCache initial sync
// from picking up leftover data
try {
Expand All @@ -111,6 +111,15 @@ void before() {
eventManager = new TestingEventManager();
tableChangeWatcher = new TableChangeWatcher(zookeeperClient, eventManager);
tableChangeWatcher.start();
// Wait for CuratorCache to complete its initial sync before creating tables.
// Without this, the cache may fire NODE_CHANGED events from the initial tree
// scan that race with table creation, causing processCreateTable() to read
// stale or incomplete ZK state.
assertThat(tableChangeWatcher.awaitInitialized(30, java.util.concurrent.TimeUnit.SECONDS))
.as("CuratorCache should complete initial sync within timeout")
.isTrue();
// Clear any events generated during initial sync (e.g., from leftover ZK nodes)
eventManager.clearEvents();
}

@AfterEach
Expand Down