Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ public class Configs {
private static final String THINCLIENT_ENABLED = "COSMOS.THINCLIENT_ENABLED";
private static final String THINCLIENT_ENABLED_VARIABLE = "COSMOS_THINCLIENT_ENABLED";

// Tri-state opt-in / kill-switch for cold-start metadata cache hedging. When unset (null),
// cold-start metadata hedging follows the account's PPAF state. "true" forces it on even when
// PPAF is disabled; "false" suppresses it regardless of PPAF.
private static final String METADATA_HEDGING_FOR_COLD_START_ENABLED = "COSMOS.METADATA_HEDGING_FOR_COLD_START_ENABLED";
private static final String METADATA_HEDGING_FOR_COLD_START_ENABLED_VARIABLE = "COSMOS_METADATA_HEDGING_FOR_COLD_START_ENABLED";

private static final boolean DEFAULT_NETTY_HTTP_CLIENT_METRICS_ENABLED = false;
private static final String NETTY_HTTP_CLIENT_METRICS_ENABLED = "COSMOS.NETTY_HTTP_CLIENT_METRICS_ENABLED";
private static final String NETTY_HTTP_CLIENT_METRICS_ENABLED_VARIABLE = "COSMOS_NETTY_HTTP_CLIENT_METRICS_ENABLED";
Expand Down Expand Up @@ -585,6 +591,25 @@ public static boolean isThinClientEnabled() {
return DEFAULT_THINCLIENT_ENABLED;
}

/**
* Resolves the tri-state cold-start metadata hedging opt-in.
*
* @return {@code null} when unset (follow PPAF), otherwise the explicit {@link Boolean} value.
*/
public static Boolean getMetadataHedgingForColdStartEnabled() {
String valueFromSystemProperty = System.getProperty(METADATA_HEDGING_FOR_COLD_START_ENABLED);
if (valueFromSystemProperty != null && !valueFromSystemProperty.isEmpty()) {
return Boolean.parseBoolean(valueFromSystemProperty);
}

String valueFromEnvVariable = System.getenv(METADATA_HEDGING_FOR_COLD_START_ENABLED_VARIABLE);
if (valueFromEnvVariable != null && !valueFromEnvVariable.isEmpty()) {
return Boolean.parseBoolean(valueFromEnvVariable);
}

return null;
}

public static boolean isNettyHttpClientMetricsEnabled() {
return Boolean.parseBoolean(
System.getProperty(NETTY_HTTP_CLIENT_METRICS_ENABLED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.azure.cosmos.implementation.batch.SinglePartitionKeyServerBatchRequest;
import com.azure.cosmos.implementation.caches.AsyncCache;
import com.azure.cosmos.implementation.caches.RxClientCollectionCache;
import com.azure.cosmos.implementation.metadatahedging.MetadataHedgingStrategy;
import com.azure.cosmos.implementation.caches.RxCollectionCache;
import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache;
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry;
Expand Down Expand Up @@ -282,6 +283,7 @@ private static CosmosItemSerializer internalDefaultSerializer() {
private final GlobalEndpointManager globalEndpointManager;
private final GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker globalPartitionEndpointManagerForPerPartitionCircuitBreaker;
private final GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover globalPartitionEndpointManagerForPerPartitionAutomaticFailover;
private MetadataHedgingStrategy metadataHedgingStrategy;
private final RetryPolicy retryPolicy;
private HttpClient reactorHttpClient;
private Function<HttpClient, HttpClient> httpClientInterceptor;
Expand Down Expand Up @@ -895,6 +897,12 @@ public void init(CosmosClientMetadataCachesSnapshot metadataCachesSnapshot, Func
DatabaseAccount databaseAccountSnapshot = this.initializeGatewayConfigurationReader();
this.resetSessionContainerIfNeeded(databaseAccountSnapshot);

this.metadataHedgingStrategy = MetadataHedgingStrategy.createIfEnabled(
Configs.getMetadataHedgingForColdStartEnabled(),
this.globalEndpointManager,
() -> this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover != null
&& this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover.isPerPartitionAutomaticFailoverEnabled());

if (metadataCachesSnapshot != null) {
AsyncCache<String, DocumentCollection> nameCache = metadataCachesSnapshot.getCollectionInfoByNameCache();
AsyncCache<String, DocumentCollection> idCache = metadataCachesSnapshot.getCollectionInfoByIdCache();
Expand All @@ -905,22 +913,28 @@ public void init(CosmosClientMetadataCachesSnapshot metadataCachesSnapshot, Func
this,
this.retryPolicy,
nameCache,
idCache
idCache,
this.globalEndpointManager,
this.metadataHedgingStrategy
);
} else {
// Cache data could not be deserialized (e.g., old format); fall back to fresh fetch
this.collectionCache = new RxClientCollectionCache(this,
this.sessionContainer,
this.gatewayProxy,
this,
this.retryPolicy);
this.retryPolicy,
this.globalEndpointManager,
this.metadataHedgingStrategy);
}
} else {
this.collectionCache = new RxClientCollectionCache(this,
this.sessionContainer,
this.gatewayProxy,
this,
this.retryPolicy);
this.retryPolicy,
this.globalEndpointManager,
this.metadataHedgingStrategy);
}
this.resetSessionTokenRetryPolicy = new ResetSessionTokenRetryPolicyFactory(this.sessionContainer, this.collectionCache, this.retryPolicy);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.DocumentClientRetryPolicy;
import com.azure.cosmos.implementation.DocumentCollection;
import com.azure.cosmos.implementation.GlobalEndpointManager;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.IAuthorizationTokenProvider;
import com.azure.cosmos.implementation.IRetryPolicyFactory;
Expand All @@ -22,13 +23,16 @@
import com.azure.cosmos.implementation.RxDocumentServiceResponse;
import com.azure.cosmos.implementation.RxStoreModel;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.metadatahedging.MetadataHedgingContext;
import com.azure.cosmos.implementation.metadatahedging.MetadataHedgingStrategy;
import reactor.core.publisher.Mono;

import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

/**
* Caches collection information.
Expand All @@ -42,51 +46,82 @@ public class RxClientCollectionCache extends RxCollectionCache {
private final IAuthorizationTokenProvider tokenProvider;
private final IRetryPolicyFactory retryPolicy;
private final ISessionContainer sessionContainer;
private final GlobalEndpointManager globalEndpointManager;
private final MetadataHedgingStrategy metadataHedgingStrategy;

public RxClientCollectionCache(DiagnosticsClientContext diagnosticsClientContext,
ISessionContainer sessionContainer,
RxStoreModel storeModel,
IAuthorizationTokenProvider tokenProvider,
IRetryPolicyFactory retryPolicy,
AsyncCache<String, DocumentCollection> collectionInfoByNameCache, AsyncCache<String, DocumentCollection> collectionInfoByIdCache) {
this(diagnosticsClientContext, sessionContainer, storeModel, tokenProvider, retryPolicy,
collectionInfoByNameCache, collectionInfoByIdCache, null, null);
}

public RxClientCollectionCache(DiagnosticsClientContext diagnosticsClientContext,
ISessionContainer sessionContainer,
RxStoreModel storeModel,
IAuthorizationTokenProvider tokenProvider,
IRetryPolicyFactory retryPolicy,
AsyncCache<String, DocumentCollection> collectionInfoByNameCache,
AsyncCache<String, DocumentCollection> collectionInfoByIdCache,
GlobalEndpointManager globalEndpointManager,
MetadataHedgingStrategy metadataHedgingStrategy) {
super(collectionInfoByNameCache, collectionInfoByIdCache);
this.diagnosticsClientContext = diagnosticsClientContext;
this.storeModel = storeModel;
this.tokenProvider = tokenProvider;
this.retryPolicy = retryPolicy;
this.sessionContainer = sessionContainer;
this.globalEndpointManager = globalEndpointManager;
this.metadataHedgingStrategy = metadataHedgingStrategy;
}

public RxClientCollectionCache(DiagnosticsClientContext diagnosticsClientContext,
ISessionContainer sessionContainer,
RxStoreModel storeModel,
IAuthorizationTokenProvider tokenProvider,
IRetryPolicyFactory retryPolicy) {
this(diagnosticsClientContext, sessionContainer, storeModel, tokenProvider, retryPolicy,
(GlobalEndpointManager) null, (MetadataHedgingStrategy) null);
}

public RxClientCollectionCache(DiagnosticsClientContext diagnosticsClientContext,
ISessionContainer sessionContainer,
RxStoreModel storeModel,
IAuthorizationTokenProvider tokenProvider,
IRetryPolicyFactory retryPolicy,
GlobalEndpointManager globalEndpointManager,
MetadataHedgingStrategy metadataHedgingStrategy) {
this.diagnosticsClientContext = diagnosticsClientContext;
this.storeModel = storeModel;
this.tokenProvider = tokenProvider;
this.retryPolicy = retryPolicy;
this.sessionContainer = sessionContainer;
this.globalEndpointManager = globalEndpointManager;
this.metadataHedgingStrategy = metadataHedgingStrategy;
}

protected Mono<DocumentCollection> getByRidAsync(MetadataDiagnosticsContext metaDataDiagnosticsContext, String collectionRid, Map<String, Object> properties) {
protected Mono<DocumentCollection> getByRidAsync(MetadataDiagnosticsContext metaDataDiagnosticsContext, String collectionRid, Map<String, Object> properties, boolean isColdStart) {
DocumentClientRetryPolicy retryPolicyInstance = new ClearingSessionContainerClientRetryPolicy(this.sessionContainer, this.retryPolicy.getRequestPolicy(null));
return ObservableHelper.inlineIfPossible(
() -> this.readCollectionAsync(metaDataDiagnosticsContext, PathsHelper.generatePath(ResourceType.DocumentCollection, collectionRid, false), retryPolicyInstance, properties)
() -> this.readCollectionAsync(metaDataDiagnosticsContext, PathsHelper.generatePath(ResourceType.DocumentCollection, collectionRid, false), retryPolicyInstance, properties, isColdStart)
, retryPolicyInstance);
}

protected Mono<DocumentCollection> getByNameAsync(MetadataDiagnosticsContext metaDataDiagnosticsContext, String resourceAddress, Map<String, Object> properties) {
protected Mono<DocumentCollection> getByNameAsync(MetadataDiagnosticsContext metaDataDiagnosticsContext, String resourceAddress, Map<String, Object> properties, boolean isColdStart) {
DocumentClientRetryPolicy retryPolicyInstance = new ClearingSessionContainerClientRetryPolicy(this.sessionContainer, this.retryPolicy.getRequestPolicy(null));
return ObservableHelper.inlineIfPossible(
() -> this.readCollectionAsync(metaDataDiagnosticsContext, resourceAddress, retryPolicyInstance, properties),
() -> this.readCollectionAsync(metaDataDiagnosticsContext, resourceAddress, retryPolicyInstance, properties, isColdStart),
retryPolicyInstance);
}

private Mono<DocumentCollection> readCollectionAsync(MetadataDiagnosticsContext metaDataDiagnosticsContext,
String collectionLink,
DocumentClientRetryPolicy retryPolicyInstance,
Map<String, Object> properties) {
Map<String, Object> properties,
boolean isColdStart) {

String path = Utils.joinPath(collectionLink, null);
RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this.diagnosticsClientContext,
Expand Down Expand Up @@ -119,14 +154,30 @@ private Mono<DocumentCollection> readCollectionAsync(MetadataDiagnosticsContext
retryPolicyInstance.onBeforeSendRequest(request);
}

// Region-targeted sender shared by the primary-only path and the metadata hedge branches.
// The hedging strategy routes each cloned request to its target region before this runs.
final boolean isAadToken = tokenProvider.getAuthorizationTokenType() == AuthorizationTokenType.AadToken;
Function<RxDocumentServiceRequest, Mono<RxDocumentServiceResponse>> sender = serviceRequest -> {
if (isAadToken) {
return tokenProvider
.populateAuthorizationHeader(serviceRequest)
.flatMap(this.storeModel::processMessage);
}
return this.storeModel.processMessage(serviceRequest);
};

Instant addressCallStartTime = Instant.now();
Mono<RxDocumentServiceResponse> responseObs;
if (tokenProvider.getAuthorizationTokenType() != AuthorizationTokenType.AadToken) {
responseObs = this.storeModel.processMessage(request);
if (this.metadataHedgingStrategy != null && isColdStart) {
// Only cold-start metadata cache population (first read on a cache miss) hedges.
// Forced refresh paths pass isColdStart=false and fall through to the direct send.
MetadataHedgingContext hedgeContext = new MetadataHedgingContext();
hedgeContext.setColdStart(true);
responseObs = this.metadataHedgingStrategy
.executeAsync(request, sender, hedgeContext)
.map(hedgeResult -> hedgeResult.getResponse());
} else {
responseObs = tokenProvider
.populateAuthorizationHeader(request)
.flatMap(serviceRequest -> this.storeModel.processMessage(serviceRequest));
responseObs = sender.apply(request);
}

return responseObs.map(response -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,15 @@ public void refresh(MetadataDiagnosticsContext metaDataDiagnosticsContext, Strin
this.collectionInfoByNameCache.refresh(
resourceFullName,
() -> {
Mono<DocumentCollection> collectionObs = this.getByNameAsync(metaDataDiagnosticsContext, resourceFullName, properties);
Mono<DocumentCollection> collectionObs = this.getByNameAsync(metaDataDiagnosticsContext, resourceFullName, properties, false);
return collectionObs.doOnSuccess(collection -> this.collectionInfoByIdCache.set(collection.getResourceId(), collection));
});
}
}

protected abstract Mono<DocumentCollection> getByRidAsync(MetadataDiagnosticsContext metaDataDiagnosticsContext, String collectionRid, Map<String, Object> properties);
protected abstract Mono<DocumentCollection> getByRidAsync(MetadataDiagnosticsContext metaDataDiagnosticsContext, String collectionRid, Map<String, Object> properties, boolean isColdStart);

protected abstract Mono<DocumentCollection> getByNameAsync(MetadataDiagnosticsContext metaDataDiagnosticsContext, String resourceAddress, Map<String, Object> properties);
protected abstract Mono<DocumentCollection> getByNameAsync(MetadataDiagnosticsContext metaDataDiagnosticsContext, String resourceAddress, Map<String, Object> properties, boolean isColdStart);

private Mono<Utils.ValueHolder<DocumentCollection>> resolveByPartitionKeyRangeIdentityAsync(MetadataDiagnosticsContext metaDataDiagnosticsContext,
PartitionKeyRangeIdentity partitionKeyRangeIdentity,
Expand Down Expand Up @@ -166,7 +166,7 @@ public Mono<Utils.ValueHolder<DocumentCollection>> resolveByRidAsync(
Mono<DocumentCollection> async = this.collectionInfoByIdCache.getAsync(
collectionResourceId,
null,
() -> this.getByRidAsync(metaDataDiagnosticsContext, collectionResourceId, properties));
() -> this.getByRidAsync(metaDataDiagnosticsContext, collectionResourceId, properties, true));
return async.map(Utils.ValueHolder::new);
}

Expand All @@ -190,7 +190,7 @@ public Mono<DocumentCollection> resolveByNameAsync(
obsoleteValue,
() -> {
Mono<DocumentCollection> collectionObs = this.getByNameAsync(
metaDataDiagnosticsContext, resourceFullName, properties)
metaDataDiagnosticsContext, resourceFullName, properties, obsoleteValue == null)
.onErrorMap(throwable -> {

if (throwable instanceof CosmosException) {
Expand Down Expand Up @@ -242,7 +242,7 @@ public Mono<Void> refreshAsync(MetadataDiagnosticsContext metaDataDiagnosticsCon
resourceFullName,
obsoleteValue,
() -> {
Mono<DocumentCollection> collectionObs = this.getByNameAsync(metaDataDiagnosticsContext, resourceFullName, request.properties);
Mono<DocumentCollection> collectionObs = this.getByNameAsync(metaDataDiagnosticsContext, resourceFullName, request.properties, false);
return collectionObs.doOnSuccess(collection -> {
this.collectionInfoByIdCache.set(collection.getResourceId(), collection);
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation.metadatahedging;

/**
* Identifies the branch (primary or hedge) that produced a candidate metadata-hedge winner.
* Used to compose the per-branch overlay in
* {@link MetadataHedgingStrategy#isAcceptableWinner}.
* <p>
* Java port of the .NET {@code MetadataHedgingStrategy.HedgeBranch} enum
* (Azure/azure-cosmos-dotnet-v3#5923).
*/
public enum HedgeBranch {
PRIMARY,
HEDGE
}
Loading