Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
Expand All @@ -35,7 +38,7 @@ public class RxPartitionKeyRangeCacheTest {
private RxDocumentClientImpl client;
private RxCollectionCache collectionCache;
private RxPartitionKeyRangeCache cache;

@BeforeMethod(groups = "unit")
public void before_test() {
client = Mockito.mock(RxDocumentClientImpl.class);
Expand Down Expand Up @@ -245,4 +248,161 @@ public void tryLookupAsync_RetriesOnceAndConvertsToNotFoundException() {
.expectNextMatches(s -> s.v == null)
.verifyComplete();
}

@Test(groups = "unit")
public void twoCachesForSameEndpointShareRoutingMapStorage() throws Exception {
// Two RxPartitionKeyRangeCache instances pointing at the same endpoint should
// share their underlying AsyncCacheNonBlocking, so a routing map populated by
// one is immediately visible to the other without a second /pkranges call.
URI endpoint = new URI("https://test-shared-pkr-1.documents.azure.com:443/");

RxDocumentClientImpl clientA = Mockito.mock(RxDocumentClientImpl.class);
RxDocumentClientImpl clientB = Mockito.mock(RxDocumentClientImpl.class);
RxCollectionCache collA = Mockito.mock(RxCollectionCache.class);
RxCollectionCache collB = Mockito.mock(RxCollectionCache.class);

String collectionRid = "shared-coll-1";
DocumentCollection collection = new DocumentCollection();
collection.setResourceId(collectionRid);
collection.setSelfLink("dbs/db1/colls/coll1");

PartitionKeyRange range = new PartitionKeyRange();
range.setId("0");
range.setMinInclusive(PartitionKeyRange.MINIMUM_INCLUSIVE_EFFECTIVE_PARTITION_KEY);
range.setMaxExclusive(PartitionKeyRange.MAXIMUM_EXCLUSIVE_EFFECTIVE_PARTITION_KEY);

FeedResponse<PartitionKeyRange> response = Mockito.mock(FeedResponse.class);
when(response.getResults()).thenReturn(Arrays.asList(range));
when(response.getContinuationToken()).thenReturn("etag-1");

AtomicInteger clientACalls = new AtomicInteger();
AtomicInteger clientBCalls = new AtomicInteger();

when(collA.resolveCollectionAsync(any(), any()))
.thenReturn(Mono.just(new Utils.ValueHolder<>(collection)));
when(clientA.readPartitionKeyRanges(eq(collection.getSelfLink()), any(CosmosQueryRequestOptions.class)))
.thenAnswer(invocation -> {
clientACalls.incrementAndGet();
return Flux.just(response);
});

when(collB.resolveCollectionAsync(any(), any()))
.thenReturn(Mono.just(new Utils.ValueHolder<>(collection)));
when(clientB.readPartitionKeyRanges(eq(collection.getSelfLink()), any(CosmosQueryRequestOptions.class)))
.thenAnswer(invocation -> {
clientBCalls.incrementAndGet();
return Flux.just(response);
});

RxPartitionKeyRangeCache cacheA = new RxPartitionKeyRangeCache(clientA, collA, endpoint);
RxPartitionKeyRangeCache cacheB = new RxPartitionKeyRangeCache(clientB, collB, endpoint);

try {
// First client populates the cache.
StepVerifier.create(cacheA.tryLookupAsync(null, collectionRid, null, new HashMap<>()))
.expectNextMatches(v -> v != null && v.v != null)
.verifyComplete();

// Second client must hit the shared cache without issuing its own /pkranges read.
StepVerifier.create(cacheB.tryLookupAsync(null, collectionRid, null, new HashMap<>()))
.expectNextMatches(v -> v != null && v.v != null)
.verifyComplete();

assertThat(clientACalls.get())
.as("client A populates the shared routing map")
.isEqualTo(1);
assertThat(clientBCalls.get())
.as("client B must hit the shared cache without issuing its own /pkranges call")
.isZero();
} finally {
cacheA.close();
cacheB.close();
}

assertThat(SharedRoutingMapCacheRegistry.getInstance().referenceCount(endpoint.toString()))
.as("close() releases the shared cache reference")
.isZero();
}

@Test(groups = "unit")
public void cachesForDifferentEndpointsDoNotShareStorage() throws Exception {
URI endpointA = new URI("https://test-shared-pkr-2a.documents.azure.com:443/");
URI endpointB = new URI("https://test-shared-pkr-2b.documents.azure.com:443/");

RxDocumentClientImpl clientA = Mockito.mock(RxDocumentClientImpl.class);
RxDocumentClientImpl clientB = Mockito.mock(RxDocumentClientImpl.class);
RxCollectionCache collA = Mockito.mock(RxCollectionCache.class);
RxCollectionCache collB = Mockito.mock(RxCollectionCache.class);

String collectionRid = "shared-coll-2";
DocumentCollection collection = new DocumentCollection();
collection.setResourceId(collectionRid);
collection.setSelfLink("dbs/db1/colls/coll2");

PartitionKeyRange range = new PartitionKeyRange();
range.setId("0");
range.setMinInclusive(PartitionKeyRange.MINIMUM_INCLUSIVE_EFFECTIVE_PARTITION_KEY);
range.setMaxExclusive(PartitionKeyRange.MAXIMUM_EXCLUSIVE_EFFECTIVE_PARTITION_KEY);

FeedResponse<PartitionKeyRange> response = Mockito.mock(FeedResponse.class);
when(response.getResults()).thenReturn(Arrays.asList(range));
when(response.getContinuationToken()).thenReturn("etag-2");

AtomicInteger clientACalls = new AtomicInteger();
AtomicInteger clientBCalls = new AtomicInteger();

when(collA.resolveCollectionAsync(any(), any()))
.thenReturn(Mono.just(new Utils.ValueHolder<>(collection)));
when(clientA.readPartitionKeyRanges(eq(collection.getSelfLink()), any(CosmosQueryRequestOptions.class)))
.thenAnswer(invocation -> {
clientACalls.incrementAndGet();
return Flux.just(response);
});
when(collB.resolveCollectionAsync(any(), any()))
.thenReturn(Mono.just(new Utils.ValueHolder<>(collection)));
when(clientB.readPartitionKeyRanges(eq(collection.getSelfLink()), any(CosmosQueryRequestOptions.class)))
.thenAnswer(invocation -> {
clientBCalls.incrementAndGet();
return Flux.just(response);
});

RxPartitionKeyRangeCache cacheA = new RxPartitionKeyRangeCache(clientA, collA, endpointA);
RxPartitionKeyRangeCache cacheB = new RxPartitionKeyRangeCache(clientB, collB, endpointB);

try {
StepVerifier.create(cacheA.tryLookupAsync(null, collectionRid, null, new HashMap<>()))
.expectNextMatches(v -> v != null && v.v != null)
.verifyComplete();

StepVerifier.create(cacheB.tryLookupAsync(null, collectionRid, null, new HashMap<>()))
.expectNextMatches(v -> v != null && v.v != null)
.verifyComplete();

// Two different endpoints → no sharing, each client issues its own read.
assertThat(clientACalls.get()).isEqualTo(1);
assertThat(clientBCalls.get()).isEqualTo(1);
} finally {
cacheA.close();
cacheB.close();
}
}

@Test(groups = "unit")
public void closeIsIdempotent() throws Exception {
URI endpoint = new URI("https://test-shared-pkr-3.documents.azure.com:443/");
RxDocumentClientImpl mockClient = Mockito.mock(RxDocumentClientImpl.class);
RxCollectionCache mockColl = Mockito.mock(RxCollectionCache.class);

RxPartitionKeyRangeCache c = new RxPartitionKeyRangeCache(mockClient, mockColl, endpoint);
assertThat(SharedRoutingMapCacheRegistry.getInstance().referenceCount(endpoint.toString()))
.isEqualTo(1);

c.close();
c.close(); // second call must be a no-op
c.close();

assertThat(SharedRoutingMapCacheRegistry.getInstance().referenceCount(endpoint.toString()))
.as("repeated close() must not drive refcount negative")
.isZero();
}
}
Loading
Loading