|
| 1 | +--- |
| 2 | +title: Rebalance |
| 3 | +sidebar_position: 2 |
| 4 | +--- |
| 5 | +# Rebalance |
| 6 | + |
| 7 | +## Overview |
| 8 | + |
| 9 | +Fluss provides cluster rebalancing capabilities to redistribute buckets across TabletServers based on workload and cluster topology changes. Rebalancing helps maintain optimal resource utilization and ensures balanced load distribution across the cluster. |
| 10 | + |
| 11 | +Rebalancing primarily occurs in the following scenarios: |
| 12 | +- **Scaling Down**: Offline existing TabletServers from the cluster |
| 13 | +- **Scaling Up**: Adding new TabletServers to the cluster |
| 14 | +- **Load Balancing**: Routine adjustments for load imbalance across TabletServers |
| 15 | + |
| 16 | +## When to Rebalance |
| 17 | + |
| 18 | +You should consider triggering a rebalance operation in these situations: |
| 19 | + |
| 20 | +1. **Server Decommissioning**: Before removing a TabletServer from the cluster permanently |
| 21 | +2. **Server Maintenance**: Before temporarily taking a TabletServer offline for upgrades or maintenance |
| 22 | +3. **Cluster Expansion**: After adding new TabletServers to distribute load to the new nodes |
| 23 | +4. **Load Imbalance**: When monitoring shows uneven distribution of replicas or leaders across TabletServers |
| 24 | + |
| 25 | +## Rebalance Workflow |
| 26 | + |
| 27 | +A typical rebalance workflow consists of the following steps: |
| 28 | + |
| 29 | +### 1. Tag Servers (Optional but Recommended) |
| 30 | + |
| 31 | +Before rebalancing, tag TabletServers that need special handling: |
| 32 | + |
| 33 | +```java |
| 34 | +import org.apache.fluss.cluster.rebalance.ServerTag; |
| 35 | + |
| 36 | +// Mark a server for permanent decommissioning |
| 37 | +admin.addServerTag(Collections.singletonList(0), ServerTag.PERMANENT_OFFLINE).get(); |
| 38 | + |
| 39 | +// Mark servers for temporary maintenance |
| 40 | +admin.addServerTag(Arrays.asList(1, 2), ServerTag.TEMPORARY_OFFLINE).get(); |
| 41 | +``` |
| 42 | + |
| 43 | +Server tags help the rebalance algorithm make informed decisions: |
| 44 | +- **PERMANENT_OFFLINE**: All buckets on these servers will be migrated away |
| 45 | +- **TEMPORARY_OFFLINE**: Buckets may be temporarily migrated, with possibility to return after maintenance |
| 46 | + |
| 47 | +### 2. Trigger Rebalance |
| 48 | + |
| 49 | +Initiate a rebalance operation with specified optimization goals: |
| 50 | + |
| 51 | +```java |
| 52 | +import org.apache.fluss.cluster.rebalance.GoalType; |
| 53 | + |
| 54 | +// Trigger rebalance with replica distribution goal |
| 55 | +List<GoalType> goals = Collections.singletonList(GoalType.REPLICA_DISTRIBUTION); |
| 56 | +String rebalanceId = admin.rebalance(goals).get(); |
| 57 | +System.out.println("Rebalance started with ID: " + rebalanceId); |
| 58 | + |
| 59 | +// Trigger rebalance with multiple goals in priority order |
| 60 | +List<GoalType> multipleGoals = Arrays.asList( |
| 61 | + GoalType.REPLICA_DISTRIBUTION, |
| 62 | + GoalType.LEADER_DISTRIBUTION |
| 63 | +); |
| 64 | +String rebalanceId = admin.rebalance(multipleGoals).get(); |
| 65 | +``` |
| 66 | + |
| 67 | +Available rebalance goals: |
| 68 | +- **REPLICA_DISTRIBUTION**: Ensures the number of replicas on each TabletServer is near balanced |
| 69 | +- **LEADER_DISTRIBUTION**: Ensures the number of leader replicas on each TabletServer is near balanced |
| 70 | + |
| 71 | +### 3. Monitor Progress |
| 72 | + |
| 73 | +Track the rebalance operation using the returned rebalance ID: |
| 74 | + |
| 75 | +```java |
| 76 | +import org.apache.fluss.cluster.rebalance.RebalanceProgress; |
| 77 | +import org.apache.fluss.cluster.rebalance.RebalanceStatus; |
| 78 | + |
| 79 | +// Query specific rebalance progress |
| 80 | +Optional<RebalanceProgress> progress = admin.listRebalanceProgress(rebalanceId).get(); |
| 81 | + |
| 82 | +if (progress.isPresent()) { |
| 83 | + RebalanceProgress p = progress.get(); |
| 84 | + System.out.println("Rebalance ID: " + p.rebalanceId()); |
| 85 | + System.out.println("Status: " + p.status()); |
| 86 | + System.out.println("Progress: " + (p.progress() * 100) + "%"); |
| 87 | + |
| 88 | + // Check if rebalance is complete |
| 89 | + if (p.status() == RebalanceStatus.COMPLETED) { |
| 90 | + System.out.println("Rebalance completed successfully!"); |
| 91 | + } |
| 92 | +} |
| 93 | + |
| 94 | +// Query the most recent rebalance progress (if rebalanceId is not provided) |
| 95 | +Optional<RebalanceProgress> latestProgress = admin.listRebalanceProgress(null).get(); |
| 96 | +``` |
| 97 | + |
| 98 | +Rebalance statuses: |
| 99 | +- **NOT_STARTED**: The rebalance has been created but not yet started |
| 100 | +- **REBALANCING**: The rebalance is currently in progress |
| 101 | +- **COMPLETED**: The rebalance has successfully completed |
| 102 | +- **FAILED**: The rebalance has failed |
| 103 | +- **CANCELED**: The rebalance has been canceled |
| 104 | + |
| 105 | +### 4. Cancel Rebalance (If Needed) |
| 106 | + |
| 107 | +Cancel an ongoing rebalance operation if necessary: |
| 108 | + |
| 109 | +```java |
| 110 | +// Cancel a specific rebalance |
| 111 | +admin.cancelRebalance(rebalanceId).get(); |
| 112 | + |
| 113 | +// Cancel the most recent rebalance |
| 114 | +admin.cancelRebalance(null).get(); |
| 115 | +``` |
| 116 | + |
| 117 | +**Important Notes:** |
| 118 | +- Only rebalance operations in `NOT_STARTED` or `REBALANCING` status can be canceled |
| 119 | +- Already completed bucket migrations will not be rolled back |
| 120 | +- After cancellation, the rebalance status will change to `CANCELED` |
| 121 | + |
| 122 | +### 5. Remove Server Tags (After Completion) |
| 123 | + |
| 124 | +After rebalance completes and maintenance is done, remove server tags to restore normal operation: |
| 125 | + |
| 126 | +```java |
| 127 | +// Remove tags from servers that are back online |
| 128 | +admin.removeServerTag(Arrays.asList(1, 2), ServerTag.TEMPORARY_OFFLINE).get(); |
| 129 | +``` |
| 130 | + |
| 131 | +## Using Java Client |
| 132 | + |
| 133 | +Here is a complete example demonstrating the rebalance workflow using the Java Client: |
| 134 | + |
| 135 | +```java |
| 136 | +import org.apache.fluss.client.admin.Admin; |
| 137 | +import org.apache.fluss.cluster.rebalance.GoalType; |
| 138 | +import org.apache.fluss.cluster.rebalance.RebalanceProgress; |
| 139 | +import org.apache.fluss.cluster.rebalance.RebalanceStatus; |
| 140 | +import org.apache.fluss.cluster.rebalance.ServerTag; |
| 141 | + |
| 142 | +import java.util.Arrays; |
| 143 | +import java.util.Collections; |
| 144 | +import java.util.List; |
| 145 | +import java.util.Optional; |
| 146 | + |
| 147 | +public class RebalanceExample { |
| 148 | + public static void main(String[] args) throws Exception { |
| 149 | + Admin admin = // ... create admin client |
| 150 | + |
| 151 | + try { |
| 152 | + // Step 1: Tag servers for decommissioning |
| 153 | + System.out.println("Tagging server 0 for permanent offline..."); |
| 154 | + admin.addServerTag(Collections.singletonList(0), ServerTag.PERMANENT_OFFLINE).get(); |
| 155 | + |
| 156 | + // Step 2: Trigger rebalance |
| 157 | + System.out.println("Triggering rebalance..."); |
| 158 | + List<GoalType> goals = Arrays.asList( |
| 159 | + GoalType.REPLICA_DISTRIBUTION, |
| 160 | + GoalType.LEADER_DISTRIBUTION |
| 161 | + ); |
| 162 | + String rebalanceId = admin.rebalance(goals).get(); |
| 163 | + System.out.println("Rebalance started with ID: " + rebalanceId); |
| 164 | + |
| 165 | + // Step 3: Monitor progress |
| 166 | + boolean isComplete = false; |
| 167 | + while (!isComplete) { |
| 168 | + Thread.sleep(5000); // Poll every 5 seconds |
| 169 | + |
| 170 | + Optional<RebalanceProgress> progress = admin.listRebalanceProgress(rebalanceId).get(); |
| 171 | + if (progress.isPresent()) { |
| 172 | + RebalanceProgress p = progress.get(); |
| 173 | + System.out.printf("Status: %s, Progress: %.2f%%\n", |
| 174 | + p.status(), p.progress() * 100); |
| 175 | + |
| 176 | + if (RebalanceStatus.FINAL_STATUSES.contains(p.status())) { |
| 177 | + isComplete = true; |
| 178 | + System.out.println("Rebalance finished with status: " + p.status()); |
| 179 | + } |
| 180 | + } |
| 181 | + } |
| 182 | + |
| 183 | + // Step 4: Verify completion |
| 184 | + Optional<RebalanceProgress> finalProgress = admin.listRebalanceProgress(rebalanceId).get(); |
| 185 | + if (finalProgress.isPresent() && |
| 186 | + finalProgress.get().status() == RebalanceStatus.COMPLETED) { |
| 187 | + System.out.println("Rebalance completed successfully!"); |
| 188 | + } |
| 189 | + |
| 190 | + } catch (Exception e) { |
| 191 | + System.err.println("Rebalance failed: " + e.getMessage()); |
| 192 | + // Optionally cancel the rebalance on error |
| 193 | + // admin.cancelRebalance(rebalanceId).get(); |
| 194 | + } finally { |
| 195 | + admin.close(); |
| 196 | + } |
| 197 | + } |
| 198 | +} |
| 199 | +``` |
| 200 | + |
| 201 | +## Using Flink Stored Procedures |
| 202 | + |
| 203 | +For rebalancing operations, Fluss provides convenient Flink stored procedures that can be called directly from Flink SQL. See [Rebalance Procedures](/docs/engine-flink/procedures.md#rebalance-procedures) for detailed documentation on using the following procedures: |
| 204 | + |
| 205 | +- **add_server_tag**: Tag servers before rebalancing |
| 206 | +- **remove_server_tag**: Remove tags after rebalancing |
| 207 | +- **rebalance**: Trigger rebalance operation |
| 208 | +- **list_rebalance**: Monitor rebalance progress |
| 209 | +- **cancel_rebalance**: Cancel ongoing rebalance |
| 210 | + |
| 211 | +Example using Flink SQL: |
| 212 | + |
| 213 | +```sql |
| 214 | +-- Tag a server for permanent offline |
| 215 | +CALL sys.add_server_tag('0', 'PERMANENT_OFFLINE'); |
| 216 | + |
| 217 | +-- Trigger rebalance |
| 218 | +CALL sys.rebalance('REPLICA_DISTRIBUTION,LEADER_DISTRIBUTION'); |
| 219 | + |
| 220 | +-- Monitor progress |
| 221 | +CALL sys.list_rebalance(); |
| 222 | + |
| 223 | +-- Cancel if needed |
| 224 | +CALL sys.cancel_rebalance(); |
| 225 | +``` |
| 226 | + |
| 227 | +## Best Practices |
| 228 | + |
| 229 | +1. **Plan Ahead**: Tag servers appropriately before triggering rebalance to guide the algorithm |
| 230 | +2. **Monitor Progress**: Regularly check rebalance status to ensure smooth operation |
| 231 | +3. **Off-Peak Hours**: Schedule rebalance operations during off-peak hours to minimize impact |
| 232 | +4. **Single Rebalance**: Fluss supports only one active rebalance task at a time in the cluster |
| 233 | +5. **Backup First**: For production environments, ensure data is backed up before major topology changes |
| 234 | +6. **Goal Priority**: Order rebalance goals by priority - the system attempts to achieve them in order |
| 235 | +7. **Server Tags**: Use `TEMPORARY_OFFLINE` for maintenance scenarios to allow buckets to return after maintenance |
| 236 | + |
| 237 | +## Troubleshooting |
| 238 | + |
| 239 | +### Rebalance Fails to Start |
| 240 | + |
| 241 | +If rebalance fails to start: |
| 242 | +- Check if another rebalance is already in progress |
| 243 | +- Verify that all TabletServers are healthy and reachable |
| 244 | +- Ensure the specified server IDs exist in the cluster |
| 245 | + |
| 246 | +### Rebalance Takes Too Long |
| 247 | + |
| 248 | +If rebalance is taking longer than expected: |
| 249 | +- Check network bandwidth between TabletServers |
| 250 | +- Verify disk I/O performance on TabletServers |
| 251 | +- Monitor cluster load and resource utilization |
| 252 | +- Consider canceling and retrying with fewer goals |
| 253 | + |
| 254 | +### Rebalance Stuck in REBALANCING Status |
| 255 | + |
| 256 | +If rebalance appears stuck: |
| 257 | +- Check TabletServer logs for errors |
| 258 | +- Verify network connectivity between servers |
| 259 | +- Use `list_rebalance` to check detailed bucket progress |
| 260 | +- If necessary, cancel and restart the rebalance operation |
0 commit comments