Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/storage/azure-storage-blob/assets.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
"AssetsRepo": "Azure/azure-sdk-assets",
"AssetsRepoPrefixPath": "java",
"TagPrefix": "java/storage/azure-storage-blob",
"Tag": "java/storage/azure-storage-blob_80c07fe827"
"Tag": "java/storage/azure-storage-blob_c976afa88e"
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.azure.storage.common.policy.ResponseValidationPolicyBuilder;
import com.azure.storage.common.policy.ScrubEtagPolicy;
import com.azure.storage.common.policy.StorageBearerTokenChallengeAuthorizationPolicy;
import com.azure.storage.common.policy.StorageContentValidationDecoderPolicy;
import com.azure.storage.common.policy.StorageSharedKeyCredentialPolicy;

import java.net.MalformedURLException;
Expand Down Expand Up @@ -140,6 +141,8 @@ public static HttpPipeline buildPipeline(StorageSharedKeyCredential storageShare

HttpPolicyProviders.addAfterRetryPolicies(policies);

policies.add(new StorageContentValidationDecoderPolicy());

policies.add(getResponseValidationPolicy());

policies.add(new HttpLoggingPolicy(logOptions));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,13 @@
import com.azure.storage.blob.options.BlobSetAccessTierOptions;
import com.azure.storage.blob.options.BlobSetTagsOptions;
import com.azure.storage.blob.sas.BlobServiceSasSignatureValues;
import com.azure.storage.common.DownloadContentValidationOptions;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.common.Utility;
import com.azure.storage.common.implementation.Constants;
import com.azure.storage.common.implementation.SasImplUtils;
import com.azure.storage.common.implementation.StorageImplUtils;
import com.azure.storage.common.policy.StorageContentValidationDecoderPolicy;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
Expand Down Expand Up @@ -111,6 +114,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;

Expand Down Expand Up @@ -1173,6 +1177,52 @@ public Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange rang
}
}

