From 98e34dab352050d9b2e4eca8e0073a22ce366663 Mon Sep 17 00:00:00 2001 From: hemanthsavasere Date: Thu, 15 Jan 2026 20:40:40 +0000 Subject: [PATCH 1/2] Fix UnstabletestListRebalanceProgress --- .../flink/procedure/FlinkProcedureITCase.java | 13 +++++- .../testutils/FlussClusterExtension.java | 41 ++++++++++++++++--- 2 files changed, 48 insertions(+), 6 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java index cc15d5ccd1..aaef8d399c 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java @@ -719,6 +719,11 @@ void testListRebalanceProgress() throws Exception { } // first create some unbalance assignment table. + // Note: We create 10 tables with server 3 marked PERMANENT_OFFLINE, concentrating + // all replicas on servers 0, 1, 2. This creates high coordinator load as all + // NotifyLeaderAndIsr RPCs serialize behind replicaStateChangeLock. We add delays + // between table creations to reduce lock contention and use a longer timeout to + // handle legitimate slowness during KV snapshot downloads and log recovery. for (int i = 0; i < 10; i++) { String tableName = "reblance_test_tab_" + i; tEnv.executeSql( @@ -728,7 +733,13 @@ void testListRebalanceProgress() throws Exception { tableName)); long tableId = admin.getTableInfo(TablePath.of(DEFAULT_DB, tableName)).get().getTableId(); - FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId); + // Use 90 second timeout to handle high load with PERMANENT_OFFLINE server + FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId, Duration.ofSeconds(90)); + + // Add delay between table creations to reduce coordinator lock contention + if (i < 9) { // Don't delay after last table + Thread.sleep(100); + } } // remove tag after crated table. diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java index b63138ea8c..7362ab3e05 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java @@ -529,11 +529,16 @@ public void waitUntilAllGatewayHasSameMetadata() { } } - /** Wait until all the table assignments buckets are ready for table. */ - public void waitUntilTableReady(long tableId) { + /** + * Wait until all replicas in table are ready with configurable timeout. + * + * @param tableId the table id to wait for + * @param timeout the maximum time to wait + */ + public void waitUntilTableReady(long tableId, Duration timeout) { ZooKeeperClient zkClient = getZooKeeperClient(); retry( - Duration.ofMinutes(1), + timeout, () -> { Optional tableAssignmentOpt = zkClient.getTableAssignment(tableId); @@ -542,6 +547,15 @@ public void waitUntilTableReady(long tableId) { }); } + /** + * Wait until all replicas in table are ready with default 1 minute timeout. + * + * @param tableId the table id to wait for + */ + public void waitUntilTableReady(long tableId) { + waitUntilTableReady(tableId, Duration.ofMinutes(1)); + } + /** * Wait until all authorization are synchronized to all tablet servers. * @@ -576,10 +590,17 @@ public void waitUntilAuthenticationSync(Collection aclBindings, bool }); } - public void waitUntilTablePartitionReady(long tableId, long partitionId) { + /** + * Wait until all replicas in partition are ready with configurable timeout. + * + * @param tableId the table id + * @param partitionId the partition id + * @param timeout the maximum time to wait + */ + public void waitUntilTablePartitionReady(long tableId, long partitionId, Duration timeout) { ZooKeeperClient zkClient = getZooKeeperClient(); retry( - Duration.ofMinutes(1), + timeout, () -> { Optional partitionAssignmentOpt = zkClient.getPartitionAssignment(partitionId); @@ -589,6 +610,16 @@ public void waitUntilTablePartitionReady(long tableId, long partitionId) { }); } + /** + * Wait until all replicas in partition are ready with default 1 minute timeout. + * + * @param tableId the table id + * @param partitionId the partition id + */ + public void waitUntilTablePartitionReady(long tableId, long partitionId) { + waitUntilTablePartitionReady(tableId, partitionId, Duration.ofMinutes(1)); + } + private void waitReplicaInAssignmentReady( ZooKeeperClient zkClient, TableAssignment tableAssignment, From e36b2bfaa2ea564b5bd7aa6e314bcc698bcd567a Mon Sep 17 00:00:00 2001 From: hemanthsavasere Date: Thu, 15 Jan 2026 20:55:42 +0000 Subject: [PATCH 2/2] Removing non necessary comments --- .../apache/fluss/flink/procedure/FlinkProcedureITCase.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java index aaef8d399c..831be95079 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java @@ -719,11 +719,6 @@ void testListRebalanceProgress() throws Exception { } // first create some unbalance assignment table. - // Note: We create 10 tables with server 3 marked PERMANENT_OFFLINE, concentrating - // all replicas on servers 0, 1, 2. This creates high coordinator load as all - // NotifyLeaderAndIsr RPCs serialize behind replicaStateChangeLock. We add delays - // between table creations to reduce lock contention and use a longer timeout to - // handle legitimate slowness during KV snapshot downloads and log recovery. for (int i = 0; i < 10; i++) { String tableName = "reblance_test_tab_" + i; tEnv.executeSql(