Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public enum OzoneManagerVersion implements ComponentVersion {

ATOMIC_CREATE_IF_NOT_EXISTS(12,
"OzoneManager version that supports explicit create-if-not-exists key semantics"),

S3_BYTE_RANGE_GET(13,
"OzoneManager version that supports S3 byte range GET key location filtering"),

FUTURE_VERSION(-1, "Used internally in the client when the server side is "
+ " newer and an unknown server version has arrived to the client.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.client.protocol;

import jakarta.annotation.Nonnull;
import java.io.EOFException;
import java.io.IOException;
import java.net.URI;
import java.util.List;
Expand Down Expand Up @@ -178,6 +179,49 @@ OzoneKeyDetails getS3KeyDetails(String bucketName, String keyName,
int partNumber)
throws IOException;

/**
* Get OzoneKey in S3 context with content positioned at the requested
* byte range start.
* @param bucketName Name of the Bucket
* @param keyName Key name
* @param partNumber Multipart-upload part number, or zero for the whole key
* @param startOffset inclusive byte range start offset
* @param endOffset inclusive byte range end offset
* @return {@link OzoneKey}
* @throws IOException
*/
default OzoneKeyDetails getS3KeyDetails(String bucketName, String keyName,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why there is a implementation?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should sit in ClientProtocolStub.

int partNumber, long startOffset, long endOffset)
throws IOException {
OzoneKeyDetails keyDetails = partNumber == 0
? getS3KeyDetails(bucketName, keyName)
: getS3KeyDetails(bucketName, keyName, partNumber);
return new OzoneKeyDetails(
keyDetails.getVolumeName(), keyDetails.getBucketName(),
keyDetails.getName(), keyDetails.getDataSize(),
keyDetails.getCreationTime().toEpochMilli(),
keyDetails.getModificationTime().toEpochMilli(),
keyDetails.getOzoneKeyLocations(),
keyDetails.getReplicationConfig(), keyDetails.getMetadata(),
keyDetails.getFileEncryptionInfo(), () -> {
OzoneInputStream stream = keyDetails.getContent();
long remaining = startOffset;
while (remaining > 0) {
long skipped = stream.skip(remaining);
if (skipped <= 0) {
if (stream.read() == -1) {
throw new EOFException("Unable to skip to S3 byte range "
+ "start offset " + startOffset + " for key " + keyName);
}
skipped = 1;
}
remaining -= skipped;
}
return stream;
}, keyDetails.isFile(), keyDetails.getOwner(), keyDetails.getTags(),
keyDetails.getGeneration());
}

OzoneVolume buildOzoneVolume(OmVolumeArgs volume);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1880,6 +1880,36 @@ public OzoneKeyDetails getS3KeyDetails(String bucketName, String keyName,
return getOzoneKeyDetails(keyInfo);
}

@Override
public OzoneKeyDetails getS3KeyDetails(String bucketName, String keyName,
int partNumber, long startOffset, long endOffset) throws IOException {
OmKeyInfo keyInfo;
if (omVersion.compareTo(OzoneManagerVersion.S3_BYTE_RANGE_GET) >= 0) {
keyInfo = getS3RangedKeyInfo(bucketName, keyName, partNumber,
startOffset, endOffset);
} else if (partNumber != 0) {
if (omVersion.compareTo(OzoneManagerVersion.S3_PART_AWARE_GET) >= 0) {
keyInfo = getS3PartKeyInfo(bucketName, keyName, partNumber);
} else {
keyInfo = getS3KeyInfo(bucketName, keyName, false);
List<OmKeyLocationInfo> filteredKeyLocationInfo = keyInfo
.getLatestVersionLocations().getBlocksLatestVersionOnly().stream()
.filter(omKeyLocationInfo -> omKeyLocationInfo.getPartNumber() ==
partNumber)
.collect(Collectors.toList());
keyInfo.updateLocationInfoList(filteredKeyLocationInfo, true, true);
keyInfo.setDataSize(filteredKeyLocationInfo.stream()
.mapToLong(OmKeyLocationInfo::getLength)
.sum());
}
keyInfo.setByteRangeStartOffset(startOffset);
} else {
keyInfo = getS3KeyInfo(bucketName, keyName, false);
keyInfo.setByteRangeStartOffset(startOffset);
}
return getOzoneKeyDetails(keyInfo);
}

@Nonnull
private OmKeyInfo getS3KeyInfo(
String bucketName, String keyName, boolean isHeadOp) throws IOException {
Expand Down Expand Up @@ -1926,6 +1956,32 @@ private OmKeyInfo getS3PartKeyInfo(
return keyInfoWithS3Context.getKeyInfo();
}

@Nonnull
private OmKeyInfo getS3RangedKeyInfo(
String bucketName, String keyName, int partNumber, long startOffset,
long endOffset) throws IOException {
verifyBucketName(bucketName);
Objects.requireNonNull(keyName, "keyName == null");

OmKeyArgs.Builder keyArgs = new OmKeyArgs.Builder()
// Volume name is not important, as we call GetKeyInfo with
// assumeS3Context = true, OM will infer the correct s3 volume.
.setVolumeName(OzoneConfigKeys.OZONE_S3_VOLUME_NAME_DEFAULT)
.setBucketName(bucketName)
.setKeyName(keyName)
.setSortDatanodesInPipeline(topologyAwareReadEnabled)
.setLatestVersionLocation(getLatestVersionLocation)
.setForceUpdateContainerCacheFromSCM(false)
.setByteRange(startOffset, endOffset);
if (partNumber != 0) {
keyArgs.setMultipartUploadPartNumber(partNumber);
}
KeyInfoWithVolumeContext keyInfoWithS3Context =
ozoneManagerClient.getKeyInfo(keyArgs.build(), true);
keyInfoWithS3Context.getUserPrincipal().ifPresent(this::updateS3Principal);
return keyInfoWithS3Context.getKeyInfo();
}

@Override
public OmKeyInfo getKeyInfo(
String volumeName, String bucketName, String keyName,
Expand Down Expand Up @@ -2299,7 +2355,7 @@ public OzoneOutputStream createFile(String volumeName, String bucketName,
*/
private OzoneInputStream getInputStreamWithRetryFunction(
OmKeyInfo keyInfo) throws IOException {
return createInputStream(keyInfo, omKeyInfo -> {
OzoneInputStream inputStream = createInputStream(keyInfo, omKeyInfo -> {
try {
return getKeyInfo(omKeyInfo.getVolumeName(), omKeyInfo.getBucketName(),
omKeyInfo.getKeyName(), true);
Expand All @@ -2308,6 +2364,10 @@ private OzoneInputStream getInputStreamWithRetryFunction(
return null;
}
});
if (keyInfo.getByteRangeStartOffset() > 0) {
inputStream.seek(keyInfo.getByteRangeStartOffset());
}
return inputStream;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public final class OmKeyArgs extends WithMetadata implements Auditable {
// been modified.
private Long expectedDataGeneration = null;
private final String expectedETag;
private final Long byteRangeStart;
private final Long byteRangeEnd;

private OmKeyArgs(Builder b) {
super(b);
Expand All @@ -84,6 +86,8 @@ private OmKeyArgs(Builder b) {
this.tags = b.tags.build();
this.expectedDataGeneration = b.expectedDataGeneration;
this.expectedETag = b.expectedETag;
this.byteRangeStart = b.byteRangeStart;
this.byteRangeEnd = b.byteRangeEnd;
}

public boolean getIsMultipartKey() {
Expand Down Expand Up @@ -170,6 +174,18 @@ public String getExpectedETag() {
return expectedETag;
}

public boolean hasByteRange() {
return byteRangeStart != null && byteRangeEnd != null;
}

public long getByteRangeStart() {
return byteRangeStart;
}

public long getByteRangeEnd() {
return byteRangeEnd;
}

@Override
public Map<String, String> toAuditMap() {
Map<String, String> auditMap = new LinkedHashMap<>();
Expand Down Expand Up @@ -218,6 +234,10 @@ public KeyArgs toProtobuf() {
if (expectedETag != null) {
builder.setExpectedETag(expectedETag);
}
if (hasByteRange()) {
builder.setByteRangeStart(byteRangeStart);
builder.setByteRangeEnd(byteRangeEnd);
}
return builder.build();
}

Expand All @@ -244,6 +264,8 @@ public static class Builder extends WithMetadata.Builder {
private final MapBuilder<String, String> tags;
private Long expectedDataGeneration = null;
private String expectedETag;
private Long byteRangeStart;
private Long byteRangeEnd;

public Builder() {
this(AclListBuilder.empty());
Expand Down Expand Up @@ -290,6 +312,8 @@ public Builder(OmKeyArgs obj) {
obj.forceUpdateContainerCacheFromSCM;
this.expectedDataGeneration = obj.expectedDataGeneration;
this.expectedETag = obj.expectedETag;
this.byteRangeStart = obj.byteRangeStart;
this.byteRangeEnd = obj.byteRangeEnd;
this.tags = MapBuilder.of(obj.tags);
this.acls = AclListBuilder.of(obj.acls);
}
Expand Down Expand Up @@ -365,6 +389,12 @@ public Builder setMultipartUploadPartNumber(int multipartUploadPartNumber) {
return this;
}

public Builder setByteRange(long start, long end) {
this.byteRangeStart = start;
this.byteRangeEnd = end;
return this;
}

@Override
public Builder addMetadata(String key, String value) {
super.addMetadata(key, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public final class OmKeyInfo extends WithParentObjectId
// been modified.
private Long expectedDataGeneration = null;
private String expectedETag;
private long byteRangeStartOffset;

private OmKeyInfo(Builder b) {
super(b);
Expand All @@ -131,6 +132,7 @@ private OmKeyInfo(Builder b) {
this.tags = b.tags.build();
this.expectedDataGeneration = b.expectedDataGeneration;
this.expectedETag = b.expectedETag;
this.byteRangeStartOffset = b.byteRangeStartOffset;
}

private static Codec<OmKeyInfo> newCodec(boolean ignorePipeline) {
Expand Down Expand Up @@ -199,6 +201,14 @@ public String getExpectedETag() {
return expectedETag;
}

public long getByteRangeStartOffset() {
return byteRangeStartOffset;
}

public void setByteRangeStartOffset(long offset) {
this.byteRangeStartOffset = offset;
}

public String getOwnerName() {
return ownerName;
}
Expand Down Expand Up @@ -503,6 +513,7 @@ public static class Builder extends WithParentObjectId.Builder<OmKeyInfo> {
private final MapBuilder<String, String> tags;
private Long expectedDataGeneration = null;
private String expectedETag;
private long byteRangeStartOffset;

public Builder() {
this.acls = AclListBuilder.empty();
Expand All @@ -526,6 +537,7 @@ public Builder(OmKeyInfo obj) {
this.isFile = obj.isFile;
this.expectedDataGeneration = obj.expectedDataGeneration;
this.expectedETag = obj.expectedETag;
this.byteRangeStartOffset = obj.byteRangeStartOffset;
this.tags = MapBuilder.of(obj.tags);
obj.keyLocationVersions.forEach(keyLocationVersion ->
this.omKeyLocationInfoGroups.add(
Expand Down Expand Up @@ -702,6 +714,11 @@ public Builder setExpectedETag(String eTag) {
return this;
}

public Builder setByteRangeStartOffset(long offset) {
this.byteRangeStartOffset = offset;
return this;
}

@Override
protected void validate() {
super.validate();
Expand Down Expand Up @@ -824,6 +841,9 @@ private KeyInfo getProtobuf(boolean ignorePipeline, String fullKeyName,
if (expectedETag != null) {
kb.setExpectedETag(expectedETag);
}
if (byteRangeStartOffset > 0) {
kb.setByteRangeStartOffset(byteRangeStartOffset);
}
if (ownerName != null) {
kb.setOwnerName(ownerName);
}
Expand Down Expand Up @@ -880,6 +900,9 @@ public static Builder builderFromProtobuf(KeyInfo keyInfo) {
if (keyInfo.hasExpectedETag()) {
builder.setExpectedETag(keyInfo.getExpectedETag());
}
if (keyInfo.hasByteRangeStartOffset()) {
builder.setByteRangeStartOffset(keyInfo.getByteRangeStartOffset());
}

if (keyInfo.hasOwnerName()) {
builder.setOwnerName(keyInfo.getOwnerName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,13 @@ Test Multipart Upload Complete
${result} = Execute AWSS3ApiCli get-object --bucket ${BUCKET} --key ${PREFIX}/multipartKey1 --part-number 2 /tmp/${PREFIX}-multipartKey1-part2.result
Compare files /tmp/part2 /tmp/${PREFIX}-multipartKey1-part2.result

${result} = Execute AWSS3ApiCli get-object --bucket ${BUCKET} --key ${PREFIX}/multipartKey1 --part-number 2 --range bytes=2-4 /tmp/${PREFIX}-multipartKey1-part2-range.result
Should contain ${result} ContentRange
Should contain ${result} bytes 2-4/1047552
Should contain ${result} AcceptRanges
Execute dd if=/tmp/part2 of=/tmp/${PREFIX}-multipartKey1-part2-range.expected skip=2 bs=1 count=3 2>/dev/null
Compare files /tmp/${PREFIX}-multipartKey1-part2-range.expected /tmp/${PREFIX}-multipartKey1-part2-range.result

Test Multipart Upload with user defined metadata size larger than 2 KB
${custom_metadata_value} = Generate Random String 3000
${result} = Initiate MPU ${BUCKET} ${PREFIX}/mpuWithLargeMetadata 255 --metadata="custom-key1=${custom_metadata_value}"
Expand Down Expand Up @@ -456,4 +463,3 @@ Test Multipart Upload Part with wrong Content-MD5 header

# Abort the multipart upload (cleanup)
Abort MPU ${BUCKET} ${PREFIX}/mpu/md5test/key2 ${uploadID}

Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.amazonaws.AmazonServiceException.ErrorType;
import com.amazonaws.HttpMethod;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.Headers;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.AccessControlList;
import com.amazonaws.services.s3.model.Bucket;
Expand Down Expand Up @@ -1290,6 +1291,41 @@ public void testGetParticularPart(@TempDir Path tempDir) throws Exception {
assertEquals(allPartContentLength, (partOneContentLength + partTwoContentLength + partThreeContentLength));
}

@Test
public void testGetParticularPartWithRange(@TempDir Path tempDir) throws Exception {
final String bucketName = getBucketName();
final String keyName = getKeyName();
final int partSize = (int) (5 * MB);
final int rangeStart = 2;
final int rangeEnd = 4;

s3Client.createBucket(bucketName);

File multipartUploadFile = Files.createFile(tempDir.resolve("multipartupload.txt")).toFile();
createFile(multipartUploadFile, (int) (15 * MB));
multipartUpload(bucketName, keyName, multipartUploadFile, partSize, null, null, null);

GetObjectRequest getObjectRequest = new GetObjectRequest(bucketName, keyName)
.withPartNumber(2)
.withRange(rangeStart, rangeEnd);
S3Object s3Object = s3Client.getObject(getObjectRequest);

ObjectMetadata objectMetadata = s3Object.getObjectMetadata();
assertEquals(rangeEnd - rangeStart + 1, objectMetadata.getContentLength());
assertThat(objectMetadata.getContentRange())
.containsExactly((long) rangeStart, (long) rangeEnd);
assertEquals(partSize, objectMetadata.getInstanceLength());
assertEquals("bytes " + rangeStart + "-" + rangeEnd + "/" + partSize,
objectMetadata.getRawMetadataValue(Headers.CONTENT_RANGE));

byte[] uploadedBytes = Files.readAllBytes(multipartUploadFile.toPath());
byte[] expectedBytes = Arrays.copyOfRange(uploadedBytes, partSize + rangeStart,
partSize + rangeEnd + 1);
try (S3ObjectInputStream objectContent = s3Object.getObjectContent()) {
assertThat(IOUtils.toByteArray(objectContent)).containsExactly(expectedBytes);
}
}

@Test
public void testGetNotExistedPart(@TempDir Path tempDir) throws Exception {
final String bucketName = getBucketName();
Expand Down
Loading