Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerConfiguration;
import org.apache.hadoop.ozone.container.diskbalancer.policy.DiskBalancerVolumeChoosingPolicy;
import org.apache.ratis.util.ReflectionUtils;

/**
Expand All @@ -48,9 +46,10 @@ public static VolumeChoosingPolicy getPolicy(ConfigurationSource conf) {
return ReflectionUtils.newInstance(policyClass, new Class<?>[] {ReentrantLock.class}, LOCK);
}

public static DiskBalancerVolumeChoosingPolicy getDiskBalancerPolicy(ConfigurationSource conf) {
Class<?> policyClass = conf.getObject(DiskBalancerConfiguration.class).getVolumeChoosingPolicyClass();
return (DiskBalancerVolumeChoosingPolicy) ReflectionUtils.newInstance(
policyClass, new Class<?>[]{ReentrantLock.class}, LOCK);
/**
* Returns the shared lock used for volume space reservation.
*/
public static ReentrantLock getVolumeSpaceReservationLock() {
return LOCK;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.container.diskbalancer;

import static org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerConfiguration.HDDS_DATANODE_DISKBALANCER_CONTAINER_CHOOSING_POLICY;

import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory;
import org.apache.hadoop.ozone.container.diskbalancer.policy.ContainerChoosingPolicy;
import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy;
import org.apache.ratis.util.ReflectionUtils;

/**
* A factory to create {@link ContainerChoosingPolicy} instances for the DiskBalancer.
* The policy class is configured via
* {@link DiskBalancerConfiguration#getContainerChoosingPolicyClass()}.
*/
public final class ContainerChoosingPolicyFactory {

private static final Class<? extends ContainerChoosingPolicy>
DEFAULT_CONTAINER_CHOOSING_POLICY = DefaultContainerChoosingPolicy.class;

private ContainerChoosingPolicyFactory() {
}

/**
* Creates a ContainerChoosingPolicy instance from configuration.
*
* @param conf the configuration source
* @return a configured ContainerChoosingPolicy instance
*/
public static ContainerChoosingPolicy getDiskBalancerPolicy(ConfigurationSource conf) {
Class<? extends ContainerChoosingPolicy> policyClass = conf.getClass(
HDDS_DATANODE_DISKBALANCER_CONTAINER_CHOOSING_POLICY,
DEFAULT_CONTAINER_CHOOSING_POLICY, ContainerChoosingPolicy.class);
return ReflectionUtils.newInstance(policyClass, new Class<?>[]{ReentrantLock.class},
VolumeChoosingPolicyFactory.getVolumeSpaceReservationLock());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,15 @@ public final class DiskBalancerConfiguration {
)
private long diskBalancerTimeout = Duration.ofSeconds(300).toMillis();

@Config(key = "hdds.datanode.disk.balancer.volume.choosing.policy", type = ConfigType.CLASS,
defaultValue = "org.apache.hadoop.ozone.container.diskbalancer.policy" +
".DefaultVolumeChoosingPolicy",
tags = {ConfigTag.DISKBALANCER},
description = "The volume choosing policy of the disk balancer service.")
private Class<?> volumeChoosingPolicyClass;
public static final String HDDS_DATANODE_DISKBALANCER_CONTAINER_CHOOSING_POLICY =
"hdds.datanode.disk.balancer.container.choosing.policy";

@Config(key = "hdds.datanode.disk.balancer.container.choosing.policy", type = ConfigType.CLASS,
@Config(key = HDDS_DATANODE_DISKBALANCER_CONTAINER_CHOOSING_POLICY, type = ConfigType.CLASS,
defaultValue = "org.apache.hadoop.ozone.container.diskbalancer.policy" +
".DefaultContainerChoosingPolicy",
tags = {ConfigTag.DISKBALANCER},
description = "The container choosing policy of the disk balancer " +
"service.")
description = "The policy for selecting source/destination volumes and " +
"containers to move for disk balancing.")
private Class<?> containerChoosingPolicyClass;

@Config(key = "hdds.datanode.disk.balancer.stop.after.disk.even",
Expand Down Expand Up @@ -174,10 +170,6 @@ public void setDiskBalancerTimeout(Duration duration) {
this.diskBalancerTimeout = duration.toMillis();
}

public Class<?> getVolumeChoosingPolicyClass() {
return volumeChoosingPolicyClass;
}

public Class<?> getContainerChoosingPolicyClass() {
return containerChoosingPolicyClass;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,12 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.fs.SpaceUsageSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DiskBalancerRunningStatus;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.server.ServerUtils;
Expand All @@ -67,11 +64,10 @@
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory;
import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerVolumeCalculation.VolumeFixedUsage;
import org.apache.hadoop.ozone.container.diskbalancer.policy.ContainerCandidate;
import org.apache.hadoop.ozone.container.diskbalancer.policy.ContainerChoosingPolicy;
import org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy;
import org.apache.hadoop.ozone.container.diskbalancer.policy.DiskBalancerVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
Expand Down Expand Up @@ -126,12 +122,10 @@ public class DiskBalancerService extends BackgroundService {
private Map<HddsVolume, Long> deltaSizes;
private MutableVolumeSet volumeSet;

private DiskBalancerVolumeChoosingPolicy volumeChoosingPolicy;
private ContainerChoosingPolicy containerChoosingPolicy;
private ContainerChoosingPolicy volumeContainerChoosingPolicy;
private final File diskBalancerInfoFile;

private DiskBalancerServiceMetrics metrics;
private long containerDefaultSize;

public DiskBalancerService(OzoneContainer ozoneContainer,
long serviceCheckInterval, long serviceCheckTimeout, TimeUnit timeUnit,
Expand All @@ -148,15 +142,9 @@ public DiskBalancerService(OzoneContainer ozoneContainer,
inProgressContainers = ConcurrentHashMap.newKeySet();
deltaSizes = new ConcurrentHashMap<>();
volumeSet = ozoneContainer.getVolumeSet();
containerDefaultSize = (long) conf.getStorageSize(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);

try {
volumeChoosingPolicy = VolumeChoosingPolicyFactory.getDiskBalancerPolicy(conf);
containerChoosingPolicy = (ContainerChoosingPolicy)
conf.getObject(DiskBalancerConfiguration.class)
.getContainerChoosingPolicyClass().newInstance();
volumeContainerChoosingPolicy = ContainerChoosingPolicyFactory.getDiskBalancerPolicy(conf);
} catch (Exception e) {
LOG.error("Got exception when initializing DiskBalancerService", e);
throw new IOException(e);
Expand Down Expand Up @@ -405,25 +393,20 @@ public BackgroundTaskQueue getTasks() {
}

for (int i = 0; i < availableTaskCount; i++) {
Pair<HddsVolume, HddsVolume> pair = volumeChoosingPolicy
.chooseVolume(volumeSet, threshold, deltaSizes, containerDefaultSize);
if (pair == null) {
continue;
}
HddsVolume sourceVolume = pair.getLeft(), destVolume = pair.getRight();
ContainerData toBalanceContainer = containerChoosingPolicy
.chooseContainer(ozoneContainer, sourceVolume, destVolume,
inProgressContainers, threshold, volumeSet, deltaSizes);
if (toBalanceContainer != null) {
DiskBalancerTask task = new DiskBalancerTask(toBalanceContainer, sourceVolume,
destVolume);
queue.add(task);
inProgressContainers.add(ContainerID.valueOf(toBalanceContainer.getContainerID()));
deltaSizes.put(sourceVolume, deltaSizes.getOrDefault(sourceVolume, 0L)
- toBalanceContainer.getBytesUsed());
} else {
// release destVolume committed bytes
destVolume.incCommittedBytes(0 - containerDefaultSize);
ContainerCandidate candidate = volumeContainerChoosingPolicy.chooseVolumesAndContainer(
ozoneContainer, volumeSet, deltaSizes, inProgressContainers, threshold);
if (candidate != null) {
HddsVolume sourceVolume = candidate.getSourceVolume();
HddsVolume destVolume = candidate.getDestVolume();
ContainerData toBalanceContainer = candidate.getContainerData();
if (toBalanceContainer != null) {
DiskBalancerTask task = new DiskBalancerTask(toBalanceContainer, sourceVolume,
destVolume);
queue.add(task);
inProgressContainers.add(ContainerID.valueOf(toBalanceContainer.getContainerID()));
deltaSizes.put(sourceVolume, deltaSizes.getOrDefault(sourceVolume, 0L)
- toBalanceContainer.getBytesUsed());
}
}
}

Expand Down Expand Up @@ -640,7 +623,7 @@ private void postCall(boolean success, long startTime) {
inProgressContainers.remove(ContainerID.valueOf(containerData.getContainerID()));
deltaSizes.put(sourceVolume, deltaSizes.get(sourceVolume) +
containerData.getBytesUsed());
destVolume.incCommittedBytes(0 - containerDefaultSize);
destVolume.incCommittedBytes(0 - containerData.getBytesUsed());
long endTime = Time.monotonicNow();
if (success) {
metrics.incrSuccessCount(1);
Expand Down Expand Up @@ -749,22 +732,13 @@ public void setBalancedBytesInLastWindow(long bytes) {
this.balancedBytesInLastWindow.set(bytes);
}

public ContainerChoosingPolicy getContainerChoosingPolicy() {
return containerChoosingPolicy;
}

public DiskBalancerVolumeChoosingPolicy getVolumeChoosingPolicy() {
return volumeChoosingPolicy;
}

@VisibleForTesting
public void setVolumeChoosingPolicy(DiskBalancerVolumeChoosingPolicy volumeChoosingPolicy) {
this.volumeChoosingPolicy = volumeChoosingPolicy;
public ContainerChoosingPolicy getVolumeContainerChoosingPolicy() {
return volumeContainerChoosingPolicy;
}

@VisibleForTesting
public void setContainerChoosingPolicy(ContainerChoosingPolicy containerChoosingPolicy) {
this.containerChoosingPolicy = containerChoosingPolicy;
public void setVolumeContainerChoosingPolicy(ContainerChoosingPolicy volumeContainerChoosingPolicy) {
this.volumeContainerChoosingPolicy = volumeContainerChoosingPolicy;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,34 @@

package org.apache.hadoop.ozone.container.diskbalancer.policy;

import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;

/**
* This interface specifies the policy for choosing volumes to balance.
* Result of consolidated volume and container selection for disk balancing.
* Contains the container to move and its source and destination volumes.
*/
public interface DiskBalancerVolumeChoosingPolicy {
/**
* Choose a pair of volumes for balancing.
*
* @param volumeSet - volumes to choose from.
* @param thresholdPercentage the threshold percentage in range (0, 100) to choose the source volume.
* @param deltaSizes - the sizes changes of inProgress balancing jobs.
* @param containerSize - the estimated size of container to be moved.
* @return Source volume and Dest volume.
*/
Pair<HddsVolume, HddsVolume> chooseVolume(MutableVolumeSet volumeSet,
double thresholdPercentage, Map<HddsVolume, Long> deltaSizes, long containerSize);
public final class ContainerCandidate {
private final ContainerData containerData;
private final HddsVolume sourceVolume;
private final HddsVolume destVolume;

public ContainerCandidate(ContainerData containerData,
HddsVolume sourceVolume, HddsVolume destVolume) {
this.containerData = containerData;
this.sourceVolume = sourceVolume;
this.destVolume = destVolume;
}

public ContainerData getContainerData() {
return containerData;
}

public HddsVolume getSourceVolume() {
return sourceVolume;
}

public HddsVolume getDestVolume() {
return destVolume;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,33 @@
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;

/**
* This interface specifies the policy for choosing containers to balance.
* This interface specifies the policy for choosing volumes and containers to balance.
* It provides consolidated volume selection (source/destination pair) and container selection
* into a single operation to avoid recalculating ideal utilization and disk usage.
*/
public interface ContainerChoosingPolicy {
/**
* Choose a container for balancing.
* @param ozoneContainer the OzoneContainer instance to get all containers of a particular volume.
* @param srcVolume the HddsVolume instance to choose containers from.
* @param destVolume the destination volume to which container is being moved.
* @param inProgressContainerIDs containerIDs present in this set should be
- avoided as these containers are already under move by diskBalancer.
* @param thresholdPercentage the threshold percentage in range (0, 100)
* Choose a container and its source/destination volumes for balancing.
* Performs both volume pair selection and container selection in one call,
* computing ideal usage and volume utilizations only once.
* Space is reserved on the destination only when a container is chosen,
* using the actual container size.
*
* @param ozoneContainer the OzoneContainer instance to get all containers
* @param volumeSet the volumeSet instance
* @param deltaMap the deltaMap instance of source volume
* @return a Container
* @param deltaMap the deltaMap for in-progress balancing jobs (negative = space to be freed)
* @param inProgressContainerIDs containerIDs to avoid (already under move)
* @param thresholdPercentage the threshold percentage in range (0, 100)
* @return a DiskBalancerVolumeContainerCandidate with container and volumes, or null if none found
*/
ContainerData chooseContainer(OzoneContainer ozoneContainer,
HddsVolume srcVolume, HddsVolume destVolume,
ContainerCandidate chooseVolumesAndContainer(OzoneContainer ozoneContainer,
MutableVolumeSet volumeSet,
Map<HddsVolume, Long> deltaMap,
Set<ContainerID> inProgressContainerIDs,
double thresholdPercentage, MutableVolumeSet volumeSet,
Map<HddsVolume, Long> deltaMap);
double thresholdPercentage);
}
Loading