Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,22 @@ public enum GoalType {
* Goal to generate leadership movement and leader replica movement tasks to ensure that the
* number of leader replicas on each tabletServer is near balanced.
*/
LEADER_DISTRIBUTION(1);
LEADER_DISTRIBUTION(1),

/**
* Goal to generate replica movement tasks to ensure that the number of replicas on each
* tabletServer is near balanced and the replicas are distributed across racks.
*/
RACK_AWARE(2),

/**
* Goal to generate replica movement tasks to ensure that the number of replicas on each
* tabletServer is near balanced and the replicas are distributed across racks. This is a
* relaxed version of RACK_AWARE goal. Contrary to RACK_AWARE goal, as long as replicas of each
* bucket can achieve a perfectly even distribution across the racks, this goal lets placement
* of multiple replicas of a bucket into a single rack.
*/
RACK_AWARE_DISTRIBUTION(3);

public final int value;

Expand All @@ -51,6 +66,10 @@ public static GoalType valueOf(int value) {
return REPLICA_DISTRIBUTION;
} else if (value == LEADER_DISTRIBUTION.value) {
return LEADER_DISTRIBUTION;
} else if (value == RACK_AWARE.value) {
return RACK_AWARE;
} else if (value == RACK_AWARE_DISTRIBUTION.value) {
return RACK_AWARE_DISTRIBUTION;
} else {
throw new IllegalArgumentException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.fluss.cluster.rebalance.GoalType;
import org.apache.fluss.server.coordinator.rebalance.ActionType;
import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
import org.apache.fluss.server.coordinator.rebalance.model.ClusterModelStats;
import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel;
import org.apache.fluss.server.coordinator.rebalance.model.ServerModel;

Expand All @@ -36,6 +37,10 @@ public static Goal getGoalByType(GoalType goalType) {
return new ReplicaDistributionGoal();
case LEADER_DISTRIBUTION:
return new LeaderReplicaDistributionGoal();
case RACK_AWARE:
return new RackAwareGoal();
case RACK_AWARE_DISTRIBUTION:
return new RackAwareDistributionGoal();
default:
throw new IllegalArgumentException("Unsupported goal type " + goalType);
}
Expand Down Expand Up @@ -76,4 +81,19 @@ public static Set<Integer> aliveServers(ClusterModel cluster) {
.map(ServerModel::id)
.collect(Collectors.toCollection(HashSet::new));
}

/** A convenience {@link Goal.ClusterModelStatsComparator} for typical hard goals. */
public static class HardGoalStatsComparator implements Goal.ClusterModelStatsComparator {
@Override
public int compare(ClusterModelStats stats1, ClusterModelStats stats2) {
// Stats are irrelevant to a hard goal. The optimization would already fail if the goal
// requirements are not met.
return 0;
}

@Override
public String explainLastComparison() {
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.server.coordinator.rebalance.goal;

import org.apache.fluss.exception.RebalanceFailureException;
import org.apache.fluss.server.coordinator.rebalance.ActionAcceptance;
import org.apache.fluss.server.coordinator.rebalance.ActionType;
import org.apache.fluss.server.coordinator.rebalance.RebalancingAction;
import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel;
import org.apache.fluss.server.coordinator.rebalance.model.ServerModel;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Set;
import java.util.SortedSet;

import static org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.ACCEPT;
import static org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.SERVER_REJECT;

/** An abstract class for rack-aware goals. */
public abstract class RackAwareAbstractGoal extends AbstractGoal {
private static final Logger LOG = LoggerFactory.getLogger(RackAwareAbstractGoal.class);

@Override
public ClusterModelStatsComparator clusterModelStatsComparator() {
return new GoalUtils.HardGoalStatsComparator();
}

@Override
protected boolean selfSatisfied(ClusterModel clusterModel, RebalancingAction action) {
return true;
}

/**
* Check whether the given action is acceptable by this goal. The following actions are
* acceptable:
*
* <ul>
* <li>All leadership moves
* <li>Replica moves that do not violate {@link
* #doesReplicaMoveViolateActionAcceptance(ClusterModel, ReplicaModel, ServerModel)}
* </ul>
*
* @param action Action to be checked for acceptance.
* @param clusterModel The state of the cluster.
* @return {@link ActionAcceptance#ACCEPT} if the action is acceptable by this goal, {@link
* ActionAcceptance#SERVER_REJECT} if the action is rejected due to violating rack awareness
* in the destination broker after moving source replica to destination broker.
*/
@Override
public ActionAcceptance actionAcceptance(RebalancingAction action, ClusterModel clusterModel) {
switch (action.getActionType()) {
case LEADERSHIP_MOVEMENT:
return ACCEPT;
case REPLICA_MOVEMENT:
if (doesReplicaMoveViolateActionAcceptance(
clusterModel,
clusterModel
.server(action.getSourceServerId())
.replica(action.getTableBucket()),
clusterModel.server(action.getDestinationServerId()))) {
return SERVER_REJECT;
}
return ACCEPT;
default:
throw new IllegalArgumentException(
"Unsupported rebalance action " + action.getActionType() + " is provided.");
}
}

/**
* Check whether the given replica move would violate the action acceptance for this rack aware
* goal.
*
* @param clusterModel The state of the cluster.
* @param sourceReplica Source replica
* @param destServer Destination server to receive the given source replica.
* @return {@code true} if the given replica move would violate action acceptance (i.e. the move
* is not acceptable), {@code false} otherwise.
*/
protected abstract boolean doesReplicaMoveViolateActionAcceptance(
ClusterModel clusterModel, ReplicaModel sourceReplica, ServerModel destServer);

/**
* Rebalance the given serverModel without violating the constraints of this rack aware goal and
* optimized goals.
*
* @param serverModel Server to be balanced.
* @param clusterModel The state of the cluster.
* @param optimizedGoals Optimized goals.
* @param throwExceptionIfCannotMove {@code true} to throw an {@link RebalanceFailureException}
* in case a required balancing action for a replica fails for all rack-aware eligible
* servers, {@code false} to just log the failure and return. This parameter enables
* selected goals fail early in case the un-satisfiability of a goal can be determined
* early.
*/
protected void rebalanceForServer(
ServerModel serverModel,
ClusterModel clusterModel,
Set<Goal> optimizedGoals,
boolean throwExceptionIfCannotMove)
throws RebalanceFailureException {
// TODO maybe use a sorted replicas set
for (ReplicaModel replica : serverModel.replicas()) {
if (!serverModel.isOfflineTagged()
&& shouldKeepInTheCurrentServer(replica, clusterModel)) {
continue;
}
// The relevant rack awareness condition is violated. Move replica to an eligible
// serverModel
SortedSet<ServerModel> eligibleServers =
rackAwareEligibleServers(replica, clusterModel);
if (maybeApplyBalancingAction(
clusterModel,
replica,
eligibleServers,
ActionType.REPLICA_MOVEMENT,
optimizedGoals)
== null) {
if (throwExceptionIfCannotMove) {
throw new RebalanceFailureException(
String.format(
"[%s] Cannot move %s to %s.",
name(), replica, eligibleServers));
}
LOG.debug(
"Cannot move replica {} to any serverModel in {}",
replica,
eligibleServers);
}
}
}

/**
* Check whether the given alive replica should stay in the current server or be moved to
* another server to satisfy the specific requirements of the rack aware goal in the given
* cluster state.
*
* @param replica An alive replica to check whether it should stay in the current server.
* @param clusterModel The state of the cluster.
* @return {@code true} if the given alive replica should stay in the current server, {@code
* false} otherwise.
*/
protected abstract boolean shouldKeepInTheCurrentServer(
ReplicaModel replica, ClusterModel clusterModel);

/**
* Get a list of eligible brokers for moving the given replica in the given cluster to satisfy
* the specific requirements of the custom rack aware goal.
*
* @param replica Replica for which a set of rack aware eligible servers are requested.
* @param clusterModel The state of the cluster.
* @return A list of rack aware eligible servers for the given replica in the given cluster.
*/
protected abstract SortedSet<ServerModel> rackAwareEligibleServers(
ReplicaModel replica, ClusterModel clusterModel);
}
Loading