diff --git a/src/main/java/net/spy/memcached/ArcusClient.java b/src/main/java/net/spy/memcached/ArcusClient.java index a7a8a2e3a..2c3eda16b 100644 --- a/src/main/java/net/spy/memcached/ArcusClient.java +++ b/src/main/java/net/spy/memcached/ArcusClient.java @@ -196,12 +196,12 @@ public class ArcusClient extends FrontCacheMemcachedClient implements ArcusClien private final Transcoder collectionTranscoder; - private static final int BOPGET_BULK_CHUNK_SIZE = 200; - private static final int SMGET_CHUNK_SIZE = 500; + public static final int BOPGET_BULK_CHUNK_SIZE = 200; + public static final int SMGET_CHUNK_SIZE = 500; private static final int NON_PIPED_BULK_INSERT_CHUNK_SIZE = 500; - private static final int MAX_GETBULK_ELEMENT_COUNT = 50; - private static final int MAX_SMGET_COUNT = 1000; // server configuration is 2000. + public static final int MAX_GETBULK_ELEMENT_COUNT = 50; + public static final int MAX_SMGET_COUNT = 1000; // server configuration is 2000. private static final int SHUTDOWN_TIMEOUT_MILLISECONDS = 2000; private static final AtomicInteger CLIENT_ID = new AtomicInteger(1); diff --git a/src/main/java/net/spy/memcached/OperationFactory.java b/src/main/java/net/spy/memcached/OperationFactory.java index 3abbd4032..38439c281 100644 --- a/src/main/java/net/spy/memcached/OperationFactory.java +++ b/src/main/java/net/spy/memcached/OperationFactory.java @@ -531,4 +531,15 @@ BTreeInsertAndGetOperation bopInsertAndGet(String key, BTreeInsertAndGet get, byte[] dataToInsert, OperationCallback cb); + /** + * Create a pipeline operation for executing multiple commands in batch. + * + * @param ops the list of operations to execute + * @param keys the unique keys related to the pipe operation + * @param cb the status callback + * @return a new pipeline operation + */ + Operation pipeline(List ops, List keys, + OperationCallback cb); + } diff --git a/src/main/java/net/spy/memcached/ops/APIType.java b/src/main/java/net/spy/memcached/ops/APIType.java index 9651a1eec..85e47f419 100644 --- a/src/main/java/net/spy/memcached/ops/APIType.java +++ b/src/main/java/net/spy/memcached/ops/APIType.java @@ -66,6 +66,7 @@ public enum APIType { SETATTR(OperationType.WRITE), GETATTR(OperationType.READ), // Other API + PIPE(OperationType.WRITE), FLUSH(OperationType.WRITE), STATS(OperationType.ETC), VERSION(OperationType.ETC), diff --git a/src/main/java/net/spy/memcached/ops/BaseOperationFactory.java b/src/main/java/net/spy/memcached/ops/BaseOperationFactory.java index 3fc20215f..d143c9dc1 100644 --- a/src/main/java/net/spy/memcached/ops/BaseOperationFactory.java +++ b/src/main/java/net/spy/memcached/ops/BaseOperationFactory.java @@ -25,6 +25,7 @@ import net.spy.memcached.collection.BTreeGetBulk; import net.spy.memcached.collection.BTreeSMGet; import net.spy.memcached.collection.CollectionBulkInsert; +import net.spy.memcached.protocol.ascii.PipelineOperationImpl; /** * Base class for operation factories. @@ -151,6 +152,16 @@ public Operation cloneMultiOperation(KeyedOperation op, MemcachedNode node, } else if (op instanceof BTreeSortMergeGetOperation) { final BTreeSMGet smGet = ((BTreeSortMergeGetOperation) op).getSMGet(); return bopsmget(smGet.clone(node, redirectKeys), (BTreeSortMergeGetOperation.Callback) mcb); + } else if (op instanceof PipelineOperation) { + PipelineOperationImpl pipeOp = (PipelineOperationImpl) op; + List originalOps = pipeOp.getOps(); + List newOps = new ArrayList<>(); + for (KeyedOperation originalOp : originalOps) { + if (redirectKeys.contains(originalOp.getKeys().iterator().next())) { + newOps.add(originalOp); + } + } + return pipeline(newOps, redirectKeys, mcb); } else { assert false : "Unhandled operation type: " + op.getClass(); } diff --git a/src/main/java/net/spy/memcached/ops/PipelineOperation.java b/src/main/java/net/spy/memcached/ops/PipelineOperation.java new file mode 100644 index 000000000..8d5feb989 --- /dev/null +++ b/src/main/java/net/spy/memcached/ops/PipelineOperation.java @@ -0,0 +1,8 @@ +package net.spy.memcached.ops; + +public interface PipelineOperation extends KeyedOperation { + + interface Callback extends OperationCallback { + void gotStatus(Operation op, OperationStatus status); + } +} diff --git a/src/main/java/net/spy/memcached/protocol/ascii/AsciiOperationFactory.java b/src/main/java/net/spy/memcached/protocol/ascii/AsciiOperationFactory.java index 1f4da20ae..849030c78 100644 --- a/src/main/java/net/spy/memcached/protocol/ascii/AsciiOperationFactory.java +++ b/src/main/java/net/spy/memcached/protocol/ascii/AsciiOperationFactory.java @@ -17,6 +17,7 @@ package net.spy.memcached.protocol.ascii; import java.util.Collection; +import java.util.List; import javax.security.sasl.SaslClient; @@ -66,9 +67,11 @@ import net.spy.memcached.ops.GetAttrOperation; import net.spy.memcached.ops.GetOperation; import net.spy.memcached.ops.GetsOperation; +import net.spy.memcached.ops.KeyedOperation; import net.spy.memcached.ops.Mutator; import net.spy.memcached.ops.MutatorOperation; import net.spy.memcached.ops.NoopOperation; +import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationCallback; import net.spy.memcached.ops.SASLAuthOperation; import net.spy.memcached.ops.SASLMechsOperation; @@ -299,4 +302,10 @@ public BTreeInsertAndGetOperation bopInsertAndGet(String key, return new BTreeInsertAndGetOperationImpl(key, get, dataToInsert, cb); } + @Override + public Operation pipeline(List ops, List keys, + OperationCallback cb) { + return new PipelineOperationImpl(ops, keys, cb); + } + } diff --git a/src/main/java/net/spy/memcached/protocol/ascii/PipelineOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/PipelineOperationImpl.java new file mode 100644 index 000000000..7c89f3f3c --- /dev/null +++ b/src/main/java/net/spy/memcached/protocol/ascii/PipelineOperationImpl.java @@ -0,0 +1,260 @@ +/* + * arcus-java-client : Arcus Java client + * Copyright 2010-2014 NAVER Corp. + * Copyright 2014-present JaM2in Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.spy.memcached.protocol.ascii; + +import java.nio.ByteBuffer; +import java.util.List; + +import net.spy.memcached.ops.APIType; +import net.spy.memcached.ops.KeyedOperation; +import net.spy.memcached.ops.Operation; +import net.spy.memcached.ops.OperationCallback; +import net.spy.memcached.ops.OperationState; +import net.spy.memcached.ops.OperationStatus; +import net.spy.memcached.ops.OperationType; +import net.spy.memcached.ops.PipelineOperation; +import net.spy.memcached.ops.StatusCode; + +/** + * Operation for executing multiple commands as pipeline. + */ +public final class PipelineOperationImpl extends OperationImpl implements PipelineOperation { + + // PIPE RESPONSES + private static final OperationStatus END = + new OperationStatus(true, "END", StatusCode.SUCCESS); + private static final OperationStatus FAILED_END = + new OperationStatus(false, "FAILED_END", StatusCode.ERR_FAILED_END); + + // EACH COMMANDS' SUCCEED RESPONSES + private static final OperationStatus CREATED_STORED = + new OperationStatus(true, "CREATED_STORED", StatusCode.SUCCESS); + private static final OperationStatus STORED = + new OperationStatus(true, "STORED", StatusCode.SUCCESS); + private static final OperationStatus REPLACED = + new OperationStatus(true, "REPLACED", StatusCode.SUCCESS); + private static final OperationStatus UPDATED = + new OperationStatus(true, "UPDATED", StatusCode.SUCCESS); + private static final OperationStatus EXIST = + new OperationStatus(true, "EXIST", StatusCode.EXIST); + private static final OperationStatus NOT_EXIST = + new OperationStatus(true, "NOT_EXIST", StatusCode.NOT_EXIST); + private static final OperationStatus DELETED = + new OperationStatus(true, "DELETED", StatusCode.SUCCESS); + private static final OperationStatus DELETED_DROPPED = + new OperationStatus(true, "DELETED_DROPPED", StatusCode.SUCCESS); + + // EACH COMMANDS' FAILED RESPONSES + private static final OperationStatus NOT_FOUND = + new OperationStatus(false, "NOT_FOUND", StatusCode.ERR_NOT_FOUND); + private static final OperationStatus NOT_FOUND_ELEMENT = + new OperationStatus(false, "NOT_FOUND_ELEMENT", StatusCode.ERR_NOT_FOUND_ELEMENT); + private static final OperationStatus NOTHING_TO_UPDATE = + new OperationStatus(false, "NOTHING_TO_UPDATE", StatusCode.ERR_NOTHING_TO_UPDATE); + private static final OperationStatus ELEMENT_EXISTS = + new OperationStatus(false, "ELEMENT_EXISTS", StatusCode.ERR_ELEMENT_EXISTS); + private static final OperationStatus OVERFLOWED = + new OperationStatus(false, "OVERFLOWED", StatusCode.ERR_OVERFLOWED); + private static final OperationStatus OUT_OF_RANGE = + new OperationStatus(false, "OUT_OF_RANGE", StatusCode.ERR_OUT_OF_RANGE); + private static final OperationStatus TYPE_MISMATCH = + new OperationStatus(false, "TYPE_MISMATCH", StatusCode.ERR_TYPE_MISMATCH); + private static final OperationStatus BKEY_MISMATCH = + new OperationStatus(false, "BKEY_MISMATCH", StatusCode.ERR_BKEY_MISMATCH); + private static final OperationStatus EFLAG_MISMATCH = + new OperationStatus(false, "EFLAG_MISMATCH", StatusCode.ERR_EFLAG_MISMATCH); + private static final OperationStatus UNREADABLE = + new OperationStatus(false, "UNREADABLE", StatusCode.ERR_UNREADABLE); + + private final List ops; + private final List keys; + private final PipelineOperation.Callback cb; + + private int responseIndex = 0; + private boolean expectingResponse = false; + private boolean successAll = true; + + public PipelineOperationImpl(List ops, List keys, + OperationCallback cb) { + super(cb); + if (ops == null || ops.isEmpty()) { + throw new IllegalArgumentException("Ops cannot be null or empty"); + } + this.ops = ops; + this.keys = keys; + this.cb = (PipelineOperation.Callback) cb; + setAPIType(APIType.PIPE); + setOperationType(OperationType.WRITE); + } + + @Override + public void initialize() { + // 1) Initialize operations and collect each buffers + // to handle switchover/redirect single key situations, + // make buffer from responseIndex Operation + int opCount = ops.size() - responseIndex; + ByteBuffer[] buffers = new ByteBuffer[opCount]; + int bufferCount = 0; + for (int i = responseIndex; i < ops.size(); i++) { + Operation op = ops.get(i); + op.initialize(); + ByteBuffer buffer = op.getBuffer(); + if (buffer != null && buffer.hasRemaining()) { + buffers[bufferCount++] = buffer; + } + } + + // 2) Remove "pipe" from the last command buffer + if (bufferCount > 0) { + buffers[bufferCount - 1] = removePipeFromLastBuffer(buffers[bufferCount - 1]); + } + + // 3) Create a concatenated pipedBuffer + int totalSize = 0; + for (int i = 0; i < bufferCount; i++) { + totalSize += buffers[i].remaining(); + } + + ByteBuffer pipedBuffer = ByteBuffer.allocate(totalSize); + for (int i = 0; i < bufferCount; i++) { + pipedBuffer.put(buffers[i]); + } + + pipedBuffer.flip(); + setBuffer(pipedBuffer); + } + + private static ByteBuffer removePipeFromLastBuffer(ByteBuffer buffer) { + byte[] bufferBytes = new byte[buffer.remaining()]; + buffer.mark(); + buffer.get(bufferBytes); + buffer.reset(); + + String command = new String(bufferBytes); + String modifiedCommand = command.replaceAll("\\s+pipe\\r\\n", "\r\n"); + byte[] modifiedBytes = modifiedCommand.getBytes(); + ByteBuffer newBuffer = ByteBuffer.allocate(modifiedBytes.length); + newBuffer.put(modifiedBytes); + newBuffer.flip(); + return newBuffer; + } + + @Override + public void handleLine(String line) { + + /* ENABLE_REPLICATION if */ + if (hasSwitchedOver(line)) { + prepareSwitchover(line); + return; + } + /* ENABLE_REPLICATION end */ + + /* ENABLE_MIGRATION if */ + if (hasNotMyKey(line)) { + String key = ops.get(responseIndex).getKeys().iterator().next(); + if (isBulkOperation()) { + addRedirectMultiKeyOperation(line, key); + responseIndex++; + } else { + // Only one NOT_MY_KEY is provided in response of + // single key piped operation when redirection. + addRedirectSingleKeyOperation(line, key); + transitionState(OperationState.REDIRECT); + } + return; + } + /* ENABLE_MIGRATION end */ + + /* + RESPONSE \r\n + \r\n + [ ... ] + \r\n + END|PIPE_ERROR \r\n + */ + if (line.startsWith("END")) { + /* ENABLE_MIGRATION if */ + if (needRedirect()) { + transitionState(OperationState.REDIRECT); + return; + } + /* ENABLE_MIGRATION end */ + + OperationStatus status = successAll ? END : FAILED_END; + complete(status); + } else if (line.startsWith("PIPE_ERROR")) { + String errorMessage = line.substring(11); + OperationStatus status = + new OperationStatus(false, errorMessage, StatusCode.ERR_INTERNAL); + complete(status); + } else if (line.startsWith("RESPONSE ")) { + expectingResponse = true; + responseIndex = 0; + } else if (expectingResponse) { + // Handle status line for each command + OperationStatus status = parseStatusLine(line); + if (!status.isSuccess()) { + successAll = false; + } + + // Notify callback with current response index + cb.gotStatus(ops.get(responseIndex), status); + responseIndex++; + } else { + // Handle single command response (non-pipe case) + // When only one command or last command without "pipe", server sends direct status + OperationStatus status = parseStatusLine(line); + if (!status.isSuccess()) { + successAll = false; + } + + // Notify callback for single command + cb.gotStatus(ops.get(0), status); + + // Complete the operation immediately for single command + complete(successAll ? END : FAILED_END); + } + } + + private OperationStatus parseStatusLine(String line) { + return matchStatus(line, + END, FAILED_END, CREATED_STORED, STORED, REPLACED, UPDATED, + EXIST, NOT_EXIST, DELETED, DELETED_DROPPED, NOT_FOUND, NOT_FOUND_ELEMENT, + NOTHING_TO_UPDATE, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE, + TYPE_MISMATCH, BKEY_MISMATCH, EFLAG_MISMATCH, UNREADABLE); + } + + @Override + public boolean isPipeOperation() { + return ops.size() > 1; + } + + @Override + public boolean isBulkOperation() { + return keys.size() > 1; + } + + @Override + public List getKeys() { + return keys; + } + + public List getOps() { + return ops; + } +} diff --git a/src/main/java/net/spy/memcached/protocol/binary/BinaryOperationFactory.java b/src/main/java/net/spy/memcached/protocol/binary/BinaryOperationFactory.java index 6e8f159f7..70da09647 100644 --- a/src/main/java/net/spy/memcached/protocol/binary/BinaryOperationFactory.java +++ b/src/main/java/net/spy/memcached/protocol/binary/BinaryOperationFactory.java @@ -17,6 +17,7 @@ package net.spy.memcached.protocol.binary; import java.util.Collection; +import java.util.List; import javax.security.sasl.SaslClient; @@ -67,9 +68,11 @@ import net.spy.memcached.ops.GetOperation; import net.spy.memcached.ops.GetOperation.Callback; import net.spy.memcached.ops.GetsOperation; +import net.spy.memcached.ops.KeyedOperation; import net.spy.memcached.ops.Mutator; import net.spy.memcached.ops.MutatorOperation; import net.spy.memcached.ops.NoopOperation; +import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationCallback; import net.spy.memcached.ops.SASLAuthOperation; import net.spy.memcached.ops.SASLMechsOperation; @@ -329,4 +332,11 @@ public BTreeInsertAndGetOperation bopInsertAndGet(String key, "BTree insert and get operation is not supported in binary protocol yet."); } + @Override + public Operation pipeline(List ops, List keys, + OperationCallback cb) { + throw new RuntimeException( + "Pipeline operation is not supported in binary protocol yet."); + } + } diff --git a/src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java b/src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java index d8b2d17cd..7e21feab8 100644 --- a/src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java +++ b/src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java @@ -17,39 +17,76 @@ */ package net.spy.memcached.v2; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.TreeMap; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import net.spy.memcached.ArcusClient; import net.spy.memcached.CachedData; +import net.spy.memcached.KeyValidator; import net.spy.memcached.MemcachedClient; import net.spy.memcached.MemcachedNode; +import net.spy.memcached.collection.BKeyObject; +import net.spy.memcached.collection.BTreeCreate; +import net.spy.memcached.collection.BTreeGet; +import net.spy.memcached.collection.BTreeGetBulk; +import net.spy.memcached.collection.BTreeGetBulkWithLongTypeBkey; +import net.spy.memcached.collection.BTreeGetBulkWithByteTypeBkey; +import net.spy.memcached.collection.BTreeUpsert; +import net.spy.memcached.ops.BTreeGetBulkOperation; +import net.spy.memcached.collection.BTreeSMGet; +import net.spy.memcached.collection.BTreeSMGetWithLongTypeBkey; +import net.spy.memcached.collection.BTreeSMGetWithByteTypeBkey; +import net.spy.memcached.ops.BTreeSortMergeGetOperation; +import net.spy.memcached.collection.BTreeInsert; +import net.spy.memcached.collection.BTreeInsertAndGet; +import net.spy.memcached.collection.CollectionAttributes; +import net.spy.memcached.collection.CollectionCreate; +import net.spy.memcached.collection.CollectionInsert; +import net.spy.memcached.collection.ElementValueType; import net.spy.memcached.ops.APIType; +import net.spy.memcached.ops.BTreeInsertAndGetOperation; +import net.spy.memcached.ops.CollectionCreateOperation; +import net.spy.memcached.ops.CollectionGetOperation; +import net.spy.memcached.ops.CollectionInsertOperation; import net.spy.memcached.ops.GetOperation; +import net.spy.memcached.ops.KeyedOperation; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationCallback; import net.spy.memcached.ops.OperationStatus; +import net.spy.memcached.ops.PipelineOperation; import net.spy.memcached.ops.StatusCode; import net.spy.memcached.ops.StoreType; import net.spy.memcached.transcoders.Transcoder; +import net.spy.memcached.transcoders.TranscoderUtils; +import net.spy.memcached.v2.vo.BKey; +import net.spy.memcached.v2.vo.BTreeElement; +import net.spy.memcached.v2.vo.BTreeElements; +import net.spy.memcached.v2.vo.BopGetArgs; +import net.spy.memcached.v2.vo.SMGetElements; +import net.spy.memcached.v2.pipe.Pipeline; public class AsyncArcusCommands implements AsyncArcusCommandsIF { private final Transcoder tc; private final Transcoder tcForCollection; - + private final KeyValidator keyValidator; private final Supplier arcusClientSupplier; @SuppressWarnings("unchecked") public AsyncArcusCommands(Supplier arcusClientSupplier) { this.tc = (Transcoder) arcusClientSupplier.get().getTranscoder(); this.tcForCollection = (Transcoder) arcusClientSupplier.get().getCollectionTranscoder(); + this.keyValidator = arcusClientSupplier.get().getKeyValidator(); this.arcusClientSupplier = arcusClientSupplier; } @@ -239,7 +276,7 @@ private ArcusFuture> get(ArcusClient client, MemcachedNode node, r -> { Map decodedMap = new HashMap<>(); for (Map.Entry entry - : ((Map) r).entrySet()) { + : ((Map) r).entrySet()) { decodedMap.put(entry.getKey(), tc.decode(entry.getValue())); } return decodedMap; @@ -342,4 +379,770 @@ public void complete() { return future; } + + public ArcusFuture bopCreate(String key, ElementValueType type, + CollectionAttributes attributes) { + if (attributes == null) { + throw new IllegalArgumentException("CollectionAttributes cannot be null"); + } + + CollectionCreate create = new BTreeCreate(TranscoderUtils.examineFlags(type), + attributes.getExpireTime(), attributes.getMaxCount(), + attributes.getOverflowAction(), attributes.getReadable(), false); + + return collectionCreate(key, create); + } + + private ArcusFuture collectionCreate(String key, CollectionCreate collectionCreate) { + AbstractArcusResult result = new AbstractArcusResult<>(new AtomicReference<>()); + ArcusFutureImpl future = new ArcusFutureImpl<>(result); + ArcusClient client = arcusClientSupplier.get(); + + OperationCallback cb = new OperationCallback() { + @Override + public void receivedStatus(OperationStatus status) { + switch (status.getStatusCode()) { + case SUCCESS: + result.set(true); + break; + case ERR_EXISTS: + result.set(false); + break; + case CANCELLED: + future.internalCancel(); + break; + default: + // NOT_SUPPORTED or unknown statement + result.addError(key, status); + } + } + + @Override + public void complete() { + future.complete(); + } + }; + CollectionCreateOperation op = client.getOpFact() + .collectionCreate(key, collectionCreate, cb); + future.setOp(op); + client.addOp(key, op); + + return future; + } + + public ArcusFuture bopInsert(String key, BTreeElement element, + CollectionAttributes attributes) { + BTreeInsert insert = new BTreeInsert<>(element.getValue(), element.getEFlag(), + null, attributes); + return collectionInsert(key, element.getBkey().toString(), insert); + } + + public ArcusFuture bopInsert(String key, BTreeElement element) { + return bopInsert(key, element, null); + } + + @Override + public ArcusFuture bopUpsert(String key, BTreeElement element, + CollectionAttributes attributes) { + BTreeUpsert upsert = new BTreeUpsert<>(element.getValue(), element.getEFlag(), + null, attributes); + return collectionInsert(key, element.getBkey().toString(), upsert); + } + + @Override + public ArcusFuture bopUpsert(String key, BTreeElement element) { + return bopUpsert(key, element, null); + } + + private ArcusFuture collectionInsert(String key, + String internalKey, + CollectionInsert collectionInsert) { + AbstractArcusResult result = new AbstractArcusResult<>(new AtomicReference<>()); + ArcusFutureImpl future = new ArcusFutureImpl<>(result); + CachedData co = tcForCollection.encode(collectionInsert.getValue()); + ArcusClient client = arcusClientSupplier.get(); + + OperationCallback cb = new OperationCallback() { + @Override + public void receivedStatus(OperationStatus status) { + if (!status.isSuccess()) { + switch (status.getStatusCode()) { + case ERR_ELEMENT_EXISTS: + case ERR_NOT_FOUND: + break; + case CANCELLED: + future.internalCancel(); + return; + default: + /* + * TYPE_MISMATCH / BKEY_MISMATCH / OVERFLOWED / OUT_OF_RANGE / NOT_SUPPORTED + * or unknown statement + */ + result.addError(key, status); + return; + } + } + result.set(status.isSuccess()); + } + + @Override + public void complete() { + future.complete(); + } + }; + CollectionInsertOperation op = client.getOpFact() + .collectionInsert(key, internalKey, collectionInsert, co.getData(), cb); + future.setOp(op); + client.addOp(key, op); + + return future; + } + + public ArcusFuture>> bopInsertAndGetTrimmed( + String key, BTreeElement element, CollectionAttributes attributes) { + return bopInsertOrUpsertAndGetTrimmed(key, element, false, attributes); + } + + public ArcusFuture>> bopInsertAndGetTrimmed( + String key, BTreeElement element) { + return bopInsertOrUpsertAndGetTrimmed(key, element, false, null); + } + + public ArcusFuture>> bopUpsertAndGetTrimmed( + String key, BTreeElement element, CollectionAttributes attributes) { + return bopInsertOrUpsertAndGetTrimmed(key, element, true, attributes); + } + + public ArcusFuture>> bopUpsertAndGetTrimmed( + String key, BTreeElement element) { + return bopInsertOrUpsertAndGetTrimmed(key, element, true, null); + } + + private ArcusFutureImpl>> bopInsertOrUpsertAndGetTrimmed( + String key, BTreeElement element, boolean isUpsert, CollectionAttributes attributes) { + AbstractArcusResult>> result = + new AbstractArcusResult<>(new AtomicReference<>()); + ArcusFutureImpl>> future = new ArcusFutureImpl<>(result); + BTreeInsertAndGet insertAndGet = createBTreeInsertAndGet(element, isUpsert, attributes); + CachedData co = tcForCollection.encode(insertAndGet.getValue()); + insertAndGet.setFlags(co.getFlags()); + ArcusClient client = arcusClientSupplier.get(); + + BTreeInsertAndGetOperation.Callback cb = new BTreeInsertAndGetOperation.Callback() { + private boolean isInsertedOrUpserted = false; + private BTreeElement trimmedElement = null; + + public void receivedStatus(OperationStatus status) { + if (!status.isSuccess()) { + switch (status.getStatusCode()) { + case ERR_ELEMENT_EXISTS: + case ERR_NOT_FOUND: + break; + case CANCELLED: + future.internalCancel(); + return; + default: + /* + * TYPE_MISMATCH / BKEY_MISMATCH / OVERFLOWED / OUT_OF_RANGE / NOT_SUPPORTED + * or unknown statement + */ + result.addError(key, status); + return; + } + } + isInsertedOrUpserted = status.isSuccess(); + } + + public void complete() { + result.set(new AbstractMap.SimpleEntry<>(isInsertedOrUpserted, trimmedElement)); + future.complete(); + } + + @Override + public void gotData(int flags, BKeyObject bKeyObject, byte[] eFlag, byte[] data) { + trimmedElement = new BTreeElement<>(BKey.of(bKeyObject), + tcForCollection.decode(new CachedData(flags, data, tc.getMaxSize())), eFlag); + } + }; + Operation op = client.getOpFact() + .bopInsertAndGet(key, insertAndGet, co.getData(), cb); + future.setOp(op); + client.addOp(key, op); + + return future; + } + + private static BTreeInsertAndGet createBTreeInsertAndGet(BTreeElement element, + boolean isUpsert, + CollectionAttributes attributes) { + BTreeInsertAndGet insertAndGet; + if (element.getBkey().getType() == BKey.BKeyType.LONG) { + insertAndGet = new BTreeInsertAndGet<>((Long) element.getBkey().getData(), + element.getEFlag(), element.getValue(), isUpsert, attributes); + } else { + insertAndGet = new BTreeInsertAndGet<>((byte[]) element.getBkey().getData(), + element.getEFlag(), element.getValue(), isUpsert, attributes); + } + return insertAndGet; + } + + public ArcusFuture> bopGet(String key, BKey bKey, BopGetArgs args) { + AbstractArcusResult> result = + new AbstractArcusResult<>(new AtomicReference<>()); + ArcusFutureImpl> future = new ArcusFutureImpl<>(result); + BTreeGet get = createBTreeGet(bKey, args); + ArcusClient client = arcusClientSupplier.get(); + + CollectionGetOperation.Callback cb = new CollectionGetOperation.Callback() { + public void receivedStatus(OperationStatus status) { + if (!status.isSuccess()) { + switch (status.getStatusCode()) { + case ERR_NOT_FOUND: + result.set(null); + break; + case ERR_NOT_FOUND_ELEMENT: + result.set(new BTreeElement<>(bKey, null, null)); + break; + case CANCELLED: + future.internalCancel(); + break; + default: + /* + * TYPE_MISMATCH / BKEY_MISMATCH / OUT_OF_RANGE / UNREADABLE / NOT_SUPPORTED + * or unknown statement + */ + result.addError(key, status); + } + } + } + + public void complete() { + future.complete(); + } + + public void gotData(String bKey, int flags, byte[] data, byte[] eflag) { + result.set(new BTreeElement<>(BKey.of(bKey), + tcForCollection.decode(new CachedData(flags, data, tc.getMaxSize())), eflag)); + } + }; + Operation op = client.getOpFact().collectionGet(key, get, cb); + future.setOp(op); + client.addOp(key, op); + + return future; + } + + private static BTreeGet createBTreeGet(BKey bKey, BopGetArgs args) { + BTreeGet get; + if (bKey.getType() == BKey.BKeyType.LONG) { + get = new BTreeGet((long) bKey.getData(), args.getElementFlagFilter(), + args.isWithDelete(), args.isDropIfEmpty()); + } else { + get = new BTreeGet((byte[]) bKey.getData(), args.getElementFlagFilter(), + args.isWithDelete(), args.isDropIfEmpty()); + } + return get; + } + + public ArcusFuture> bopGet(String key, BKey from, BKey to, BopGetArgs args) { + verifyBKeyRange(from, to); + boolean reverse = from.compareTo(to) > 0; + + AbstractArcusResult> result = + new AbstractArcusResult<>(new AtomicReference<>( + new BTreeElements<>(new TreeMap<>(reverse ? Collections.reverseOrder() : null)))); + ArcusFutureImpl> future = new ArcusFutureImpl<>(result); + BTreeGet get = createBTreeGet(from, to, args); + ArcusClient client = arcusClientSupplier.get(); + + CollectionGetOperation.Callback cb = new CollectionGetOperation.Callback() { + public void receivedStatus(OperationStatus status) { + if (status.getStatusCode() == StatusCode.TRIMMED) { + result.get().trimmed(); + } else if (!status.isSuccess()) { + switch (status.getStatusCode()) { + case ERR_NOT_FOUND: + result.set(null); + break; + case ERR_NOT_FOUND_ELEMENT: + break; + case CANCELLED: + future.internalCancel(); + break; + default: + /* + * TYPE_MISMATCH / BKEY_MISMATCH / OUT_OF_RANGE / UNREADABLE / NOT_SUPPORTED + * or unknown statement + */ + result.addError(key, status); + } + } + } + + public void complete() { + future.complete(); + } + + public void gotData(String bKey, int flags, byte[] data, byte[] eflag) { + result.get().addElement(new BTreeElement<>(BKey.of(bKey), tcForCollection.decode( + new CachedData(flags, data, tc.getMaxSize())), eflag)); + } + }; + Operation op = client.getOpFact().collectionGet(key, get, cb); + future.setOp(op); + client.addOp(key, op); + + return future; + } + + private static BTreeGet createBTreeGet(BKey from, BKey to, BopGetArgs args) { + BTreeGet get; + if (from.getType() == BKey.BKeyType.LONG) { + get = new BTreeGet((Long) from.getData(), (Long) to.getData(), + args.getElementFlagFilter(), args.getOffset(), args.getCount(), + args.isWithDelete(), args.isDropIfEmpty()); + } else { + get = new BTreeGet((byte[]) from.getData(), (byte[]) to.getData(), + args.getElementFlagFilter(), args.getOffset(), args.getCount(), + args.isWithDelete(), args.isDropIfEmpty()); + } + return get; + } + + public ArcusFuture>> bopMultiGet(List keys, + BKey from, BKey to, + BopGetArgs args) { + verifyBKeyRange(from, to); + verifyCountArg(args, ArcusClient.MAX_GETBULK_ELEMENT_COUNT); + boolean reverse = from.compareTo(to) > 0; + + ArcusClient client = arcusClientSupplier.get(); + keyValidator.validateKey(keys); + keyValidator.checkDupKey(keys); + Collection>> arrangedKeys = + client.groupingKeys(keys, ArcusClient.BOPGET_BULK_CHUNK_SIZE, APIType.BOP_GET); + + Collection> futures = new ArrayList<>(); + Map>>, List> futureToKeys = + new HashMap<>(); + + for (Map.Entry> entry : arrangedKeys) { + BTreeGetBulk getBulk = createBTreeGetBulk(from, to, args, entry); + CompletableFuture>> future = + bopMultiGetPerNode(client, reverse, getBulk).toCompletableFuture(); + futureToKeys.put(future, entry.getValue()); + futures.add(future); + } + + /* + * Combine all futures. If any future fails exceptionally, + * the corresponding keys will have null values in the result map. + * If key not found, the corresponding key will not be present in the result map. + */ + return new ArcusMultiFuture<>(futures, () -> { + Map> results = new HashMap<>(); + for (Map.Entry>>, List> entry + : futureToKeys.entrySet()) { + if (entry.getKey().isCompletedExceptionally()) { + for (String key : entry.getValue()) { + results.put(key, null); + } + } else { + Map> result = entry.getKey().join(); + if (result != null) { + results.putAll(result); + } + } + } + return results; + }); + } + + private void verifyCountArg(BopGetArgs args, int maxCount) { + int count = args.getCount(); + if (count <= 0 || count > maxCount) { + throw new IllegalArgumentException("Count should be between 0 to " + maxCount); + } + } + + /** + * Use only in bopMultiGet method. + * + * @param getBulk get bulk parameters for single node + * @return ArcusFuture with results + */ + private ArcusFuture>> bopMultiGetPerNode(ArcusClient client, + boolean reverse, + BTreeGetBulk getBulk) { + AbstractArcusResult>> result = + new AbstractArcusResult<>(new AtomicReference<>(new HashMap<>())); + ArcusFutureImpl>> future = new ArcusFutureImpl<>(result); + + BTreeGetBulkOperation.Callback cb = new BTreeGetBulkOperation.Callback() { + @Override + public void receivedStatus(OperationStatus status) { + if (status.getStatusCode() == StatusCode.CANCELLED) { + future.internalCancel(); + } else if (!status.isSuccess()) { + /* + * NOT_SUPPORTED or unknown statement + */ + for (String key : getBulk.getKeyList()) { + result.addError(key, status); + } + } + } + + @Override + public void complete() { + future.complete(); + } + + @Override + public void gotKey(String key, int elementCount, OperationStatus status) { + if (status.isSuccess()) { + BTreeElements elements = new BTreeElements<>( + new TreeMap<>(reverse ? Collections.reverseOrder() : null)); + result.get().put(key, elements); + if (status.getStatusCode() == StatusCode.TRIMMED) { + elements.trimmed(); + } + return; + } + switch (status.getStatusCode()) { + case ERR_NOT_FOUND: + break; + case ERR_NOT_FOUND_ELEMENT: + // Put empty BTreeElements for the BTree item key + result.get().put(key, new BTreeElements<>(new TreeMap<>())); + break; + default: + /* + * TYPE_MISMATCH / BKEY_MISMATCH / OUT_OF_RANGE / UNREADABLE + * or unknown statement + */ + result.addError(key, status); + } + } + + @Override + public void gotElement(String key, int flags, Object bKey, byte[] eFlag, byte[] data) { + BTreeElements elements = result.get().get(key); + elements.addElement(new BTreeElement<>(BKey.of(bKey), tcForCollection.decode( + new CachedData(flags, data, tc.getMaxSize())), eFlag)); + } + }; + Operation op = client.getOpFact().bopGetBulk(getBulk, cb); + future.setOp(op); + client.addOp(getBulk.getMemcachedNode(), op); + + return future; + } + + private static void verifyBKeyRange(BKey from, BKey to) { + if (from.getType() != to.getType()) { + throw new IllegalArgumentException("Two BKey types(from, to) must be the same."); + } + } + + private BTreeGetBulk createBTreeGetBulk(BKey from, BKey to, BopGetArgs args, + Map.Entry> entry) { + if (from.getType() == BKey.BKeyType.LONG) { + return new BTreeGetBulkWithLongTypeBkey<>(entry.getKey(), entry.getValue(), + (Long) from.getData(), (Long) to.getData(), args.getElementFlagFilter(), + args.getOffset(), args.getCount()); + } else { + return new BTreeGetBulkWithByteTypeBkey<>(entry.getKey(), entry.getValue(), + (byte[]) from.getData(), (byte[]) to.getData(), args.getElementFlagFilter(), + args.getOffset(), args.getCount()); + } + } + + public ArcusFuture> bopSortMergeGet(List keys, BKey from, BKey to, + boolean unique, BopGetArgs args) { + verifyBKeyRange(from, to); + verifyCountArg(args, ArcusClient.MAX_SMGET_COUNT); + + ArcusClient client = arcusClientSupplier.get(); + keyValidator.validateKey(keys); + keyValidator.checkDupKey(keys); + + Collection>> arrangedKeys = + client.groupingKeys(keys, ArcusClient.SMGET_CHUNK_SIZE, APIType.BOP_SMGET); + + Collection> futures = new ArrayList<>(); + List>> smGetFutures = new ArrayList<>(); + + for (Map.Entry> entry : arrangedKeys) { + BTreeSMGet smGet = createBTreeSMGet(from, to, args, unique, entry); + CompletableFuture> future = + bopSortMergeGetPerNode(client, smGet).toCompletableFuture(); + smGetFutures.add(future); + futures.add(future); + } + + /* + * Combine all futures and merge results from multiple nodes. + */ + return new ArcusMultiFuture<>(futures, () -> { + List> results = new ArrayList<>(); + for (CompletableFuture> future : smGetFutures) { + if (!future.isCompletedExceptionally()) { + results.add(future.join()); + } + } + return SMGetElements.mergeSMGetElements(results, from.compareTo(to) <= 0, unique, + args.getCount()); + }); + } + + /** + * Use only in bopSortMergeGet method. + * + * @param smGet sort-merge get parameters for single node + * @return ArcusFuture with results + */ + private ArcusFuture> bopSortMergeGetPerNode(ArcusClient client, + BTreeSMGet smGet) { + List> elementList = new ArrayList<>(); + List missedKeys = new ArrayList<>(); + List trimmedKeys = new ArrayList<>(); + SMGetElements smGetElements = new SMGetElements<>(elementList, missedKeys, trimmedKeys); + + AtomicReference> atomicReference = new AtomicReference<>(smGetElements); + AbstractArcusResult> result = + new AbstractArcusResult<>(atomicReference); + + ArcusFutureImpl> future = new ArcusFutureImpl<>(result); + + BTreeSortMergeGetOperation.Callback cb = new BTreeSortMergeGetOperation.Callback() { + @Override + public void receivedStatus(OperationStatus status) { + if (status.getStatusCode() == StatusCode.CANCELLED) { + future.internalCancel(); + } else if (!status.isSuccess()) { + /* + * TYPE_MISMATCH / BKEY_MISMATCH / OUT_OF_RANGE / NOT_SUPPORTED or unknown statement + */ + for (String key : smGet.getKeyList()) { + result.addError(key, status); + } + } + } + + @Override + public void complete() { + future.complete(); + } + + @Override + public void gotData(String key, int flags, Object bKey, byte[] eFlag, byte[] data) { + BTreeElement btreeElement = new BTreeElement<>(BKey.of(bKey), + tcForCollection.decode(new CachedData(flags, data, tc.getMaxSize())), eFlag); + elementList.add(new SMGetElements.Element<>(key, btreeElement)); + } + + @Override + public void gotMissedKey(String key, OperationStatus cause) { + missedKeys.add(new SMGetElements.MissedKey(key, cause.getStatusCode())); + } + + @Override + public void gotTrimmedKey(String key, Object bKey) { + trimmedKeys.add(new SMGetElements.TrimmedKey(key, BKey.of(bKey))); + } + }; + Operation op = client.getOpFact().bopsmget(smGet, cb); + future.setOp(op); + client.addOp(smGet.getMemcachedNode(), op); + + return future; + } + + private BTreeSMGet createBTreeSMGet(BKey from, BKey to, BopGetArgs args, + boolean unique, + Map.Entry> entry) { + + if (from.getType() == BKey.BKeyType.LONG) { + return new BTreeSMGetWithLongTypeBkey<>(entry.getKey(), entry.getValue(), + (Long) from.getData(), (Long) to.getData(), args.getElementFlagFilter(), + args.getCount(), unique); + } else { + return new BTreeSMGetWithByteTypeBkey<>(entry.getKey(), entry.getValue(), + (byte[]) from.getData(), (byte[]) to.getData(), args.getElementFlagFilter(), + args.getCount(), unique); + } + } + + public Pipeline pipeline() { + ArcusClient arcusClient = arcusClientSupplier.get(); + return new Pipeline<>(arcusClient.getOpFact(), tc); + } + + public ArcusFuture> execute(Pipeline pipeline) { + Set keys = pipeline.getKeys(); + List ops = pipeline.getOps(); + + ArcusClient client = arcusClientSupplier.get(); + validatePipeline(keys, ops); + + Collection>> arrangedKeys = + client.groupingKeys(keys, ArcusClient.MAX_PIPED_ITEM_COUNT, APIType.PIPE); + + // If Pipeline has single key, then execute directly for single node. + if (arrangedKeys.size() == 1) { + Map.Entry> entry = arrangedKeys.iterator().next(); + return executePerNode(client, entry.getKey(), entry.getValue(), ops); + } + + // Pipeline will be split up and executed in multiple nodes. + // 1) Build key-to-node mapping for O(1) lookup. + Map keyToNode = new HashMap<>(keys.size()); + Map> nodeToKeys = new HashMap<>(); + for (Map.Entry> entry : arrangedKeys) { + for (String key : entry.getValue()) { + keyToNode.put(key, entry.getKey()); + } + nodeToKeys.put(entry.getKey(), entry.getValue()); + } + + // 2) Group operations per node with their original indexes. + Map, List>> nodeToOpsAndIndexes + = getOpsAndIndexesByNode(ops, keyToNode); + + // 3) Execute operations per node. + Map>, List> futureToIndexes = + getFutureToOriginIndexes(nodeToOpsAndIndexes, client, nodeToKeys); + + /* + * Combine all futures. If any future fails exceptionally, + * the corresponding indexes will have null values in the result list. + */ + return new ArcusMultiFuture<>(new ArrayList<>(futureToIndexes.keySet()), () -> { + List results = new ArrayList<>(Collections.nCopies(ops.size(), null)); + for (Map.Entry>, List> entry + : futureToIndexes.entrySet()) { + if (!entry.getKey().isCompletedExceptionally()) { + List resultPerNode = entry.getKey().join(); + List indexes = entry.getValue(); + for (int i = 0; i < indexes.size(); i++) { + results.set(indexes.get(i), resultPerNode.get(i)); + } + } + } + return results; + }); + } + + private Map>, List> getFutureToOriginIndexes( + Map, List>> nodeToOpsAndIndexes, + ArcusClient client, Map> nodeToKeys) { + Map>, List> futureToIndexes = new HashMap<>(); + for (Map.Entry, List>> entry + : nodeToOpsAndIndexes.entrySet()) { + MemcachedNode node = entry.getKey(); + + CompletableFuture> future = executePerNode(client, node, + nodeToKeys.get(node), entry.getValue().getKey()) + .toCompletableFuture(); + futureToIndexes.put(future, entry.getValue().getValue()); + } + return futureToIndexes; + } + + private Map, List>> getOpsAndIndexesByNode( + List ops, Map keyToNode) { + Map, List>> nodeToOpsAndIndexes + = new HashMap<>(); + for (int i = 0; i < ops.size(); i++) { + KeyedOperation op = ops.get(i); + String key = op.getKeys().iterator().next(); + MemcachedNode node = keyToNode.get(key); + Map.Entry, List> opsAndIndexes = nodeToOpsAndIndexes.get(node); + if (opsAndIndexes == null) { + List opsForNode = new ArrayList<>(); + List indexesForNode = new ArrayList<>(); + opsForNode.add(op); + indexesForNode.add(i); + nodeToOpsAndIndexes.put(node, + new AbstractMap.SimpleEntry<>(opsForNode, indexesForNode)); + } else { + opsAndIndexes.getKey().add(op); + opsAndIndexes.getValue().add(i); + } + } + return nodeToOpsAndIndexes; + } + + private void validatePipeline(Collection keys, List ops) { + keyValidator.validateKey(keys); + + if (ops.isEmpty() || ops.size() > 500) { + throw new IllegalArgumentException("Pipeline must have 1 to 500 operations."); + } + } + + /** + * Use only in execute method. + * + * @param client the ArcusClient instance to use + * @param node the MemcachedNode to execute operations on + * @param keys keys to add error when operation fails + * @param opsForNode operations to execute on the node + * @return ArcusFuture with each pipe results + */ + private ArcusFuture> executePerNode(ArcusClient client, + MemcachedNode node, + List keys, + List opsForNode) { + AtomicReference> atomicReference = new AtomicReference<>( + new ArrayList<>(Collections.nCopies(opsForNode.size(), null))); + AbstractArcusResult> result = new AbstractArcusResult<>(atomicReference); + ArcusFutureImpl> future = new ArcusFutureImpl<>(result); + + OperationCallback cb = new PipelineOperation.Callback() { + @Override + public void gotStatus(Operation op, OperationStatus status) { + int index = opsForNode.indexOf(op); + if (!status.isSuccess()) { + switch (status.getStatusCode()) { + case ERR_NOT_FOUND_ELEMENT: + case ERR_ELEMENT_EXISTS: + break; + default: + /* + * NOT_FOUND, TYPE_MISMATCH, BKEY_MISMATCH, EFLAG_MISMATCH, OVERFLOWED, OUT_OF_RANGE, + * NOTHING_TO_UPDATE, NOT_SUPPORTED + * or unknown statement + */ + atomicReference.get().set(index, status); + return; + } + } + atomicReference.get().set(index, status.isSuccess()); + } + + @Override + public void receivedStatus(OperationStatus status) { + if (status.getStatusCode() == StatusCode.CANCELLED) { + future.internalCancel(); + } else if (!status.isSuccess() && status.getStatusCode() != StatusCode.ERR_FAILED_END) { + for (String key : keys) { + result.addError(key, status); + } + } + } + + @Override + public void complete() { + future.complete(); + } + }; + + Operation pipelineOp = client.getOpFact().pipeline(opsForNode, keys, cb); + future.setOp(pipelineOp); + client.addOp(node, pipelineOp); + + return future; + } } diff --git a/src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java b/src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java index 085da0267..27f0c063d 100644 --- a/src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java +++ b/src/main/java/net/spy/memcached/v2/AsyncArcusCommandsIF.java @@ -20,6 +20,16 @@ import java.util.List; import java.util.Map; +import net.spy.memcached.collection.CollectionAttributes; +import net.spy.memcached.collection.ElementValueType; +import net.spy.memcached.v2.vo.BKey; +import net.spy.memcached.v2.vo.BTreeElement; +import net.spy.memcached.v2.vo.BTreeElements; +import net.spy.memcached.v2.vo.BopGetArgs; +import net.spy.memcached.v2.vo.SMGetElements; + +import net.spy.memcached.v2.pipe.Pipeline; + public interface AsyncArcusCommandsIF { /** @@ -106,4 +116,168 @@ public interface AsyncArcusCommandsIF { */ ArcusFuture flush(int delay); + /** + * Create a btree item. + * + * @param key key to create + * @param type btree element value type + * @param attributes collection attributes (must not be null) + * @return {@code Boolean.True} if created, otherwise {@code Boolean.False} + */ + ArcusFuture bopCreate(String key, ElementValueType type, + CollectionAttributes attributes); + + /** + * Insert an element into a btree item. + * + * @param key key to insert + * @param element btree element to insert + * @param attributes collection attributes for creation when the btree does not exist + * @return {@code Boolean.True} if inserted, {@code Boolean.False} if element exists, + * {@code null} if key is not found + */ + ArcusFuture bopInsert(String key, BTreeElement element, + CollectionAttributes attributes); + + /** + * Insert an element into a btree item. + * + * @param key key to insert + * @param element btree element to insert + * @return {@code Boolean.True} if inserted, {@code Boolean.False} if element exists, + * {@code null} if key is not found + */ + ArcusFuture bopInsert(String key, BTreeElement element); + + /** + * Upsert an element into a btree item. + * + * @param key key to upsert + * @param element btree element to upsert + * @param attributes collection attributes for creation when the btree does not exist + * @return {@code Boolean.True} if upserted, {@code Boolean.False} otherwise + */ + ArcusFuture bopUpsert(String key, BTreeElement element, + CollectionAttributes attributes); + + /** + * Upsert an element into a btree item. + * + * @param key key to upsert + * @param element btree element to upsert + * @return {@code Boolean.True} if upserted, {@code Boolean.False} otherwise + */ + ArcusFuture bopUpsert(String key, BTreeElement element); + + /** + * Insert an element into a btree item and get trimmed element if overflow trim occurs. + * + * @param key key to insert + * @param element btree element to insert + * @param attributes collection attributes for creation when the btree does not exist + * @return {@code Map.Entry} with insertion result and trimmed element + */ + ArcusFuture>> bopInsertAndGetTrimmed( + String key, BTreeElement element, CollectionAttributes attributes); + + /** + * Insert an element into a btree item and get trimmed element if overflow trim occurs. + * + * @param key key to insert + * @param element btree element to insert + * @return {@code Map.Entry} with insertion result and trimmed element + */ + ArcusFuture>> bopInsertAndGetTrimmed(String key, + BTreeElement element); + + /** + * Upsert an element into a btree item and get trimmed element if overflow trim occurs. + * + * @param key key to upsert + * @param element btree element to upsert + * @param attributes collection attributes for creation when the btree does not exist + * @return {@code Map.Entry} with upsertion result and trimmed element + */ + ArcusFuture>> bopUpsertAndGetTrimmed( + String key, BTreeElement element, CollectionAttributes attributes); + + /** + * Upsert an element into a btree item and get trimmed element if overflow trim occurs. + * + * @param key key to upsert + * @param element btree element to upsert + * @return {@code Map.Entry} with upsertion result and trimmed element + */ + ArcusFuture>> bopUpsertAndGetTrimmed(String key, + BTreeElement element); + + /** + * Get an element from a btree item. + * + * @param key key to get + * @param bKey BKey of the element to get + * @param args arguments for get operation + * @return {@code BTreeElement} if found, {@code BTreeElement} with null value and eFlag + * if element is not found but key exists, {@code null} if key is not found + */ + ArcusFuture> bopGet(String key, BKey bKey, BopGetArgs args); + + /** + * Get elements from a btree item. + * + * @param key key to get + * @param from BKey range start + * @param to BKey range end + * @param args arguments for get operation + * @return {@code BTreeElements} that contains trimmed or not and elements. + * If element is not found but key exists, {@code BTreeElements} with empty map will be returned. + * If key is not found, {@code null} will be returned. + */ + ArcusFuture> bopGet(String key, BKey from, BKey to, BopGetArgs args); + + /** + * Get elements from multiple btree items. + * + * @param keys list of keys to get + * @param from BKey range start + * @param to BKey range end + * @param args arguments for get operation + * @return Map of key to BTreeElements. If element is not found but key exists, + * empty {@code BTreeElements} will be set for entry value. If key is not found, + * the corresponding entry will not be present in the map. + */ + ArcusFuture>> bopMultiGet(List keys, + BKey from, BKey to, + BopGetArgs args); + + /** + * Get sort-merged elements from multiple btree items. + * + * @param keys list of keys to get + * @param from BKey range start + * @param to BKey range end + * @param unique whether to return unique elements only + * @param args arguments for get operation + * @return {@code SMGetElements} containing sort-merged elements. Never return {@code null}. + * If matching elements not exist, the elements list in the {@code SMGetElements} will be empty. + */ + ArcusFuture> bopSortMergeGet(List keys, BKey from, BKey to, + boolean unique, BopGetArgs args); + + /** + * Create a pipeline for batch operations. + * @return pipeline instance. Operations can be added by chaining methods. + */ + Pipeline pipeline(); + + /** + * Execute multiple operations in a pipeline using Arcus Command Pipelining feature. + * Does not guarantee atomicity. + * Does not guarantee the order of results when multiple keys are used in the pipeline. + * + * @param pipeline which contains multiple operations + * @return list of results for each operation in the pipeline + */ + ArcusFuture> execute(Pipeline pipeline); + } diff --git a/src/main/java/net/spy/memcached/v2/pipe/Pipeline.java b/src/main/java/net/spy/memcached/v2/pipe/Pipeline.java new file mode 100644 index 000000000..5a42c641e --- /dev/null +++ b/src/main/java/net/spy/memcached/v2/pipe/Pipeline.java @@ -0,0 +1,273 @@ +/* + * arcus-java-client : Arcus Java client + * Copyright 2010-2014 NAVER Corp. + * Copyright 2014-present JaM2in Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.spy.memcached.v2.pipe; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import net.spy.memcached.OperationFactory; +import net.spy.memcached.collection.BTreeDelete; +import net.spy.memcached.collection.BTreeInsert; +import net.spy.memcached.collection.BTreeMutate; +import net.spy.memcached.collection.BTreeUpdate; +import net.spy.memcached.collection.BTreeUpsert; +import net.spy.memcached.collection.ListDelete; +import net.spy.memcached.collection.MapDelete; +import net.spy.memcached.collection.MapInsert; +import net.spy.memcached.collection.MapUpdate; +import net.spy.memcached.collection.SetDelete; +import net.spy.memcached.collection.SetExist; +import net.spy.memcached.ops.KeyedOperation; +import net.spy.memcached.ops.Mutator; +import net.spy.memcached.collection.CollectionAttributes; +import net.spy.memcached.collection.ListInsert; +import net.spy.memcached.collection.RequestMode; +import net.spy.memcached.collection.SetInsert; +import net.spy.memcached.transcoders.Transcoder; +import net.spy.memcached.v2.vo.BKey; +import net.spy.memcached.v2.vo.BTreeElement; + +/** + * Data class for pipelined commands. + * If multiple keys are mapped to the same node, commands for those keys are sent in one batch. + * The atomicity for commands is not guaranteed but the order of commands + * for each key is guaranteed. + * Max number of commands is limited as 500. + * + * @param + */ +public class Pipeline { + private final Set keys; + private final List ops; + private final OperationFactory opFact; + private final Transcoder tc; + + public Pipeline(OperationFactory opFact, Transcoder tc) { + this.opFact = opFact; + this.tc = tc; + this.keys = new HashSet<>(); + this.ops = new ArrayList<>(); + } + + public Pipeline lopInsert(String key, int index, V value) { + return lopInsert(key, index, value, null); + } + + public Pipeline lopInsert(String key, int index, V value, CollectionAttributes attributes) { + checkNumOfCommands(); + + ListInsert listInsert = new ListInsert<>(value, RequestMode.PIPE, attributes); + keys.add(key); + ops.add(opFact.collectionInsert(key, String.valueOf(index), + listInsert, tc.encode(value).getData(), null)); + return this; + } + + public Pipeline lopDelete(String key, int index, boolean dropIfEmpty) { + checkNumOfCommands(); + + ListDelete listDelete = new ListDelete(index, dropIfEmpty, false); + keys.add(key); + ops.add(opFact.collectionDelete(key, listDelete, null)); + return this; + } + + public Pipeline lopDelete(String key, int from, int to, boolean dropIfEmpty) { + checkNumOfCommands(); + + ListDelete listDelete = new ListDelete(from, to, dropIfEmpty, false); + keys.add(key); + ops.add(opFact.collectionDelete(key, listDelete, null)); + return this; + } + + public Pipeline sopInsert(String key, V value) { + return sopInsert(key, value, null); + } + + public Pipeline sopInsert(String key, V value, CollectionAttributes attributes) { + checkNumOfCommands(); + + SetInsert setInsert = new SetInsert<>(value, RequestMode.PIPE, attributes); + keys.add(key); + ops.add(opFact.collectionInsert(key, "", + setInsert, tc.encode(value).getData(), null)); + return this; + } + + public Pipeline sopExist(String key, V value) { + checkNumOfCommands(); + + SetExist setExist = new SetExist<>(value, tc); + keys.add(key); + ops.add(opFact.collectionExist(key, "", setExist, null)); + return this; + } + + public Pipeline sopDelete(String key, V value) { + checkNumOfCommands(); + + SetDelete setDelete = new SetDelete<>(value, false, false, tc); + keys.add(key); + ops.add(opFact.collectionDelete(key, setDelete, null)); + return this; + } + + public Pipeline mopInsert(String key, String mkey, V value) { + return this.mopInsert(key, mkey, value, null); + } + + public Pipeline mopInsert(String key, String mkey, V value, CollectionAttributes attributes) { + checkNumOfCommands(); + + MapInsert mapInsert = new MapInsert<>(value, RequestMode.PIPE, attributes); + keys.add(key); + ops.add(opFact.collectionInsert(key, mkey, + mapInsert, tc.encode(value).getData(), null)); + return this; + } + + public Pipeline mopDelete(String key, String mkey) { + checkNumOfCommands(); + + MapDelete mapDelete = new MapDelete(Collections.singletonList(mkey), false, false); + keys.add(key); + ops.add(opFact.collectionDelete(key, mapDelete, null)); + return this; + } + + public Pipeline mopUpdate(String key, String mkey, V value) { + checkNumOfCommands(); + + MapUpdate mapUpdate = new MapUpdate<>(value, false); + keys.add(key); + ops.add(opFact.collectionUpdate(key, mkey, mapUpdate, tc.encode(value).getData(), null)); + return this; + } + + public Pipeline bopInsert(String key, BTreeElement element) { + return this.bopInsert(key, element, null); + } + + public Pipeline bopInsert(String key, BTreeElement element, + CollectionAttributes attributes) { + checkNumOfCommands(); + + BTreeInsert mapInsert = new BTreeInsert<>( + element.getValue(), element.getEFlag(), RequestMode.PIPE, attributes); + keys.add(key); + ops.add(opFact.collectionInsert(key, element.getBkey().toString(), + mapInsert, tc.encode(element.getValue()).getData(), null)); + return this; + } + + public Pipeline bopUpsert(String key, BTreeElement element) { + return this.bopUpsert(key, element, null); + } + + public Pipeline bopUpsert(String key, BTreeElement element, + CollectionAttributes attributes) { + checkNumOfCommands(); + + BTreeUpsert mapInsert = new BTreeUpsert<>( + element.getValue(), element.getEFlag(), RequestMode.PIPE, attributes); + keys.add(key); + ops.add(opFact.collectionInsert(key, element.getBkey().toString(), + mapInsert, tc.encode(element.getValue()).getData(), null)); + return this; + } + + public Pipeline bopDelete(String key, BKey bkey) { + checkNumOfCommands(); + + BTreeDelete bTreeDelete; + if (bkey.getType() == BKey.BKeyType.LONG) { + bTreeDelete = new BTreeDelete((Long) bkey.getData(), null, false, false); + } else { + bTreeDelete = new BTreeDelete((byte[]) bkey.getData(), null, false, false); + } + keys.add(key); + ops.add(opFact.collectionDelete(key, bTreeDelete, null)); + return this; + } + + public Pipeline bopDelete(String key, BKey from, BKey to) { + checkNumOfCommands(); + + if (from.getType() != to.getType()) { + throw new IllegalArgumentException("BKey types must match for range delete"); + } + + BTreeDelete bTreeDelete; + if (from.getType() == BKey.BKeyType.LONG) { + bTreeDelete = new BTreeDelete((Long) from.getData(), (Long) to.getData(), + -1, null, false, false); + } else { + bTreeDelete = new BTreeDelete((byte[]) from.getData(), (byte[]) to.getData(), + -1, null, false, false); + } + keys.add(key); + ops.add(opFact.collectionDelete(key, bTreeDelete, null)); + return this; + } + + public Pipeline bopUpdate(String key, BKey bkey, V value) { + checkNumOfCommands(); + + BTreeUpdate bTreeUpdate = new BTreeUpdate<>(value, null, false); + keys.add(key); + ops.add(opFact.collectionUpdate(key, bkey.toString(), bTreeUpdate, + tc.encode(value).getData(), null)); + return this; + } + + public Pipeline bopIncr(String key, BKey bkey, int delta) { + checkNumOfCommands(); + + BTreeMutate bTreeMutate = new BTreeMutate(Mutator.incr, delta); + keys.add(key); + ops.add(opFact.collectionMutate(key, bkey.toString(), bTreeMutate, null)); + return this; + } + + public Pipeline bopDecr(String key, BKey bkey, int delta) { + checkNumOfCommands(); + + BTreeMutate bTreeMutate = new BTreeMutate(Mutator.decr, delta); + keys.add(key); + ops.add(opFact.collectionMutate(key, bkey.toString(), bTreeMutate, null)); + return this; + } + + public Set getKeys() { + return Collections.unmodifiableSet(keys); + } + + public List getOps() { + return Collections.unmodifiableList(ops); + } + + private void checkNumOfCommands() { + if (ops.size() >= 500) { + throw new IllegalStateException("The number of commands in a pipeline cannot exceed 500"); + } + } +} diff --git a/src/main/java/net/spy/memcached/v2/vo/BKey.java b/src/main/java/net/spy/memcached/v2/vo/BKey.java new file mode 100644 index 000000000..7b841d790 --- /dev/null +++ b/src/main/java/net/spy/memcached/v2/vo/BKey.java @@ -0,0 +1,135 @@ +package net.spy.memcached.v2.vo; + +import java.util.Arrays; +import java.util.Objects; + +import net.spy.memcached.collection.BKeyObject; +import net.spy.memcached.util.BTreeUtil; + +public final class BKey implements Comparable { + private final BKeyType type; + private final Object data; + + private BKey(long data) { + if (data < 0) { + throw new IllegalArgumentException("BKey long value cannot be negative."); + } + this.type = BKeyType.LONG; + this.data = data; + } + + private BKey(byte[] data) { + if (data == null) { + throw new IllegalArgumentException("BKey byte array cannot be null."); + } + + if (data.length > 31) { + throw new IllegalArgumentException( + "BKey byte array size must be between 0 and 31. Given size: " + data.length); + } + + this.type = BKeyType.BYTE_ARRAY; + this.data = Arrays.copyOf(data, data.length); + } + + public static BKey of(Object bKey) { + if (bKey == null) { + throw new IllegalArgumentException("BKey cannot be null"); + } + if (bKey instanceof Long) { + return new BKey((Long) bKey); + } else if (bKey instanceof byte[]) { + return new BKey((byte[]) bKey); + } else if (bKey instanceof String) { + String bkeyString = (String) bKey; + try { + return new BKey(Long.parseLong(bkeyString)); + } catch (NumberFormatException e) { + return new BKey(BTreeUtil.hexStringToByteArrays(bkeyString)); + } + } else { + throw new IllegalArgumentException("Unsupported BKey type: " + bKey.getClass()); + } + } + + public static BKey of(BKeyObject bkeyObject) { + if (bkeyObject == null) { + throw new IllegalArgumentException("BKeyObject cannot be null"); + } + + if (bkeyObject.isByteArray()) { + return new BKey(bkeyObject.getByteArrayBKeyRaw()); + } else { + return new BKey(bkeyObject.getLongBKey()); + } + } + + public enum BKeyType { + BYTE_ARRAY, + LONG; + } + + public Object getData() { + if (type == BKeyType.BYTE_ARRAY) { + byte[] bytes = (byte[]) data; + return Arrays.copyOf(bytes, bytes.length); + } + return data; + } + + public BKeyType getType() { + return type; + } + + @Override + public int compareTo(BKey o) { + if (this.type != o.type) { + throw new IllegalArgumentException("Cannot compare different BKey types."); + } + + if (this.type == BKeyType.LONG) { + return ((Long) this.data).compareTo((Long) o.data); + } else { + return BTreeUtil.compareByteArraysInLexOrder((byte[]) this.data, (byte[]) o.data); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || this.getClass() != o.getClass()) { + return false; + } + BKey bKey = (BKey) o; + if (this.type != bKey.type) { + return false; + } + + if (this.type == BKeyType.LONG) { + return this.data.equals(bKey.data); + } else { + return Arrays.equals((byte[]) this.data, (byte[]) bKey.data); + } + } + + @Override + public int hashCode() { + if (this.type == BKeyType.LONG) { + return Objects.hash(this.type, this.data); + } else { + return Objects.hash(this.type, Arrays.hashCode((byte[]) this.data)); + } + } + + @Override + public String toString() { + if (this.type == BKeyType.LONG) { + return String.valueOf(this.data); + } else { + return BTreeUtil.toHex((byte[]) this.data); + } + } +} diff --git a/src/main/java/net/spy/memcached/v2/vo/BTreeElement.java b/src/main/java/net/spy/memcached/v2/vo/BTreeElement.java new file mode 100644 index 000000000..abfabdace --- /dev/null +++ b/src/main/java/net/spy/memcached/v2/vo/BTreeElement.java @@ -0,0 +1,55 @@ +package net.spy.memcached.v2.vo; + +import java.util.Arrays; +import java.util.Objects; + +public final class BTreeElement implements Comparable> { + private final BKey bkey; + private final V value; + private final byte[] eFlag; + + public BTreeElement(BKey bkey, V value, byte[] eFlag) { + if (bkey == null) { + throw new IllegalArgumentException("BKey cannot be null"); + } + this.bkey = bkey; + this.value = value; + this.eFlag = eFlag; + } + + public BKey getBkey() { + return bkey; + } + + public V getValue() { + return value; + } + + public byte[] getEFlag() { + return eFlag; + } + + @Override + public int compareTo(BTreeElement o) { + return this.bkey.compareTo(o.bkey); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + BTreeElement that = (BTreeElement) o; + return Objects.equals(bkey, that.bkey) && + Objects.equals(value, that.value) && Objects.deepEquals(eFlag, that.eFlag); + } + + @Override + public int hashCode() { + return Objects.hash(bkey, value, Arrays.hashCode(eFlag)); + } +} diff --git a/src/main/java/net/spy/memcached/v2/vo/BTreeElements.java b/src/main/java/net/spy/memcached/v2/vo/BTreeElements.java new file mode 100644 index 000000000..7badd987b --- /dev/null +++ b/src/main/java/net/spy/memcached/v2/vo/BTreeElements.java @@ -0,0 +1,33 @@ +package net.spy.memcached.v2.vo; + +import java.util.Collections; +import java.util.Map; +import java.util.SortedMap; + +public final class BTreeElements { + private boolean isTrimmed; + private final SortedMap> elements; + + public BTreeElements(SortedMap> elements) { + if (elements == null) { + throw new IllegalArgumentException("Elements map cannot be null"); + } + this.elements = elements; + } + + public boolean isTrimmed() { + return isTrimmed; + } + + public Map> getElements() { + return Collections.unmodifiableMap(elements); + } + + public void trimmed() { + this.isTrimmed = true; + } + + public void addElement(BTreeElement element) { + this.elements.put(element.getBkey(), element); + } +} diff --git a/src/main/java/net/spy/memcached/v2/vo/BopGetArgs.java b/src/main/java/net/spy/memcached/v2/vo/BopGetArgs.java new file mode 100644 index 000000000..0f12faf98 --- /dev/null +++ b/src/main/java/net/spy/memcached/v2/vo/BopGetArgs.java @@ -0,0 +1,100 @@ +package net.spy.memcached.v2.vo; + +import net.spy.memcached.collection.ElementFlagFilter; + +public final class BopGetArgs { + + public static final BopGetArgs DEFAULT = new BopGetArgs.Builder().build(); + + private final ElementFlagFilter eFlagFilter; + private final int offset; + private final int count; + private final boolean withDelete; + private final boolean dropIfEmpty; + + private BopGetArgs(ElementFlagFilter eFlagFilter, int offset, int count, + boolean withDelete, boolean dropIfEmpty) { + this.eFlagFilter = eFlagFilter; + this.offset = offset; + this.count = count; + this.withDelete = withDelete; + this.dropIfEmpty = dropIfEmpty; + } + + public ElementFlagFilter getElementFlagFilter() { + return eFlagFilter; + } + + public int getOffset() { + return offset; + } + + public int getCount() { + return count; + } + + public boolean isWithDelete() { + return withDelete; + } + + public boolean isDropIfEmpty() { + return dropIfEmpty; + } + + public static final class Builder { + private ElementFlagFilter eFlagFilter = null; + private int offset = 0; + private int count = 50; + private boolean withDelete = false; + private boolean dropIfEmpty = false; + + public Builder eFlagFilter(ElementFlagFilter eFlagFilter) { + this.eFlagFilter = eFlagFilter; + return this; + } + + /** + * Set the offset only for {@code AsyncArcusCommands#bopGet} + * or {@code AsyncArcusCommands#bopMultiGet} + * + * @param offset to skip elements that match condition from the 'from' BKey + */ + public Builder offset(int offset) { + if (offset < 0) { + throw new IllegalArgumentException("offset cannot be negative"); + } + this.offset = offset; + return this; + } + + /** + * Set the count of elements to retrieve. + * + * @param count For bopGet or bopMultiGet method, + * set the number of elements to retrieve from each BTree item. + * For bopSortMergeGet method, + * set the total number of elements to retrieve across all BTree items. + */ + public Builder count(int count) { + if (count < 0) { + throw new IllegalArgumentException("count cannot be negative"); + } + this.count = count; + return this; + } + + public Builder withDelete() { + this.withDelete = true; + return this; + } + + public Builder dropIfEmpty() { + this.dropIfEmpty = true; + return this; + } + + public BopGetArgs build() { + return new BopGetArgs(eFlagFilter, offset, count, withDelete, dropIfEmpty); + } + } +} diff --git a/src/main/java/net/spy/memcached/v2/vo/SMGetElements.java b/src/main/java/net/spy/memcached/v2/vo/SMGetElements.java new file mode 100644 index 000000000..71624e6f4 --- /dev/null +++ b/src/main/java/net/spy/memcached/v2/vo/SMGetElements.java @@ -0,0 +1,265 @@ +package net.spy.memcached.v2.vo; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import net.spy.memcached.ops.StatusCode; + +public final class SMGetElements { + private final List> elements; + private final List missedKeys; + private final List trimmedKeys; + + public SMGetElements(List> elements, + List missedKeys, + List trimmedKeys) { + if (elements == null || missedKeys == null || trimmedKeys == null) { + throw new IllegalArgumentException("Arguments cannot be null"); + } + this.elements = elements; + this.missedKeys = missedKeys; + this.trimmedKeys = trimmedKeys; + } + + public static SMGetElements mergeSMGetElements(List> smGetElementsList, + boolean ascending, + boolean unique, int count) { + List> allElements = new ArrayList<>(); + List allMissedKeys = new ArrayList<>(); + List allTrimmedKeys = new ArrayList<>(); + + // 1) Collect elements (deduplicate while collecting if unique is true) + if (unique) { + collectUniqueElements(smGetElementsList, allElements, allMissedKeys, allTrimmedKeys, + ascending); + } else { + collectDuplicatedElements(smGetElementsList, allElements, allMissedKeys, allTrimmedKeys); + } + + // 2) Sort elements, missed keys, and trimmed keys + if (ascending) { + allElements.sort(Comparator.naturalOrder()); + } else { + allElements.sort(Comparator.reverseOrder()); + } + Collections.sort(allMissedKeys); + Collections.sort(allTrimmedKeys); + + // 3) Trim elements to the requested count + if (allElements.size() > count) { + allElements = new ArrayList<>(allElements.subList(0, count)); + } + + // 4) Remove trimmed keys outside the final element range + if (!allElements.isEmpty()) { + BKey lastBKey = allElements.get(allElements.size() - 1).getbTreeElement().getBkey(); + allTrimmedKeys.removeIf(trimmedKey -> { + int comp = trimmedKey.getBKey().compareTo(lastBKey); + return ascending ? comp >= 0 : comp <= 0; + }); + } + + return new SMGetElements<>(allElements, allMissedKeys, allTrimmedKeys); + } + + private static void collectUniqueElements( + List> smGetElementsList, + List> allElements, + List allMissedKeys, + List allTrimmedKeys, + boolean ascending) { + Map> uniqueMap = new HashMap<>(); + + for (SMGetElements smGetElements : smGetElementsList) { + for (Element element : smGetElements.getElements()) { + BKey bkey = element.getbTreeElement().getBkey(); + + uniqueMap.compute(bkey, (k, existing) -> { + if (existing == null) { + return element; + } + + // Remain only one element when bkey is duplicated: + // - smaller key if ascending bkey range + // - larger key if descending bkey range + int keyComparison = existing.getKey().compareTo(element.getKey()); + if (ascending) { + return keyComparison <= 0 ? existing : element; + } else { + return keyComparison >= 0 ? existing : element; + } + }); + } + allMissedKeys.addAll(smGetElements.getMissedKeys()); + allTrimmedKeys.addAll(smGetElements.getTrimmedKeys()); + } + + allElements.addAll(uniqueMap.values()); + } + + private static void collectDuplicatedElements( + List> smGetElementsList, + List> allElements, + List allMissedKeys, + List allTrimmedKeys) { + for (SMGetElements smgetElements : smGetElementsList) { + allElements.addAll(smgetElements.getElements()); + allMissedKeys.addAll(smgetElements.getMissedKeys()); + allTrimmedKeys.addAll(smgetElements.getTrimmedKeys()); + } + } + + public List> getElements() { + return Collections.unmodifiableList(elements); + } + + public List getMissedKeys() { + return Collections.unmodifiableList(missedKeys); + } + + public List getTrimmedKeys() { + return Collections.unmodifiableList(trimmedKeys); + } + + public static final class Element implements Comparable> { + + private final String key; + private final BTreeElement bTreeElement; + + public Element(String key, BTreeElement element) { + if (key == null || element == null) { + throw new IllegalArgumentException("key or element cannot be null"); + } + this.key = key; + this.bTreeElement = element; + } + + @Override + public int compareTo(Element o) { + int elementComparison = bTreeElement.compareTo(o.getbTreeElement()); + if (elementComparison == 0) { + return this.key.compareTo(o.key); + } + return elementComparison; + } + + public String getKey() { + return key; + } + + public BTreeElement getbTreeElement() { + return bTreeElement; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + Element that = (Element) o; + return Objects.equals(key, that.key) && Objects.equals(bTreeElement, that.bTreeElement); + } + + @Override + public int hashCode() { + return Objects.hash(key, bTreeElement); + } + } + + public static final class MissedKey implements Comparable { + private final String key; + private final StatusCode statusCode; + + public MissedKey(String key, StatusCode statusCode) { + if (key == null || statusCode == null) { + throw new IllegalArgumentException("key or statusCode cannot be null"); + } + this.key = key; + this.statusCode = statusCode; + } + + public String getKey() { + return key; + } + + public StatusCode getStatusCode() { + return statusCode; + } + + @Override + public int compareTo(MissedKey o) { + return this.key.compareTo(o.key); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + MissedKey missedKey = (MissedKey) o; + return Objects.equals(key, missedKey.key) && statusCode == missedKey.statusCode; + } + + @Override + public int hashCode() { + return Objects.hash(key, statusCode); + } + } + + public static final class TrimmedKey implements Comparable { + private final String key; + private final BKey bKey; + + public TrimmedKey(String key, BKey bKey) { + if (key == null || bKey == null) { + throw new IllegalArgumentException("key or bKey cannot be null"); + } + this.key = key; + this.bKey = bKey; + } + + public String getKey() { + return key; + } + + public BKey getBKey() { + return bKey; + } + + @Override + public int compareTo(TrimmedKey o) { + return this.key.compareTo(o.key); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + TrimmedKey that = (TrimmedKey) o; + return Objects.equals(key, that.key) && Objects.equals(bKey, that.bKey); + } + + @Override + public int hashCode() { + return Objects.hash(key, bKey); + } + } +} diff --git a/src/test/java/net/spy/memcached/v2/BTreeAsyncArcusCommandsTest.java b/src/test/java/net/spy/memcached/v2/BTreeAsyncArcusCommandsTest.java new file mode 100644 index 000000000..65547cb87 --- /dev/null +++ b/src/test/java/net/spy/memcached/v2/BTreeAsyncArcusCommandsTest.java @@ -0,0 +1,833 @@ +package net.spy.memcached.v2; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import net.spy.memcached.collection.CollectionAttributes; +import net.spy.memcached.collection.CollectionOverflowAction; +import net.spy.memcached.collection.ElementValueType; +import net.spy.memcached.ops.StatusCode; +import net.spy.memcached.v2.vo.BKey; +import net.spy.memcached.v2.vo.BTreeElement; +import net.spy.memcached.v2.vo.BTreeElements; +import net.spy.memcached.v2.vo.BopGetArgs; +import net.spy.memcached.v2.vo.SMGetElements; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertIterableEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class BTreeAsyncArcusCommandsTest extends AsyncArcusCommandsTest { + + private static final List> ELEMENTS = Arrays.asList( + new BTreeElement<>(BKey.of(1L), "value1", null), + new BTreeElement<>(BKey.of(2L), "value2", null), + new BTreeElement<>(BKey.of(3L), "value3", null), + new BTreeElement<>(BKey.of(4L), "value4", null), + new BTreeElement<>(BKey.of(5L), "value5", null) + ); + + @Test + void bopInsert() throws Exception { + // given + String key = keys.get(0); + + // when + async.bopCreate(key, ElementValueType.STRING, new CollectionAttributes()) + .thenCompose(result -> { + assertTrue(result); + return async.bopInsert(key, ELEMENTS.get(0)); + }) + // then + .thenAccept(Assertions::assertTrue) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopInsertDifferentTypeAndGetDifferentElement() throws Exception { + // given + String key = keys.get(0); + BTreeElement element = ELEMENTS.get(0); + + // when + async.bopCreate(key, ElementValueType.LONG, new CollectionAttributes()) + .thenCompose(result -> { + assertTrue(result); + return async.bopInsert(key, element); + }) + .thenCompose(result -> { + assertTrue(result); + return async.bopGet(key, element.getBkey(), BopGetArgs.DEFAULT); + }) + // then + .thenAccept(result -> { + assertNotEquals(element, result); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopInsertNotFound() throws Exception { + // given & when + async.bopInsert(keys.get(0), ELEMENTS.get(0)) + // then + .thenAccept(Assertions::assertFalse) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopInsertTypeMisMatch() throws Exception { + // given + String key = keys.get(0); + CollectionAttributes attrs = new CollectionAttributes(); + + // when + async.set(key, 0, VALUE) + .thenCompose(result -> { + assertTrue(result); + return async.bopInsert(key, ELEMENTS.get(0), attrs); + }) + // then + .exceptionally(throwable -> { + assertNotNull(throwable); + assertTrue(throwable.getCause().getMessage().contains("TYPE_MISMATCH")); + return null; + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopInsertAndGetTrimmed() throws Exception { + // given + String key = keys.get(0); + CollectionAttributes attrs = new CollectionAttributes(); + attrs.setMaxCount(3); + + // when + async.bopInsert(key, ELEMENTS.get(0), attrs) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(1), attrs)) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(2), attrs)) + .thenCompose(result -> async.bopInsertAndGetTrimmed(key, ELEMENTS.get(3), attrs)) + // then + .thenAccept(result -> { + assertTrue(result.getKey()); + BTreeElement trimmedElement = result.getValue(); + assertNotNull(trimmedElement); + assertEquals(ELEMENTS.get(0), trimmedElement); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopFailToInsertAndGetTrimmed() throws Exception { + // given + String key = keys.get(0); + CollectionAttributes attrs = new CollectionAttributes(); + attrs.setMaxCount(3); + + // when + async.bopInsert(key, ELEMENTS.get(0), attrs) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(1))) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(2))) + .thenCompose(result -> async.bopInsertAndGetTrimmed(key, ELEMENTS.get(0))) + // then + .thenAccept(result -> { + assertFalse(result.getKey()); + assertNull(result.getValue()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopUpsertAndGetTrimmed() throws Exception { + // given + String key = keys.get(0); + CollectionAttributes attrs = new CollectionAttributes(); + attrs.setMaxCount(3); + + // when + async.bopInsert(key, ELEMENTS.get(0), attrs) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(1))) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(2))) + .thenCompose(result -> async.bopUpsertAndGetTrimmed(key, + ELEMENTS.get(3))) + // then + .thenAccept(result -> { + assertTrue(result.getKey()); + assertEquals(ELEMENTS.get(0), result.getValue()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopUpsertAndNotGetTrimmed() throws Exception { + // given + String key = keys.get(0); + CollectionAttributes attrs = new CollectionAttributes(); + attrs.setMaxCount(3); + String newValue = "new_value"; + + // when + async.bopInsert(key, ELEMENTS.get(0), attrs) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(1))) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(2))) + .thenCompose(result -> async.bopUpsertAndGetTrimmed(key, + new BTreeElement<>(BKey.of(1L), newValue, null))) + // then + .thenAccept(result -> { + assertTrue(result.getKey()); + assertNull(result.getValue()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopGet() throws Exception { + // given + String key = keys.get(0); + CollectionAttributes attrs = new CollectionAttributes(); + + // when + async.bopInsert(key, ELEMENTS.get(0), attrs) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(1))) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(2))) + .thenCompose(result -> async.bopGet(key, BKey.of(1L), BKey.of(3L), BopGetArgs.DEFAULT)) + // then + .thenAccept(elements -> { + assertEquals(3, elements.getElements().size()); + + int i = 0; + for (BTreeElement element : elements.getElements().values()) { + assertEquals(ELEMENTS.get(i++), element); + } + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopGetWithDelete() throws Exception { + // given + String key = keys.get(0); + CollectionAttributes attrs = new CollectionAttributes(); + BopGetArgs getArgsWithDelete = new BopGetArgs.Builder() + .withDelete() + .count(10) + .build(); + + // when + async.bopInsert(key, ELEMENTS.get(0), attrs) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(1))) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(2))) + .thenCompose(result -> async.bopGet(key, BKey.of(2L), getArgsWithDelete)) + // then + .thenAccept(element -> assertEquals(ELEMENTS.get(1), element)) + .thenCompose(v -> async.bopGet(key, BKey.of(2L), BopGetArgs.DEFAULT)) + .thenAccept(element -> { + // ELEMENT NOT FOUND + assertNotNull(element); + assertEquals(BKey.of(2L), element.getBkey()); + assertNull(element.getValue()); + assertNull(element.getEFlag()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopGetWithDeleteAndDropIfEmpty() throws Exception { + // given + String key = keys.get(0); + BopGetArgs getArgsWithDeleteAndDrop = new BopGetArgs.Builder() + .withDelete() + .dropIfEmpty() + .count(10) + .build(); + + // when + async.bopInsert(key, ELEMENTS.get(0), new CollectionAttributes()) + .thenCompose(result -> async.bopGet(key, BKey.of(1L), getArgsWithDeleteAndDrop)) + // then + .thenAccept(element -> assertEquals(ELEMENTS.get(0), element)) + .thenCompose(v -> async.bopInsert(key, ELEMENTS.get(1))) + .thenAccept(Assertions::assertFalse) // NOT FOUND (key miss) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopGetRangeNotExistKey() { + // given + String key = keys.get(0); + + // when + async.bopGet(key, BKey.of(1L), BKey.of(10L), BopGetArgs.DEFAULT) + // then + .thenAccept(Assertions::assertNull) // NOT FOUND (key miss) + .toCompletableFuture() + .join(); + } + + @Test + void bopGetRangeNotExistElements() throws Exception { + // given + String key = keys.get(0); + + // when + async.bopInsert(key, ELEMENTS.get(0), new CollectionAttributes()) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(1))) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(2))) + .thenCompose(result -> async.bopGet(key, + BKey.of(10L), BKey.of(20L), BopGetArgs.DEFAULT)) + // then + .thenAccept(elements -> { + // NOT FOUND ELEMENT + assertNotNull(elements); + assertEquals(0, elements.getElements().size()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopGetRangeWithDelete() throws Exception { + // given + String key = keys.get(0); + CollectionAttributes attrs = new CollectionAttributes(); + BopGetArgs getArgsWithDelete = new BopGetArgs.Builder() + .withDelete() + .count(10) + .build(); + + // when + async.bopInsert(key, ELEMENTS.get(0), attrs) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(1), attrs)) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(2), attrs)) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(3), attrs)) + .thenCompose(result -> async.bopGet(key, BKey.of(2L), BKey.of(3L), getArgsWithDelete)) + // then + .thenAccept(elements -> { + assertNotNull(elements); + assertEquals(2, elements.getElements().size()); + assertTrue(elements.getElements().containsValue(ELEMENTS.get(1))); + assertTrue(elements.getElements().containsValue(ELEMENTS.get(2))); + }) + .thenCompose(v -> async.bopGet(key, BKey.of(2L), BKey.of(3L), BopGetArgs.DEFAULT)) + .thenAccept(elements -> { + assertNotNull(elements); + assertTrue(elements.getElements().isEmpty()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopGetRangeDescending() throws Exception { + // given + String key = keys.get(0); + + // when + async.bopInsert(key, ELEMENTS.get(0), new CollectionAttributes()) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(1))) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(2))) + .thenCompose(result -> async.bopInsert(key, ELEMENTS.get(3))) + .thenCompose(result -> async.bopGet(key, BKey.of(10L), BKey.of(1L), BopGetArgs.DEFAULT)) + // then + .thenAccept(elements -> { + assertNotNull(elements); + int i = 3; + for (BTreeElement element : elements.getElements().values()) { + assertEquals(ELEMENTS.get(i--), element); + } + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + + @Test + void bopMultiGet() throws Exception { + // given + List testKeys = Arrays.asList(keys.get(0), keys.get(1), keys.get(2)); + CollectionAttributes attr = new CollectionAttributes(); + + // when + async.bopInsert(testKeys.get(0), ELEMENTS.get(0), attr) + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(1))) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(2), attr)) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(3))) + .thenCompose(result -> async.bopInsert(testKeys.get(2), ELEMENTS.get(4), attr)) + // then + .thenCompose(result -> async + .bopMultiGet(testKeys, BKey.of(1L), BKey.of(10L), BopGetArgs.DEFAULT)) + .thenAccept(map -> { + assertEquals(3, map.size()); + + BTreeElements elements0 = map.get(testKeys.get(0)); + assertEquals(2, elements0.getElements().size()); + assertTrue(elements0.getElements().containsValue(ELEMENTS.get(0))); + assertTrue(elements0.getElements().containsValue(ELEMENTS.get(1))); + + BTreeElements elements1 = map.get(testKeys.get(1)); + assertEquals(2, elements1.getElements().size()); + assertTrue(elements1.getElements().containsValue(ELEMENTS.get(2))); + assertTrue(elements1.getElements().containsValue(ELEMENTS.get(3))); + + BTreeElements elements2 = map.get(testKeys.get(2)); + assertEquals(1, elements2.getElements().size()); + assertTrue(elements2.getElements().containsValue(ELEMENTS.get(4))); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopMultiGetDescending() throws Exception { + // given + List testKeys = Arrays.asList(keys.get(0), keys.get(1), keys.get(2)); + CollectionAttributes attr = new CollectionAttributes(); + + // when + async.bopInsert(testKeys.get(0), ELEMENTS.get(0), attr) + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(1))) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(2), attr)) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(3))) + .thenCompose(result -> async.bopInsert(testKeys.get(2), ELEMENTS.get(4), attr)) + // then + .thenCompose(result -> async + .bopMultiGet(testKeys, BKey.of(10L), BKey.of(1L), BopGetArgs.DEFAULT)) + .thenAccept(map -> { + assertEquals(3, map.size()); + + BTreeElements elements0 = map.get(testKeys.get(0)); + assertEquals(2, elements0.getElements().size()); + BTreeElements elements1 = map.get(testKeys.get(1)); + assertEquals(2, elements1.getElements().size()); + BTreeElements elements2 = map.get(testKeys.get(2)); + assertEquals(1, elements2.getElements().size()); + + // Make sure that the order is descending + int i = 1; + for (Map.Entry> entry : elements0.getElements() + .entrySet()) { + assertEquals(ELEMENTS.get(i--), entry.getValue()); + } + i = 3; + for (Map.Entry> entry : elements1.getElements() + .entrySet()) { + assertEquals(ELEMENTS.get(i--), entry.getValue()); + } + + assertTrue(elements2.getElements().containsValue(ELEMENTS.get(4))); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopMultiGetNotFoundElement() throws Exception { + // given + List testKeys = Arrays.asList(keys.get(0), keys.get(1)); + CollectionAttributes attrs = new CollectionAttributes(); + + // when + async.bopInsert(testKeys.get(0), ELEMENTS.get(0), attrs) + .thenCompose(result -> async + .bopMultiGet(testKeys, BKey.of(10L), BKey.of(20L), BopGetArgs.DEFAULT)) + // then + .thenAccept(map -> { + assertEquals(1, map.size()); + + // NOT FOUND ELEMENT + BTreeElements elements = map.get(testKeys.get(0)); + assertNotNull(elements); + assertEquals(0, elements.getElements().size()); + + // NOT FOUND + assertFalse(map.containsKey(testKeys.get(1))); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopSortMergeGetAscendingUnique() throws Exception { + // given + List testKeys = Arrays.asList(keys.get(0), keys.get(1), keys.get(2)); + CollectionAttributes attrs = new CollectionAttributes(); + + async.bopInsert(testKeys.get(0), ELEMENTS.get(0), attrs) + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(2))) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(0), attrs)) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(1))) + .thenCompose(result -> async.bopInsert(testKeys.get(2), ELEMENTS.get(0), attrs)) + .thenCompose(result -> async.bopInsert(testKeys.get(2), ELEMENTS.get(3))) + // when + .thenCompose(result -> async.bopSortMergeGet(testKeys, + BKey.of(1L), BKey.of(10L), true, BopGetArgs.DEFAULT)) + // then + .thenAccept(smGetElements -> { + assertNotNull(smGetElements); + assertEquals(4, smGetElements.getElements().size()); + + List> elements = smGetElements.getElements(); + assertEquals(testKeys.get(0), elements.get(0).getKey()); + assertEquals(ELEMENTS.get(0), elements.get(0).getbTreeElement()); + assertEquals(testKeys.get(1), elements.get(1).getKey()); + assertEquals(ELEMENTS.get(1), elements.get(1).getbTreeElement()); + assertEquals(testKeys.get(0), elements.get(2).getKey()); + assertEquals(ELEMENTS.get(2), elements.get(2).getbTreeElement()); + assertEquals(testKeys.get(2), elements.get(3).getKey()); + assertEquals(ELEMENTS.get(3), elements.get(3).getbTreeElement()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopSortMergeGetDescendingUnique() throws Exception { + // given + List testKeys = Arrays.asList(keys.get(0), keys.get(1), keys.get(2)); + CollectionAttributes attrs = new CollectionAttributes(); + + async.bopInsert(testKeys.get(0), ELEMENTS.get(0), attrs) + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(2))) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(0), attrs)) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(1))) + .thenCompose(result -> async.bopInsert(testKeys.get(2), ELEMENTS.get(0), attrs)) + .thenCompose(result -> async.bopInsert(testKeys.get(2), ELEMENTS.get(3))) + // when + .thenCompose(result -> async.bopSortMergeGet(testKeys, + BKey.of(10L), BKey.of(1L), true, BopGetArgs.DEFAULT)) + // then + .thenAccept(smGetElements -> { + assertNotNull(smGetElements); + assertEquals(4, smGetElements.getElements().size()); + + List> elements = smGetElements.getElements(); + assertEquals(testKeys.get(2), elements.get(0).getKey()); + assertEquals(ELEMENTS.get(3), elements.get(0).getbTreeElement()); + assertEquals(testKeys.get(0), elements.get(1).getKey()); + assertEquals(ELEMENTS.get(2), elements.get(1).getbTreeElement()); + assertEquals(testKeys.get(1), elements.get(2).getKey()); + assertEquals(ELEMENTS.get(1), elements.get(2).getbTreeElement()); + assertEquals(testKeys.get(2), elements.get(3).getKey()); + assertEquals(ELEMENTS.get(0), elements.get(3).getbTreeElement()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopSortMergeGetAscendingDuplicated() throws Exception { + // given + List testKeys = Arrays.asList(keys.get(0), keys.get(1)); + CollectionAttributes attrs = new CollectionAttributes(); + + async.bopInsert(testKeys.get(0), ELEMENTS.get(0), attrs) + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(1))) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(0), attrs)) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(1))) + // when + .thenCompose(result -> async.bopSortMergeGet(testKeys, + BKey.of(1L), BKey.of(2L), false, BopGetArgs.DEFAULT)) + // then + .thenAccept(smGetElements -> { + assertNotNull(smGetElements); + assertEquals(4, smGetElements.getElements().size()); + + SMGetElements.Element element1 = smGetElements.getElements().get(0); + assertEquals(ELEMENTS.get(0), element1.getbTreeElement()); + assertEquals(testKeys.get(0), element1.getKey()); + + SMGetElements.Element element2 = smGetElements.getElements().get(1); + assertEquals(ELEMENTS.get(0), element2.getbTreeElement()); + assertEquals(testKeys.get(1), element2.getKey()); + + SMGetElements.Element element3 = smGetElements.getElements().get(2); + assertEquals(ELEMENTS.get(1), element3.getbTreeElement()); + assertEquals(testKeys.get(0), element3.getKey()); + + SMGetElements.Element element4 = smGetElements.getElements().get(3); + assertEquals(ELEMENTS.get(1), element4.getbTreeElement()); + assertEquals(testKeys.get(1), element4.getKey()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopSortMergeGetDescendingDuplicated() throws Exception { + // given + List testKeys = Arrays.asList(keys.get(0), keys.get(1)); + CollectionAttributes attrs = new CollectionAttributes(); + + async.bopInsert(testKeys.get(0), ELEMENTS.get(0), attrs) + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(1))) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(0), attrs)) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(1))) + // when + .thenCompose(result -> async.bopSortMergeGet(testKeys, + BKey.of(2L), BKey.of(1L), false, BopGetArgs.DEFAULT)) + // then + .thenAccept(smGetElements -> { + assertNotNull(smGetElements); + assertEquals(4, smGetElements.getElements().size()); + + SMGetElements.Element element1 = smGetElements.getElements().get(0); + assertEquals(ELEMENTS.get(1), element1.getbTreeElement()); + assertEquals(testKeys.get(1), element1.getKey()); + + SMGetElements.Element element2 = smGetElements.getElements().get(1); + assertEquals(ELEMENTS.get(1), element2.getbTreeElement()); + assertEquals(testKeys.get(0), element2.getKey()); + + SMGetElements.Element element3 = smGetElements.getElements().get(2); + assertEquals(ELEMENTS.get(0), element3.getbTreeElement()); + assertEquals(testKeys.get(1), element3.getKey()); + + SMGetElements.Element element4 = smGetElements.getElements().get(3); + assertEquals(ELEMENTS.get(0), element4.getbTreeElement()); + assertEquals(testKeys.get(0), element4.getKey()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopSortMergeGetUnique() throws Exception { + // given + CollectionAttributes attrs = new CollectionAttributes(); + + async.bopInsert(keys.get(0), ELEMENTS.get(0), attrs) + .thenCompose(result -> async.bopInsert(keys.get(1), ELEMENTS.get(0), attrs)) + .thenCompose(result -> async.bopInsert(keys.get(2), ELEMENTS.get(0), attrs)) + .thenCompose(result -> async.bopInsert(keys.get(3), ELEMENTS.get(0), attrs)) + // when + .thenCompose(result -> async.bopSortMergeGet(keys, + BKey.of(1L), BKey.of(3L), true, BopGetArgs.DEFAULT)) + // then + .thenAccept(smGetElements -> { + assertNotNull(smGetElements); + assertEquals(1, smGetElements.getElements().size()); + + SMGetElements.Element element = smGetElements.getElements().get(0); + assertEquals(ELEMENTS.get(0), element.getbTreeElement()); + assertEquals(keys.get(0), element.getKey()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopSortMergeGetWithMissedKeys() throws Exception { + // given + List testKeys = Arrays.asList(keys.get(0), keys.get(1)); + CollectionAttributes attrs = new CollectionAttributes(); + + async.bopInsert(testKeys.get(0), ELEMENTS.get(0), attrs) + // when + .thenCompose(result -> async.bopSortMergeGet(testKeys, + BKey.of(1L), BKey.of(10L), false, BopGetArgs.DEFAULT)) + // then + .thenAccept(smGetElements -> { + assertNotNull(smGetElements); + + assertEquals(1, smGetElements.getElements().size()); + List> elements = smGetElements.getElements(); + assertEquals(ELEMENTS.get(0), elements.get(0).getbTreeElement()); + assertEquals(testKeys.get(0), elements.get(0).getKey()); + + assertEquals(1, smGetElements.getMissedKeys().size()); + SMGetElements.MissedKey missedKey = smGetElements.getMissedKeys().get(0); + assertEquals(testKeys.get(1), missedKey.getKey()); + assertEquals(StatusCode.ERR_NOT_FOUND, missedKey.getStatusCode()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopSortMergeGetNotFound() throws Exception { + // given + List testKeys = Arrays.asList(keys.get(0), keys.get(1), keys.get(2)); + + // when + async.bopSortMergeGet(testKeys, + BKey.of(10L), BKey.of(20L), false, BopGetArgs.DEFAULT) + // then + .thenAccept(smGetElements -> { + assertNotNull(smGetElements); + assertEquals(0, smGetElements.getElements().size()); + assertEquals(3, smGetElements.getMissedKeys().size()); + assertEquals(0, smGetElements.getTrimmedKeys().size()); + assertIterableEquals(testKeys, smGetElements.getMissedKeys().stream() + .map(SMGetElements.MissedKey::getKey).collect(Collectors.toList())); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopSortMergeGetNotFoundElement() throws Exception { + // given + List testKeys = Arrays.asList(keys.get(0), keys.get(1), keys.get(2)); + CollectionAttributes attrs = new CollectionAttributes(); + + // when + async.bopInsert(testKeys.get(0), ELEMENTS.get(0), attrs) + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(1), attrs)) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(2), attrs)) + .thenCompose(result -> async.bopSortMergeGet(testKeys, + BKey.of(10L), BKey.of(20L), false, BopGetArgs.DEFAULT)) + // then + .thenAccept(smGetElements -> { + assertNotNull(smGetElements); + assertEquals(0, smGetElements.getElements().size()); + assertEquals(1, smGetElements.getMissedKeys().size()); + assertEquals(0, smGetElements.getTrimmedKeys().size()); + + SMGetElements.MissedKey missedKey = smGetElements.getMissedKeys().get(0); + assertEquals(testKeys.get(2), missedKey.getKey()); + assertEquals(StatusCode.ERR_NOT_FOUND, missedKey.getStatusCode()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopSortMergeGetWithTrimmedKeys() throws Exception { + // given + List testKeys = Arrays.asList(keys.get(0), keys.get(1), keys.get(2)); + CollectionAttributes attrs = new CollectionAttributes(); + attrs.setMaxCount(2); + + // when + async.bopInsert(testKeys.get(0), ELEMENTS.get(0), attrs) + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(1))) + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(2))) + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(3))) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(1), attrs)) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(2))) + .thenCompose(result -> async.bopSortMergeGet(testKeys, + BKey.of(10L), BKey.of(1L), false, BopGetArgs.DEFAULT)) + // then + .thenAccept(smGetElements -> { + assertNotNull(smGetElements); + assertEquals(4, smGetElements.getElements().size()); + assertEquals(1, smGetElements.getMissedKeys().size()); + assertEquals(1, smGetElements.getTrimmedKeys().size()); + + SMGetElements.TrimmedKey trimmedKey = smGetElements.getTrimmedKeys().get(0); + assertEquals(keys.get(0), trimmedKey.getKey()); + assertEquals(ELEMENTS.get(2).getBkey(), trimmedKey.getBKey()); + + List> elements = smGetElements.getElements(); + assertEquals(testKeys.get(0), elements.get(0).getKey()); + assertEquals(ELEMENTS.get(3), elements.get(0).getbTreeElement()); + assertEquals(testKeys.get(1), elements.get(1).getKey()); + assertEquals(ELEMENTS.get(2), elements.get(1).getbTreeElement()); + assertEquals(testKeys.get(0), elements.get(2).getKey()); + assertEquals(ELEMENTS.get(2), elements.get(2).getbTreeElement()); + assertEquals(testKeys.get(1), elements.get(3).getKey()); + assertEquals(ELEMENTS.get(1), elements.get(3).getbTreeElement()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopSortMergeGetNotHaveTrimmedKeysOutOfElementsRangeDescending() throws Exception { + // given + List testKeys = Arrays.asList(keys.get(0), keys.get(1)); + CollectionAttributes attrs = new CollectionAttributes(); + attrs.setMaxCount(2); + + // when + async.bopInsert(testKeys.get(0), ELEMENTS.get(0), attrs) // to be trimmed + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(1))) // to be trimmed + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(2))) + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(3))) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(2), attrs)) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(3))) + .thenCompose(result -> async.bopSortMergeGet(testKeys, + BKey.of(10L), BKey.of(1L), false, + new BopGetArgs.Builder().build())) + // then + .thenAccept(smGetElements -> { + assertNotNull(smGetElements); + assertEquals(4, smGetElements.getElements().size()); + assertTrue(smGetElements.getMissedKeys().isEmpty()); + assertTrue(smGetElements.getTrimmedKeys().isEmpty()); + + List> elements = smGetElements.getElements(); + assertEquals(testKeys.get(1), elements.get(0).getKey()); + assertEquals(ELEMENTS.get(3), elements.get(0).getbTreeElement()); + assertEquals(testKeys.get(0), elements.get(1).getKey()); + assertEquals(ELEMENTS.get(3), elements.get(1).getbTreeElement()); + assertEquals(testKeys.get(1), elements.get(2).getKey()); + assertEquals(ELEMENTS.get(2), elements.get(2).getbTreeElement()); + assertEquals(testKeys.get(0), elements.get(3).getKey()); + assertEquals(ELEMENTS.get(2), elements.get(3).getbTreeElement()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void bopSortMergeGetNotHaveTrimmedKeysOutOfElementsRangeAscending() throws Exception { + // given + List testKeys = Arrays.asList(keys.get(0), keys.get(1)); + CollectionAttributes attrs = new CollectionAttributes(); + attrs.setMaxCount(2); + attrs.setOverflowAction(CollectionOverflowAction.largest_trim); + + // when + async.bopInsert(testKeys.get(0), ELEMENTS.get(3), attrs) + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(2))) + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(1))) // to be trimmed + .thenCompose(result -> async.bopInsert(testKeys.get(0), ELEMENTS.get(0))) // to be trimmed + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(1), attrs)) + .thenCompose(result -> async.bopInsert(testKeys.get(1), ELEMENTS.get(0))) + .thenCompose(result -> async.bopSortMergeGet(testKeys, + BKey.of(1L), BKey.of(10L), false, + new BopGetArgs.Builder().build())) + // then + .thenAccept(smGetElements -> { + assertNotNull(smGetElements); + assertEquals(4, smGetElements.getElements().size()); + assertTrue(smGetElements.getMissedKeys().isEmpty()); + assertTrue(smGetElements.getTrimmedKeys().isEmpty()); + + List> elements = smGetElements.getElements(); + assertEquals(testKeys.get(0), elements.get(0).getKey()); + assertEquals(ELEMENTS.get(0), elements.get(0).getbTreeElement()); + assertEquals(testKeys.get(1), elements.get(1).getKey()); + assertEquals(ELEMENTS.get(0), elements.get(1).getbTreeElement()); + assertEquals(testKeys.get(0), elements.get(2).getKey()); + assertEquals(ELEMENTS.get(1), elements.get(2).getbTreeElement()); + assertEquals(testKeys.get(1), elements.get(3).getKey()); + assertEquals(ELEMENTS.get(1), elements.get(3).getbTreeElement()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } +} diff --git a/src/test/java/net/spy/memcached/v2/PipelineAsyncArcusCommandsTest.java b/src/test/java/net/spy/memcached/v2/PipelineAsyncArcusCommandsTest.java new file mode 100644 index 000000000..a893239f2 --- /dev/null +++ b/src/test/java/net/spy/memcached/v2/PipelineAsyncArcusCommandsTest.java @@ -0,0 +1,217 @@ +package net.spy.memcached.v2; + +import java.util.concurrent.TimeUnit; + +import net.spy.memcached.collection.CollectionAttributes; +import net.spy.memcached.collection.CollectionOverflowAction; +import net.spy.memcached.ops.OperationStatus; +import net.spy.memcached.ops.StatusCode; +import net.spy.memcached.v2.pipe.Pipeline; +import net.spy.memcached.v2.vo.BKey; +import net.spy.memcached.v2.vo.BTreeElement; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class PipelineAsyncArcusCommandsTest extends AsyncArcusCommandsTest { + + @Test + void pipelineMixedInsert() throws Exception { + // given + CollectionAttributes attr = new CollectionAttributes(); + + // when + Pipeline pipeline = async.pipeline() + .lopInsert("list1", 0, "value1", attr) + .sopInsert("set1", "value1", attr) + .lopInsert("list1", 0, "value2") + .sopInsert("set2", "value2") + .bopInsert("btree1", new BTreeElement<>(BKey.of(1L), "value1", null), attr) + .mopInsert("map1", "field1", "value1", attr) + .mopInsert("map1", "field1", "value2", attr); + + async.execute(pipeline) + // then + .thenAccept(list -> { + assertEquals(7, list.size()); + assertEquals(true, list.get(0)); + assertEquals(true, list.get(1)); + assertEquals(true, list.get(2)); + assertEquals(StatusCode.ERR_NOT_FOUND, ((OperationStatus) list.get(3)).getStatusCode()); + assertEquals(true, list.get(4)); + assertEquals(true, list.get(5)); + assertEquals(false, list.get(6)); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void pipelineInsertTypeMismatch() throws Exception { + // given + CollectionAttributes attr = new CollectionAttributes(); + + // when + Pipeline pipeline = async.pipeline() + .lopInsert("list1", 0, "value1", attr) + .sopInsert("list1", "value1"); + + async.execute(pipeline) + // then + .thenAccept(list -> { + assertEquals(2, list.size()); + assertEquals(true, list.get(0)); + assertEquals(StatusCode.ERR_TYPE_MISMATCH, + ((OperationStatus) list.get(1)).getStatusCode()); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void pipeline500OpsOnSingleKey() throws Exception { + // given + CollectionAttributes attr = new CollectionAttributes(); + String key = "btree_max"; + + Pipeline pipeline = async.pipeline(); + for (long i = 0; i < 500; i++) { + if (i == 0) { + pipeline.bopInsert(key, new BTreeElement<>(BKey.of(i), "value" + i, null), attr); + } else { + pipeline.bopInsert(key, new BTreeElement<>(BKey.of(i), "value" + i, null)); + } + } + + // when + async.execute(pipeline) + // then + .thenAccept(list -> { + assertEquals(500, list.size()); + for (int i = 0; i < 500; i++) { + assertEquals(true, list.get(i)); + } + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void pipeline500OpsOnMultiKeys() throws Exception { + // given + CollectionAttributes attr = new CollectionAttributes(); + + Pipeline pipeline = async.pipeline(); + for (int keyIdx = 0; keyIdx < 10; keyIdx++) { + String key = "btree_multi_" + keyIdx; + for (long i = 0; i < 50; i++) { + if (i == 0) { + pipeline.bopInsert(key, new BTreeElement<>(BKey.of(i), "value" + i, null), attr); + } else { + pipeline.bopInsert(key, new BTreeElement<>(BKey.of(i), "value" + i, null)); + } + } + } + + // when + async.execute(pipeline) + // then + .thenAccept(list -> { + assertEquals(500, list.size()); + for (int i = 0; i < 500; i++) { + assertEquals(true, list.get(i)); + } + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void pipelineExceed500OpsThrowsException() { + // given + CollectionAttributes attr = new CollectionAttributes(); + String key = "btree_exceed"; + + Pipeline pipeline = async.pipeline(); + for (long i = 0; i < 500; i++) { + if (i == 0) { + pipeline.bopInsert(key, new BTreeElement<>(BKey.of(i), "value" + i, null), attr); + } else { + pipeline.bopInsert(key, new BTreeElement<>(BKey.of(i), "value" + i, null)); + } + } + + // when & then + BTreeElement element = new BTreeElement<>(BKey.of(501L), "value501", null); + assertThrows(IllegalStateException.class, () -> pipeline.bopInsert(key, element)); + } + + @Test + void pipelineSingleKeyWithMixedOperations() throws Exception { + // given + CollectionAttributes attr = new CollectionAttributes(); + String key = "btree_mixed"; + + Pipeline pipeline = async.pipeline() + .bopInsert(key, new BTreeElement<>(BKey.of(1L), "value1", null), attr) + .bopInsert(key, new BTreeElement<>(BKey.of(2L), "value2", null)) + .bopUpsert(key, new BTreeElement<>(BKey.of(1L), "updated1", null)) + .bopDelete(key, BKey.of(3L)); + + // when + async.execute(pipeline) + // then + .thenAccept(list -> { + assertEquals(4, list.size()); + assertEquals(true, list.get(0)); + assertEquals(true, list.get(1)); + assertEquals(true, list.get(2)); + assertEquals(false, list.get(3)); + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void pipelineMultipleKeysOrderPreserved() throws Exception { + // given + CollectionAttributes attr = new CollectionAttributes(); + attr.setMaxCount(2); + attr.setOverflowAction(CollectionOverflowAction.error); + + Pipeline pipeline = async.pipeline() + .bopInsert("btree_order1", new BTreeElement<>(BKey.of(1L), "A", null), attr) + .bopInsert("btree_order2", new BTreeElement<>(BKey.of(1L), "B", null), attr) + .bopInsert("btree_order1", new BTreeElement<>(BKey.of(2L), "C", null)) + .bopInsert("btree_order2", new BTreeElement<>(BKey.of(2L), "D", null)) + .bopInsert("btree_order1", new BTreeElement<>(BKey.of(3L), "E", null)) + .bopInsert("btree_order2", new BTreeElement<>(BKey.of(3L), "F", null)); + + // when + async.execute(pipeline) + // then + .thenAccept(list -> { + assertEquals(6, list.size()); + for (int i = 0; i < 4; i++) { + assertEquals(true, list.get(i)); + } + for (int i = 5; i < 6; i++) { + assertEquals(StatusCode.ERR_OVERFLOWED, + ((OperationStatus) list.get(i)).getStatusCode()); + } + }) + .toCompletableFuture() + .get(300, TimeUnit.MILLISECONDS); + } + + @Test + void pipelineEmptyThrowsException() { + // given + Pipeline pipeline = async.pipeline(); + + // when & then: Should throw IllegalArgumentException + assertThrows(IllegalArgumentException.class, () -> async.execute(pipeline)); + } +} diff --git a/src/test/java/net/spy/memcached/v2/migration/PipelineOperationMigrationTest.java b/src/test/java/net/spy/memcached/v2/migration/PipelineOperationMigrationTest.java new file mode 100644 index 000000000..834e124f3 --- /dev/null +++ b/src/test/java/net/spy/memcached/v2/migration/PipelineOperationMigrationTest.java @@ -0,0 +1,201 @@ +package net.spy.memcached.v2.migration; + +import java.net.InetSocketAddress; +import java.nio.Buffer; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +import net.spy.memcached.ArcusClient; +import net.spy.memcached.RedirectHandler; +import net.spy.memcached.ops.Operation; +import net.spy.memcached.ops.OperationState; +import net.spy.memcached.ops.OperationStatus; +import net.spy.memcached.ops.PipelineOperation; +import net.spy.memcached.protocol.ascii.AsciiMemcachedNodeImpl; +import net.spy.memcached.protocol.ascii.AsciiOperationFactory; +import net.spy.memcached.protocol.ascii.PipelineOperationImpl; +import net.spy.memcached.transcoders.SerializingTranscoder; +import net.spy.memcached.v2.AsyncArcusCommands; +import net.spy.memcached.v2.pipe.Pipeline; +import net.spy.memcached.v2.vo.BKey; +import net.spy.memcached.v2.vo.BTreeElement; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class PipelineOperationMigrationTest { + + private static ArcusClient arcusClient; + private static AsyncArcusCommands async; + private static final List KEYS = Arrays.asList("migration_key1", "migration_key2", + "migration_key3", "migration_key4"); + private static final String VALUE = "migration_value"; + + @BeforeAll + static void setUp() { + arcusClient = ArcusClient.createArcusClient("localhost:2181", "test"); + async = arcusClient.asyncCommands(); + } + + @BeforeEach + void cleanUp() throws ExecutionException, InterruptedException, TimeoutException { + async.flush(-1).get(300, TimeUnit.MILLISECONDS); + } + + @AfterAll + static void tearDown() { + arcusClient.shutdown(); + } + + @Test + void redirectMultiKeysFromPipelineOperation() { + // given + SerializingTranscoder tc = SerializingTranscoder.forCollection().build(); + Pipeline pipeline = new Pipeline<>(new AsciiOperationFactory(), tc); + + pipeline.lopInsert(KEYS.get(0), 0, VALUE); + pipeline.sopInsert(KEYS.get(1), VALUE); + pipeline.mopInsert(KEYS.get(2), "mkey", VALUE); + pipeline.bopInsert(KEYS.get(3), new BTreeElement<>(BKey.of(0L), VALUE, null)); + + PipelineOperationImpl pipelineOp = getPipelineOp(pipeline); + + // when + ByteBuffer b = ByteBuffer.allocate(200); + + String line1 = "RESPONSE 3\r\n"; + b.put(line1.getBytes()); + ((Buffer) b).flip(); + assertDoesNotThrow(() -> pipelineOp.readFromBuffer(b)); + ((Buffer) b).clear(); + + String line2 = "STORED\r\n"; + b.put(line2.getBytes()); + ((Buffer) b).flip(); + assertDoesNotThrow(() -> pipelineOp.readFromBuffer(b)); + ((Buffer) b).clear(); + + String line3 = "NOT_MY_KEY 1000 2000\r\n"; + b.put(line3.getBytes()); + ((Buffer) b).flip(); + assertDoesNotThrow(() -> pipelineOp.readFromBuffer(b)); + ((Buffer) b).clear(); + + String line4 = "STORED\r\n"; + b.put(line4.getBytes()); + ((Buffer) b).flip(); + assertDoesNotThrow(() -> pipelineOp.readFromBuffer(b)); + ((Buffer) b).clear(); + + String line5 = "NOT_MY_KEY 4000 5000\r\n"; + b.put(line5.getBytes()); + ((Buffer) b).flip(); + assertDoesNotThrow(() -> pipelineOp.readFromBuffer(b)); + ((Buffer) b).clear(); + + String line6 = "END\r\n"; + b.put(line6.getBytes()); + ((Buffer) b).flip(); + assertDoesNotThrow(() -> pipelineOp.readFromBuffer(b)); + ((Buffer) b).clear(); + + // then + RedirectHandler redirectHandler = pipelineOp.getAndClearRedirectHandler(); + assertEquals(OperationState.REDIRECT, pipelineOp.getState()); + assertInstanceOf(RedirectHandler.RedirectHandlerMultiKey.class, redirectHandler); + List keys = ((RedirectHandler.RedirectHandlerMultiKey) redirectHandler) + .groupRedirectKeys(arcusClient.getMemcachedConnection(), pipelineOp) + .values().stream().flatMap(List::stream).collect(Collectors.toList()); + assertTrue(keys.contains(KEYS.get(1))); + assertTrue(keys.contains(KEYS.get(3))); + } + + private PipelineOperationImpl getPipelineOp(Pipeline pipeline) { + PipelineOperation.Callback cb = new PipelineOperation.Callback() { + @Override + public void receivedStatus(OperationStatus status) { + // Do nothing + } + + @Override + public void complete() { + // Do nothing + } + + @Override + public void gotStatus(Operation op, OperationStatus status) { + // Do nothing + } + }; + + PipelineOperationImpl pipelineOp = + new PipelineOperationImpl(pipeline.getOps(), new ArrayList<>(pipeline.getKeys()), cb); + + LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); + pipelineOp.setHandlingNode(new AsciiMemcachedNodeImpl("testnode", new InetSocketAddress(11211), + 60, queue, queue, queue, 0L, false)); + pipelineOp.writeComplete(); + return pipelineOp; + } + + @Test + void redirectSingleKeyFromPipelineOperation() { + SerializingTranscoder tc = SerializingTranscoder.forCollection().build(); + Pipeline pipeline = new Pipeline<>(new AsciiOperationFactory(), tc); + + pipeline.lopInsert(KEYS.get(0), 0, VALUE); + pipeline.lopInsert(KEYS.get(0), 1, VALUE); + pipeline.lopInsert(KEYS.get(0), 2, VALUE); + pipeline.lopInsert(KEYS.get(0), 3, VALUE); + + PipelineOperationImpl pipelineOp = getPipelineOp(pipeline); + + // when + ByteBuffer b = ByteBuffer.allocate(200); + + String line1 = "RESPONSE 2\r\n"; + b.put(line1.getBytes()); + ((Buffer) b).flip(); + assertDoesNotThrow(() -> pipelineOp.readFromBuffer(b)); + ((Buffer) b).clear(); + + String line2 = "STORED\r\n"; + b.put(line2.getBytes()); + ((Buffer) b).flip(); + assertDoesNotThrow(() -> pipelineOp.readFromBuffer(b)); + ((Buffer) b).clear(); + + String line3 = "NOT_MY_KEY 1000 2000\r\n"; + b.put(line3.getBytes()); + ((Buffer) b).flip(); + assertDoesNotThrow(() -> pipelineOp.readFromBuffer(b)); + ((Buffer) b).clear(); + + String line4 = "END\r\n"; + b.put(line4.getBytes()); + ((Buffer) b).flip(); + assertDoesNotThrow(() -> pipelineOp.readFromBuffer(b)); + ((Buffer) b).clear(); + + // then + RedirectHandler redirectHandler = pipelineOp.getAndClearRedirectHandler(); + assertEquals(OperationState.REDIRECT, pipelineOp.getState()); + assertInstanceOf(RedirectHandler.RedirectHandlerSingleKey.class, redirectHandler); + assertEquals(KEYS.get(0), + ((RedirectHandler.RedirectHandlerSingleKey) redirectHandler).getKey()); + } +}