/**
* Reads a range of bytes from a blob with content validation options. Uploading data must be done from the {@link BlockBlobClient}, {@link
* PageBlobClient}, or {@link AppendBlobClient}.
*
* <p><strong>Code Samples</strong></p>
*
* <pre>{@code
* BlobRange range = new BlobRange(1024, 2048L);
* DownloadRetryOptions options = new DownloadRetryOptions().setMaxRetryRequests(5);
* DownloadContentValidationOptions validationOptions = new DownloadContentValidationOptions()
* .setStructuredMessageValidationEnabled(true);
*
* client.downloadStreamWithResponse(range, options, null, false, validationOptions).subscribe(response -> {
* ByteArrayOutputStream downloadData = new ByteArrayOutputStream();
* response.getValue().subscribe(piece -> {
* try {
* downloadData.write(piece.array());
* } catch (IOException ex) {
* throw new UncheckedIOException(ex);
* }
* });
* });
* }</pre>
*
* <p>For more information, see the
* <a href="https://docs.microsoft.com/rest/api/storageservices/get-blob">Azure Docs</a></p>
*
* @param range {@link BlobRange}
* @param options {@link DownloadRetryOptions}
* @param requestConditions {@link BlobRequestConditions}
* @param getRangeContentMd5 Whether the contentMD5 for the specified blob range should be returned.
* @param contentValidationOptions {@link DownloadContentValidationOptions} options for content validation
* @return A reactive response containing the blob data.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, DownloadRetryOptions options,
BlobRequestConditions requestConditions, boolean getRangeContentMd5,
DownloadContentValidationOptions contentValidationOptions) {
try {
return withContext(context -> downloadStreamWithResponse(range, options, requestConditions,
getRangeContentMd5, contentValidationOptions, context));
} catch (RuntimeException ex) {
return monoError(LOGGER, ex);
}
}

/**
* Reads a range of bytes from a blob. Uploading data must be done from the {@link BlockBlobClient}, {@link
* PageBlobClient}, or {@link AppendBlobClient}.
Expand Down Expand Up @@ -1215,19 +1265,41 @@ public Mono<BlobDownloadContentAsyncResponse> downloadContentWithResponse(Downlo
}

Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, DownloadRetryOptions options,
BlobRequestConditions requestConditions, boolean getRangeContentMd5, Context context) {
BlobRequestConditions requestConditions, boolean getRangeContentMd5,
DownloadContentValidationOptions contentValidationOptions, Context context) {
BlobRange finalRange = range == null ? new BlobRange(0) : range;
Boolean getMD5 = getRangeContentMd5 ? getRangeContentMd5 : null;

// Determine MD5 validation: properly consider both getRangeContentMd5 parameter and validation options
// MD5 validation is enabled if:
// 1. getRangeContentMd5 is explicitly true, OR
// 2. contentValidationOptions.isMd5ValidationEnabled() is true
final Boolean finalGetMD5;
if (getRangeContentMd5
|| (contentValidationOptions != null && contentValidationOptions.isMd5ValidationEnabled())) {
finalGetMD5 = true;
} else {
finalGetMD5 = null;
}

BlobRequestConditions finalRequestConditions
= requestConditions == null ? new BlobRequestConditions() : requestConditions;
DownloadRetryOptions finalOptions = (options == null) ? new DownloadRetryOptions() : options;

// The first range should eagerly convert headers as they'll be used to create response types.
Context firstRangeContext = context == null
Context initialContext = context == null
? new Context("azure-eagerly-convert-headers", true)
: context.addData("azure-eagerly-convert-headers", true);

return downloadRange(finalRange, finalRequestConditions, finalRequestConditions.getIfMatch(), getMD5,
// Add structured message decoding context if enabled
final Context firstRangeContext;
if (contentValidationOptions != null && contentValidationOptions.isStructuredMessageValidationEnabled()) {
firstRangeContext = initialContext.addData(Constants.STRUCTURED_MESSAGE_DECODING_CONTEXT_KEY, true)
.addData(Constants.STRUCTURED_MESSAGE_VALIDATION_OPTIONS_CONTEXT_KEY, contentValidationOptions);
} else {
firstRangeContext = initialContext;
}

return downloadRange(finalRange, finalRequestConditions, finalRequestConditions.getIfMatch(), finalGetMD5,
firstRangeContext).map(response -> {
BlobsDownloadHeaders blobsDownloadHeaders = new BlobsDownloadHeaders(response.getHeaders());
String eTag = blobsDownloadHeaders.getETag();
Expand All @@ -1247,6 +1319,18 @@ Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, Down
finalCount = finalRange.getCount();
}

AtomicReference<StorageContentValidationDecoderPolicy.DecoderState> decoderStateRef
= new AtomicReference<>();
if (contentValidationOptions != null
&& contentValidationOptions.isStructuredMessageValidationEnabled()) {
Object decoderStateObj
= firstRangeContext.getData(Constants.STRUCTURED_MESSAGE_DECODER_STATE_CONTEXT_KEY)
.orElse(null);
if (decoderStateObj instanceof StorageContentValidationDecoderPolicy.DecoderState) {
decoderStateRef.set((StorageContentValidationDecoderPolicy.DecoderState) decoderStateObj);
}
}

// The resume function takes throwable and offset at the destination.
// I.e. offset is relative to the starting point.
BiFunction<Throwable, Long, Mono<StreamResponse>> onDownloadErrorResume = (throwable, offset) -> {
Expand All @@ -1255,30 +1339,91 @@ Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, Down
}

long newCount = finalCount - offset;

/*
* It's possible that the network stream will throw an error after emitting all data but before
* completing. Issuing a retry at this stage would leave the download in a bad state with
* incorrect count and offset values. Because we have read the intended amount of data, we can
* ignore the error at the end of the stream.
*/
if (newCount == 0) {
LOGGER.warning("Exception encountered in ReliableDownload after all data read from the network "
+ "but before stream signaled completion. Returning success as all data was downloaded. "
+ "Exception message: " + throwable.getMessage());
return Mono.empty();
StorageContentValidationDecoderPolicy.DecoderState decoderState = null;
long expectedEncodedLength = finalCount;
long encodedProgress = offset;

if (contentValidationOptions != null
&& contentValidationOptions.isStructuredMessageValidationEnabled()) {
decoderState = decoderStateRef.get();

if (decoderState == null) {
Object decoderStateObj
= firstRangeContext.getData(Constants.STRUCTURED_MESSAGE_DECODER_STATE_CONTEXT_KEY)
.orElse(null);

if (decoderStateObj instanceof StorageContentValidationDecoderPolicy.DecoderState) {
decoderState = (StorageContentValidationDecoderPolicy.DecoderState) decoderStateObj;
}
}

if (decoderState != null) {
expectedEncodedLength = decoderState.getExpectedContentLength();
encodedProgress = decoderState.getTotalEncodedBytesProcessed();
}
}

try {
return downloadRange(new BlobRange(initialOffset + offset, newCount), finalRequestConditions,
eTag, getMD5, context);
// For retry context, preserve decoder state if structured message validation is enabled
Context retryContext = firstRangeContext;
BlobRange retryRange;

// If structured message decoding is enabled, we need to calculate the retry offset
// based on the encoded bytes processed, not the decoded bytes
if (contentValidationOptions != null
&& contentValidationOptions.isStructuredMessageValidationEnabled()) {
long retryStartOffset = -1;

// First try to use decoder state (authoritative)
if (decoderState != null) {
// Always rewind decoder to last validated boundary before retrying.
retryStartOffset = decoderState.resetForRetry();

retryContext = retryContext
.addData(Constants.STRUCTURED_MESSAGE_DECODER_STATE_CONTEXT_KEY, decoderState);
decoderStateRef.set(decoderState);
}

// If no decoder state or no retry offset from state, fall back to parsed token or offset.
if (retryStartOffset < 0) {
retryStartOffset = StorageContentValidationDecoderPolicy
.parseRetryStartOffset(throwable.getMessage());
}
if (retryStartOffset < 0) {
retryStartOffset = offset;
}

long remainingCount = expectedEncodedLength - retryStartOffset;
if (remainingCount < 0) {
remainingCount = expectedEncodedLength - offset;
retryStartOffset = offset;
}

retryRange = new BlobRange(initialOffset + retryStartOffset, remainingCount);

LOGGER.info(
"Structured message smart retry: resuming from offset {} (initial={}, encoded={}, remaining={})",
initialOffset + retryStartOffset, initialOffset, retryStartOffset, remainingCount);
} else {
// For non-structured downloads, use smart retry from the interrupted offset
retryRange = new BlobRange(initialOffset + offset, newCount);
}

return downloadRange(retryRange, finalRequestConditions, eTag, finalGetMD5, retryContext);
} catch (Exception e) {
return Mono.error(e);
}
};

// Structured message decoding is now handled by StructuredMessageDecoderPolicy
return BlobDownloadAsyncResponseConstructorProxy.create(response, onDownloadErrorResume, finalOptions);
});

}

Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, DownloadRetryOptions options,
BlobRequestConditions requestConditions, boolean getRangeContentMd5, Context context) {
return downloadStreamWithResponse(range, options, requestConditions, getRangeContentMd5, null, context);
}

private Mono<StreamResponse> downloadRange(BlobRange range, BlobRequestConditions requestConditions, String eTag,
Expand Down
Loading
Loading