Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ public final class ContainerBalancerConfiguration {
"to exclude from balancing. For example \"1, 4, 5\" or \"1,4,5\".")
private String excludeContainers = "";

@Config(key = "hdds.container.balancer.include.containers", type = ConfigType.STRING, defaultValue =
"", tags = {ConfigTag.BALANCER}, description = "List of container IDs " +
"to include in balancing. Only these containers will be included in balancing. " +
"For example \"1, 4, 5\" or \"1,4,5\".")
private String includeContainers = "";

@Config(key = "hdds.container.balancer.move.timeout", type = ConfigType.TIME, defaultValue = "65m",
tags = {ConfigTag.BALANCER}, description =
"The amount of time to allow a single container to move " +
Expand Down Expand Up @@ -311,6 +317,17 @@ public Set<ContainerID> getExcludeContainers() {
}).collect(Collectors.toSet());
}

public Set<ContainerID> getIncludeContainers() {
if (includeContainers.isEmpty()) {
return new HashSet<>();
}
return Arrays.stream(includeContainers.split(","))
.map(s -> {
s = s.trim();
return ContainerID.valueOf(Long.parseLong(s));
}).collect(Collectors.toSet());
}

/**
* Sets containers to exclude from balancing.
* @param excludeContainers String of {@link ContainerID} to exclude. For
Expand All @@ -320,6 +337,16 @@ public void setExcludeContainers(String excludeContainers) {
this.excludeContainers = excludeContainers;
}

/**
* Sets containers to include in balancing. When non-empty, only these
* containers will be considered for balancing.
* @param includeContainers String of {@link ContainerID} to include. For
* example, "1, 4, 5" or "1,4,5".
*/
public void setIncludeContainers(String includeContainers) {
this.includeContainers = includeContainers;
}

