Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public class BlockOutputStream extends OutputStream {
= new AtomicReference<>();

private final BlockData.Builder containerBlockData;
private final ContainerProtos.StorageTypeProto storageType;
private volatile XceiverClientFactory xceiverClientFactory;
private XceiverClientSpi xceiverClient;
private OzoneClientConfig config;
Expand Down Expand Up @@ -177,7 +178,8 @@ public BlockOutputStream(
OzoneClientConfig config,
Token<? extends TokenIdentifier> token,
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
Supplier<ExecutorService> blockOutputStreamResourceProvider
Supplier<ExecutorService> blockOutputStreamResourceProvider,
ContainerProtos.StorageTypeProto storageType
) throws IOException {
this.xceiverClientFactory = xceiverClientManager;
this.config = config;
Expand All @@ -195,6 +197,10 @@ public BlockOutputStream(
if (replicationIndex > 0) {
blkIDBuilder.setReplicaIndex(replicationIndex);
}
if (storageType != null) {
blkIDBuilder.setStorageType(storageType);
}
this.storageType = storageType;
this.containerBlockData = BlockData.newBuilder().setBlockID(
blkIDBuilder.build()).addMetadata(keyValue);
this.pipeline = pipeline;
Expand Down Expand Up @@ -964,7 +970,8 @@ private CompletableFuture<PutBlockResult> writeChunkToContainer(
}

asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
blockID.get(), data, tokenString, replicationIndex, blockData, close);
blockID.get(), data, tokenString, replicationIndex, blockData, close,
storageType);
CompletableFuture<ContainerCommandResponseProto>
respFuture = asyncReply.getResponse();
validateFuture = respFuture.thenApplyAsync(e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,12 @@ public ECBlockOutputStream(
OzoneClientConfig config,
Token<? extends TokenIdentifier> token,
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
Supplier<ExecutorService> executorServiceSupplier
Supplier<ExecutorService> executorServiceSupplier,
ContainerProtos.StorageTypeProto storageType
) throws IOException {
super(blockID, -1, xceiverClientManager,
pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs, executorServiceSupplier);
pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs, executorServiceSupplier,
storageType);
// In EC stream, there will be only one node in pipeline.
this.datanodeDetails = pipeline.getClosestNode();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,12 @@ public RatisBlockOutputStream(
OzoneClientConfig config,
Token<? extends TokenIdentifier> token,
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
Supplier<ExecutorService> blockOutputStreamResourceProvider
Supplier<ExecutorService> blockOutputStreamResourceProvider,
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.StorageTypeProto storageType
) throws IOException {
super(blockID, blockSize, xceiverClientManager, pipeline,
bufferPool, config, token, clientMetrics, streamBufferArgs, blockOutputStreamResourceProvider);
bufferPool, config, token, clientMetrics, streamBufferArgs, blockOutputStreamResourceProvider,
storageType);
this.commitWatcher = new CommitWatcher(bufferPool, getXceiverClient());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ private BlockOutputStream createBlockOutputStream(BufferPool bufferPool)
null,
ContainerClientMetrics.acquire(),
streamBufferArgs,
() -> newFixedThreadPool(10));
() -> newFixedThreadPool(10),
null);
}

