Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,13 @@ void testListRebalanceProgress() throws Exception {
tableName));
long tableId =
admin.getTableInfo(TablePath.of(DEFAULT_DB, tableName)).get().getTableId();
FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
// Use 90 second timeout to handle high load with PERMANENT_OFFLINE server
FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId, Duration.ofSeconds(90));

// Add delay between table creations to reduce coordinator lock contention
if (i < 9) { // Don't delay after last table
Thread.sleep(100);
}
}

// remove tag after crated table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,11 +529,16 @@ public void waitUntilAllGatewayHasSameMetadata() {
}
}

/** Wait until all the table assignments buckets are ready for table. */
public void waitUntilTableReady(long tableId) {
/**
* Wait until all replicas in table are ready with configurable timeout.
*
* @param tableId the table id to wait for
* @param timeout the maximum time to wait
*/
public void waitUntilTableReady(long tableId, Duration timeout) {
ZooKeeperClient zkClient = getZooKeeperClient();
retry(
Duration.ofMinutes(1),
timeout,
() -> {
Optional<TableAssignment> tableAssignmentOpt =
zkClient.getTableAssignment(tableId);
Expand All @@ -542,6 +547,15 @@ public void waitUntilTableReady(long tableId) {
});
}

/**
* Wait until all replicas in table are ready with default 1 minute timeout.
*
* @param tableId the table id to wait for
*/
public void waitUntilTableReady(long tableId) {
waitUntilTableReady(tableId, Duration.ofMinutes(1));
}

/**
* Wait until all authorization are synchronized to all tablet servers.
*
Expand Down Expand Up @@ -576,10 +590,17 @@ public void waitUntilAuthenticationSync(Collection<AclBinding> aclBindings, bool
});
}

public void waitUntilTablePartitionReady(long tableId, long partitionId) {
/**
* Wait until all replicas in partition are ready with configurable timeout.
*
* @param tableId the table id
* @param partitionId the partition id
* @param timeout the maximum time to wait
*/
public void waitUntilTablePartitionReady(long tableId, long partitionId, Duration timeout) {
ZooKeeperClient zkClient = getZooKeeperClient();
retry(
Duration.ofMinutes(1),
timeout,
() -> {
Optional<PartitionAssignment> partitionAssignmentOpt =
zkClient.getPartitionAssignment(partitionId);
Expand All @@ -589,6 +610,16 @@ public void waitUntilTablePartitionReady(long tableId, long partitionId) {
});
}

/**
* Wait until all replicas in partition are ready with default 1 minute timeout.
*
* @param tableId the table id
* @param partitionId the partition id
*/
public void waitUntilTablePartitionReady(long tableId, long partitionId) {
waitUntilTablePartitionReady(tableId, partitionId, Duration.ofMinutes(1));
}

private void waitReplicaInAssignmentReady(
ZooKeeperClient zkClient,
TableAssignment tableAssignment,
Expand Down