diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java index cc15d5ccd1..831be95079 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java @@ -728,7 +728,13 @@ void testListRebalanceProgress() throws Exception { tableName)); long tableId = admin.getTableInfo(TablePath.of(DEFAULT_DB, tableName)).get().getTableId(); - FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId); + // Use 90 second timeout to handle high load with PERMANENT_OFFLINE server + FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId, Duration.ofSeconds(90)); + + // Add delay between table creations to reduce coordinator lock contention + if (i < 9) { // Don't delay after last table + Thread.sleep(100); + } } // remove tag after crated table. diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java index b63138ea8c..7362ab3e05 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java @@ -529,11 +529,16 @@ public void waitUntilAllGatewayHasSameMetadata() { } } - /** Wait until all the table assignments buckets are ready for table. */ - public void waitUntilTableReady(long tableId) { + /** + * Wait until all replicas in table are ready with configurable timeout. + * + * @param tableId the table id to wait for + * @param timeout the maximum time to wait + */ + public void waitUntilTableReady(long tableId, Duration timeout) { ZooKeeperClient zkClient = getZooKeeperClient(); retry( - Duration.ofMinutes(1), + timeout, () -> { Optional tableAssignmentOpt = zkClient.getTableAssignment(tableId); @@ -542,6 +547,15 @@ public void waitUntilTableReady(long tableId) { }); } + /** + * Wait until all replicas in table are ready with default 1 minute timeout. + * + * @param tableId the table id to wait for + */ + public void waitUntilTableReady(long tableId) { + waitUntilTableReady(tableId, Duration.ofMinutes(1)); + } + /** * Wait until all authorization are synchronized to all tablet servers. * @@ -576,10 +590,17 @@ public void waitUntilAuthenticationSync(Collection aclBindings, bool }); } - public void waitUntilTablePartitionReady(long tableId, long partitionId) { + /** + * Wait until all replicas in partition are ready with configurable timeout. + * + * @param tableId the table id + * @param partitionId the partition id + * @param timeout the maximum time to wait + */ + public void waitUntilTablePartitionReady(long tableId, long partitionId, Duration timeout) { ZooKeeperClient zkClient = getZooKeeperClient(); retry( - Duration.ofMinutes(1), + timeout, () -> { Optional partitionAssignmentOpt = zkClient.getPartitionAssignment(partitionId); @@ -589,6 +610,16 @@ public void waitUntilTablePartitionReady(long tableId, long partitionId) { }); } + /** + * Wait until all replicas in partition are ready with default 1 minute timeout. + * + * @param tableId the table id + * @param partitionId the partition id + */ + public void waitUntilTablePartitionReady(long tableId, long partitionId) { + waitUntilTablePartitionReady(tableId, partitionId, Duration.ofMinutes(1)); + } + private void waitReplicaInAssignmentReady( ZooKeeperClient zkClient, TableAssignment tableAssignment,