diff --git a/src/main/java/net/spy/memcached/ArcusClient.java b/src/main/java/net/spy/memcached/ArcusClient.java index a7a8a2e3a..2c3eda16b 100644 --- a/src/main/java/net/spy/memcached/ArcusClient.java +++ b/src/main/java/net/spy/memcached/ArcusClient.java @@ -196,12 +196,12 @@ public class ArcusClient extends FrontCacheMemcachedClient implements ArcusClien private final Transcoder collectionTranscoder; - private static final int BOPGET_BULK_CHUNK_SIZE = 200; - private static final int SMGET_CHUNK_SIZE = 500; + public static final int BOPGET_BULK_CHUNK_SIZE = 200; + public static final int SMGET_CHUNK_SIZE = 500; private static final int NON_PIPED_BULK_INSERT_CHUNK_SIZE = 500; - private static final int MAX_GETBULK_ELEMENT_COUNT = 50; - private static final int MAX_SMGET_COUNT = 1000; // server configuration is 2000. + public static final int MAX_GETBULK_ELEMENT_COUNT = 50; + public static final int MAX_SMGET_COUNT = 1000; // server configuration is 2000. private static final int SHUTDOWN_TIMEOUT_MILLISECONDS = 2000; private static final AtomicInteger CLIENT_ID = new AtomicInteger(1); diff --git a/src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java b/src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java index d8b2d17cd..dd70d9731 100644 --- a/src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java +++ b/src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java @@ -17,6 +17,7 @@ */ package net.spy.memcached.v2; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -28,9 +29,32 @@ import net.spy.memcached.ArcusClient; import net.spy.memcached.CachedData; +import net.spy.memcached.KeyValidator; import net.spy.memcached.MemcachedClient; import net.spy.memcached.MemcachedNode; +import net.spy.memcached.collection.BKeyObject; +import net.spy.memcached.collection.BTreeCreate; +import net.spy.memcached.collection.BTreeGet; +import net.spy.memcached.collection.BTreeGetBulk; +import net.spy.memcached.collection.BTreeGetBulkWithLongTypeBkey; +import net.spy.memcached.collection.BTreeGetBulkWithByteTypeBkey; +import net.spy.memcached.collection.BTreeUpsert; +import net.spy.memcached.ops.BTreeGetBulkOperation; +import net.spy.memcached.collection.BTreeSMGet; +import net.spy.memcached.collection.BTreeSMGetWithLongTypeBkey; +import net.spy.memcached.collection.BTreeSMGetWithByteTypeBkey; +import net.spy.memcached.ops.BTreeSortMergeGetOperation; +import net.spy.memcached.collection.BTreeInsert; +import net.spy.memcached.collection.BTreeInsertAndGet; +import net.spy.memcached.collection.CollectionAttributes; +import net.spy.memcached.collection.CollectionCreate; +import net.spy.memcached.collection.CollectionInsert; +import net.spy.memcached.collection.ElementValueType; import net.spy.memcached.ops.APIType; +import net.spy.memcached.ops.BTreeInsertAndGetOperation; +import net.spy.memcached.ops.CollectionCreateOperation; +import net.spy.memcached.ops.CollectionGetOperation; +import net.spy.memcached.ops.CollectionInsertOperation; import net.spy.memcached.ops.GetOperation; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationCallback; @@ -38,18 +62,25 @@ import net.spy.memcached.ops.StatusCode; import net.spy.memcached.ops.StoreType; import net.spy.memcached.transcoders.Transcoder; +import net.spy.memcached.transcoders.TranscoderUtils; +import net.spy.memcached.v2.vo.BKey; +import net.spy.memcached.v2.vo.BTreeElement; +import net.spy.memcached.v2.vo.BTreeElements; +import net.spy.memcached.v2.vo.BopGetArgs; +import net.spy.memcached.v2.vo.SMGetElements; public class AsyncArcusCommands implements AsyncArcusCommandsIF { private final Transcoder tc; private final Transcoder tcForCollection; - + private final KeyValidator keyValidator; private final Supplier arcusClientSupplier; @SuppressWarnings("unchecked") public AsyncArcusCommands(Supplier arcusClientSupplier) { this.tc = (Transcoder) arcusClientSupplier.get().getTranscoder(); this.tcForCollection = (Transcoder) arcusClientSupplier.get().getCollectionTranscoder(); + this.keyValidator = arcusClientSupplier.get().getKeyValidator(); this.arcusClientSupplier = arcusClientSupplier; } @@ -239,7 +270,7 @@ private ArcusFuture> get(ArcusClient client, MemcachedNode node, r -> { Map decodedMap = new HashMap<>(); for (Map.Entry entry - : ((Map) r).entrySet()) { + : ((Map) r).entrySet()) { decodedMap.put(entry.getKey(), tc.decode(entry.getValue())); } return decodedMap; @@ -342,4 +373,591 @@ public void complete() { return future; } + + public ArcusFuture bopCreate(String key, ElementValueType type, + CollectionAttributes attributes) { + if (attributes == null) { + throw new IllegalArgumentException("CollectionAttributes cannot be null"); + } + + CollectionCreate create = new BTreeCreate(TranscoderUtils.examineFlags(type), + attributes.getExpireTime(), attributes.getMaxCount(), + attributes.getOverflowAction(), attributes.getReadable(), false); + + return collectionCreate(key, create); + } + + private ArcusFuture collectionCreate(String key, CollectionCreate collectionCreate) { + AbstractArcusResult result = new AbstractArcusResult<>(new AtomicReference<>()); + ArcusFutureImpl future = new ArcusFutureImpl<>(result); + ArcusClient client = arcusClientSupplier.get(); + + OperationCallback cb = new OperationCallback() { + @Override + public void receivedStatus(OperationStatus status) { + switch (status.getStatusCode()) { + case SUCCESS: + result.set(true); + break; + case ERR_EXISTS: + result.set(false); + break; + case CANCELLED: + future.internalCancel(); + break; + default: + // NOT_SUPPORTED or unknown statement + result.addError(key, status); + } + } + + @Override + public void complete() { + future.complete(); + } + }; + CollectionCreateOperation op = client.getOpFact() + .collectionCreate(key, collectionCreate, cb); + future.setOp(op); + client.addOp(key, op); + + return future; + } + + public ArcusFuture bopInsert(String key, BTreeElement element, + CollectionAttributes attributes) { + BTreeInsert insert = new BTreeInsert<>(element.getValue(), element.getEFlag(), + null, attributes); + return collectionInsert(key, element.getBkey().toString(), insert); + } + + public ArcusFuture bopInsert(String key, BTreeElement element) { + return bopInsert(key, element, null); + } + + @Override + public ArcusFuture bopUpsert(String key, BTreeElement element, + CollectionAttributes attributes) { + BTreeUpsert upsert = new BTreeUpsert<>(element.getValue(), element.getEFlag(), + null, attributes); + return collectionInsert(key, element.getBkey().toString(), upsert); + } + + @Override + public ArcusFuture bopUpsert(String key, BTreeElement element) { + return bopUpsert(key, element, null); + } + + private ArcusFuture collectionInsert(String key, + String internalKey, + CollectionInsert collectionInsert) { + AbstractArcusResult result = new AbstractArcusResult<>(new AtomicReference<>()); + ArcusFutureImpl future = new ArcusFutureImpl<>(result); + CachedData co = tcForCollection.encode(collectionInsert.getValue()); + ArcusClient client = arcusClientSupplier.get(); + + OperationCallback cb = new OperationCallback() { + @Override + public void receivedStatus(OperationStatus status) { + if (!status.isSuccess()) { + switch (status.getStatusCode()) { + case ERR_ELEMENT_EXISTS: + case ERR_NOT_FOUND: + break; + case CANCELLED: + future.internalCancel(); + return; + default: + /* + * TYPE_MISMATCH / BKEY_MISMATCH / OVERFLOWED / OUT_OF_RANGE / NOT_SUPPORTED + * or unknown statement + */ + result.addError(key, status); + return; + } + } + result.set(status.isSuccess()); + } + + @Override + public void complete() { + future.complete(); + } + }; + CollectionInsertOperation op = client.getOpFact() + .collectionInsert(key, internalKey, collectionInsert, co.getData(), cb); + future.setOp(op); + client.addOp(key, op); + + return future; + } + + public ArcusFuture>> bopInsertAndGetTrimmed( + String key, BTreeElement element, CollectionAttributes attributes) { + return bopInsertOrUpsertAndGetTrimmed(key, element, false, attributes); + } + + public ArcusFuture>> bopInsertAndGetTrimmed( + String key, BTreeElement element) { + return bopInsertOrUpsertAndGetTrimmed(key, element, false, null); + } + + public ArcusFuture>> bopUpsertAndGetTrimmed( + String key, BTreeElement element, CollectionAttributes attributes) { + return bopInsertOrUpsertAndGetTrimmed(key, element, true, attributes); + } + + public ArcusFuture>> bopUpsertAndGetTrimmed( + String key, BTreeElement element) { + return bopInsertOrUpsertAndGetTrimmed(key, element, true, null); + } + + private ArcusFutureImpl>> bopInsertOrUpsertAndGetTrimmed( + String key, BTreeElement element, boolean isUpsert, CollectionAttributes attributes) { + AbstractArcusResult>> result = + new AbstractArcusResult<>(new AtomicReference<>()); + ArcusFutureImpl>> future = new ArcusFutureImpl<>(result); + BTreeInsertAndGet insertAndGet = createBTreeInsertAndGet(element, isUpsert, attributes); + CachedData co = tcForCollection.encode(insertAndGet.getValue()); + insertAndGet.setFlags(co.getFlags()); + ArcusClient client = arcusClientSupplier.get(); + + BTreeInsertAndGetOperation.Callback cb = new BTreeInsertAndGetOperation.Callback() { + private BTreeElement trimmedElement = null; + + public void receivedStatus(OperationStatus status) { + if (!status.isSuccess()) { + switch (status.getStatusCode()) { + case ERR_ELEMENT_EXISTS: + case ERR_NOT_FOUND: + break; + case CANCELLED: + future.internalCancel(); + return; + default: + /* + * TYPE_MISMATCH / BKEY_MISMATCH / OVERFLOWED / OUT_OF_RANGE / NOT_SUPPORTED + * or unknown statement + */ + result.addError(key, status); + return; + } + } + result.set(new AbstractMap.SimpleEntry<>(status.isSuccess(), trimmedElement)); + } + + public void complete() { + future.complete(); + } + + @Override + public void gotData(int flags, BKeyObject bKeyObject, byte[] eFlag, byte[] data) { + trimmedElement = new BTreeElement<>(BKey.of(bKeyObject), + tcForCollection.decode(new CachedData(flags, data, tc.getMaxSize())), eFlag); + } + }; + Operation op = client.getOpFact() + .bopInsertAndGet(key, insertAndGet, co.getData(), cb); + future.setOp(op); + client.addOp(key, op); + + return future; + } + + private static BTreeInsertAndGet createBTreeInsertAndGet(BTreeElement element, + boolean isUpsert, + CollectionAttributes attributes) { + BTreeInsertAndGet insertAndGet; + if (element.getBkey().getType() == BKey.BKeyType.LONG) { + insertAndGet = new BTreeInsertAndGet<>((Long) element.getBkey().getData(), + element.getEFlag(), element.getValue(), isUpsert, attributes); + } else { + insertAndGet = new BTreeInsertAndGet<>((byte[]) element.getBkey().getData(), + element.getEFlag(), element.getValue(), isUpsert, attributes); + } + return insertAndGet; + } + + public ArcusFuture> bopGet(String key, BKey bKey, BopGetArgs args) { + AbstractArcusResult> result = + new AbstractArcusResult<>(new AtomicReference<>()); + ArcusFutureImpl> future = new ArcusFutureImpl<>(result); + BTreeGet get = createBTreeGet(bKey, args); + ArcusClient client = arcusClientSupplier.get(); + + CollectionGetOperation.Callback cb = new CollectionGetOperation.Callback() { + public void receivedStatus(OperationStatus status) { + if (!status.isSuccess()) { + switch (status.getStatusCode()) { + case ERR_NOT_FOUND: + result.set(null); + break; + case ERR_NOT_FOUND_ELEMENT: + result.set(new BTreeElement<>(bKey, null, null)); + break; + case CANCELLED: + future.internalCancel(); + break; + default: + /* + * TYPE_MISMATCH / BKEY_MISMATCH / OUT_OF_RANGE / UNREADABLE / NOT_SUPPORTED + * or unknown statement + */ + result.addError(key, status); + } + } + } + + public void complete() { + future.complete(); + } + + public void gotData(String bKey, int flags, byte[] data, byte[] eflag) { + result.set(new BTreeElement<>(BKey.of(bKey), + tcForCollection.decode(new CachedData(flags, data, tc.getMaxSize())), eflag)); + } + }; + Operation op = client.getOpFact().collectionGet(key, get, cb); + future.setOp(op); + client.addOp(key, op); + + return future; + } + + private static BTreeGet createBTreeGet(BKey bKey, BopGetArgs args) { + BTreeGet get; + if (bKey.getType() == BKey.BKeyType.LONG) { + get = new BTreeGet((long) bKey.getData(), args.getElementFlagFilter(), + args.isWithDelete(), args.isDropIfEmpty()); + } else { + get = new BTreeGet((byte[]) bKey.getData(), args.getElementFlagFilter(), + args.isWithDelete(), args.isDropIfEmpty()); + } + return get; + } + + public ArcusFuture> bopGet(String key, BKey from, BKey to, BopGetArgs args) { + verifyBKeyRange(from, to); + + AbstractArcusResult> result = + new AbstractArcusResult<>(new AtomicReference<>(new BTreeElements<>(new ArrayList<>()))); + ArcusFutureImpl> future = new ArcusFutureImpl<>(result); + BTreeGet get = createBTreeGet(from, to, args); + ArcusClient client = arcusClientSupplier.get(); + + CollectionGetOperation.Callback cb = new CollectionGetOperation.Callback() { + public void receivedStatus(OperationStatus status) { + if (status.getStatusCode() == StatusCode.TRIMMED) { + result.get().trimmed(); + } else if (!status.isSuccess()) { + switch (status.getStatusCode()) { + case ERR_NOT_FOUND: + result.set(null); + break; + case ERR_NOT_FOUND_ELEMENT: + break; + case CANCELLED: + future.internalCancel(); + break; + default: + /* + * TYPE_MISMATCH / BKEY_MISMATCH / OUT_OF_RANGE / UNREADABLE / NOT_SUPPORTED + * or unknown statement + */ + result.addError(key, status); + } + } + } + + public void complete() { + future.complete(); + } + + public void gotData(String bKey, int flags, byte[] data, byte[] eflag) { + result.get().addElement(new BTreeElement<>(BKey.of(bKey), tcForCollection.decode( + new CachedData(flags, data, tc.getMaxSize())), eflag)); + } + }; + Operation op = client.getOpFact().collectionGet(key, get, cb); + future.setOp(op); + client.addOp(key, op); + + return future; + } + + private static BTreeGet createBTreeGet(BKey from, BKey to, BopGetArgs args) { + BTreeGet get; + if (from.getType() == BKey.BKeyType.LONG) { + get = new BTreeGet((Long) from.getData(), (Long) to.getData(), + args.getElementFlagFilter(), args.getOffset(), args.getCount(), + args.isWithDelete(), args.isDropIfEmpty()); + } else { + get = new BTreeGet((byte[]) from.getData(), (byte[]) to.getData(), + args.getElementFlagFilter(), args.getOffset(), args.getCount(), + args.isWithDelete(), args.isDropIfEmpty()); + } + return get; + } + + public ArcusFuture>> bopMultiGet(List keys, + BKey from, BKey to, + BopGetArgs args) { + verifyBKeyRange(from, to); + verifyCountArg(args, ArcusClient.MAX_GETBULK_ELEMENT_COUNT); + + ArcusClient client = arcusClientSupplier.get(); + keyValidator.validateKey(keys); + keyValidator.checkDupKey(keys); + Collection>> arrangedKeys = + client.groupingKeys(keys, ArcusClient.BOPGET_BULK_CHUNK_SIZE, APIType.BOP_GET); + + Collection> futures = new ArrayList<>(); + Map>>, List> futureToKeys = + new HashMap<>(); + + for (Map.Entry> entry : arrangedKeys) { + BTreeGetBulk getBulk = createBTreeGetBulk(from, to, args, entry); + CompletableFuture>> future = + bopMultiGetPerNode(client, getBulk).toCompletableFuture(); + futureToKeys.put(future, entry.getValue()); + futures.add(future); + } + + /* + * Combine all futures. If any future fails exceptionally, + * the corresponding keys will have null values in the result map. + * If key not found, the corresponding key will not be present in the result map. + */ + return new ArcusMultiFuture<>(futures, () -> { + Map> results = new HashMap<>(); + for (Map.Entry>>, List> entry + : futureToKeys.entrySet()) { + if (entry.getKey().isCompletedExceptionally()) { + for (String key : entry.getValue()) { + results.put(key, null); + } + } else { + Map> result = entry.getKey().join(); + if (result != null) { + results.putAll(result); + } + } + } + return results; + }); + } + + private void verifyCountArg(BopGetArgs args, int maxCount) { + int count = args.getCount(); + if (count <= 0 || count > maxCount) { + throw new IllegalArgumentException("Count should be between 0 to " + maxCount); + } + } + + /** + * Use only in bopMultiGet method. + * + * @param getBulk get bulk parameters for single node + * @return ArcusFuture with results + */ + private ArcusFuture>> bopMultiGetPerNode(ArcusClient client, + BTreeGetBulk getBulk) { + AbstractArcusResult>> result = + new AbstractArcusResult<>(new AtomicReference<>(new HashMap<>())); + ArcusFutureImpl>> future = new ArcusFutureImpl<>(result); + + BTreeGetBulkOperation.Callback cb = new BTreeGetBulkOperation.Callback() { + @Override + public void receivedStatus(OperationStatus status) { + if (status.getStatusCode() == StatusCode.CANCELLED) { + future.internalCancel(); + } else if (!status.isSuccess()) { + /* + * NOT_SUPPORTED or unknown statement + */ + for (String key : getBulk.getKeyList()) { + result.addError(key, status); + } + } + } + + @Override + public void complete() { + future.complete(); + } + + @Override + public void gotKey(String key, int elementCount, OperationStatus status) { + if (status.isSuccess()) { + BTreeElements elements = new BTreeElements<>(new ArrayList<>()); + result.get().put(key, elements); + if (status.getStatusCode() == StatusCode.TRIMMED) { + elements.trimmed(); + } + return; + } + switch (status.getStatusCode()) { + case ERR_NOT_FOUND: + break; + case ERR_NOT_FOUND_ELEMENT: + // Put empty BTreeElements for the BTree item key + result.get().put(key, new BTreeElements<>(new ArrayList<>())); + break; + default: + /* + * TYPE_MISMATCH / BKEY_MISMATCH / OUT_OF_RANGE / UNREADABLE + * or unknown statement + */ + result.addError(key, status); + } + } + + @Override + public void gotElement(String key, int flags, Object bKey, byte[] eFlag, byte[] data) { + BTreeElements elements = result.get().get(key); + elements.addElement(new BTreeElement<>(BKey.of(bKey), tcForCollection.decode( + new CachedData(flags, data, tc.getMaxSize())), eFlag)); + } + }; + Operation op = client.getOpFact().bopGetBulk(getBulk, cb); + future.setOp(op); + client.addOp(getBulk.getMemcachedNode(), op); + + return future; + } + + private static void verifyBKeyRange(BKey from, BKey to) { + if (from.getType() != to.getType()) { + throw new IllegalArgumentException("Two BKey types(from, to) must be the same."); + } + } + + private BTreeGetBulk createBTreeGetBulk(BKey from, BKey to, BopGetArgs args, + Map.Entry> entry) { + if (from.getType() == BKey.BKeyType.LONG) { + return new BTreeGetBulkWithLongTypeBkey<>(entry.getKey(), entry.getValue(), + (Long) from.getData(), (Long) to.getData(), args.getElementFlagFilter(), + args.getOffset(), args.getCount()); + } else { + return new BTreeGetBulkWithByteTypeBkey<>(entry.getKey(), entry.getValue(), + (byte[]) from.getData(), (byte[]) to.getData(), args.getElementFlagFilter(), + args.getOffset(), args.getCount()); + } + } + + public ArcusFuture> bopSortMergeGet(List keys, BKey from, BKey to, + boolean unique, BopGetArgs args) { + verifyBKeyRange(from, to); + verifyCountArg(args, ArcusClient.MAX_SMGET_COUNT); + + ArcusClient client = arcusClientSupplier.get(); + keyValidator.validateKey(keys); + keyValidator.checkDupKey(keys); + + Collection>> arrangedKeys = + client.groupingKeys(keys, ArcusClient.SMGET_CHUNK_SIZE, APIType.BOP_SMGET); + + Collection> futures = new ArrayList<>(); + List>> smGetFutures = new ArrayList<>(); + + for (Map.Entry> entry : arrangedKeys) { + BTreeSMGet smGet = createBTreeSMGet(from, to, args, unique, entry); + CompletableFuture> future = + bopSortMergeGetPerNode(client, smGet).toCompletableFuture(); + smGetFutures.add(future); + futures.add(future); + } + + /* + * Combine all futures and merge results from multiple nodes. + */ + return new ArcusMultiFuture<>(futures, () -> { + List> results = new ArrayList<>(); + for (CompletableFuture> future : smGetFutures) { + if (!future.isCompletedExceptionally()) { + results.add(future.join()); + } + } + return SMGetElements.mergeSMGetElements(results, from.compareTo(to) <= 0, unique, + args.getCount()); + }); + } + + /** + * Use only in bopSortMergeGet method. + * + * @param smGet sort-merge get parameters for single node + * @return ArcusFuture with results + */ + private ArcusFuture> bopSortMergeGetPerNode(ArcusClient client, + BTreeSMGet smGet) { + List> elementList = new ArrayList<>(); + List missedKeys = new ArrayList<>(); + List trimmedKeys = new ArrayList<>(); + SMGetElements smGetElements = new SMGetElements<>(elementList, missedKeys, trimmedKeys); + + AtomicReference> atomicReference = new AtomicReference<>(smGetElements); + AbstractArcusResult> result = + new AbstractArcusResult<>(atomicReference); + + ArcusFutureImpl> future = new ArcusFutureImpl<>(result); + + BTreeSortMergeGetOperation.Callback cb = new BTreeSortMergeGetOperation.Callback() { + @Override + public void receivedStatus(OperationStatus status) { + if (status.getStatusCode() == StatusCode.CANCELLED) { + future.internalCancel(); + } else if (!status.isSuccess()) { + /* + * TYPE_MISMATCH / BKEY_MISMATCH / OUT_OF_RANGE / NOT_SUPPORTED or unknown statement + */ + for (String key : smGet.getKeyList()) { + result.addError(key, status); + } + } + } + + @Override + public void complete() { + future.complete(); + } + + @Override + public void gotData(String key, int flags, Object bKey, byte[] eFlag, byte[] data) { + BTreeElement btreeElement = new BTreeElement<>(BKey.of(bKey), + tcForCollection.decode(new CachedData(flags, data, tc.getMaxSize())), eFlag); + elementList.add(new SMGetElements.Element<>(key, btreeElement)); + } + + @Override + public void gotMissedKey(String key, OperationStatus cause) { + missedKeys.add(new SMGetElements.MissedKey(key, cause.getStatusCode())); + } + + @Override + public void gotTrimmedKey(String key, Object bKey) { + trimmedKeys.add(new SMGetElements.TrimmedKey(key, BKey.of(bKey))); + } + }; + Operation op = client.getOpFact().bopsmget(smGet, cb); + future.setOp(op); + client.addOp(smGet.getMemcachedNode(), op); + + return future; + } + + private BTreeSMGet createBTreeSMGet(BKey from, BKey to, BopGetArgs args, + boolean unique, + Map.Entry> entry) { + + if (from.getType() == BKey.BKeyType.LONG) { + return new BTreeSMGetWithLongTypeBkey<>(entry.getKey(), entry.getValue(), + (Long) from.getData(), (Long) to.getData(), args.getElementFlagFilter(), + args.getCount(), unique); + } else { + return new BTreeSMGetWithByteTypeBkey<>(entry.getKey(), entry.getValue(), + (byte[]) from.getData(), (byte[]) to.getData(), args.getElementFlagFilter(), + args.getCount(), unique); + } + } } diff --git a/src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java b/src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java index 085da0267..44893a102 100644 --- a/src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java +++ b/src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java @@ -20,6 +20,14 @@ import java.util.List; import java.util.Map; +import net.spy.memcached.collection.CollectionAttributes; +import net.spy.memcached.collection.ElementValueType; +import net.spy.memcached.v2.vo.BKey; +import net.spy.memcached.v2.vo.BTreeElement; +import net.spy.memcached.v2.vo.BTreeElements; +import net.spy.memcached.v2.vo.BopGetArgs; +import net.spy.memcached.v2.vo.SMGetElements; + public interface AsyncArcusCommandsIF { /** @@ -106,4 +114,151 @@ public interface AsyncArcusCommandsIF { */ ArcusFuture flush(int delay); + /** + * Create a btree item. + * + * @param key key to create + * @param type btree element value type + * @param attributes collection attributes (must not be null) + * @return {@code Boolean.True} if created, otherwise {@code Boolean.False} + */ + ArcusFuture bopCreate(String key, ElementValueType type, + CollectionAttributes attributes); + + /** + * Insert an element into a btree item. + * + * @param key key to insert + * @param element btree element to insert + * @param attributes collection attributes for creation when the btree does not exist + * @return {@code Boolean.True} if inserted, {@code Boolean.False} if element exists, + * {@code null} if key is not found + */ + ArcusFuture bopInsert(String key, BTreeElement element, + CollectionAttributes attributes); + + /** + * Insert an element into a btree item. + * + * @param key key to insert + * @param element btree element to insert + * @return {@code Boolean.True} if inserted, {@code Boolean.False} if element exists, + * {@code null} if key is not found + */ + ArcusFuture bopInsert(String key, BTreeElement element); + + /** + * Upsert an element into a btree item. + * + * @param key key to upsert + * @param element btree element to upsert + * @param attributes collection attributes for creation when the btree does not exist + * @return {@code Boolean.True} if upserted, {@code Boolean.False} otherwise + */ + ArcusFuture bopUpsert(String key, BTreeElement element, + CollectionAttributes attributes); + + /** + * Upsert an element into a btree item. + * + * @param key key to upsert + * @param element btree element to upsert + * @return {@code Boolean.True} if upserted, {@code Boolean.False} otherwise + */ + ArcusFuture bopUpsert(String key, BTreeElement element); + + /** + * Insert an element into a btree item and get trimmed element if overflow trim occurs. + * + * @param key key to insert + * @param element btree element to insert + * @param attributes collection attributes for creation when the btree does not exist + * @return {@code Map.Entry} with insertion result and trimmed element + */ + ArcusFuture>> bopInsertAndGetTrimmed( + String key, BTreeElement element, CollectionAttributes attributes); + + /** + * Insert an element into a btree item and get trimmed element if overflow trim occurs. + * + * @param key key to insert + * @param element btree element to insert + * @return {@code Map.Entry} with insertion result and trimmed element + */ + ArcusFuture>> bopInsertAndGetTrimmed(String key, + BTreeElement element); + + /** + * Upsert an element into a btree item and get trimmed element if overflow trim occurs. + * + * @param key key to upsert + * @param element btree element to upsert + * @param attributes collection attributes for creation when the btree does not exist + * @return {@code Map.Entry} with upsertion result and trimmed element + */ + ArcusFuture>> bopUpsertAndGetTrimmed( + String key, BTreeElement element, CollectionAttributes attributes); + + /** + * Upsert an element into a btree item and get trimmed element if overflow trim occurs. + * + * @param key key to upsert + * @param element btree element to upsert + * @return {@code Map.Entry} with upsertion result and trimmed element + */ + ArcusFuture>> bopUpsertAndGetTrimmed(String key, + BTreeElement element); + + /** + * Get an element from a btree item. + * + * @param key key to get + * @param bKey BKey of the element to get + * @param args arguments for get operation + * @return {@code BTreeElement} if found, {@code BTreeElement} with null value and eFlag + * if element is not found but key exists, {@code null} if key is not found + */ + ArcusFuture> bopGet(String key, BKey bKey, BopGetArgs args); + + /** + * Get elements from a btree item. + * + * @param key key to get + * @param from BKey range start + * @param to BKey range end + * @param args arguments for get operation + * @return {@code BTreeElements} that contains trimmed or not and elements. + * If element is not found but key exists, {@code BTreeElements} with empty map will be returned. + * If key is not found, {@code null} will be returned. + */ + ArcusFuture> bopGet(String key, BKey from, BKey to, BopGetArgs args); + + /** + * Get elements from multiple btree items. + * + * @param keys list of keys to get + * @param from BKey range start + * @param to BKey range end + * @param args arguments for get operation + * @return Map of key to BTreeElements. If element is not found but key exists, + * empty {@code BTreeElements} will be set for entry value. If key is not found, + * the corresponding entry will not be present in the map. + */ + ArcusFuture>> bopMultiGet(List keys, + BKey from, BKey to, + BopGetArgs args); + + /** + * Get sort-merged elements from multiple btree items. + * + * @param keys list of keys to get + * @param from BKey range start + * @param to BKey range end + * @param unique whether to return unique elements only + * @param args arguments for get operation + * @return {@code SMGetElements} containing sort-merged elements. Never return {@code null}. + * If matching elements not exist, the elements list in the {@code SMGetElements} will be empty. + */ + ArcusFuture> bopSortMergeGet(List keys, BKey from, BKey to, + boolean unique, BopGetArgs args); } diff --git a/src/main/java/net/spy/memcached/v2/vo/BKey.java b/src/main/java/net/spy/memcached/v2/vo/BKey.java new file mode 100644 index 000000000..7b841d790 --- /dev/null +++ b/src/main/java/net/spy/memcached/v2/vo/BKey.java @@ -0,0 +1,135 @@ +package net.spy.memcached.v2.vo; + +import java.util.Arrays; +import java.util.Objects; + +import net.spy.memcached.collection.BKeyObject; +import net.spy.memcached.util.BTreeUtil; + +public final class BKey implements Comparable { + private final BKeyType type; + private final Object data; + + private BKey(long data) { + if (data < 0) { + throw new IllegalArgumentException("BKey long value cannot be negative."); + } + this.type = BKeyType.LONG; + this.data = data; + } + + private BKey(byte[] data) { + if (data == null) { + throw new IllegalArgumentException("BKey byte array cannot be null."); + } + + if (data.length > 31) { + throw new IllegalArgumentException( + "BKey byte array size must be between 0 and 31. Given size: " + data.length); + } + + this.type = BKeyType.BYTE_ARRAY; + this.data = Arrays.copyOf(data, data.length); + } + + public static BKey of(Object bKey) { + if (bKey == null) { + throw new IllegalArgumentException("BKey cannot be null"); + } + if (bKey instanceof Long) { + return new BKey((Long) bKey); + } else if (bKey instanceof byte[]) { + return new BKey((byte[]) bKey); + } else if (bKey instanceof String) { + String bkeyString = (String) bKey; + try { + return new BKey(Long.parseLong(bkeyString)); + } catch (NumberFormatException e) { + return new BKey(BTreeUtil.hexStringToByteArrays(bkeyString)); + } + } else { + throw new IllegalArgumentException("Unsupported BKey type: " + bKey.getClass()); + } + } + + public static BKey of(BKeyObject bkeyObject) { + if (bkeyObject == null) { + throw new IllegalArgumentException("BKeyObject cannot be null"); + } + + if (bkeyObject.isByteArray()) { + return new BKey(bkeyObject.getByteArrayBKeyRaw()); + } else { + return new BKey(bkeyObject.getLongBKey()); + } + } + + public enum BKeyType { + BYTE_ARRAY, + LONG; + } + + public Object getData() { + if (type == BKeyType.BYTE_ARRAY) { + byte[] bytes = (byte[]) data; + return Arrays.copyOf(bytes, bytes.length); + } + return data; + } + + public BKeyType getType() { + return type; + } + + @Override + public int compareTo(BKey o) { + if (this.type != o.type) { + throw new IllegalArgumentException("Cannot compare different BKey types."); + } + + if (this.type == BKeyType.LONG) { + return ((Long) this.data).compareTo((Long) o.data); + } else { + return BTreeUtil.compareByteArraysInLexOrder((byte[]) this.data, (byte[]) o.data); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || this.getClass() != o.getClass()) { + return false; + } + BKey bKey = (BKey) o; + if (this.type != bKey.type) { + return false; + } + + if (this.type == BKeyType.LONG) { + return this.data.equals(bKey.data); + } else { + return Arrays.equals((byte[]) this.data, (byte[]) bKey.data); + } + } + + @Override + public int hashCode() { + if (this.type == BKeyType.LONG) { + return Objects.hash(this.type, this.data); + } else { + return Objects.hash(this.type, Arrays.hashCode((byte[]) this.data)); + } + } + + @Override + public String toString() { + if (this.type == BKeyType.LONG) { + return String.valueOf(this.data); + } else { + return BTreeUtil.toHex((byte[]) this.data); + } + } +} diff --git a/src/main/java/net/spy/memcached/v2/vo/BTreeElement.java b/src/main/java/net/spy/memcached/v2/vo/BTreeElement.java new file mode 100644 index 000000000..abfabdace --- /dev/null +++ b/src/main/java/net/spy/memcached/v2/vo/BTreeElement.java @@ -0,0 +1,55 @@ +package net.spy.memcached.v2.vo; + +import java.util.Arrays; +import java.util.Objects; + +public final class BTreeElement implements Comparable> { + private final BKey bkey; + private final V value; + private final byte[] eFlag; + + public BTreeElement(BKey bkey, V value, byte[] eFlag) { + if (bkey == null) { + throw new IllegalArgumentException("BKey cannot be null"); + } + this.bkey = bkey; + this.value = value; + this.eFlag = eFlag; + } + + public BKey getBkey() { + return bkey; + } + + public V getValue() { + return value; + } + + public byte[] getEFlag() { + return eFlag; + } + + @Override + public int compareTo(BTreeElement o) { + return this.bkey.compareTo(o.bkey); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + BTreeElement that = (BTreeElement) o; + return Objects.equals(bkey, that.bkey) && + Objects.equals(value, that.value) && Objects.deepEquals(eFlag, that.eFlag); + } + + @Override + public int hashCode() { + return Objects.hash(bkey, value, Arrays.hashCode(eFlag)); + } +} diff --git a/src/main/java/net/spy/memcached/v2/vo/BTreeElements.java b/src/main/java/net/spy/memcached/v2/vo/BTreeElements.java new file mode 100644 index 000000000..ae23d771e --- /dev/null +++ b/src/main/java/net/spy/memcached/v2/vo/BTreeElements.java @@ -0,0 +1,32 @@ +package net.spy.memcached.v2.vo; + +import java.util.Collections; +import java.util.List; + +public final class BTreeElements { + private boolean isTrimmed; + private final List> elements; + + public BTreeElements(List> elements) { + if (elements == null) { + throw new IllegalArgumentException("Elements map cannot be null"); + } + this.elements = elements; + } + + public boolean isTrimmed() { + return isTrimmed; + } + + public List> getElements() { + return Collections.unmodifiableList(elements); + } + + public void trimmed() { + this.isTrimmed = true; + } + + public void addElement(BTreeElement element) { + this.elements.add(element); + } +} diff --git a/src/main/java/net/spy/memcached/v2/vo/BopGetArgs.java b/src/main/java/net/spy/memcached/v2/vo/BopGetArgs.java new file mode 100644 index 000000000..0f12faf98 --- /dev/null +++ b/src/main/java/net/spy/memcached/v2/vo/BopGetArgs.java @@ -0,0 +1,100 @@ +package net.spy.memcached.v2.vo; + +import net.spy.memcached.collection.ElementFlagFilter; + +public final class BopGetArgs { + + public static final BopGetArgs DEFAULT = new BopGetArgs.Builder().build(); + + private final ElementFlagFilter eFlagFilter; + private final int offset; + private final int count; + private final boolean withDelete; + private final boolean dropIfEmpty; + + private BopGetArgs(ElementFlagFilter eFlagFilter, int offset, int count, + boolean withDelete, boolean dropIfEmpty) { + this.eFlagFilter = eFlagFilter; + this.offset = offset; + this.count = count; + this.withDelete = withDelete; + this.dropIfEmpty = dropIfEmpty; + } + + public ElementFlagFilter getElementFlagFilter() { + return eFlagFilter; + } + + public int getOffset() { + return offset; + } + + public int getCount() { + return count; + } + + public boolean isWithDelete() { + return withDelete; + } + + public boolean isDropIfEmpty() { + return dropIfEmpty; + } + + public static final class Builder { + private ElementFlagFilter eFlagFilter = null; + private int offset = 0; + private int count = 50; + private boolean withDelete = false; + private boolean dropIfEmpty = false; + + public Builder eFlagFilter(ElementFlagFilter eFlagFilter) { + this.eFlagFilter = eFlagFilter; + return this; + } + + /** + * Set the offset only for {@code AsyncArcusCommands#bopGet} + * or {@code AsyncArcusCommands#bopMultiGet} + * + * @param offset to skip elements that match condition from the 'from' BKey + */ + public Builder offset(int offset) { + if (offset < 0) { + throw new IllegalArgumentException("offset cannot be negative"); + } + this.offset = offset; + return this; + } + + /** + * Set the count of elements to retrieve. + * + * @param count For bopGet or bopMultiGet method, + * set the number of elements to retrieve from each BTree item. + * For bopSortMergeGet method, + * set the total number of elements to retrieve across all BTree items. + */ + public Builder count(int count) { + if (count < 0) { + throw new IllegalArgumentException("count cannot be negative"); + } + this.count = count; + return this; + } + + public Builder withDelete() { + this.withDelete = true; + return this; + } + + public Builder dropIfEmpty() { + this.dropIfEmpty = true; + return this; + } + + public BopGetArgs build() { + return new BopGetArgs(eFlagFilter, offset, count, withDelete, dropIfEmpty); + } + } +} diff --git a/src/main/java/net/spy/memcached/v2/vo/SMGetElements.java b/src/main/java/net/spy/memcached/v2/vo/SMGetElements.java new file mode 100644 index 000000000..71624e6f4 --- /dev/null +++ b/src/main/java/net/spy/memcached/v2/vo/SMGetElements.java @@ -0,0 +1,265 @@ +package net.spy.memcached.v2.vo; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import net.spy.memcached.ops.StatusCode; + +public final class SMGetElements { + private final List> elements; + private final List missedKeys; + private final List trimmedKeys; + + public SMGetElements(List> elements, + List missedKeys, + List trimmedKeys) { + if (elements == null || missedKeys == null || trimmedKeys == null) { + throw new IllegalArgumentException("Arguments cannot be null"); + } + this.elements = elements; + this.missedKeys = missedKeys; + this.trimmedKeys = trimmedKeys; + } + + public static SMGetElements mergeSMGetElements(List> smGetElementsList, + boolean ascending, + boolean unique, int count) { + List> allElements = new ArrayList<>(); + List allMissedKeys = new ArrayList<>(); + List allTrimmedKeys = new ArrayList<>(); + + // 1) Collect elements (deduplicate while collecting if unique is true) + if (unique) { + collectUniqueElements(smGetElementsList, allElements, allMissedKeys, allTrimmedKeys, + ascending); + } else { + collectDuplicatedElements(smGetElementsList, allElements, allMissedKeys, allTrimmedKeys); + } + + // 2) Sort elements, missed keys, and trimmed keys + if (ascending) { + allElements.sort(Comparator.naturalOrder()); + } else { + allElements.sort(Comparator.reverseOrder()); + } + Collections.sort(allMissedKeys); + Collections.sort(allTrimmedKeys); + + // 3) Trim elements to the requested count + if (allElements.size() > count) { + allElements = new ArrayList<>(allElements.subList(0, count)); + } + + // 4) Remove trimmed keys outside the final element range + if (!allElements.isEmpty()) { + BKey lastBKey = allElements.get(allElements.size() - 1).getbTreeElement().getBkey(); + allTrimmedKeys.removeIf(trimmedKey -> { + int comp = trimmedKey.getBKey().compareTo(lastBKey); + return ascending ? comp >= 0 : comp <= 0; + }); + } + + return new SMGetElements<>(allElements, allMissedKeys, allTrimmedKeys); + } + + private static void collectUniqueElements( + List> smGetElementsList, + List> allElements, + List allMissedKeys, + List allTrimmedKeys, + boolean ascending) { + Map> uniqueMap = new HashMap<>(); + + for (SMGetElements smGetElements : smGetElementsList) { + for (Element element : smGetElements.getElements()) { + BKey bkey = element.getbTreeElement().getBkey(); + + uniqueMap.compute(bkey, (k, existing) -> { + if (existing == null) { + return element; + } + + // Remain only one element when bkey is duplicated: + // - smaller key if ascending bkey range + // - larger key if descending bkey range + int keyComparison = existing.getKey().compareTo(element.getKey()); + if (ascending) { + return keyComparison <= 0 ? existing : element; + } else { + return keyComparison >= 0 ? existing : element; + } + }); + } + allMissedKeys.addAll(smGetElements.getMissedKeys()); + allTrimmedKeys.addAll(smGetElements.getTrimmedKeys()); + } + + allElements.addAll(uniqueMap.values()); + } + + private static void collectDuplicatedElements( + List> smGetElementsList, + List> allElements, + List allMissedKeys, + List allTrimmedKeys) { + for (SMGetElements smgetElements : smGetElementsList) { + allElements.addAll(smgetElements.getElements()); + allMissedKeys.addAll(smgetElements.getMissedKeys()); + allTrimmedKeys.addAll(smgetElements.getTrimmedKeys()); + } + } + + public List> getElements() { + return Collections.unmodifiableList(elements); + } + + public List getMissedKeys() { + return Collections.unmodifiableList(missedKeys); + } + + public List getTrimmedKeys() { + return Collections.unmodifiableList(trimmedKeys); + } + + public static final class Element implements Comparable> { + + private final String key; + private final BTreeElement bTreeElement; + + public Element(String key, BTreeElement element) { + if (key == null || element == null) { + throw new IllegalArgumentException("key or element cannot be null"); + } + this.key = key; + this.bTreeElement = element; + } + + @Override + public int compareTo(Element o) { + int elementComparison = bTreeElement.compareTo(o.getbTreeElement()); + if (elementComparison == 0) { + return this.key.compareTo(o.key); + } + return elementComparison; + } + + public String getKey() { + return key; + } + + public BTreeElement getbTreeElement() { + return bTreeElement; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + Element that = (Element) o; + return Objects.equals(key, that.key) && Objects.equals(bTreeElement, that.bTreeElement); + } + + @Override + public int hashCode() { + return Objects.hash(key, bTreeElement); + } + } + + public static final class MissedKey implements Comparable { + private final String key; + private final StatusCode statusCode; + + public MissedKey(String key, StatusCode statusCode) { + if (key == null || statusCode == null) { + throw new IllegalArgumentException("key or statusCode cannot be null"); + } + this.key = key; + this.statusCode = statusCode; + } + + public String getKey() { + return key; + } + + public StatusCode getStatusCode() { + return statusCode; + } + + @Override + public int compareTo(MissedKey o) { + return this.key.compareTo(o.key); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + MissedKey missedKey = (MissedKey) o; + return Objects.equals(key, missedKey.key) && statusCode == missedKey.statusCode; + } + + @Override + public int hashCode() { + return Objects.hash(key, statusCode); + } + } + + public static final class TrimmedKey implements Comparable { + private final String key; + private final BKey bKey; + + public TrimmedKey(String key, BKey bKey) { + if (key == null || bKey == null) { + throw new IllegalArgumentException("key or bKey cannot be null"); + } + this.key = key; + this.bKey = bKey; + } + + public String getKey() { + return key; + } + + public BKey getBKey() { + return bKey; + } + + @Override + public int compareTo(TrimmedKey o) { + return this.key.compareTo(o.key); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + TrimmedKey that = (TrimmedKey) o; + return Objects.equals(key, that.key) && Objects.equals(bKey, that.bKey); + } + + @Override + public int hashCode() { + return Objects.hash(key, bKey); + } + } +} diff --git a/src/test/java/net/spy/memcached/v2/BTreeAsyncArcusCommandsTest.java b/src/test/java/net/spy/memcached/v2/BTreeAsyncArcusCommandsTest.java new file mode 100644 index 000000000..6728b4906 --- /dev/null +++ b/src/test/java/net/spy/memcached/v2/BTreeAsyncArcusCommandsTest.java @@ -0,0 +1,830 @@ +package net.spy.memcached.v2; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import net.spy.memcached.collection.CollectionAttributes; +import net.spy.memcached.collection.CollectionOverflowAction; +import net.spy.memcached.collection.ElementValueType; +import net.spy.memcached.ops.StatusCode; +import net.spy.memcached.v2.vo.BKey; +import net.spy.memcached.v2.vo.BTreeElement; +import net.spy.memcached.v2.vo.BTreeElements; +import net.spy.memcached.v2.vo.BopGetArgs; +import net.spy.memcached.v2.vo.SMGetElements; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertIterableEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class BTreeAsyncArcusCommandsTest extends AsyncArcusCommandsTest { + + private static final List> ELEMENTS = Arrays.asList( + new BTreeElement<>(BKey.of(1L), "value1", null), + new BTreeElement<>(BKey.of(2L), "value2", null), + new BTreeElement<>(BKey.of(3L), "value3", null), + new BTreeElement<>(BKey.of(4L), "value4", null), + new BTreeElement<>(BKey.of(5L), "value5", null) + ); + + @Test + void bopInsert() throws Exception { + // given + String key = keys.get(0); + + // when + async.bopCreate(key, ElementValueType.STRING, new CollectionAttributes()) + .thenCompose(result -> { + assertTrue(result); + return async.bopInsert(key, ELEMENTS.get(0)); + }) + // then + .thenAccept(Assertions::assertTrue) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopInsertDifferentTypeAndGetDifferentElement() throws Exception { + // given + String key = keys.get(0); + BTreeElement element = ELEMENTS.get(0); + + // when + async.bopCreate(key, ElementValueType.LONG, new CollectionAttributes()) + .thenCompose(result -> { + assertTrue(result); + return async.bopInsert(key, element); + }) + .thenCompose(result -> { + assertTrue(result); + return async.bopGet(key, element.getBkey(), BopGetArgs.DEFAULT); + }) + // then + .thenAccept(result -> { + assertNotEquals(element, result); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopInsertNotFound() throws Exception { + // given & when + async.bopInsert(keys.get(0), ELEMENTS.get(0)) + // then + .thenAccept(Assertions::assertFalse) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopInsertTypeMisMatch() throws Exception { + // given + String key = keys.get(0); + CollectionAttributes attrs = new CollectionAttributes(); + + // when + async.set(key, 0, VALUE) + .thenCompose(result -> { + assertTrue(result); + return async.bopInsert(key, ELEMENTS.get(0), attrs); + }) + // then + .exceptionally(throwable -> { + assertNotNull(throwable); + assertTrue(throwable.getCause().getMessage().contains("TYPE_MISMATCH")); + return null; + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopInsertAndGetTrimmed() throws Exception { + // given + String key = keys.get(0); + CollectionAttributes attrs = new CollectionAttributes(); + attrs.setMaxCount(3); + + // when + async.bopInsert(key, ELEMENTS.get(0), attrs) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(1), attrs)) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(2), attrs)) + .thenCompose(result -> async.bopInsertAndGetTrimmed(key, ELEMENTS.get(3), attrs)) + // then + .thenAccept(result -> { + assertTrue(result.getKey()); + BTreeElement trimmedElement = result.getValue(); + assertNotNull(trimmedElement); + assertEquals(ELEMENTS.get(0), trimmedElement); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopFailToInsertAndGetTrimmed() throws Exception { + // given + String key = keys.get(0); + CollectionAttributes attrs = new CollectionAttributes(); + attrs.setMaxCount(3); + + // when + async.bopInsert(key, ELEMENTS.get(0), attrs) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(1))) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(2))) + .thenCompose(result -> async.bopInsertAndGetTrimmed(key, ELEMENTS.get(0))) + // then + .thenAccept(result -> { + assertFalse(result.getKey()); + assertNull(result.getValue()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopUpsertAndGetTrimmed() throws Exception { + // given + String key = keys.get(0); + CollectionAttributes attrs = new CollectionAttributes(); + attrs.setMaxCount(3); + + // when + async.bopInsert(key, ELEMENTS.get(0), attrs) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(1))) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(2))) + .thenCompose(result -> async.bopUpsertAndGetTrimmed(key, + ELEMENTS.get(3))) + // then + .thenAccept(result -> { + assertTrue(result.getKey()); + assertEquals(ELEMENTS.get(0), result.getValue()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopUpsertAndNotGetTrimmed() throws Exception { + // given + String key = keys.get(0); + CollectionAttributes attrs = new CollectionAttributes(); + attrs.setMaxCount(3); + String newValue = "new_value"; + + // when + async.bopInsert(key, ELEMENTS.get(0), attrs) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(1))) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(2))) + .thenCompose(result -> async.bopUpsertAndGetTrimmed(key, + new BTreeElement<>(BKey.of(1L), newValue, null))) + // then + .thenAccept(result -> { + assertTrue(result.getKey()); + assertNull(result.getValue()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopGet() throws Exception { + // given + String key = keys.get(0); + CollectionAttributes attrs = new CollectionAttributes(); + + // when + async.bopInsert(key, ELEMENTS.get(0), attrs) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(1))) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(2))) + .thenCompose(result -> async.bopGet(key, BKey.of(1L), BKey.of(3L), BopGetArgs.DEFAULT)) + // then + .thenAccept(elements -> { + assertEquals(3, elements.getElements().size()); + + int i = 0; + for (BTreeElement element : elements.getElements()) { + assertEquals(ELEMENTS.get(i++), element); + } + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopGetWithDelete() throws Exception { + // given + String key = keys.get(0); + CollectionAttributes attrs = new CollectionAttributes(); + BopGetArgs getArgsWithDelete = new BopGetArgs.Builder() + .withDelete() + .count(10) + .build(); + + // when + async.bopInsert(key, ELEMENTS.get(0), attrs) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(1))) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(2))) + .thenCompose(result -> async.bopGet(key, BKey.of(2L), getArgsWithDelete)) + // then + .thenAccept(element -> assertEquals(ELEMENTS.get(1), element)) + .thenCompose(v -> async.bopGet(key, BKey.of(2L), BopGetArgs.DEFAULT)) + .thenAccept(element -> { + // ELEMENT NOT FOUND + assertNotNull(element); + assertEquals(BKey.of(2L), element.getBkey()); + assertNull(element.getValue()); + assertNull(element.getEFlag()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopGetWithDeleteAndDropIfEmpty() throws Exception { + // given + String key = keys.get(0); + BopGetArgs getArgsWithDeleteAndDrop = new BopGetArgs.Builder() + .withDelete() + .dropIfEmpty() + .count(10) + .build(); + + // when + async.bopInsert(key, ELEMENTS.get(0), new CollectionAttributes()) + .thenCompose(result -> async.bopGet(key, BKey.of(1L), getArgsWithDeleteAndDrop)) + // then + .thenAccept(element -> assertEquals(ELEMENTS.get(0), element)) + .thenCompose(v -> async.bopInsert(key, ELEMENTS.get(1))) + .thenAccept(Assertions::assertFalse) // NOT FOUND (key miss) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopGetRangeNotExistKey() { + // given + String key = keys.get(0); + + // when + async.bopGet(key, BKey.of(1L), BKey.of(10L), BopGetArgs.DEFAULT) + // then + .thenAccept(Assertions::assertNull) // NOT FOUND (key miss) + .toCompletableFuture() + .join(); + } + + @Test + void bopGetRangeNotExistElements() throws Exception { + // given + String key = keys.get(0); + + // when + async.bopInsert(key, ELEMENTS.get(0), new CollectionAttributes()) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(1))) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(2))) + .thenCompose(result -> async.bopGet(key, + BKey.of(10L), BKey.of(20L), BopGetArgs.DEFAULT)) + // then + .thenAccept(elements -> { + // NOT FOUND ELEMENT + assertNotNull(elements); + assertEquals(0, elements.getElements().size()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopGetRangeWithDelete() throws Exception { + // given + String key = keys.get(0); + CollectionAttributes attrs = new CollectionAttributes(); + BopGetArgs getArgsWithDelete = new BopGetArgs.Builder() + .withDelete() + .count(10) + .build(); + + // when + async.bopInsert(key, ELEMENTS.get(0), attrs) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(1), attrs)) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(2), attrs)) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(3), attrs)) + .thenCompose(result -> async.bopGet(key, BKey.of(2L), BKey.of(3L), getArgsWithDelete)) + // then + .thenAccept(elements -> { + assertNotNull(elements); + assertEquals(2, elements.getElements().size()); + assertEquals(ELEMENTS.get(1), elements.getElements().get(0)); + assertEquals(ELEMENTS.get(2), elements.getElements().get(1)); + }) + .thenCompose(v -> async.bopGet(key, BKey.of(2L), BKey.of(3L), BopGetArgs.DEFAULT)) + .thenAccept(elements -> { + assertNotNull(elements); + assertTrue(elements.getElements().isEmpty()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopGetRangeDescending() throws Exception { + // given + String key = keys.get(0); + + // when + async.bopInsert(key, ELEMENTS.get(0), new CollectionAttributes()) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(1))) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(2))) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(3))) + .thenCompose(result -> async.bopGet(key, BKey.of(10L), BKey.of(1L), BopGetArgs.DEFAULT)) + // then + .thenAccept(elements -> { + assertNotNull(elements); + int i = 3; + for (BTreeElement element : elements.getElements()) { + assertEquals(ELEMENTS.get(i--), element); + } + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + + @Test + void bopMultiGet() throws Exception { + // given + List testKeys = Arrays.asList(keys.get(0), keys.get(1), keys.get(2)); + CollectionAttributes attr = new CollectionAttributes(); + + // when + async.bopInsert(testKeys.get(0), ELEMENTS.get(0), attr) + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(1))) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(2), attr)) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(3))) + .thenCompose(result -> async.bopInsert(testKeys.get(2), ELEMENTS.get(4), attr)) + // then + .thenCompose(result -> async + .bopMultiGet(testKeys, BKey.of(1L), BKey.of(10L), BopGetArgs.DEFAULT)) + .thenAccept(map -> { + assertEquals(3, map.size()); + + BTreeElements elements0 = map.get(testKeys.get(0)); + assertEquals(2, elements0.getElements().size()); + assertEquals(ELEMENTS.get(0), elements0.getElements().get(0)); + assertEquals(ELEMENTS.get(1), elements0.getElements().get(1)); + + BTreeElements elements1 = map.get(testKeys.get(1)); + assertEquals(2, elements1.getElements().size()); + assertEquals(ELEMENTS.get(2), elements1.getElements().get(0)); + assertEquals(ELEMENTS.get(3), elements1.getElements().get(1)); + + BTreeElements elements2 = map.get(testKeys.get(2)); + assertEquals(1, elements2.getElements().size()); + assertEquals(ELEMENTS.get(4), elements2.getElements().get(0)); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopMultiGetDescending() throws Exception { + // given + List testKeys = Arrays.asList(keys.get(0), keys.get(1), keys.get(2)); + CollectionAttributes attr = new CollectionAttributes(); + + // when + async.bopInsert(testKeys.get(0), ELEMENTS.get(0), attr) + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(1))) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(2), attr)) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(3))) + .thenCompose(result -> async.bopInsert(testKeys.get(2), ELEMENTS.get(4), attr)) + // then + .thenCompose(result -> async + .bopMultiGet(testKeys, BKey.of(10L), BKey.of(1L), BopGetArgs.DEFAULT)) + .thenAccept(map -> { + assertEquals(3, map.size()); + + BTreeElements elements0 = map.get(testKeys.get(0)); + assertEquals(2, elements0.getElements().size()); + BTreeElements elements1 = map.get(testKeys.get(1)); + assertEquals(2, elements1.getElements().size()); + BTreeElements elements2 = map.get(testKeys.get(2)); + assertEquals(1, elements2.getElements().size()); + + // Make sure that the order is descending + int i = 1; + for (BTreeElement element : elements0.getElements()) { + assertEquals(ELEMENTS.get(i--), element); + } + i = 3; + for (BTreeElement element : elements1.getElements()) { + assertEquals(ELEMENTS.get(i--), element); + } + + assertEquals(ELEMENTS.get(4), elements2.getElements().get(0)); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopMultiGetNotFoundElement() throws Exception { + // given + List testKeys = Arrays.asList(keys.get(0), keys.get(1)); + CollectionAttributes attrs = new CollectionAttributes(); + + // when + async.bopInsert(testKeys.get(0), ELEMENTS.get(0), attrs) + .thenCompose(result -> async + .bopMultiGet(testKeys, BKey.of(10L), BKey.of(20L), BopGetArgs.DEFAULT)) + // then + .thenAccept(map -> { + assertEquals(1, map.size()); + + // NOT FOUND ELEMENT + BTreeElements elements = map.get(testKeys.get(0)); + assertNotNull(elements); + assertEquals(0, elements.getElements().size()); + + // NOT FOUND + assertFalse(map.containsKey(testKeys.get(1))); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopSortMergeGetAscendingUnique() throws Exception { + // given + List testKeys = Arrays.asList(keys.get(0), keys.get(1), keys.get(2)); + CollectionAttributes attrs = new CollectionAttributes(); + + async.bopInsert(testKeys.get(0), ELEMENTS.get(0), attrs) + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(2))) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(0), attrs)) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(1))) + .thenCompose(result -> async.bopInsert(testKeys.get(2), ELEMENTS.get(0), attrs)) + .thenCompose(result -> async.bopInsert(testKeys.get(2), ELEMENTS.get(3))) + // when + .thenCompose(result -> async.bopSortMergeGet(testKeys, + BKey.of(1L), BKey.of(10L), true, BopGetArgs.DEFAULT)) + // then + .thenAccept(smGetElements -> { + assertNotNull(smGetElements); + assertEquals(4, smGetElements.getElements().size()); + + List> elements = smGetElements.getElements(); + assertEquals(testKeys.get(0), elements.get(0).getKey()); + assertEquals(ELEMENTS.get(0), elements.get(0).getbTreeElement()); + assertEquals(testKeys.get(1), elements.get(1).getKey()); + assertEquals(ELEMENTS.get(1), elements.get(1).getbTreeElement()); + assertEquals(testKeys.get(0), elements.get(2).getKey()); + assertEquals(ELEMENTS.get(2), elements.get(2).getbTreeElement()); + assertEquals(testKeys.get(2), elements.get(3).getKey()); + assertEquals(ELEMENTS.get(3), elements.get(3).getbTreeElement()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopSortMergeGetDescendingUnique() throws Exception { + // given + List testKeys = Arrays.asList(keys.get(0), keys.get(1), keys.get(2)); + CollectionAttributes attrs = new CollectionAttributes(); + + async.bopInsert(testKeys.get(0), ELEMENTS.get(0), attrs) + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(2))) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(0), attrs)) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(1))) + .thenCompose(result -> async.bopInsert(testKeys.get(2), ELEMENTS.get(0), attrs)) + .thenCompose(result -> async.bopInsert(testKeys.get(2), ELEMENTS.get(3))) + // when + .thenCompose(result -> async.bopSortMergeGet(testKeys, + BKey.of(10L), BKey.of(1L), true, BopGetArgs.DEFAULT)) + // then + .thenAccept(smGetElements -> { + assertNotNull(smGetElements); + assertEquals(4, smGetElements.getElements().size()); + + List> elements = smGetElements.getElements(); + assertEquals(testKeys.get(2), elements.get(0).getKey()); + assertEquals(ELEMENTS.get(3), elements.get(0).getbTreeElement()); + assertEquals(testKeys.get(0), elements.get(1).getKey()); + assertEquals(ELEMENTS.get(2), elements.get(1).getbTreeElement()); + assertEquals(testKeys.get(1), elements.get(2).getKey()); + assertEquals(ELEMENTS.get(1), elements.get(2).getbTreeElement()); + assertEquals(testKeys.get(2), elements.get(3).getKey()); + assertEquals(ELEMENTS.get(0), elements.get(3).getbTreeElement()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopSortMergeGetAscendingDuplicated() throws Exception { + // given + List testKeys = Arrays.asList(keys.get(0), keys.get(1)); + CollectionAttributes attrs = new CollectionAttributes(); + + async.bopInsert(testKeys.get(0), ELEMENTS.get(0), attrs) + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(1))) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(0), attrs)) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(1))) + // when + .thenCompose(result -> async.bopSortMergeGet(testKeys, + BKey.of(1L), BKey.of(2L), false, BopGetArgs.DEFAULT)) + // then + .thenAccept(smGetElements -> { + assertNotNull(smGetElements); + assertEquals(4, smGetElements.getElements().size()); + + SMGetElements.Element element1 = smGetElements.getElements().get(0); + assertEquals(ELEMENTS.get(0), element1.getbTreeElement()); + assertEquals(testKeys.get(0), element1.getKey()); + + SMGetElements.Element element2 = smGetElements.getElements().get(1); + assertEquals(ELEMENTS.get(0), element2.getbTreeElement()); + assertEquals(testKeys.get(1), element2.getKey()); + + SMGetElements.Element element3 = smGetElements.getElements().get(2); + assertEquals(ELEMENTS.get(1), element3.getbTreeElement()); + assertEquals(testKeys.get(0), element3.getKey()); + + SMGetElements.Element element4 = smGetElements.getElements().get(3); + assertEquals(ELEMENTS.get(1), element4.getbTreeElement()); + assertEquals(testKeys.get(1), element4.getKey()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopSortMergeGetDescendingDuplicated() throws Exception { + // given + List testKeys = Arrays.asList(keys.get(0), keys.get(1)); + CollectionAttributes attrs = new CollectionAttributes(); + + async.bopInsert(testKeys.get(0), ELEMENTS.get(0), attrs) + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(1))) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(0), attrs)) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(1))) + // when + .thenCompose(result -> async.bopSortMergeGet(testKeys, + BKey.of(2L), BKey.of(1L), false, BopGetArgs.DEFAULT)) + // then + .thenAccept(smGetElements -> { + assertNotNull(smGetElements); + assertEquals(4, smGetElements.getElements().size()); + + SMGetElements.Element element1 = smGetElements.getElements().get(0); + assertEquals(ELEMENTS.get(1), element1.getbTreeElement()); + assertEquals(testKeys.get(1), element1.getKey()); + + SMGetElements.Element element2 = smGetElements.getElements().get(1); + assertEquals(ELEMENTS.get(1), element2.getbTreeElement()); + assertEquals(testKeys.get(0), element2.getKey()); + + SMGetElements.Element element3 = smGetElements.getElements().get(2); + assertEquals(ELEMENTS.get(0), element3.getbTreeElement()); + assertEquals(testKeys.get(1), element3.getKey()); + + SMGetElements.Element element4 = smGetElements.getElements().get(3); + assertEquals(ELEMENTS.get(0), element4.getbTreeElement()); + assertEquals(testKeys.get(0), element4.getKey()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopSortMergeGetUnique() throws Exception { + // given + CollectionAttributes attrs = new CollectionAttributes(); + + async.bopInsert(keys.get(0), ELEMENTS.get(0), attrs) + .thenCompose(result -> async.bopInsert(keys.get(1), ELEMENTS.get(0), attrs)) + .thenCompose(result -> async.bopInsert(keys.get(2), ELEMENTS.get(0), attrs)) + .thenCompose(result -> async.bopInsert(keys.get(3), ELEMENTS.get(0), attrs)) + // when + .thenCompose(result -> async.bopSortMergeGet(keys, + BKey.of(1L), BKey.of(3L), true, BopGetArgs.DEFAULT)) + // then + .thenAccept(smGetElements -> { + assertNotNull(smGetElements); + assertEquals(1, smGetElements.getElements().size()); + + SMGetElements.Element element = smGetElements.getElements().get(0); + assertEquals(ELEMENTS.get(0), element.getbTreeElement()); + assertEquals(keys.get(0), element.getKey()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopSortMergeGetWithMissedKeys() throws Exception { + // given + List testKeys = Arrays.asList(keys.get(0), keys.get(1)); + CollectionAttributes attrs = new CollectionAttributes(); + + async.bopInsert(testKeys.get(0), ELEMENTS.get(0), attrs) + // when + .thenCompose(result -> async.bopSortMergeGet(testKeys, + BKey.of(1L), BKey.of(10L), false, BopGetArgs.DEFAULT)) + // then + .thenAccept(smGetElements -> { + assertNotNull(smGetElements); + + assertEquals(1, smGetElements.getElements().size()); + List> elements = smGetElements.getElements(); + assertEquals(ELEMENTS.get(0), elements.get(0).getbTreeElement()); + assertEquals(testKeys.get(0), elements.get(0).getKey()); + + assertEquals(1, smGetElements.getMissedKeys().size()); + SMGetElements.MissedKey missedKey = smGetElements.getMissedKeys().get(0); + assertEquals(testKeys.get(1), missedKey.getKey()); + assertEquals(StatusCode.ERR_NOT_FOUND, missedKey.getStatusCode()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopSortMergeGetNotFound() throws Exception { + // given + List testKeys = Arrays.asList(keys.get(0), keys.get(1), keys.get(2)); + + // when + async.bopSortMergeGet(testKeys, + BKey.of(10L), BKey.of(20L), false, BopGetArgs.DEFAULT) + // then + .thenAccept(smGetElements -> { + assertNotNull(smGetElements); + assertEquals(0, smGetElements.getElements().size()); + assertEquals(3, smGetElements.getMissedKeys().size()); + assertEquals(0, smGetElements.getTrimmedKeys().size()); + assertIterableEquals(testKeys, smGetElements.getMissedKeys().stream() + .map(SMGetElements.MissedKey::getKey).collect(Collectors.toList())); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopSortMergeGetNotFoundElement() throws Exception { + // given + List testKeys = Arrays.asList(keys.get(0), keys.get(1), keys.get(2)); + CollectionAttributes attrs = new CollectionAttributes(); + + // when + async.bopInsert(testKeys.get(0), ELEMENTS.get(0), attrs) + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(1), attrs)) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(2), attrs)) + .thenCompose(result -> async.bopSortMergeGet(testKeys, + BKey.of(10L), BKey.of(20L), false, BopGetArgs.DEFAULT)) + // then + .thenAccept(smGetElements -> { + assertNotNull(smGetElements); + assertEquals(0, smGetElements.getElements().size()); + assertEquals(1, smGetElements.getMissedKeys().size()); + assertEquals(0, smGetElements.getTrimmedKeys().size()); + + SMGetElements.MissedKey missedKey = smGetElements.getMissedKeys().get(0); + assertEquals(testKeys.get(2), missedKey.getKey()); + assertEquals(StatusCode.ERR_NOT_FOUND, missedKey.getStatusCode()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopSortMergeGetWithTrimmedKeys() throws Exception { + // given + List testKeys = Arrays.asList(keys.get(0), keys.get(1), keys.get(2)); + CollectionAttributes attrs = new CollectionAttributes(); + attrs.setMaxCount(2); + + // when + async.bopInsert(testKeys.get(0), ELEMENTS.get(0), attrs) + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(1))) + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(2))) + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(3))) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(1), attrs)) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(2))) + .thenCompose(result -> async.bopSortMergeGet(testKeys, + BKey.of(10L), BKey.of(1L), false, BopGetArgs.DEFAULT)) + // then + .thenAccept(smGetElements -> { + assertNotNull(smGetElements); + assertEquals(4, smGetElements.getElements().size()); + assertEquals(1, smGetElements.getMissedKeys().size()); + assertEquals(1, smGetElements.getTrimmedKeys().size()); + + SMGetElements.TrimmedKey trimmedKey = smGetElements.getTrimmedKeys().get(0); + assertEquals(keys.get(0), trimmedKey.getKey()); + assertEquals(ELEMENTS.get(2).getBkey(), trimmedKey.getBKey()); + + List> elements = smGetElements.getElements(); + assertEquals(testKeys.get(0), elements.get(0).getKey()); + assertEquals(ELEMENTS.get(3), elements.get(0).getbTreeElement()); + assertEquals(testKeys.get(1), elements.get(1).getKey()); + assertEquals(ELEMENTS.get(2), elements.get(1).getbTreeElement()); + assertEquals(testKeys.get(0), elements.get(2).getKey()); + assertEquals(ELEMENTS.get(2), elements.get(2).getbTreeElement()); + assertEquals(testKeys.get(1), elements.get(3).getKey()); + assertEquals(ELEMENTS.get(1), elements.get(3).getbTreeElement()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopSortMergeGetNotHaveTrimmedKeysOutOfElementsRangeDescending() throws Exception { + // given + List testKeys = Arrays.asList(keys.get(0), keys.get(1)); + CollectionAttributes attrs = new CollectionAttributes(); + attrs.setMaxCount(2); + + // when + async.bopInsert(testKeys.get(0), ELEMENTS.get(0), attrs) // to be trimmed + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(1))) // to be trimmed + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(2))) + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(3))) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(2), attrs)) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(3))) + .thenCompose(result -> async.bopSortMergeGet(testKeys, + BKey.of(10L), BKey.of(1L), false, + new BopGetArgs.Builder().build())) + // then + .thenAccept(smGetElements -> { + assertNotNull(smGetElements); + assertEquals(4, smGetElements.getElements().size()); + assertTrue(smGetElements.getMissedKeys().isEmpty()); + assertTrue(smGetElements.getTrimmedKeys().isEmpty()); + + List> elements = smGetElements.getElements(); + assertEquals(testKeys.get(1), elements.get(0).getKey()); + assertEquals(ELEMENTS.get(3), elements.get(0).getbTreeElement()); + assertEquals(testKeys.get(0), elements.get(1).getKey()); + assertEquals(ELEMENTS.get(3), elements.get(1).getbTreeElement()); + assertEquals(testKeys.get(1), elements.get(2).getKey()); + assertEquals(ELEMENTS.get(2), elements.get(2).getbTreeElement()); + assertEquals(testKeys.get(0), elements.get(3).getKey()); + assertEquals(ELEMENTS.get(2), elements.get(3).getbTreeElement()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopSortMergeGetNotHaveTrimmedKeysOutOfElementsRangeAscending() throws Exception { + // given + List testKeys = Arrays.asList(keys.get(0), keys.get(1)); + CollectionAttributes attrs = new CollectionAttributes(); + attrs.setMaxCount(2); + attrs.setOverflowAction(CollectionOverflowAction.largest_trim); + + // when + async.bopInsert(testKeys.get(0), ELEMENTS.get(3), attrs) + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(2))) + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(1))) // to be trimmed + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(0))) // to be trimmed + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(1), attrs)) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(0))) + .thenCompose(result -> async.bopSortMergeGet(testKeys, + BKey.of(1L), BKey.of(10L), false, + new BopGetArgs.Builder().build())) + // then + .thenAccept(smGetElements -> { + assertNotNull(smGetElements); + assertEquals(4, smGetElements.getElements().size()); + assertTrue(smGetElements.getMissedKeys().isEmpty()); + assertTrue(smGetElements.getTrimmedKeys().isEmpty()); + + List> elements = smGetElements.getElements(); + assertEquals(testKeys.get(0), elements.get(0).getKey()); + assertEquals(ELEMENTS.get(0), elements.get(0).getbTreeElement()); + assertEquals(testKeys.get(1), elements.get(1).getKey()); + assertEquals(ELEMENTS.get(0), elements.get(1).getbTreeElement()); + assertEquals(testKeys.get(0), elements.get(2).getKey()); + assertEquals(ELEMENTS.get(1), elements.get(2).getbTreeElement()); + assertEquals(testKeys.get(1), elements.get(3).getKey()); + assertEquals(ELEMENTS.get(1), elements.get(3).getbTreeElement()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } +}