diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java index 6fb27c3a77..027a550cf7 100644 --- a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java +++ b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java @@ -38,7 +38,22 @@ public enum GoalType { * Goal to generate leadership movement and leader replica movement tasks to ensure that the * number of leader replicas on each tabletServer is near balanced. */ - LEADER_DISTRIBUTION(1); + LEADER_DISTRIBUTION(1), + + /** + * Goal to generate replica movement tasks to ensure that the number of replicas on each + * tabletServer is near balanced and the replicas are distributed across racks. + */ + RACK_AWARE(2), + + /** + * Goal to generate replica movement tasks to ensure that the number of replicas on each + * tabletServer is near balanced and the replicas are distributed across racks. This is a + * relaxed version of RACK_AWARE goal. Contrary to RACK_AWARE goal, as long as replicas of each + * bucket can achieve a perfectly even distribution across the racks, this goal lets placement + * of multiple replicas of a bucket into a single rack. + */ + RACK_AWARE_DISTRIBUTION(3); public final int value; @@ -51,6 +66,10 @@ public static GoalType valueOf(int value) { return REPLICA_DISTRIBUTION; } else if (value == LEADER_DISTRIBUTION.value) { return LEADER_DISTRIBUTION; + } else if (value == RACK_AWARE.value) { + return RACK_AWARE; + } else if (value == RACK_AWARE_DISTRIBUTION.value) { + return RACK_AWARE_DISTRIBUTION; } else { throw new IllegalArgumentException( String.format( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalUtils.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalUtils.java index 3385964ce6..375e5de657 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalUtils.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalUtils.java @@ -20,6 +20,7 @@ import org.apache.fluss.cluster.rebalance.GoalType; import org.apache.fluss.server.coordinator.rebalance.ActionType; import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModelStats; import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel; import org.apache.fluss.server.coordinator.rebalance.model.ServerModel; @@ -36,6 +37,10 @@ public static Goal getGoalByType(GoalType goalType) { return new ReplicaDistributionGoal(); case LEADER_DISTRIBUTION: return new LeaderReplicaDistributionGoal(); + case RACK_AWARE: + return new RackAwareGoal(); + case RACK_AWARE_DISTRIBUTION: + return new RackAwareDistributionGoal(); default: throw new IllegalArgumentException("Unsupported goal type " + goalType); } @@ -76,4 +81,19 @@ public static Set aliveServers(ClusterModel cluster) { .map(ServerModel::id) .collect(Collectors.toCollection(HashSet::new)); } + + /** A convenience {@link Goal.ClusterModelStatsComparator} for typical hard goals. */ + public static class HardGoalStatsComparator implements Goal.ClusterModelStatsComparator { + @Override + public int compare(ClusterModelStats stats1, ClusterModelStats stats2) { + // Stats are irrelevant to a hard goal. The optimization would already fail if the goal + // requirements are not met. + return 0; + } + + @Override + public String explainLastComparison() { + return null; + } + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareAbstractGoal.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareAbstractGoal.java new file mode 100644 index 0000000000..2ba15353e7 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareAbstractGoal.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.goal; + +import org.apache.fluss.exception.RebalanceFailureException; +import org.apache.fluss.server.coordinator.rebalance.ActionAcceptance; +import org.apache.fluss.server.coordinator.rebalance.ActionType; +import org.apache.fluss.server.coordinator.rebalance.RebalancingAction; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel; +import org.apache.fluss.server.coordinator.rebalance.model.ServerModel; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Set; +import java.util.SortedSet; + +import static org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.ACCEPT; +import static org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.SERVER_REJECT; + +/** An abstract class for rack-aware goals. */ +public abstract class RackAwareAbstractGoal extends AbstractGoal { + private static final Logger LOG = LoggerFactory.getLogger(RackAwareAbstractGoal.class); + + @Override + public ClusterModelStatsComparator clusterModelStatsComparator() { + return new GoalUtils.HardGoalStatsComparator(); + } + + @Override + protected boolean selfSatisfied(ClusterModel clusterModel, RebalancingAction action) { + return true; + } + + /** + * Check whether the given action is acceptable by this goal. The following actions are + * acceptable: + * + * + * + * @param action Action to be checked for acceptance. + * @param clusterModel The state of the cluster. + * @return {@link ActionAcceptance#ACCEPT} if the action is acceptable by this goal, {@link + * ActionAcceptance#SERVER_REJECT} if the action is rejected due to violating rack awareness + * in the destination broker after moving source replica to destination broker. + */ + @Override + public ActionAcceptance actionAcceptance(RebalancingAction action, ClusterModel clusterModel) { + switch (action.getActionType()) { + case LEADERSHIP_MOVEMENT: + return ACCEPT; + case REPLICA_MOVEMENT: + if (doesReplicaMoveViolateActionAcceptance( + clusterModel, + clusterModel + .server(action.getSourceServerId()) + .replica(action.getTableBucket()), + clusterModel.server(action.getDestinationServerId()))) { + return SERVER_REJECT; + } + return ACCEPT; + default: + throw new IllegalArgumentException( + "Unsupported rebalance action " + action.getActionType() + " is provided."); + } + } + + /** + * Check whether the given replica move would violate the action acceptance for this rack aware + * goal. + * + * @param clusterModel The state of the cluster. + * @param sourceReplica Source replica + * @param destServer Destination server to receive the given source replica. + * @return {@code true} if the given replica move would violate action acceptance (i.e. the move + * is not acceptable), {@code false} otherwise. + */ + protected abstract boolean doesReplicaMoveViolateActionAcceptance( + ClusterModel clusterModel, ReplicaModel sourceReplica, ServerModel destServer); + + /** + * Rebalance the given serverModel without violating the constraints of this rack aware goal and + * optimized goals. + * + * @param serverModel Server to be balanced. + * @param clusterModel The state of the cluster. + * @param optimizedGoals Optimized goals. + * @param throwExceptionIfCannotMove {@code true} to throw an {@link RebalanceFailureException} + * in case a required balancing action for a replica fails for all rack-aware eligible + * servers, {@code false} to just log the failure and return. This parameter enables + * selected goals fail early in case the un-satisfiability of a goal can be determined + * early. + */ + protected void rebalanceForServer( + ServerModel serverModel, + ClusterModel clusterModel, + Set optimizedGoals, + boolean throwExceptionIfCannotMove) + throws RebalanceFailureException { + // TODO maybe use a sorted replicas set + for (ReplicaModel replica : serverModel.replicas()) { + if (!serverModel.isOfflineTagged() + && shouldKeepInTheCurrentServer(replica, clusterModel)) { + continue; + } + // The relevant rack awareness condition is violated. Move replica to an eligible + // serverModel + SortedSet eligibleServers = + rackAwareEligibleServers(replica, clusterModel); + if (maybeApplyBalancingAction( + clusterModel, + replica, + eligibleServers, + ActionType.REPLICA_MOVEMENT, + optimizedGoals) + == null) { + if (throwExceptionIfCannotMove) { + throw new RebalanceFailureException( + String.format( + "[%s] Cannot move %s to %s.", + name(), replica, eligibleServers)); + } + LOG.debug( + "Cannot move replica {} to any serverModel in {}", + replica, + eligibleServers); + } + } + } + + /** + * Check whether the given alive replica should stay in the current server or be moved to + * another server to satisfy the specific requirements of the rack aware goal in the given + * cluster state. + * + * @param replica An alive replica to check whether it should stay in the current server. + * @param clusterModel The state of the cluster. + * @return {@code true} if the given alive replica should stay in the current server, {@code + * false} otherwise. + */ + protected abstract boolean shouldKeepInTheCurrentServer( + ReplicaModel replica, ClusterModel clusterModel); + + /** + * Get a list of eligible brokers for moving the given replica in the given cluster to satisfy + * the specific requirements of the custom rack aware goal. + * + * @param replica Replica for which a set of rack aware eligible servers are requested. + * @param clusterModel The state of the cluster. + * @return A list of rack aware eligible servers for the given replica in the given cluster. + */ + protected abstract SortedSet rackAwareEligibleServers( + ReplicaModel replica, ClusterModel clusterModel); +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareDistributionGoal.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareDistributionGoal.java new file mode 100644 index 0000000000..50ded37167 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareDistributionGoal.java @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.goal; + +import org.apache.fluss.exception.RebalanceFailureException; +import org.apache.fluss.server.coordinator.rebalance.model.BucketModel; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel; +import org.apache.fluss.server.coordinator.rebalance.model.ServerModel; + +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Stream; + +import static java.util.Collections.max; +import static java.util.Collections.min; +import static org.apache.fluss.server.coordinator.rebalance.goal.GoalUtils.aliveServers; +import static org.apache.fluss.utils.Preconditions.checkNotNull; + +/** + * Generate replica movement proposals to evenly distribute replicas over alive racks not excluded + * for replica moves. + * + *

This is a relaxed version of {@link RackAwareGoal}. Contrary to {@link RackAwareGoal}, as long + * as replicas of each bucket can achieve a perfectly even distribution across the racks, this goal + * lets placement of multiple replicas of a bucket into a single rack. + * + *

For example, suppose a table with 1 bucket has 4 replicas in a cluster with 2 racks. Then the + * following distribution will be acceptable by this goal (but would be unacceptable by {@link + * RackAwareGoal}): + * + *

+ * Rack A                    | Rack B
+ * -----------------------------------------------------
+ * Server1-rack A  replica-0 | Server2-rack B
+ * Server3-rack A            | Server4-rack B replica-1
+ * Server5-rack A  replica-2 | Server6-rack B
+ * Server7-rack A            | Server8-rack B replica-3
+ * 
+ * + *

However, this goal will yield an {@link RebalanceFailureException} for the same bucket in the + * following cluster due to the lack of a second server to place a replica of this bucket in {@code + * Rack B}: + * + *

+ * Rack A                    | Rack B
+ * -----------------------------------------------------
+ * Server1-rack A  replica-0 | Server2-rack B replica-1
+ * Server3-rack A  replica-3 |
+ * Server5-rack A  replica-2 |
+ * 
+ */ +public class RackAwareDistributionGoal extends RackAwareAbstractGoal { + + private RackRebalanceLimit rackRebalanceLimit; + private Set serversAllowedReplicaMoves; + + @Override + protected boolean doesReplicaMoveViolateActionAcceptance( + ClusterModel clusterModel, ReplicaModel sourceReplica, ServerModel destServer) { + String destinationRackId = destServer.rack(); + String sourceRackId = sourceReplica.server().rack(); + + if (sourceRackId.equals(destinationRackId)) { + // A replica move within the same rack cannot violate rack aware distribution. + return false; + } + + // The replica move shall not increase the replica distribution imbalance of the bucket + // across racks. + BucketModel bucket = clusterModel.bucket(sourceReplica.tableBucket()); + checkNotNull(bucket, "Bucket for replica " + sourceReplica + " is not found"); + Set bucketServers = bucket.bucketServers(); + Map numReplicasByRack = numBucketReplicasByRackId(bucketServers); + + // Once this goal is optimized, it is guaranteed to have 0 replicas on servers excluded for + // replica moves. + // Hence, no explicit check is necessary for verifying the replica source. + return numReplicasByRack.getOrDefault(destinationRackId, 0) + >= numReplicasByRack.getOrDefault(sourceRackId, 0); + } + + @Override + protected void initGoalState(ClusterModel clusterModel) throws RebalanceFailureException { + serversAllowedReplicaMoves = aliveServers(clusterModel); + if (serversAllowedReplicaMoves.isEmpty()) { + throw new RebalanceFailureException( + String.format( + "[%s] All alive tabletServers are excluded from replica moves.", + name())); + } + + rackRebalanceLimit = + new RackRebalanceLimit( + clusterModel, clusterModel.racksContainServerWithoutOfflineTag().size()); + } + + /** + * Update goal state. Sanity check: After completion of balancing, confirm that replicas of each + * bucket are evenly distributed across the racks. + * + * @param clusterModel The state of the cluster. + */ + @Override + protected void updateGoalState(ClusterModel clusterModel) throws RebalanceFailureException { + // One pass is sufficient to satisfy or alert impossibility of this goal. + // Sanity check to confirm that replicas of each bucket are evenly distributed across the + // racks + ensureRackAwareDistribution(clusterModel); + finish(); + } + + @Override + public void finish() { + super.finish(); + // Clean up the memory + rackRebalanceLimit.clear(); + } + + /** + * Violations of rack-aware distribution can be resolved with replica movements. + * + * @param server Server to be balanced. + * @param clusterModel The state of the cluster. + * @param optimizedGoals Optimized goals. + */ + @Override + protected void rebalanceForServer( + ServerModel server, ClusterModel clusterModel, Set optimizedGoals) + throws RebalanceFailureException { + rebalanceForServer(server, clusterModel, optimizedGoals, false); + } + + /** + * Get a list of eligible servers for moving the given replica in the given cluster to satisfy + * the specific requirements of the rack aware goal. A server is rack aware distribution + * eligible for a given replica if it is in a rack + * + *
    + *
  • that needs more replicas -- i.e. has fewer than base limit replicas + *
  • with at most as many as base limit replicas, and the number of racks with at least one + * more replica over the base limit is fewer than the allowed number of such racks + *
+ * + * @param replica Replica for which a set of rack aware eligible servers are requested. + * @param clusterModel The state of the cluster. + * @return A list of rack aware eligible servers for the given replica in the given cluster. + */ + @Override + protected SortedSet rackAwareEligibleServers( + ReplicaModel replica, ClusterModel clusterModel) { + return rackAwareEligibleServers(replica, clusterModel, Collections.emptyList()); + } + + /** + * The same as {@link #rackAwareEligibleServers(ReplicaModel replica, ClusterModel + * clusterModel)}, except that this accepts an extra comparator to sort after the original + * rack-aware sort is performed. The method is intended to be used as a tool for + * extension/customization. + * + *

For example, after the method sorts by number of replicas in the rack, if a customization + * that "if possible, place it in zone-of-racks that has the least replica", one can try to + * provide {@code Collections.singletonList(Comparator.comparingInt((Server b) -> + * SomeZoneUtils.zoneOfRack(b.rack()).totalReplicas()))} + * + * @param replica Replica for which a set of rack aware eligible servers are requested. + * @param clusterModel The state of the cluster. + * @param extraSoftServerPriorityComparators List of comparators to sort eligible servers, after + * comparing the number of replicas in rack + * @return A list of rack aware eligible servers for the given replica in the given cluster. + */ + private SortedSet rackAwareEligibleServers( + ReplicaModel replica, + ClusterModel clusterModel, + List> extraSoftServerPriorityComparators) { + BucketModel bucket = clusterModel.bucket(replica.tableBucket()); + checkNotNull(bucket, "Bucket for replica " + replica + " is not found"); + Set bucketServers = bucket.bucketServers(); + Map numReplicasByRack = numBucketReplicasByRackId(bucketServers); + + // Decrement the number of replicas in this rack. + numReplicasByRack.merge(replica.server().rack(), -1, Integer::sum); + + int baseLimit = rackRebalanceLimit.baseLimitByRF(bucketServers.size()); + + // A replica is allowed to be moved to a rack at the base limit only if the number of racks + // with at least one more replica over the base limit is fewer than the allowed number of + // such racks. + // For example, suppose a bucket has 5 replicas in a cluster with 3 racks. In the ideal + // distribution, 2 racks has 2 replicas and the other rack has 1 replica from the bucket. + // Suppose that in the current distribution, Rack-1 has 1 offline and 1 online replica, + // Rack-2 has 2 online replicas, and Rack-3 has 1 online replica. In this scenario, + // we can place the offline replica to an alive broker in either Rack-1 or Rack-3, because + // the cluster has only one rack with at least one more replica over the base limit and we + // know that the ideal distribution allows 2 such racks. + boolean canMoveToRacksAtBaseLimit = false; + int numRacksWithOneMoreReplicaLimit = + rackRebalanceLimit.numRacksWithOneMoreReplicaByRF(bucketServers.size()); + int numRacksWithAtLeastOneMoreReplica = + (int) numReplicasByRack.values().stream().filter(r -> r > baseLimit).count(); + if (numRacksWithAtLeastOneMoreReplica < numRacksWithOneMoreReplicaLimit) { + canMoveToRacksAtBaseLimit = true; + } + + final Comparator leastReplicasInRack = + Comparator.comparingInt( + (ServerModel s) -> numReplicasByRack.getOrDefault(s.rack(), 0)); + final SortedSet rackAwareDistributionEligibleServers = + new TreeSet<>( + Stream.concat( + // Prefer servers whose rack has fewer replicas from the + // bucket first + Stream.of(leastReplicasInRack), + // Use the extra comparators as the sub-criteria for sorting + extraSoftServerPriorityComparators.stream()) + // Combine the comparators into one + .reduce(Comparator::thenComparing) + // While the stream won't be null, code check warns for direct + // `.get()` provide a default + .orElse(leastReplicasInRack) + // Compare serverID at the last + .thenComparingInt(ServerModel::id)); + + for (ServerModel destServer : clusterModel.aliveServers()) { + int numReplicasInThisRack = numReplicasByRack.getOrDefault(destServer.rack(), 0); + if (numReplicasInThisRack < baseLimit + || (canMoveToRacksAtBaseLimit && numReplicasInThisRack == baseLimit)) { + // Either the (1) destination rack needs more replicas or (2) the replica is allowed + // to be moved to a rack at the base limit and the destination server is in a rack + // at the base limit + if (!bucketServers.contains(destServer)) { + rackAwareDistributionEligibleServers.add(destServer); + } + } + } + + // Return eligible servers + return rackAwareDistributionEligibleServers; + } + + @Override + protected boolean shouldKeepInTheCurrentServer( + ReplicaModel replica, ClusterModel clusterModel) { + if (isExcludedForReplicaMove(replica.server())) { + // A replica in a server excluded for the replica moves must be relocated to another + // server. + return false; + } + // Rack aware distribution requires perfectly even distribution for replicas of each + // bucket across the racks. + // This permits placement of multiple replicas of a bucket into a single rack. + BucketModel bucket = clusterModel.bucket(replica.tableBucket()); + checkNotNull(bucket, "Bucket for replica " + replica + " is not found"); + Set bucketServers = bucket.bucketServers(); + int replicationFactor = bucketServers.size(); + Map numReplicasByRack = numBucketReplicasByRackId(bucketServers); + + int baseLimit = rackRebalanceLimit.baseLimitByRF(replicationFactor); + int numRacksWithOneMoreReplicaLimit = + rackRebalanceLimit.numRacksWithOneMoreReplicaByRF(replicationFactor); + int upperLimit = baseLimit + (numRacksWithOneMoreReplicaLimit == 0 ? 0 : 1); + + int numReplicasInThisRack = numReplicasByRack.get(replica.server().rack()); + if (numReplicasInThisRack <= baseLimit) { + // Rack does not have extra replicas to give away + return true; + } else if (numReplicasInThisRack > upperLimit) { + // The rack has extra replica(s) to give away + return false; + } else { + // This is a rack either with an extra replica to give away or keep. + int numRacksWithOneMoreReplica = + (int) numReplicasByRack.values().stream().filter(r -> r > baseLimit).count(); + // If the current number of racks with one more replica over the base limit are more + // than the allowed number of such racks, then this rack has an extra replica to give + // away, otherwise it keeps the replica. + return numRacksWithOneMoreReplica <= numRacksWithOneMoreReplicaLimit; + } + } + + /** + * Check whether the given server is excluded for replica moves. Such a server cannot receive + * replicas, but can give them away. + * + * @param server Server to check for exclusion from replica moves. + * @return {@code true} if the given server is excluded for replica moves, {@code false} + * otherwise. + */ + private boolean isExcludedForReplicaMove(ServerModel server) { + return !serversAllowedReplicaMoves.contains(server.id()); + } + + /** + * Ensures that replicas of all buckets in the cluster satisfy rack-aware distribution. + * + * @param clusterModel A cluster model. + */ + private void ensureRackAwareDistribution(ClusterModel clusterModel) + throws RebalanceFailureException { + for (ReplicaModel leader : clusterModel.leaderReplicas()) { + BucketModel bucket = clusterModel.bucket(leader.tableBucket()); + checkNotNull(bucket, "Bucket for replica " + leader + " is not found"); + Set bucketServers = bucket.bucketServers(); + Map numReplicasByRack = numBucketReplicasByRackId(bucketServers); + + int maxNumReplicasInARack = max(numReplicasByRack.values()); + + // Check if rack(s) have multiple replicas from the same bucket (i.e. otherwise the + // distribution is rack-aware). + if (maxNumReplicasInARack > 1) { + // Check whether there are alive racks allowed replica moves with: + // (1) no replicas despite having RF > number of alive racks allowed replica moves + // (2) more replicas that they could have been placed into other racks. + boolean someAliveRacksHaveNoReplicas = + numReplicasByRack.size() < rackRebalanceLimit.numAliveRacks(); + if (someAliveRacksHaveNoReplicas + || maxNumReplicasInARack - min(numReplicasByRack.values()) > 1) { + throw new RebalanceFailureException( + String.format( + "[%s] Bucket %s is not rack-aware. Servers (%s) and replicas per rack (%s).", + name(), + leader.tableBucket(), + bucketServers, + numReplicasByRack)); + } + } + } + } + + /** + * Given the servers that host replicas of a bucket, retrieves a map containing number of + * replicas by the id of the rack they reside in. + * + * @param bucketServers Servers that host replicas of some bucket + * @return A map containing the number of replicas by rackId that these replicas reside in. + */ + private Map numBucketReplicasByRackId(Set bucketServers) { + Map numBucketReplicasByRackId = new HashMap<>(); + for (ServerModel server : bucketServers) { + numBucketReplicasByRackId.merge(server.rack(), 1, Integer::sum); + } + return numBucketReplicasByRackId; + } + + /** + * A wrapper to facilitate describing per-rack replica count limits for a bucket with the given + * replication factor. + * + *

These limits are expressed in terms of the following: + * + *

    + *
  • {@link #baseLimitByRF}: The minimum number of replicas from the bucket with the given + * replication factor that each alive rack that is allowed replica moves must contain + *
  • {@link #numRacksWithOneMoreReplicaByRF}: The exact number of racks that are allowed + * replica moves must contain an additional replica (i.e. the base limit + 1) from the + * bucket with the given replication factor + *
+ * + *

For example, for a given replication factor (RF), if the base limit is 1 and the number of + * racks (that are allowed replica moves) with one more replica is 3, then 3 racks should have 2 + * replicas and each remaining rack should have 1 replica from the bucket with the given RF. + */ + protected static class RackRebalanceLimit { + private final int numAliveRacks; + private final Map baseLimitByRF; + private final Map numRacksWithOneMoreReplicaByRF; + + public RackRebalanceLimit(ClusterModel clusterModel, int numAliveRacks) + throws RebalanceFailureException { + this.numAliveRacks = numAliveRacks; + if (this.numAliveRacks == 0) { + // Handle the case when all alive racks are excluded from replica moves. + throw new RebalanceFailureException( + "All alive racks are excluded from replica moves."); + } + int maxReplicationFactor = clusterModel.maxReplicationFactor(); + baseLimitByRF = new HashMap<>(); + numRacksWithOneMoreReplicaByRF = new HashMap<>(); + + // Precompute the limits for each possible replication factor up to maximum replication + // factor. + for (int replicationFactor = 1; + replicationFactor <= maxReplicationFactor; + replicationFactor++) { + int baseLimit = replicationFactor / this.numAliveRacks; + baseLimitByRF.put(replicationFactor, baseLimit); + numRacksWithOneMoreReplicaByRF.put( + replicationFactor, replicationFactor % this.numAliveRacks); + } + } + + public int numAliveRacks() { + return numAliveRacks; + } + + public Integer baseLimitByRF(int replicationFactor) { + return baseLimitByRF.get(replicationFactor); + } + + public Integer numRacksWithOneMoreReplicaByRF(int replicationFactor) { + return numRacksWithOneMoreReplicaByRF.get(replicationFactor); + } + + /** Clear balance limits. */ + public void clear() { + baseLimitByRF.clear(); + numRacksWithOneMoreReplicaByRF.clear(); + } + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareGoal.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareGoal.java new file mode 100644 index 0000000000..d4c9a5deeb --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareGoal.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.goal; + +import org.apache.fluss.exception.RebalanceFailureException; +import org.apache.fluss.server.coordinator.rebalance.model.BucketModel; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.RackModel; +import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel; +import org.apache.fluss.server.coordinator.rebalance.model.ServerModel; + +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; + +import static org.apache.fluss.utils.Preconditions.checkNotNull; + +/** + * Generate replica movement proposals to provide rack-aware replica distribution, which ensure that + * all replicas of each bucket are assigned in a rack aware manner -- i.e. no more than one replica + * of each bucket resides in the same rack. + */ +public class RackAwareGoal extends RackAwareAbstractGoal { + + @Override + protected boolean doesReplicaMoveViolateActionAcceptance( + ClusterModel clusterModel, ReplicaModel sourceReplica, ServerModel destServer) { + // Destination server cannot be in a rack that violates rack awareness. + BucketModel bucket = clusterModel.bucket(sourceReplica.tableBucket()); + checkNotNull(bucket, "Bucket for replica " + sourceReplica + " is not found"); + Set bucketServers = bucket.bucketServers(); + bucketServers.remove(sourceReplica.server()); + + // If destination server exists on any of the rack of other replicas, it violates the + // rack-awareness + return bucketServers.stream().map(ServerModel::rack).anyMatch(destServer.rack()::equals); + } + + /** + * This is a hard goal; hence, the proposals are not limited to dead server replicas in case of + * self-healing. Sanity Check: There exists sufficient number of racks for achieving + * rack-awareness. + * + * @param clusterModel The state of the cluster. + */ + @Override + protected void initGoalState(ClusterModel clusterModel) throws RebalanceFailureException { + // Sanity Check: not enough racks to satisfy rack awareness. + // Assumes number of racks doesn't exceed Integer.MAX_VALUE. + int numAvailableRacks = + (int) + clusterModel.racksContainServerWithoutOfflineTag().stream() + .map(RackModel::rack) + .distinct() + .count(); + if (clusterModel.maxReplicationFactor() > numAvailableRacks) { + throw new RebalanceFailureException( + String.format( + "[%s] Insufficient number of racks to distribute each replica (Current: %d, Needed: %d).", + name(), numAvailableRacks, clusterModel.maxReplicationFactor())); + } + } + + /** + * Update goal state. Sanity check: After completion of balancing, confirm that replicas of each + * bucket reside at a separate rack. + * + * @param clusterModel The state of the cluster. + */ + @Override + protected void updateGoalState(ClusterModel clusterModel) throws RebalanceFailureException { + // One pass is sufficient to satisfy or alert impossibility of this goal. + // Sanity check to confirm that the final distribution is rack aware. + ensureRackAware(clusterModel); + finish(); + } + + /** + * Rack-awareness violations can be resolved with replica movements. + * + * @param server Server to be rebalanced. + * @param clusterModel The state of the cluster. + * @param optimizedGoals Optimized goals. + */ + @Override + protected void rebalanceForServer( + ServerModel server, ClusterModel clusterModel, Set optimizedGoals) + throws RebalanceFailureException { + rebalanceForServer(server, clusterModel, optimizedGoals, true); + } + + /** + * Get a list of rack aware eligible servers for the given replica in the given cluster. A + * server is rack aware eligible for a given replica if the server resides in a rack where no + * other server in the same rack contains a replica from the same bucket of the given replica. + * + * @param replica Replica for which a set of rack aware eligible servers are requested. + * @param clusterModel The state of the cluster. + * @return A list of rack aware eligible servers for the given replica in the given cluster. + */ + @Override + protected SortedSet rackAwareEligibleServers( + ReplicaModel replica, ClusterModel clusterModel) { + // Populate bucket rackIds. + BucketModel bucket = clusterModel.bucket(replica.tableBucket()); + checkNotNull(bucket, "Bucket for replica " + replica + " is not found"); + List bucketRackIds = + bucket.bucketServers().stream().map(ServerModel::rack).collect(Collectors.toList()); + + // Remove rackId of the given replica, but if there is any other replica from the bucket + // residing in the same cluster, keep its rackId in the list. + bucketRackIds.remove(replica.server().rack()); + + SortedSet rackAwareEligibleServers = + new TreeSet<>(Comparator.comparingInt(ServerModel::id)); + for (ServerModel server : clusterModel.aliveServers()) { + if (!bucketRackIds.contains(server.rack())) { + rackAwareEligibleServers.add(server); + } + } + // Return eligible servers. + return rackAwareEligibleServers; + } + + @Override + protected boolean shouldKeepInTheCurrentServer( + ReplicaModel replica, ClusterModel clusterModel) { + // Rack awareness requires no more than one replica from a given bucket residing in any + // rack in the cluster + String myRackId = replica.server().rack(); + int myServerId = replica.serverId(); + BucketModel bucket = clusterModel.bucket(replica.tableBucket()); + checkNotNull(bucket, "Bucket for replica " + replica + " is not found"); + for (ServerModel bucketServer : bucket.bucketServers()) { + if (myRackId.equals(bucketServer.rack()) && myServerId != bucketServer.id()) { + return false; + } + } + return true; + } + + private void ensureRackAware(ClusterModel clusterModel) throws RebalanceFailureException { + for (ReplicaModel leader : clusterModel.leaderReplicas()) { + Set replicaServersRackIds = new HashSet<>(); + BucketModel bucket = clusterModel.bucket(leader.tableBucket()); + checkNotNull(bucket, "Bucket for replica " + leader + " is not found"); + Set followerServers = new HashSet<>(bucket.followerServers()); + + // Add rackId of replicas. + for (ServerModel followerServer : followerServers) { + replicaServersRackIds.add(followerServer.rack()); + } + replicaServersRackIds.add(leader.server().rack()); + if (replicaServersRackIds.size() != (followerServers.size() + 1)) { + throw new RebalanceFailureException( + String.format( + "[%s] Bucket %s is not rack-aware. Leader (%s) and follower server (%s).", + name(), leader.tableBucket(), leader.server(), followerServers)); + } + } + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/BucketModel.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/BucketModel.java index d64c464eba..9cf1897764 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/BucketModel.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/BucketModel.java @@ -62,6 +62,17 @@ public Set bucketServers() { return bucketServers; } + public List followerServers() { + List followerServers = new ArrayList<>(); + replicas.forEach( + replica -> { + if (!replica.isLeader()) { + followerServers.add(replica.server()); + } + }); + return followerServers; + } + public boolean canAssignReplicaToServer(ServerModel candidateServer) { return !ineligibleServers.contains(candidateServer); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModel.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModel.java index d55f6a3cb9..8d481ecfd7 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModel.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModel.java @@ -53,9 +53,14 @@ public class ClusterModel { private final SortedSet servers; private final Map bucketsByTableBucket; + // An integer to keep track of the maximum replication factor that a bucket was ever created + // with. + private int maxReplicationFactor; + public ClusterModel(SortedSet servers) { this.servers = servers; this.bucketsByTableBucket = new HashMap<>(); + this.maxReplicationFactor = 1; this.aliveServers = new HashSet<>(); this.offlineServers = new TreeSet<>(); @@ -76,6 +81,13 @@ public ClusterModel(SortedSet servers) { } } + /** + * @return The maximum replication factor of a bucket that was added to the cluster before. + */ + public int maxReplicationFactor() { + return maxReplicationFactor; + } + public SortedSet offlineServers() { return offlineServers; } @@ -88,6 +100,15 @@ public Set aliveServers() { return Collections.unmodifiableSet(aliveServers); } + /** + * @return Racks that contain a server without offline tag. + */ + public Set racksContainServerWithoutOfflineTag() { + return racksById.values().stream() + .filter(RackModel::rackContainsServerWithoutOfflineTag) + .collect(Collectors.toSet()); + } + public @Nullable BucketModel bucket(TableBucket tableBucket) { return bucketsByTableBucket.get(tableBucket); } @@ -110,6 +131,20 @@ public int numReplicas() { return bucketsByTableBucket.values().stream().mapToInt(p -> p.replicas().size()).sum(); } + /** + * @return All the leader replicas in the cluster. + */ + public Set leaderReplicas() { + Set leaderReplicas = new HashSet<>(); + for (BucketModel bucket : bucketsByTableBucket.values()) { + ReplicaModel leader = bucket.leader(); + if (leader != null) { + leaderReplicas.add(leader); + } + } + return leaderReplicas; + } + public int numLeaderReplicas() { int numLeaderReplicas = 0; for (BucketModel bucket : bucketsByTableBucket.values()) { @@ -193,6 +228,8 @@ public void createReplica(int serverId, TableBucket tableBucket, int index, bool } else { bucket.addFollower(replica, index); } + + maxReplicationFactor = Math.max(maxReplicationFactor, bucket.replicas().size()); } /** diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/RackModel.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/RackModel.java index fdf9cbad80..d5a95bbeff 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/RackModel.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/RackModel.java @@ -59,6 +59,18 @@ public String rack() { return rack; } + /** + * @return true if the rack contains a server without offline tag. + */ + public boolean rackContainsServerWithoutOfflineTag() { + for (ServerModel server : servers.values()) { + if (!server.isOfflineTagged()) { + return true; + } + } + return false; + } + @Nullable ServerModel server(int serverId) { return servers.get(serverId); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModel.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModel.java index 8b853ba32b..116d4b4e17 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModel.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModel.java @@ -190,7 +190,7 @@ public int compareTo(ServerModel o) { @Override public String toString() { return String.format( - "ServerModel[id=%s,rack=%s,isAlive=%s,replicaCount=%s]", + "ServerModel[id=%s,rack=%s,isOfflineTagged=%s,replicaCount=%s]", serverId, rack, isOfflineTagged, replicas.size()); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareDistributionGoalTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareDistributionGoalTest.java new file mode 100644 index 0000000000..e27111cf7d --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareDistributionGoalTest.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.goal; + +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.exception.RebalanceFailureException; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.ServerModel; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; + +import static org.apache.fluss.server.coordinator.rebalance.RebalanceTestUtils.addBucket; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link RackAwareDistributionGoal}. */ +public class RackAwareDistributionGoalTest { + @Test + void testDiffWithRackAwareGoal() { + SortedSet servers = new TreeSet<>(); + servers.add(new ServerModel(1, "rackA", false)); + servers.add(new ServerModel(2, "rackB", false)); + servers.add(new ServerModel(3, "rackA", false)); + servers.add(new ServerModel(4, "rackB", false)); + servers.add(new ServerModel(5, "rackA", false)); + servers.add(new ServerModel(6, "rackB", false)); + servers.add(new ServerModel(7, "rackA", false)); + servers.add(new ServerModel(8, "rackB", false)); + ClusterModel clusterModel = new ClusterModel(servers); + + TableBucket t1b0 = new TableBucket(1, 0); + addBucket(clusterModel, t1b0, Arrays.asList(1, 4, 5, 8)); + + RackAwareGoal goal = new RackAwareGoal(); + assertThatThrownBy(() -> goal.optimize(clusterModel, Collections.singleton(goal))) + .isInstanceOf(RebalanceFailureException.class) + .hasMessage( + "[RackAwareGoal] Insufficient number of racks to distribute each replica (Current: 2, Needed: 4)."); + + // No error. + RackAwareDistributionGoal rackAwareDistributionGoal = new RackAwareDistributionGoal(); + rackAwareDistributionGoal.optimize( + clusterModel, Collections.singleton(rackAwareDistributionGoal)); + } + + @Test + void testLackOfSecondServerToPlaceReplica() { + SortedSet servers = new TreeSet<>(); + servers.add(new ServerModel(1, "rackA", false)); + servers.add(new ServerModel(2, "rackB", false)); + servers.add(new ServerModel(3, "rackA", false)); + servers.add(new ServerModel(4, "rackA", false)); + ClusterModel clusterModel = new ClusterModel(servers); + + TableBucket t1b0 = new TableBucket(1, 0); + addBucket(clusterModel, t1b0, Arrays.asList(1, 2, 3, 4)); + RackAwareDistributionGoal rackAwareDistributionGoal = new RackAwareDistributionGoal(); + assertThatThrownBy( + () -> + rackAwareDistributionGoal.optimize( + clusterModel, + Collections.singleton(rackAwareDistributionGoal))) + .isInstanceOf(RebalanceFailureException.class) + .hasMessage( + "[RackAwareDistributionGoal] Bucket TableBucket{tableId=1, bucket=0} is not rack-aware. " + + "Servers ([ServerModel[id=1,rack=rackA,isOfflineTagged=false,replicaCount=1], " + + "ServerModel[id=2,rack=rackB,isOfflineTagged=false,replicaCount=1], " + + "ServerModel[id=3,rack=rackA,isOfflineTagged=false,replicaCount=1], " + + "ServerModel[id=4,rack=rackA,isOfflineTagged=false,replicaCount=1]]) " + + "and replicas per rack ({rackA=3, rackB=1})."); + } + + @Test + void testReplicaDistributionBalanceAcrossRack() { + ClusterModel clusterModel = generateUnbalancedReplicaAcrossServerAndRack(); + + RackAwareDistributionGoal goal = new RackAwareDistributionGoal(); + GoalOptimizer goalOptimizer = new GoalOptimizer(); + List rebalancePlanForBuckets = + goalOptimizer.doOptimizeOnce(clusterModel, Collections.singletonList(goal)); + assertThat(rebalancePlanForBuckets).hasSize(10); + for (int i = 0; i < 10; i++) { + RebalancePlanForBucket planForBucket = rebalancePlanForBuckets.get(i); + assertThat(planForBucket.getNewLeader()).isEqualTo(4); + assertThat(planForBucket.getNewReplicas()).isEqualTo(Arrays.asList(4, 5, 2, 3)); + } + + clusterModel = generateUnbalancedReplicaAcrossServerAndRack(); + // combine with ReplicaDistributionGoal, the replica distribution will be balanced across + // server and rock. + ReplicaDistributionGoal replicaDistributionGoal = new ReplicaDistributionGoal(); + goalOptimizer = new GoalOptimizer(); + rebalancePlanForBuckets = + goalOptimizer.doOptimizeOnce( + clusterModel, Arrays.asList(goal, replicaDistributionGoal)); + assertThat(rebalancePlanForBuckets).hasSize(10); + + Map serverIdToReplicaNumber = + getServerIdToReplicaNumber(rebalancePlanForBuckets); + assertThat(serverIdToReplicaNumber.values().stream().filter(n -> n > 6).count()) + .isEqualTo(0); + assertThat(serverIdToReplicaNumber.values().stream().filter(n -> n < 4).count()) + .isEqualTo(0); + assertThat(getRackIdToReplicaNumber(rebalancePlanForBuckets, clusterModel).values()) + .containsExactlyInAnyOrder(20, 20); + } + + private ClusterModel generateUnbalancedReplicaAcrossServerAndRack() { + SortedSet servers = new TreeSet<>(); + servers.add(new ServerModel(0, "rack0", false)); + servers.add(new ServerModel(1, "rack0", false)); + servers.add(new ServerModel(2, "rack0", false)); + servers.add(new ServerModel(3, "rack0", false)); + servers.add(new ServerModel(4, "rack1", false)); + servers.add(new ServerModel(5, "rack1", false)); + servers.add(new ServerModel(6, "rack1", false)); + servers.add(new ServerModel(7, "rack1", false)); + + ClusterModel clusterModel = new ClusterModel(servers); + // all allocate in rack0, rack1, rack2. + for (int i = 0; i < 10; i++) { + TableBucket t1bi = new TableBucket(1, i); + addBucket(clusterModel, t1bi, Arrays.asList(0, 1, 2, 3)); + } + + return clusterModel; + } + + private Map getServerIdToReplicaNumber( + List rebalancePlanForBuckets) { + Map serverIdToLeaderReplicaNumber = new HashMap<>(); + for (RebalancePlanForBucket planForBucket : rebalancePlanForBuckets) { + List newReplicas = planForBucket.getNewReplicas(); + for (Integer serverId : newReplicas) { + serverIdToLeaderReplicaNumber.put( + serverId, serverIdToLeaderReplicaNumber.getOrDefault(serverId, 0) + 1); + } + } + return serverIdToLeaderReplicaNumber; + } + + private Map getRackIdToReplicaNumber( + List rebalancePlanForBuckets, ClusterModel clusterModel) { + Map rackIdToLeaderReplicaNumber = new HashMap<>(); + for (RebalancePlanForBucket planForBucket : rebalancePlanForBuckets) { + List newReplicas = planForBucket.getNewReplicas(); + for (Integer serverId : newReplicas) { + ServerModel server = clusterModel.server(serverId); + rackIdToLeaderReplicaNumber.put( + server.rack(), + rackIdToLeaderReplicaNumber.getOrDefault(server.rack(), 0) + 1); + } + } + return rackIdToLeaderReplicaNumber; + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareGoalTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareGoalTest.java new file mode 100644 index 0000000000..ee13d556ab --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/goal/RackAwareGoalTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.rebalance.goal; + +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.exception.RebalanceFailureException; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel; +import org.apache.fluss.server.coordinator.rebalance.model.ServerModel; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; + +import static org.apache.fluss.server.coordinator.rebalance.RebalanceTestUtils.addBucket; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link RackAwareGoal}. */ +public class RackAwareGoalTest { + + @Test + void testReplicaNumExceedsRackNum() { + SortedSet servers = new TreeSet<>(); + servers.add(new ServerModel(0, "rack0", false)); + servers.add(new ServerModel(1, "rack1", false)); + // server2 offline. + servers.add(new ServerModel(2, "rack2", true)); + ClusterModel clusterModel = new ClusterModel(servers); + TableBucket t1b0 = new TableBucket(1, 0); + addBucket(clusterModel, t1b0, Arrays.asList(0, 1, 2)); + + RackAwareGoal goal = new RackAwareGoal(); + assertThatThrownBy(() -> goal.optimize(clusterModel, Collections.singleton(goal))) + .isInstanceOf(RebalanceFailureException.class) + .hasMessage( + "[RackAwareGoal] Insufficient number of racks to distribute each replica (Current: 2, Needed: 3)."); + } + + @Test + void testReplicaMove() { + SortedSet servers = new TreeSet<>(); + servers.add(new ServerModel(0, "rack0", false)); + servers.add(new ServerModel(1, "rack1", false)); + servers.add(new ServerModel(2, "rack2", false)); + servers.add(new ServerModel(3, "rack0", false)); + ClusterModel clusterModel = new ClusterModel(servers); + + TableBucket t1b0 = new TableBucket(1, 0); + addBucket(clusterModel, t1b0, Arrays.asList(0, 1, 3)); + + // check the follower will be moved to server2. + RackAwareGoal goal = new RackAwareGoal(); + GoalOptimizer goalOptimizer = new GoalOptimizer(); + List rebalancePlanForBuckets = + goalOptimizer.doOptimizeOnce(clusterModel, Collections.singletonList(goal)); + assertThat(rebalancePlanForBuckets).hasSize(1); + assertThat(rebalancePlanForBuckets.get(0)) + .isEqualTo( + new RebalancePlanForBucket( + t1b0, 0, 2, Arrays.asList(0, 1, 3), Arrays.asList(2, 1, 3))); + } + + @Test + void testReplicaDistributionNotBalanceAcrossRackAndServer() { + // RackAwareGoal only requires that replicas of the same bucket cannot be distributed on + // the same rack, but it does not care about the balance of replicas between racks, nor does + // it care about the balance of replicas between servers. + ClusterModel clusterModel = generateUnbalancedReplicaAcrossServerAndRack(); + RackAwareGoal goal = new RackAwareGoal(); + GoalOptimizer goalOptimizer = new GoalOptimizer(); + List rebalancePlanForBuckets = + goalOptimizer.doOptimizeOnce(clusterModel, Collections.singletonList(goal)); + assertThat(rebalancePlanForBuckets).hasSize(0); + } + + @Test + void testReplicaDistributionBalanceAcrossServer() { + // the same input of `testReplicaDistributionNotBalanceAcrossRackAndServer`, if we combine + // using RackAwareGoal and ReplicaDistributionGoal, the replica distribution will be + // balanced across servers. + ClusterModel clusterModel = generateUnbalancedReplicaAcrossServerAndRack(); + RackAwareGoal rackAwareGoal = new RackAwareGoal(); + ReplicaDistributionGoal replicaDistributionGoal = new ReplicaDistributionGoal(); + GoalOptimizer goalOptimizer = new GoalOptimizer(); + List rebalancePlanForBuckets = + goalOptimizer.doOptimizeOnce( + clusterModel, Arrays.asList(rackAwareGoal, replicaDistributionGoal)); + // Realance result(ReplicaNum) from server side: server0: 3, server1: 1, server2: 3, + // server3: 1, server4: 1, server5: 1 + assertThat(rebalancePlanForBuckets).hasSize(1); + assertThat(rebalancePlanForBuckets.get(0)) + .isEqualTo( + new RebalancePlanForBucket( + new TableBucket(1, 3), + 0, + 1, + Arrays.asList(0, 3, 5), + Arrays.asList(1, 3, 5))); + } + + private ClusterModel generateUnbalancedReplicaAcrossServerAndRack() { + SortedSet servers = new TreeSet<>(); + servers.add(new ServerModel(0, "rack0", false)); + servers.add(new ServerModel(1, "rack0", false)); + servers.add(new ServerModel(2, "rack1", false)); + servers.add(new ServerModel(3, "rack1", false)); + servers.add(new ServerModel(4, "rack2", false)); + servers.add(new ServerModel(5, "rack3", false)); + ClusterModel clusterModel = new ClusterModel(servers); + + // For the following case, RackAwareGoal will not remove any replicas but the replica + // distribution is not balanced not only in racks but also in servers. + // t1b0 -> 0, 2, 4 + // t1b1 -> 0, 2, 5 + // t1b2 -> 0, 2, 4 + // t1b3 -> 0, 3, 5 + + // Replica num from server side: server0: 4, server1: 0, server2: 3, server3: 1, server4: 1, + // server5: 1 + // Replica num from rack side: rack0: 4, rack1: 4, rack2: 2, rack3: 2 + TableBucket t1b0 = new TableBucket(1, 0); + addBucket(clusterModel, t1b0, Arrays.asList(0, 2, 4)); + TableBucket t1b1 = new TableBucket(1, 1); + addBucket(clusterModel, t1b1, Arrays.asList(0, 2, 5)); + TableBucket t1b2 = new TableBucket(1, 2); + addBucket(clusterModel, t1b2, Arrays.asList(0, 2, 4)); + TableBucket t1b3 = new TableBucket(1, 3); + addBucket(clusterModel, t1b3, Arrays.asList(0, 3, 5)); + return clusterModel; + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModelTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModelTest.java index d375923c9f..1908c94b7d 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModelTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModelTest.java @@ -145,4 +145,19 @@ void testRelocateReplica() { .hasMessageContaining( "Requested replica 1 is not a replica of bucket TableBucket{tableId=1, bucket=0}"); } + + @Test + void testMaxReplicaFactor() { + ClusterModel clusterModel = new ClusterModel(servers); + assertThat(clusterModel.maxReplicationFactor()).isEqualTo(1); + + clusterModel.createReplica(0, new TableBucket(1, 0), 0, true); + assertThat(clusterModel.maxReplicationFactor()).isEqualTo(1); + clusterModel.createReplica(1, new TableBucket(1, 0), 1, false); + assertThat(clusterModel.maxReplicationFactor()).isEqualTo(2); + clusterModel.createReplica(2, new TableBucket(1, 0), 2, false); + assertThat(clusterModel.maxReplicationFactor()).isEqualTo(3); + clusterModel.createReplica(0, new TableBucket(2, 0L, 0), 0, true); + assertThat(clusterModel.maxReplicationFactor()).isEqualTo(3); + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModelTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModelTest.java index edaff4b76a..3b895fd193 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModelTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModelTest.java @@ -81,13 +81,13 @@ void testServerModel() { void testToString() { ServerModel serverModel = new ServerModel(0, "rack0", true); assertThat(serverModel.toString()) - .isEqualTo("ServerModel[id=0,rack=rack0,isAlive=true,replicaCount=0]"); + .isEqualTo("ServerModel[id=0,rack=rack0,isOfflineTagged=true,replicaCount=0]"); serverModel.putReplica( new TableBucket(1L, 0), new ReplicaModel(new TableBucket(1L, 0), serverModel, false)); assertThat(serverModel.toString()) - .isEqualTo("ServerModel[id=0,rack=rack0,isAlive=true,replicaCount=1]"); + .isEqualTo("ServerModel[id=0,rack=rack0,isOfflineTagged=true,replicaCount=1]"); } @Test