Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/main/java/net/spy/memcached/ArcusClientPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,26 @@ public GetFuture<CASValue<Object>> asyncGets(String key) {
return this.getClient().asyncGets(key);
}

@Override
public <T> GetFuture<T> asyncGetAndTouch(String key, int exp, Transcoder<T> tc) {
return this.getClient().asyncGetAndTouch(key, exp, tc);
}

@Override
public GetFuture<Object> asyncGetAndTouch(String key, int exp) {
return this.getClient().asyncGetAndTouch(key, exp);
}

@Override
public <T> GetFuture<CASValue<T>> asyncGetsAndTouch(String key, int exp, Transcoder<T> tc) {
return this.getClient().asyncGetsAndTouch(key, exp, tc);
}

@Override
public GetFuture<CASValue<Object>> asyncGetsAndTouch(String key, int exp) {
return this.getClient().asyncGetsAndTouch(key, exp);
}

@Override
public <T> CASValue<T> gets(String key, Transcoder<T> tc)
throws OperationTimeoutException {
Expand Down
100 changes: 100 additions & 0 deletions src/main/java/net/spy/memcached/MemcachedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,106 @@ public GetFuture<CASValue<Object>> asyncGets(final String key) {
return asyncGets(key, transcoder);
}

/**
* Get the given key to reset its expiration time asynchronously.
*
* @param key the key to fetch
* @param exp the new expiration to set for the given key
* @param tc the transcoder to serialize and unserialize value
* @return a future that will hold the return value of the fetch
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public <T> GetFuture<T> asyncGetAndTouch(final String key, final int exp,
final Transcoder<T> tc) {
final CountDownLatch latch = new CountDownLatch(1);
final GetFuture<T> future = new GetFuture<>(latch, operationTimeout);

Operation op = opFact.getAndTouch(key, exp,
new GetOperation.Callback() {
private GetResult<T> result = null;

public void receivedStatus(OperationStatus status) {
future.set(result, status);
}

public void gotData(String k, int flags, byte[] data) {
assert k.equals(key) : "Wrong key returned";
result = new GetResultImpl<>(new CachedData(flags, data, tc.getMaxSize()), tc);
}

public void complete() {
latch.countDown();
}
});
future.setOperation(op);
addOp(key, op);
return future;
}

/**
* Get the given key to reset its expiration time asynchronously.
*
* @param key the key to fetch
* @param exp the new expiration to set for the given key
* @return a future that will hold the return value of the fetch
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public GetFuture<Object> asyncGetAndTouch(final String key, final int exp) {
return asyncGetAndTouch(key, exp, transcoder);
}

/**
* Gets (with CAS support) the given key to reset its expiration time asynchronously.
*
* @param key the key to fetch
* @param exp the new expiration to set for the given key
* @param tc the transcoder to serialize and unserialize value
* @return a future that will hold the return value of the fetch
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public <T> GetFuture<CASValue<T>> asyncGetsAndTouch(final String key, final int exp,
final Transcoder<T> tc) {
final CountDownLatch latch = new CountDownLatch(1);
final GetFuture<CASValue<T>> rv = new GetFuture<>(latch, operationTimeout);

Operation op = opFact.getsAndTouch(key, exp, new GetsOperation.Callback() {
private GetResult<CASValue<T>> val = null;

public void receivedStatus(OperationStatus status) {
rv.set(val, status);
}

public void gotData(String k, int flags, long cas, byte[] data) {
assert key.equals(k) : "Wrong key returned";
assert cas > 0 : "CAS was less than zero: " + cas;
val = new GetsResultImpl<>(cas, new CachedData(flags, data, tc.getMaxSize()), tc);
}

public void complete() {
latch.countDown();
}
});
rv.setOperation(op);
addOp(key, op);
return rv;
}

/**
* Gets (with CAS support) the given key to reset its expiration time asynchronously.
*
* @param key the key to fetch
* @param exp the new expiration to set for the given key
* @return a future that will hold the return value of the fetch
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public GetFuture<CASValue<Object>> asyncGetsAndTouch(final String key, final int exp) {
return asyncGetsAndTouch(key, exp, transcoder);
}

/**
* Gets (with CAS support) with a single key.
*
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/net/spy/memcached/MemcachedClientIF.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ <T> Future<CASValue<T>> asyncGets(String key,

Future<CASValue<Object>> asyncGets(String key);

<T> Future<T> asyncGetAndTouch(final String key, final int exp, final Transcoder<T> tc);

Future<Object> asyncGetAndTouch(final String key, final int exp);

<T> Future<CASValue<T>> asyncGetsAndTouch(final String key, final int exp,
final Transcoder<T> tc);

Future<CASValue<Object>> asyncGetsAndTouch(final String key, final int exp);

<T> CASValue<T> gets(String key, Transcoder<T> tc)
throws OperationTimeoutException;

Expand Down
20 changes: 20 additions & 0 deletions src/main/java/net/spy/memcached/OperationFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,26 @@ public interface OperationFactory {
*/
GetsOperation gets(Collection<String> keys, GetsOperation.Callback cb, boolean isMGet);

/**
* Get the key and resets its timeout.
*
* @param key the key to get a value for and reset its timeout
* @param expiration the new expiration for the key
* @param cb the callback that will contain the result
* @return a new GetAndTouchOperation
*/
GetOperation getAndTouch(String key, int expiration, GetOperation.Callback cb);

/**
* Gets (with CAS support) the key and resets its timeout.
*
* @param key the key to get a value for and reset its timeout
* @param expiration the new expiration for the key
* @param cb the callback that will contain the result
* @return a new GetsAndTouchOperation
*/
GetsOperation getsAndTouch(String key, int expiration, GetsOperation.Callback cb);

/**
* Create a mutator operation.
*
Expand Down
1 change: 1 addition & 0 deletions src/main/java/net/spy/memcached/ops/APIType.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public enum APIType {
INCR(OperationType.WRITE), DECR(OperationType.WRITE),
DELETE(OperationType.WRITE),
GET(OperationType.READ), GETS(OperationType.READ),
GAT(OperationType.WRITE), GATS(OperationType.WRITE),

// List API Type
LOP_CREATE(OperationType.WRITE),
Expand Down
1 change: 1 addition & 0 deletions src/main/java/net/spy/memcached/ops/OperationType.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public enum OperationType {
* BTreeInsertAndGetOperationImpl (asyncBopInsertAndGetTrimmed)
* FlushOperationImpl / FlushByPrefixOperationImpl (flush)
* SetAttrOperationImpl (asyncSetAttr)
* GetAndTouchOperationImpl / GetsAndTouchOperationImpl (asyncGetAndTouch / asyncGetsAndTouch)
*/
WRITE,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ public GetsOperation gets(Collection<String> keys, GetsOperation.Callback cb, bo
return new GetsOperationImpl(keys, cb, isMGet);
}

public GetOperation getAndTouch(String key, int expiration, GetOperation.Callback cb) {
return new GetAndTouchOperationImpl(key, expiration, cb);
}

public GetsOperation getsAndTouch(String key, int expiration, GetsOperation.Callback cb) {
return new GetsAndTouchOperationImpl(key, expiration, cb);
}

public MutatorOperation mutate(Mutator m, String key, int by,
long def, int exp, OperationCallback cb) {
return new MutatorOperationImpl(m, key, by, def, exp, cb);
Expand Down
31 changes: 31 additions & 0 deletions src/main/java/net/spy/memcached/protocol/ascii/BaseGetOpImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ abstract class BaseGetOpImpl extends OperationImpl {
private final String cmd;
private final Collection<String> keys;
private String currentKey = null;
private final int exp;
private long casValue = 0;
private int currentFlags = 0;
private byte[] data = null;
Expand All @@ -55,9 +56,22 @@ public BaseGetOpImpl(String c,
super(cb);
cmd = c;
keys = k;
exp = 0;
setOperationType(OperationType.READ);
}

/**
* For GetAndTouchOperationImpl, GetsAndTouchOperationImpl Only
*/
public BaseGetOpImpl(String c, int e,
OperationCallback cb, Collection<String> k) {
super(cb);
cmd = c;
keys = k;
exp = e;
setOperationType(OperationType.WRITE);
}

/**
* Get the keys this GetOperation is looking for.
*/
Expand All @@ -73,6 +87,12 @@ public final void handleLine(String line) {
...
END\r\n
*/
/* ENABLE_REPLICATION if */
if (hasSwitchedOver(line)) {
prepareSwitchover(line);
return;
}
/* ENABLE_REPLICATION end */
if (line.equals("END")) {
getLogger().debug("Get complete!");
/* ENABLE_MIGRATION if */
Expand Down Expand Up @@ -184,6 +204,14 @@ public final void initialize() {
commandBuilder.append(' ');
commandBuilder.append(keysString);
commandBuilder.append(RN_STRING);
} else if (cmd.equals("gat") || cmd.equals("gats")) {
// syntax: gat || gats <exp> <key>\r\n
commandBuilder.append(cmd);
commandBuilder.append(' ');
commandBuilder.append(exp);
commandBuilder.append(' ');
commandBuilder.append(keysString);
commandBuilder.append(RN_STRING);
} else {
assert (cmd.equals("mget") || cmd.equals("mgets"))
: "Unknown Command " + cmd;
Expand Down Expand Up @@ -231,4 +259,7 @@ public boolean isBulkOperation() {
return keys.size() > 1;
}

public int getExpiration() {
return exp;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* arcus-java-client : Arcus Java client
* Copyright 2010-2014 NAVER Corp.
* Copyright 2014-present JaM2in Co., Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.spy.memcached.protocol.ascii;

import java.util.Collections;

import net.spy.memcached.ops.APIType;
import net.spy.memcached.ops.GetOperation;

/**
* Implementation of the get and touch operation.
*/
class GetAndTouchOperationImpl extends BaseGetOpImpl implements GetOperation {
private static final String CMD = "gat";

public GetAndTouchOperationImpl(String k, int e, GetOperation.Callback cb) {
super(CMD, e, cb, Collections.singleton(k));
setAPIType(APIType.GAT);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* arcus-java-client : Arcus Java client
* Copyright 2010-2014 NAVER Corp.
* Copyright 2014-present JaM2in Co., Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.spy.memcached.protocol.ascii;

import java.util.Collections;

import net.spy.memcached.ops.APIType;
import net.spy.memcached.ops.GetsOperation;

/**
* Implementation of the gets and touch operation.
*/
class GetsAndTouchOperationImpl extends BaseGetOpImpl implements GetsOperation {
private static final String CMD = "gats";

public GetsAndTouchOperationImpl(String k, int e, GetsOperation.Callback cb) {
super(CMD, e, cb, Collections.singleton(k));
setAPIType(APIType.GATS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,16 @@ public GetsOperation gets(Collection<String> keys, GetsOperation.Callback cb, bo
"multiple key gets is not supported in binary protocol yet.");
}

public GetOperation getAndTouch(String key, int expiration, GetOperation.Callback cb) {
throw new RuntimeException(
"GetAndTouchOperation is not supported in binary protocol yet.");
}

public GetsOperation getsAndTouch(String key, int expiration, GetsOperation.Callback cb) {
throw new RuntimeException(
"GetsAndTouchOperation is not supported in binary protocol yet.");
}

public MutatorOperation mutate(Mutator m, String key, int by,
long def, int exp, OperationCallback cb) {
return new MutatorOperationImpl(m, key, by, def, exp, cb);
Expand Down
19 changes: 19 additions & 0 deletions src/test/java/net/spy/memcached/ProtocolBaseCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,25 @@ void testGetStatsCacheDump() throws Exception {
assertTrue(val.matches("\\[acctime=\\d+, exptime=\\d+\\]"), val + "doesn't match");
}

// @Test
// void testGetAndTouch() throws Exception {
// assertNull(client.get("testgat"));
// assertTrue(client.set("testgat", 3, "testgatvalue").get());
// GetFuture<Object> future = client.asyncGetAndTouch("testgat", 5);
// assertNotNull(future.get());
// assertEquals("testgatvalue", future.get());
// }
//
// @Test
// void testGetsAndTouch() throws Exception {
// assertNull(client.get("testgats"));
// assertTrue(client.set("testgats", 3, "testgatsvalue").get());
// GetFuture<CASValue<Object>> future = client.asyncGetsAndTouch("testgats", 5);
// assertNotNull(future.get());
// assertEquals("testgatsvalue", future.get().getValue());
// assertTrue(future.get().getCas() > 0);
// }

@Test
void testDelayedFlush() throws Exception {
assertNull(client.get("test1"));
Expand Down
Loading