private ECBlockOutputStream createECBlockOutputStream(OzoneClientConfig clientConfig,
Expand All @@ -193,7 +194,7 @@ private ECBlockOutputStream createECBlockOutputStream(OzoneClientConfig clientCo
StreamBufferArgs.getDefaultStreamBufferArgs(repConfig, clientConfig);

return new ECBlockOutputStream(blockID, xcm, pipeline, BufferPool.empty(), clientConfig, null,
clientMetrics, streamBufferArgs, () -> newFixedThreadPool(2));
clientMetrics, streamBufferArgs, () -> newFixedThreadPool(2), null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@

package org.apache.hadoop.hdds.scm;

import org.apache.hadoop.hdds.protocol.StorageType;

/**
* The information of the request of pipeline.
*/
public final class PipelineRequestInformation {
private final long size;
private final StorageType storageType;

/**
* Builder for PipelineRequestInformation.
*/
public static class Builder {
private long size;
private StorageType storageType;

public static Builder getBuilder() {
return new Builder();
Expand All @@ -43,16 +47,26 @@ public Builder setSize(long sz) {
return this;
}

public Builder setStorageType(StorageType st) {
this.storageType = st;
return this;
}

public PipelineRequestInformation build() {
return new PipelineRequestInformation(size);
return new PipelineRequestInformation(size, storageType);
}
}

private PipelineRequestInformation(long size) {
private PipelineRequestInformation(long size, StorageType storageType) {
this.size = size;
this.storageType = storageType;
}

public long getSize() {
return size;
}

public StorageType getStorageType() {
return storageType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,11 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_PIPELINE_SCRUB_INTERVAL_DEFAULT =
"150s";

public static final String OZONE_SCM_PIPELINE_CREATION_STORAGE_TYPE_AWARE =
"ozone.scm.pipeline.creation.storage-type-aware.enabled";
public static final boolean
OZONE_SCM_PIPELINE_CREATION_STORAGE_TYPE_AWARE_DEFAULT = false;

// Allow SCM to auto create factor ONE ratis pipeline.
public static final String OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE =
"ozone.scm.pipeline.creation.auto.factor.one";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.utils.db.Codec;
Expand Down Expand Up @@ -87,6 +88,7 @@ public final class ContainerInfo implements Comparable<ContainerInfo> {
private long sequenceId;
// Health state of the container (determined by ReplicationManager)
private ContainerHealthState healthState;
private final StorageType storageType;

private ContainerInfo(Builder b) {
containerID = ContainerID.valueOf(b.containerID);
Expand All @@ -102,6 +104,7 @@ private ContainerInfo(Builder b) {
replicationConfig = b.replicationConfig;
clock = b.clock;
healthState = b.healthState != null ? b.healthState : ContainerHealthState.HEALTHY;
storageType = b.storageType;
}

public static Codec<ContainerInfo> getCodec() {
Expand All @@ -126,6 +129,9 @@ public static ContainerInfo fromProtobuf(HddsProtos.ContainerInfoProto info) {
if (info.hasPipelineID()) {
builder.setPipelineID(PipelineID.getFromProtobuf(info.getPipelineID()));
}
if (info.hasStorageType()) {
builder.setStorageType(StorageType.valueOf(info.getStorageType()));
}
return builder.build();

}
Expand Down Expand Up @@ -288,9 +294,17 @@ public HddsProtos.ContainerInfoProto getProtobuf() {
builder.setPipelineID(getPipelineID().getProtobuf());
}

if (storageType != null) {
builder.setStorageType(storageType.toProto());
}

return builder.build();
}

public StorageType getStorageType() {
return storageType;
}

public String getOwner() {
return owner;
}
Expand Down Expand Up @@ -390,6 +404,7 @@ public static class Builder {
private PipelineID pipelineID;
private ReplicationConfig replicationConfig;
private ContainerHealthState healthState;
private StorageType storageType;

public Builder setPipelineID(PipelineID pipelineId) {
this.pipelineID = pipelineId;
Expand Down Expand Up @@ -447,6 +462,11 @@ public Builder setHealthState(ContainerHealthState healthState) {
return this;
}

public Builder setStorageType(StorageType storageType) {
this.storageType = storageType;
return this;
}

/**
* Also resets {@code stateEnterTime}, so make sure to set clock first.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Objects;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
Expand All @@ -43,6 +44,8 @@ public class BlockLocationInfo {
// The block is under construction. Apply to hsynced file last block.
private boolean underConstruction;

private StorageType storageType;

protected BlockLocationInfo(Builder builder) {
this.blockID = builder.blockID;
this.pipeline = builder.pipeline;
Expand All @@ -51,6 +54,7 @@ protected BlockLocationInfo(Builder builder) {
this.token = builder.token;
this.partNumber = builder.partNumber;
this.createVersion = builder.createVersion;
this.storageType = builder.storageType;
}

public void setCreateVersion(long version) {
Expand Down Expand Up @@ -121,6 +125,14 @@ public boolean isUnderConstruction() {
return this.underConstruction;
}

public StorageType getStorageType() {
return storageType;
}

public void setStorageType(StorageType storageType) {
this.storageType = storageType;
}

/**
* Builder of BlockLocationInfo.
*/
Expand All @@ -132,6 +144,7 @@ public static class Builder {
private Pipeline pipeline;
private int partNumber;
private long createVersion;
private StorageType storageType;

public Builder setBlockID(BlockID blockId) {
this.blockID = blockId;
Expand Down Expand Up @@ -168,6 +181,11 @@ public Builder setCreateVersion(long version) {
return this;
}

public Builder setStorageType(StorageType storageType) {
this.storageType = storageType;
return this;
}

public BlockLocationInfo build() {
return new BlockLocationInfo(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,17 +436,21 @@ static long getLen(ReadChunkResponseProto response) {
public static XceiverClientReply writeChunkAsync(
XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID,
ByteString data, String tokenString,
int replicationIndex, BlockData blockData, boolean close)
int replicationIndex, BlockData blockData, boolean close,
ContainerProtos.StorageTypeProto storageType)
throws IOException, ExecutionException, InterruptedException {

DatanodeBlockID.Builder blkIDBuilder = DatanodeBlockID.newBuilder()
.setContainerID(blockID.getContainerID())
.setLocalID(blockID.getLocalID())
.setBlockCommitSequenceId(blockID.getBlockCommitSequenceId())
.setReplicaIndex(replicationIndex);
if (storageType != null) {
blkIDBuilder.setStorageType(storageType);
}
WriteChunkRequestProto.Builder writeChunkRequest =
WriteChunkRequestProto.newBuilder()
.setBlockID(DatanodeBlockID.newBuilder()
.setContainerID(blockID.getContainerID())
.setLocalID(blockID.getLocalID())
.setBlockCommitSequenceId(blockID.getBlockCommitSequenceId())
.setReplicaIndex(replicationIndex)
.build())
.setBlockID(blkIDBuilder.build())
.setChunkData(chunk)
.setData(data);
if (blockData != null) {
Expand Down Expand Up @@ -548,6 +552,15 @@ public static void createRecoveringContainer(XceiverClientSpi client,
ContainerProtos.ContainerDataProto.State.RECOVERING, replicaIndex);
}

@InterfaceStability.Evolving
public static void createRecoveringContainer(XceiverClientSpi client,
long containerID, String encodedToken, int replicaIndex,
ContainerProtos.StorageTypeProto storageType) throws IOException {
createContainer(client, containerID, encodedToken,
ContainerProtos.ContainerDataProto.State.RECOVERING, replicaIndex,
storageType);
}

/**
* createContainer call that creates a container on the datanode.
* @param client - client
Expand All @@ -556,7 +569,7 @@ public static void createRecoveringContainer(XceiverClientSpi client,
*/
public static void createContainer(XceiverClientSpi client, long containerID,
String encodedToken) throws IOException {
createContainer(client, containerID, encodedToken, null, 0);
createContainer(client, containerID, encodedToken, null, 0, null);
}

/**
Expand All @@ -571,6 +584,24 @@ public static void createContainer(XceiverClientSpi client,
long containerID, String encodedToken,
ContainerProtos.ContainerDataProto.State state, int replicaIndex)
throws IOException {
createContainer(client, containerID, encodedToken, state, replicaIndex,
null);
}

/**
* createContainer call that creates a container on the datanode.
* @param client - client
* @param containerID - ID of container
* @param encodedToken - encodedToken if security is enabled
* @param state - state of the container
* @param replicaIndex - index position of the container replica
* @param storageType - storage type for volume selection on the datanode
*/
public static void createContainer(XceiverClientSpi client,
long containerID, String encodedToken,
ContainerProtos.ContainerDataProto.State state, int replicaIndex,
ContainerProtos.StorageTypeProto storageType)
throws IOException {
ContainerProtos.CreateContainerRequestProto.Builder createRequest =
ContainerProtos.CreateContainerRequestProto.newBuilder();
createRequest
Expand All @@ -581,6 +612,9 @@ public static void createContainer(XceiverClientSpi client,
if (replicaIndex > 0) {
createRequest.setReplicaIndex(replicaIndex);
}
if (storageType != null) {
createRequest.setStorageType(storageType);
}

String id = client.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder request =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ public final class OzoneConsts {
public static final String MAX_NUM_OF_BUCKETS = "maxNumOfBuckets";
public static final String HAS_SNAPSHOT = "hasSnapshot";
public static final String STORAGE_TYPE = "storageType";
public static final String STORAGE_POLICY = "storagePolicy";
public static final String RESOURCE_TYPE = "resourceType";
public static final String IS_VERSION_ENABLED = "isVersionEnabled";
public static final String CREATION_TIME = "creationTime";
Expand Down
Loading