public Duration getMoveTimeout() {
return Duration.ofMillis(moveTimeout);
}
Expand Down Expand Up @@ -422,6 +449,7 @@ public String toString() {
"%-50s %s%n" +
"%-50s %s%n" +
"%-50s %s%n" +
"%-50s %s%n" +
"%-50s %s%n", "Key", "Value", "Threshold",
threshold, "Max Datanodes to Involve per Iteration(percent)",
maxDatanodesPercentageToInvolvePerIteration,
Expand All @@ -443,6 +471,8 @@ public String toString() {
networkTopologyEnable,
"Whether to Trigger Refresh Datanode Usage Info",
triggerDuEnable,
"Container IDs to Include in Balancing",
includeContainers.isEmpty() ? "None" : includeContainers,
"Container IDs to Exclude from Balancing",
excludeContainers.equals("") ? "None" : excludeContainers,
"Datanodes Specified to be Balanced",
Expand All @@ -463,6 +493,7 @@ public ContainerBalancerConfigurationProto.Builder toProtobufBuilder() {
.setSizeLeavingSourceMax(maxSizeLeavingSource)
.setIterations(iterations)
.setExcludeContainers(excludeContainers)
.setIncludeContainers(includeContainers)
.setMoveTimeout(moveTimeout)
.setBalancingIterationInterval(balancingInterval)
.setIncludeDatanodes(includeNodes)
Expand Down Expand Up @@ -497,6 +528,9 @@ static ContainerBalancerConfiguration fromProtobuf(
if (proto.hasIterations()) {
config.setIterations(proto.getIterations());
}
if (proto.hasIncludeContainers()) {
config.setIncludeContainers(proto.getIncludeContainers());
}
if (proto.hasExcludeContainers()) {
config.setExcludeContainers(proto.getExcludeContainers());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,8 @@ StartContainerBalancerResponseProto startContainerBalancer(
Optional<Boolean> networkTopologyEnable,
Optional<String> includeNodes,
Optional<String> excludeNodes,
Optional<String> excludeContainers) throws IOException;
Optional<String> excludeContainers,
Optional<String> includeContainers) throws IOException;

/**
* Stop ContainerBalancer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,8 @@ StartContainerBalancerResponseProto startContainerBalancer(
Optional<Boolean> networkTopologyEnable,
Optional<String> includeNodes,
Optional<String> excludeNodes,
Optional<String> excludeContainers) throws IOException;
Optional<String> excludeContainers,
Optional<String> includeContainers) throws IOException;

/**
* Stop ContainerBalancer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,8 @@ public StartContainerBalancerResponseProto startContainerBalancer(
Optional<Boolean> networkTopologyEnable,
Optional<String> includeNodes,
Optional<String> excludeNodes,
Optional<String> excludeContainers) throws IOException {
Optional<String> excludeContainers,
Optional<String> includeContainers) throws IOException {
StartContainerBalancerRequestProto.Builder builder =
StartContainerBalancerRequestProto.newBuilder();
builder.setTraceID(TracingUtil.exportCurrentSpan());
Expand Down Expand Up @@ -1057,6 +1058,11 @@ public StartContainerBalancerResponseProto startContainerBalancer(
builder.setExcludeContainers(ec);
}

if (includeContainers.isPresent()) {
String ic = includeContainers.get();
builder.setIncludeContainers(ic);
}

StartContainerBalancerRequestProto request = builder.build();
return submitRequest(Type.StartContainerBalancer,
builder1 -> builder1.setStartContainerBalancerRequest(request))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,7 @@ message StartContainerBalancerRequestProto {
optional string includeNodes = 14;
optional string excludeNodes = 15;
optional string excludeContainers = 16;
optional string includeContainers = 17;
}

message StartContainerBalancerResponseProto {
Expand Down
1 change: 1 addition & 0 deletions hadoop-hdds/interface-client/src/main/proto/hdds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ message ContainerBalancerConfigurationProto {
required bool shouldRun = 18;
optional int32 nextIterationIndex = 19;
optional int64 moveReplicationTimeout = 20;
optional string includeContainers = 21;
}

message TransferLeadershipRequestProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class ContainerBalancerSelectionCriteria {
private ContainerManager containerManager;
private Map<ContainerID, DatanodeDetails> containerToSourceMap;
private Set<ContainerID> excludeContainers;
private Set<ContainerID> includeContainers;
private Set<ContainerID> excludeContainersDueToFailure;
private FindSourceStrategy findSourceStrategy;
private Map<DatanodeDetails, NavigableSet<ContainerID>> setMap;
Expand All @@ -72,6 +73,7 @@ public ContainerBalancerSelectionCriteria(
this.containerToSourceMap = containerToSourceMap;
excludeContainersDueToFailure = new HashSet<>();
excludeContainers = balancerConfiguration.getExcludeContainers();
includeContainers = balancerConfiguration.getIncludeContainers();
this.findSourceStrategy = findSourceStrategy;
this.setMap = new HashMap<>();
}
Expand Down Expand Up @@ -158,6 +160,10 @@ private Comparator<ContainerID> orderContainersByUsedBytes() {
public boolean shouldBeExcluded(ContainerID containerID,
DatanodeDetails node, long sizeMovedAlready) {
ContainerInfo container;
//If includeContainers is specified, exclude containers not in the include list
if (!includeContainers.isEmpty() && !includeContainers.contains(containerID)) {
return true;
}
try {
container = containerManager.getContainer(containerID);
} catch (ContainerNotFoundException e) {
Expand Down Expand Up @@ -263,6 +269,9 @@ private NavigableSet<ContainerID> getCandidateContainers(DatanodeDetails node) {
new TreeSet<>(orderContainersByUsedBytes().reversed());
try {
Set<ContainerID> idSet = nodeManager.getContainers(node);
if (includeContainers != null && !includeContainers.isEmpty()) {
idSet.retainAll(includeContainers);
}
if (excludeContainers != null) {
idSet.removeAll(excludeContainers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1146,6 +1146,7 @@ public StartContainerBalancerResponseProto startContainerBalancer(
Optional<String> includeNodes = Optional.empty();
Optional<String> excludeNodes = Optional.empty();
Optional<String> excludeContainers = Optional.empty();
Optional<String> includeContainers = Optional.empty();

if (request.hasThreshold()) {
threshold = Optional.of(request.getThreshold());
Expand Down Expand Up @@ -1210,12 +1211,16 @@ public StartContainerBalancerResponseProto startContainerBalancer(
excludeContainers = Optional.of(request.getExcludeContainers());
}

if (request.hasIncludeContainers()) {
includeContainers = Optional.of(request.getIncludeContainers());
}

return impl.startContainerBalancer(threshold, iterations,
maxDatanodesPercentageToInvolvePerIteration,
maxSizeToMovePerIterationInGB, maxSizeEnteringTargetInGB,
maxSizeLeavingSourceInGB, balancingInterval, moveTimeout,
moveReplicationTimeout, networkTopologyEnable, includeNodes,
excludeNodes, excludeContainers);
excludeNodes, excludeContainers, includeContainers);
}

public StopContainerBalancerResponseProto stopContainerBalancer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1161,7 +1161,8 @@ public StartContainerBalancerResponseProto startContainerBalancer(
Optional<Boolean> networkTopologyEnable,
Optional<String> includeNodes,
Optional<String> excludeNodes,
Optional<String> excludeContainers) throws IOException {
Optional<String> excludeContainers,
Optional<String> includeContainers) throws IOException {
Map<String, String> auditMap = Maps.newHashMap();
try {
getScm().checkAdminAccess(getRemoteUser(), false);
Expand Down Expand Up @@ -1277,6 +1278,12 @@ public StartContainerBalancerResponseProto startContainerBalancer(
cbc.setExcludeContainers(ec);
}

if (includeContainers.isPresent()) {
String ic = includeContainers.get();
auditMap.put("includeContainers", (ic));
cbc.setIncludeContainers(ic);
}

ContainerBalancer containerBalancer = scm.getContainerBalancer();
containerBalancer.startBalancer(cbc);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,23 @@ public void balancerShouldNotSelectConfiguredExcludeContainers(@Nonnull MockedSC
}
}

@ParameterizedTest(name = "MockedSCM #{index}: {0}")
@MethodSource("createMockedSCMs")
public void balancerShouldOnlySelectConfiguredIncludeContainers(@Nonnull MockedSCM mockedSCM) {
ContainerBalancerConfiguration config = new ContainerBalancerConfigBuilder(mockedSCM.getNodeCount()).build();
config.setIncludeContainers("1, 4, 5");

ContainerBalancerTask task = mockedSCM.startBalancerTask(config);

Set<ContainerID> includeContainers = config.getIncludeContainers();
assertThat(task.getContainerToSourceMap()).isNotEmpty();
for (ContainerID container : task.getContainerToSourceMap().keySet()) {
assertThat(includeContainers)
.as("Container %s should be in the include list", container)
.contains(container);
}
}

@ParameterizedTest(name = "MockedSCM #{index}: {0}")
@MethodSource("createMockedSCMs")
public void checkIterationResult(@Nonnull MockedSCM mockedSCM) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,14 @@ public class ContainerBalancerStartSubcommand extends ScmSubcommand {
"(specify \"1,2,3\" for container IDs).")
private Optional<String> excludeContainers;

@Option(names = {"--include-containers"},
description = "A list of container IDs separated by commas. " +
"Only the containers specified in this list will be included in balancing." +
" If --exclude-containers is also specified, those containers will " +
"be excluded. This configuration is empty by default " +
"(specify \"1,2,3\" for container IDs).")
private Optional<String> includeContainers;

@Override
public void execute(ScmClient scmClient) throws IOException {
StartContainerBalancerResponseProto response = scmClient.
Expand All @@ -134,7 +142,7 @@ public void execute(ScmClient scmClient) throws IOException {
maxSizeToMovePerIterationInGB, maxSizeEnteringTargetInGB,
maxSizeLeavingSourceInGB, balancingInterval, moveTimeout,
moveReplicationTimeout, networkTopologyEnable, includeNodes,
excludeNodes, excludeContainers);
excludeNodes, excludeContainers, includeContainers);
if (response.getStart()) {
System.out.println("Container Balancer started successfully.");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ String getConfigurationPrettyString(HddsProtos.ContainerBalancerConfigurationPro
"%-50s %s%n" +
"%-50s %s%n" +
"%-50s %s%n" +
"%-50s %s%n" +
"%-50s %s%n", "Key", "Value", "Threshold",
configuration.getUtilizationThreshold(), "Max Datanodes to Involve per Iteration(percent)",
configuration.getDatanodesInvolvedMaxPercentagePerIteration(),
Expand All @@ -143,6 +144,8 @@ String getConfigurationPrettyString(HddsProtos.ContainerBalancerConfigurationPro
configuration.getMoveNetworkTopologyEnable(),
"Whether to Trigger Refresh Datanode Usage Info",
configuration.getTriggerDuBeforeMoveEnable(),
"Container IDs to Include in Balancing",
configuration.getIncludeContainers().isEmpty() ? "None" : configuration.getIncludeContainers(),
"Container IDs to Exclude from Balancing",
configuration.getExcludeContainers().isEmpty() ? "None" : configuration.getExcludeContainers(),
"Datanodes Specified to be Balanced",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,13 +525,14 @@ public StartContainerBalancerResponseProto startContainerBalancer(
Optional<Boolean> networkTopologyEnable,
Optional<String> includeNodes,
Optional<String> excludeNodes,
Optional<String> excludeContainers) throws IOException {
Optional<String> excludeContainers,
Optional<String> includeContainers) throws IOException {
return storageContainerLocationClient.startContainerBalancer(threshold,
iterations, maxDatanodesPercentageToInvolvePerIteration,
maxSizeToMovePerIterationInGB, maxSizeEnteringTargetInGB,
maxSizeLeavingSourceInGB, balancingInterval, moveTimeout,
moveReplicationTimeout, networkTopologyEnable, includeNodes,
excludeNodes, excludeContainers);
excludeNodes, excludeContainers, includeContainers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class TestContainerBalancerSubCommand {
"Interval between each Iteration 0min\n" +
"Whether to Enable Network Topology false\n" +
"Whether to Trigger Refresh Datanode Usage Info false\n" +
"Container IDs to Include in Balancing None\n" +
"Container IDs to Exclude from Balancing None\n" +
"Datanodes Specified to be Balanced None\n" +
"Datanodes Excluded from Balancing None";
Expand Down Expand Up @@ -449,7 +450,7 @@ public void testContainerBalancerStartSubcommandWhenBalancerIsNotRunning()
throws IOException {
ScmClient scmClient = mock(ScmClient.class);
when(scmClient.startContainerBalancer(
null, null, null, null, null, null, null, null, null, null, null, null, null))
null, null, null, null, null, null, null, null, null, null, null, null, null, null))
.thenReturn(
StorageContainerLocationProtocolProtos
.StartContainerBalancerResponseProto.newBuilder()
Expand All @@ -465,7 +466,7 @@ public void testContainerBalancerStartSubcommandWhenBalancerIsRunning()
throws IOException {
ScmClient scmClient = mock(ScmClient.class);
when(scmClient.startContainerBalancer(
null, null, null, null, null, null, null, null, null, null, null, null, null))
null, null, null, null, null, null, null, null, null, null, null, null, null, null))
.thenReturn(StorageContainerLocationProtocolProtos
.StartContainerBalancerResponseProto.newBuilder()
.setStart(false)
Expand Down
Loading