Skip to content

Commit 413af29

Browse files
committed
Storage tiering. Create container on matching volume type
1 parent b79d034 commit 413af29

11 files changed

Lines changed: 478 additions & 12 deletions

File tree

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,7 @@ public static void createRecoveringContainer(XceiverClientSpi client,
556556
*/
557557
public static void createContainer(XceiverClientSpi client, long containerID,
558558
String encodedToken) throws IOException {
559-
createContainer(client, containerID, encodedToken, null, 0);
559+
createContainer(client, containerID, encodedToken, null, 0, null);
560560
}
561561

562562
/**
@@ -571,6 +571,24 @@ public static void createContainer(XceiverClientSpi client,
571571
long containerID, String encodedToken,
572572
ContainerProtos.ContainerDataProto.State state, int replicaIndex)
573573
throws IOException {
574+
createContainer(client, containerID, encodedToken, state, replicaIndex,
575+
null);
576+
}
577+
578+
/**
579+
* createContainer call that creates a container on the datanode.
580+
* @param client - client
581+
* @param containerID - ID of container
582+
* @param encodedToken - encodedToken if security is enabled
583+
* @param state - state of the container
584+
* @param replicaIndex - index position of the container replica
585+
* @param storageType - storage type for volume selection on the datanode
586+
*/
587+
public static void createContainer(XceiverClientSpi client,
588+
long containerID, String encodedToken,
589+
ContainerProtos.ContainerDataProto.State state, int replicaIndex,
590+
ContainerProtos.StorageTypeProto storageType)
591+
throws IOException {
574592
ContainerProtos.CreateContainerRequestProto.Builder createRequest =
575593
ContainerProtos.CreateContainerRequestProto.newBuilder();
576594
createRequest
@@ -581,6 +599,9 @@ public static void createContainer(XceiverClientSpi client,
581599
if (replicaIndex > 0) {
582600
createRequest.setReplicaIndex(replicaIndex);
583601
}
602+
if (storageType != null) {
603+
createRequest.setStorageType(storageType);
604+
}
584605

585606
String id = client.getPipeline().getFirstNode().getUuidString();
586607
ContainerCommandRequestProto.Builder request =

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import java.util.Set;
5454
import java.util.concurrent.TimeUnit;
5555
import java.util.concurrent.locks.ReentrantReadWriteLock;
56+
import java.util.stream.Collectors;
5657
import org.apache.commons.io.FileUtils;
5758
import org.apache.hadoop.fs.FileAlreadyExistsException;
5859
import org.apache.hadoop.fs.FileUtil;
@@ -148,6 +149,23 @@ public void setCheckChunksFilePath(boolean bCheckChunksDirFilePath) {
148149
@Override
149150
public void create(VolumeSet volumeSet, VolumeChoosingPolicy
150151
volumeChoosingPolicy, String clusterId) throws StorageContainerException {
152+
create(volumeSet, volumeChoosingPolicy, clusterId, null);
153+
}
154+
155+
/**
156+
* Creates a container, filtering volumes by the requested StorageType
157+
* before choosing a volume. If no volumes match the requested type,
158+
* falls back to all available volumes.
159+
*
160+
* @param volumeSet the set of available volumes
161+
* @param volumeChoosingPolicy policy for choosing among candidate volumes
162+
* @param clusterId the cluster ID
163+
* @param storageType the requested storage type, or null for no filtering
164+
*/
165+
public void create(VolumeSet volumeSet, VolumeChoosingPolicy
166+
volumeChoosingPolicy, String clusterId,
167+
org.apache.hadoop.hdds.protocol.StorageType storageType)
168+
throws StorageContainerException {
151169
Objects.requireNonNull(volumeChoosingPolicy, "VolumeChoosingPolicy == null");
152170
Objects.requireNonNull(volumeSet, "volumeSet == null");
153171
Objects.requireNonNull(clusterId, "clusterId == null");
@@ -159,6 +177,20 @@ public void create(VolumeSet volumeSet, VolumeChoosingPolicy
159177
try {
160178
List<HddsVolume> volumes
161179
= StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList());
180+
if (storageType != null) {
181+
org.apache.hadoop.fs.StorageType fsStorageType =
182+
org.apache.hadoop.fs.StorageType.valueOf(storageType.name());
183+
List<HddsVolume> filtered = volumes.stream()
184+
.filter(v -> v.getStorageType() == fsStorageType)
185+
.collect(Collectors.toList());
186+
if (!filtered.isEmpty()) {
187+
volumes = filtered;
188+
} else {
189+
LOG.warn("No volumes found with storage type {}, falling back to" +
190+
" all volumes for container {}", storageType,
191+
containerData.getContainerID());
192+
}
193+
}
162194
while (true) {
163195
HddsVolume containerVolume;
164196
String hddsVolumeDir;

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@
100100
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
101101
import org.apache.hadoop.hdds.conf.StorageUnit;
102102
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
103+
import org.apache.hadoop.hdds.protocol.StorageType;
103104
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
104105
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
105106
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
@@ -475,12 +476,20 @@ ContainerCommandResponseProto handleCreateContainer(
475476
KeyValueContainer newContainer = new KeyValueContainer(
476477
newContainerData, conf);
477478

479+
// Extract storageType for volume selection on heterogeneous nodes.
480+
StorageType requestedStorageType = null;
481+
if (request.getCreateContainer().hasStorageType()) {
482+
requestedStorageType = StorageType.valueOf(
483+
request.getCreateContainer().getStorageType().name());
484+
}
485+
478486
boolean created = false;
479487
Lock containerIdLock = containerCreationLocks.get(containerID);
480488
containerIdLock.lock();
481489
try {
482490
if (containerSet.getContainer(containerID) == null) {
483-
newContainer.create(volumeSet, volumeChoosingPolicy, clusterId);
491+
newContainer.create(volumeSet, volumeChoosingPolicy, clusterId,
492+
requestedStorageType);
484493
if (RECOVERING == newContainer.getContainerState()) {
485494
created = containerSet.addContainerByOverwriteMissingContainer(newContainer);
486495
} else {

hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,202 @@ private void testCreateContainer() throws StorageContainerException {
200200
"DB does not exist");
201201
}
202202

203+
@ContainerTestVersionInfo.ContainerTest
204+
public void testCreateContainerWithStorageTypeFiltering(
205+
ContainerTestVersionInfo versionInfo) throws Exception {
206+
init(versionInfo);
207+
208+
// Create two volumes: one SSD, one DISK
209+
File ssdDir = new File(folder, "ssd");
210+
File diskDir = new File(folder, "disk");
211+
assertTrue(ssdDir.mkdirs());
212+
assertTrue(diskDir.mkdirs());
213+
214+
HddsVolume ssdVolume = new HddsVolume.Builder(ssdDir.toString())
215+
.conf(CONF)
216+
.datanodeUuid(datanodeId.toString())
217+
.storageType(org.apache.hadoop.fs.StorageType.SSD)
218+
.build();
219+
HddsVolume diskVolume = new HddsVolume.Builder(diskDir.toString())
220+
.conf(CONF)
221+
.datanodeUuid(datanodeId.toString())
222+
.storageType(org.apache.hadoop.fs.StorageType.DISK)
223+
.build();
224+
225+
StorageVolumeUtil.checkVolume(ssdVolume, scmId, scmId, CONF, null, null);
226+
StorageVolumeUtil.checkVolume(diskVolume, scmId, scmId, CONF, null, null);
227+
228+
List<HddsVolume> mixedVolumes = new ArrayList<>();
229+
mixedVolumes.add(ssdVolume);
230+
mixedVolumes.add(diskVolume);
231+
232+
VolumeSet mixedVolumeSet = mock(MutableVolumeSet.class);
233+
when(mixedVolumeSet.getVolumesList())
234+
.thenAnswer(i -> mixedVolumes.stream()
235+
.map(v -> (StorageVolume) v)
236+
.collect(Collectors.toList()));
237+
238+
// volumeChoosingPolicy returns the first volume from the filtered list
239+
RoundRobinVolumeChoosingPolicy policy =
240+
mock(RoundRobinVolumeChoosingPolicy.class);
241+
when(policy.chooseVolume(anyList(), anyLong())).thenAnswer(
242+
invocation -> {
243+
List<HddsVolume> volumes = invocation.getArgument(0);
244+
return volumes.get(0);
245+
});
246+
247+
// Request SSD storage type - should only see ssdVolume
248+
KeyValueContainerData ssdContainerData = new KeyValueContainerData(100L,
249+
layout,
250+
(long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
251+
datanodeId.toString());
252+
KeyValueContainer ssdContainer =
253+
new KeyValueContainer(ssdContainerData, CONF);
254+
ssdContainer.create(mixedVolumeSet, policy, scmId,
255+
org.apache.hadoop.hdds.protocol.StorageType.SSD);
256+
257+
assertEquals(ssdVolume, ssdContainerData.getVolume());
258+
259+
// Request DISK storage type - should only see diskVolume
260+
KeyValueContainerData diskContainerData = new KeyValueContainerData(101L,
261+
layout,
262+
(long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
263+
datanodeId.toString());
264+
KeyValueContainer diskContainer =
265+
new KeyValueContainer(diskContainerData, CONF);
266+
diskContainer.create(mixedVolumeSet, policy, scmId,
267+
org.apache.hadoop.hdds.protocol.StorageType.DISK);
268+
269+
assertEquals(diskVolume, diskContainerData.getVolume());
270+
}
271+
272+
@ContainerTestVersionInfo.ContainerTest
273+
public void testCreateContainerWithStorageTypeFallback(
274+
ContainerTestVersionInfo versionInfo) throws Exception {
275+
init(versionInfo);
276+
277+
// Create only DISK volumes - no SSD available
278+
File diskDir = new File(folder, "diskonly");
279+
assertTrue(diskDir.mkdirs());
280+
281+
HddsVolume diskVolume = new HddsVolume.Builder(diskDir.toString())
282+
.conf(CONF)
283+
.datanodeUuid(datanodeId.toString())
284+
.storageType(org.apache.hadoop.fs.StorageType.DISK)
285+
.build();
286+
StorageVolumeUtil.checkVolume(diskVolume, scmId, scmId, CONF, null, null);
287+
288+
List<HddsVolume> diskOnlyVolumes = new ArrayList<>();
289+
diskOnlyVolumes.add(diskVolume);
290+
291+
VolumeSet diskOnlyVolumeSet = mock(MutableVolumeSet.class);
292+
when(diskOnlyVolumeSet.getVolumesList())
293+
.thenAnswer(i -> diskOnlyVolumes.stream()
294+
.map(v -> (StorageVolume) v)
295+
.collect(Collectors.toList()));
296+
297+
RoundRobinVolumeChoosingPolicy policy =
298+
mock(RoundRobinVolumeChoosingPolicy.class);
299+
when(policy.chooseVolume(anyList(), anyLong())).thenAnswer(
300+
invocation -> {
301+
List<HddsVolume> volumes = invocation.getArgument(0);
302+
return volumes.get(0);
303+
});
304+
305+
// Request SSD but only DISK is available - should fall back to DISK
306+
KeyValueContainerData fallbackData = new KeyValueContainerData(102L,
307+
layout,
308+
(long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
309+
datanodeId.toString());
310+
KeyValueContainer fallbackContainer =
311+
new KeyValueContainer(fallbackData, CONF);
312+
fallbackContainer.create(diskOnlyVolumeSet, policy, scmId,
313+
org.apache.hadoop.hdds.protocol.StorageType.SSD);
314+
315+
// Should succeed and use the DISK volume as fallback
316+
assertEquals(diskVolume, fallbackData.getVolume());
317+
}
318+
319+
@ContainerTestVersionInfo.ContainerTest
320+
public void testCreateContainerWithNullStorageType(
321+
ContainerTestVersionInfo versionInfo) throws Exception {
322+
init(versionInfo);
323+
// Null storageType should behave identically to the original create()
324+
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId, null);
325+
keyValueContainerData = keyValueContainer.getContainerData();
326+
327+
assertNotNull(keyValueContainerData.getMetadataPath());
328+
assertNotNull(keyValueContainerData.getChunksPath());
329+
assertTrue(keyValueContainer.getContainerFile().exists());
330+
assertTrue(keyValueContainer.getContainerDBFile().exists());
331+
}
332+
333+
@ContainerTestVersionInfo.ContainerTest
334+
public void testCreateContainerFilteringPassesOnlyMatchingVolumes(
335+
ContainerTestVersionInfo versionInfo) throws Exception {
336+
init(versionInfo);
337+
338+
// Create 2 SSD + 1 DISK volumes
339+
File ssd1Dir = new File(folder, "ssd1");
340+
File ssd2Dir = new File(folder, "ssd2");
341+
File diskDir = new File(folder, "disk2");
342+
assertTrue(ssd1Dir.mkdirs());
343+
assertTrue(ssd2Dir.mkdirs());
344+
assertTrue(diskDir.mkdirs());
345+
346+
HddsVolume ssd1 = new HddsVolume.Builder(ssd1Dir.toString())
347+
.conf(CONF).datanodeUuid(datanodeId.toString())
348+
.storageType(org.apache.hadoop.fs.StorageType.SSD).build();
349+
HddsVolume ssd2 = new HddsVolume.Builder(ssd2Dir.toString())
350+
.conf(CONF).datanodeUuid(datanodeId.toString())
351+
.storageType(org.apache.hadoop.fs.StorageType.SSD).build();
352+
HddsVolume disk = new HddsVolume.Builder(diskDir.toString())
353+
.conf(CONF).datanodeUuid(datanodeId.toString())
354+
.storageType(org.apache.hadoop.fs.StorageType.DISK).build();
355+
356+
StorageVolumeUtil.checkVolume(ssd1, scmId, scmId, CONF, null, null);
357+
StorageVolumeUtil.checkVolume(ssd2, scmId, scmId, CONF, null, null);
358+
StorageVolumeUtil.checkVolume(disk, scmId, scmId, CONF, null, null);
359+
360+
List<HddsVolume> allVolumes = new ArrayList<>();
361+
allVolumes.add(ssd1);
362+
allVolumes.add(ssd2);
363+
allVolumes.add(disk);
364+
365+
VolumeSet vs = mock(MutableVolumeSet.class);
366+
when(vs.getVolumesList())
367+
.thenAnswer(i -> allVolumes.stream()
368+
.map(v -> (StorageVolume) v)
369+
.collect(Collectors.toList()));
370+
371+
// Capture which volumes the policy actually sees
372+
List<List<HddsVolume>> capturedVolumeLists = new ArrayList<>();
373+
RoundRobinVolumeChoosingPolicy policy =
374+
mock(RoundRobinVolumeChoosingPolicy.class);
375+
when(policy.chooseVolume(anyList(), anyLong())).thenAnswer(
376+
invocation -> {
377+
List<HddsVolume> volumes = invocation.getArgument(0);
378+
capturedVolumeLists.add(new ArrayList<>(volumes));
379+
return volumes.get(0);
380+
});
381+
382+
KeyValueContainerData data = new KeyValueContainerData(200L,
383+
layout,
384+
(long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(),
385+
datanodeId.toString());
386+
KeyValueContainer container = new KeyValueContainer(data, CONF);
387+
container.create(vs, policy, scmId,
388+
org.apache.hadoop.hdds.protocol.StorageType.SSD);
389+
390+
// Policy should have received only the 2 SSD volumes, not the DISK one
391+
assertEquals(1, capturedVolumeLists.size());
392+
List<HddsVolume> receivedVolumes = capturedVolumeLists.get(0);
393+
assertEquals(2, receivedVolumes.size());
394+
assertTrue(receivedVolumes.contains(ssd1));
395+
assertTrue(receivedVolumes.contains(ssd2));
396+
assertFalse(receivedVolumes.contains(disk));
397+
}
398+
203399
/**
204400
* Tests repair of containers affected by the bug reported in HDDS-6235.
205401
*/

0 commit comments

Comments
 (0)