Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions src/main/java/net/spy/memcached/ArcusKetamaNodeLocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public final class ArcusKetamaNodeLocator extends SpyObject implements NodeLocat
private final ArcusKetamaNodeLocatorConfiguration config;

private final Lock lock = new ReentrantLock();
private final boolean enableShardKey;

public ArcusKetamaNodeLocator(List<MemcachedNode> nodes) {
this(nodes, new ArcusKetamaNodeLocatorConfiguration());
Expand All @@ -69,6 +70,7 @@ public ArcusKetamaNodeLocator(List<MemcachedNode> nodes,
allNodes = nodes;
ketamaNodes = new TreeMap<>();
config = conf;
enableShardKey = conf.isShardKeyEnabled();

int numReps = config.getNodeRepetitions();
// Ketama does some special work with md5 where it reuses chunks.
Expand All @@ -94,6 +96,7 @@ private ArcusKetamaNodeLocator(TreeMap<Long, SortedSet<MemcachedNode>> smn,
ketamaNodes = smn;
allNodes = an;
config = conf;
enableShardKey = conf.isShardKeyEnabled();

/* ENABLE_MIGRATION if */
existNodes = new HashSet<>();
Expand Down Expand Up @@ -151,7 +154,29 @@ public SortedMap<Long, SortedSet<MemcachedNode>> getKetamaNodes() {
}

public MemcachedNode getPrimary(final String k) {
return getNodeForKey(hashAlg.hash(k));
String shardKey = getShardKey(k);
return getNodeForKey(hashAlg.hash(shardKey));
}

String getShardKey(String key) {
if (!enableShardKey) {
return key;
}

if (key == null) {
return null;
}

int left = key.indexOf('{');
if (left == -1) {
return key;
}
int right = key.indexOf('}', left + 1);
if (right == -1 || right == left + 1) {
return key;
}

return key.substring(left + 1, right);
}

MemcachedNode getNodeForKey(long hash) {
Expand Down Expand Up @@ -565,9 +590,10 @@ class KetamaIterator implements Iterator<MemcachedNode> {

public KetamaIterator(final String k, final int t) {
super();
hashVal = hashAlg.hash(k);
String shardKey = getShardKey(k);
hashVal = hashAlg.hash(shardKey);
remainingTries = t;
key = k;
key = shardKey;
}

private void nextHash() {
Expand Down
49 changes: 40 additions & 9 deletions src/main/java/net/spy/memcached/ArcusReplKetamaNodeLocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,19 @@ public final class ArcusReplKetamaNodeLocator extends SpyObject implements NodeL

private final Collection<MemcachedReplicaGroup> toDeleteGroups;
private final HashAlgorithm hashAlg = HashAlgorithm.KETAMA_HASH;
private final ArcusReplKetamaNodeLocatorConfiguration config
= new ArcusReplKetamaNodeLocatorConfiguration();
private final ArcusReplKetamaNodeLocatorConfiguration config;

private final Lock lock = new ReentrantLock();
private final boolean enableShardKey;

public ArcusReplKetamaNodeLocator(List<MemcachedNode> nodes) {
public ArcusReplKetamaNodeLocator(List<MemcachedNode> nodes,
ArcusReplKetamaNodeLocatorConfiguration conf) {
super();
allNodes = nodes;
ketamaGroups = new TreeMap<>();
allGroups = new ConcurrentHashMap<>();
config = conf;
enableShardKey = conf.isShardKeyEnabled();

// create all memcached replica group
for (MemcachedNode node : nodes) {
Expand Down Expand Up @@ -105,12 +108,15 @@ public ArcusReplKetamaNodeLocator(List<MemcachedNode> nodes) {

private ArcusReplKetamaNodeLocator(TreeMap<Long, SortedSet<MemcachedReplicaGroup>> kg,
ConcurrentHashMap<String, MemcachedReplicaGroup> ag,
Collection<MemcachedNode> an) {
Collection<MemcachedNode> an,
ArcusReplKetamaNodeLocatorConfiguration conf) {
super();
ketamaGroups = kg;
allGroups = ag;
allNodes = an;
toDeleteGroups = new HashSet<>();
config = conf;
enableShardKey = conf.isShardKeyEnabled();

/* ENABLE_MIGRATION if */
alterNodes = new HashSet<>();
Expand Down Expand Up @@ -172,11 +178,13 @@ public Collection<MemcachedNode> getMasterNodes() {
}

public MemcachedNode getPrimary(final String k) {
return getNodeForKey(hashAlg.hash(k), ReplicaPick.MASTER);
String shardKey = getShardKey(k);
return getNodeForKey(hashAlg.hash(shardKey), ReplicaPick.MASTER);
}

public MemcachedNode getPrimary(final String k, ReplicaPick pick) {
return getNodeForKey(hashAlg.hash(k), pick);
String shardKey = getShardKey(k);
return getNodeForKey(hashAlg.hash(shardKey), pick);
}

private MemcachedNode getNodeForKey(long hash, ReplicaPick pick) {
Expand Down Expand Up @@ -231,7 +239,7 @@ public NodeLocator getReadonlyCopy() {
nodesCopy.add(new MemcachedNodeROImpl(node));
}

return new ArcusReplKetamaNodeLocator(ketamaCopy, groupsCopy, nodesCopy);
return new ArcusReplKetamaNodeLocator(ketamaCopy, groupsCopy, nodesCopy, config);
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -297,6 +305,27 @@ public void switchoverReplGroup(MemcachedReplicaGroup group) {
lock.unlock();
}

String getShardKey(String key) {
if (!enableShardKey) {
return key;
}

if (key == null) {
return null;
}

int left = key.indexOf('{');
if (left == -1) {
return key;
}
int right = key.indexOf('}', left + 1);
if (right == -1 || right == left + 1) {
return key;
}

return key.substring(left + 1, right);
}

private void insertNodeIntoGroup(MemcachedNode node) {
/* ENABLE_MIGRATION if */
if (migrationInProgress) {
Expand Down Expand Up @@ -716,9 +745,11 @@ private class ReplKetamaIterator implements Iterator<MemcachedNode> {

public ReplKetamaIterator(final String k, ReplicaPick p, final int t) {
super();
hashVal = hashAlg.hash(k);

String shardKey = getShardKey(k);
hashVal = hashAlg.hash(shardKey);
remainingTries = t;
key = k;
key = shardKey;
pick = p;
}

Expand Down
5 changes: 5 additions & 0 deletions src/main/java/net/spy/memcached/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ MemcachedNode createMemcachedNode(String name,
*/
boolean getDnsCacheTtlCheck();

/**
* If true, the shard key logic will be used for hashing.
*/
boolean isShardKeyEnabled();

/**
* Observers that should be established at the time of connection
* instantiation.
Expand Down
23 changes: 21 additions & 2 deletions src/main/java/net/spy/memcached/ConnectionFactoryBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import net.spy.memcached.protocol.ascii.AsciiOperationFactory;
import net.spy.memcached.protocol.binary.BinaryOperationFactory;
import net.spy.memcached.transcoders.Transcoder;
import net.spy.memcached.util.ArcusKetamaNodeLocatorConfiguration;
import net.spy.memcached.util.ArcusReplKetamaNodeLocatorConfiguration;

/**
* Builder for more easily configuring a ConnectionFactory.
Expand Down Expand Up @@ -61,6 +63,7 @@ public class ConnectionFactoryBuilder {
private boolean useNagle = false;
private boolean keepAlive = false;
private boolean dnsCacheTtlCheck = true;
private boolean enableShardKey = false;
private long maxReconnectDelay = 1;

private int readBufSize = -1;
Expand Down Expand Up @@ -493,6 +496,11 @@ public ConnectionFactoryBuilder setDnsCacheTtlCheck(boolean dnsCacheTtlCheck) {
return this;
}

public ConnectionFactoryBuilder enableShardKey(boolean shardKey) {
this.enableShardKey = shardKey;
return this;
}

/**
* Get the ConnectionFactory set up with the provided parameters.
*/
Expand Down Expand Up @@ -547,10 +555,16 @@ public NodeLocator createLocator(List<MemcachedNode> nodes) {
// This locator uses ArcusReplKetamaNodeLocatorConfiguration
// which builds keys off the server's group name, not
// its ip:port.
return new ArcusReplKetamaNodeLocator(nodes);
ArcusReplKetamaNodeLocatorConfiguration conf =
new ArcusReplKetamaNodeLocatorConfiguration();
conf.enableShardKey(enableShardKey);
return new ArcusReplKetamaNodeLocator(nodes, conf);
}
/* ENABLE_REPLICATION end */
return new ArcusKetamaNodeLocator(nodes);
ArcusKetamaNodeLocatorConfiguration conf =
new ArcusKetamaNodeLocatorConfiguration();
conf.enableShardKey(enableShardKey);
return new ArcusKetamaNodeLocator(nodes, conf);
default:
throw new IllegalStateException(
"Unhandled locator type: " + locator);
Expand Down Expand Up @@ -627,6 +641,11 @@ public boolean getDnsCacheTtlCheck() {
return dnsCacheTtlCheck;
}

@Override
public boolean isShardKeyEnabled() {
return enableShardKey;
}

@Override
public long getMaxReconnectDelay() {
return maxReconnectDelay;
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/net/spy/memcached/DefaultConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,11 @@ public boolean getKeepAlive() {
return false;
}

@Override
public boolean isShardKeyEnabled() {
return false;
}

@Override
public boolean getDnsCacheTtlCheck() {
return true;
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/net/spy/memcached/KeyValidator.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public void validateKey(String key) {
}
if (!(('a' <= b && b <= 'z') || ('A' <= b && b <= 'Z') ||
('0' <= b && b <= '9') ||
(b == '_') || (b == '-') || (b == '+') || (b == '.'))) {
(b == '_') || (b == '-') || (b == '+') || (b == '.') ||
(b == '{') || (b == '}'))) {
throw new IllegalArgumentException(
"Key contains invalid prefix: ``" + key + "''");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
public class ArcusKetamaNodeLocatorConfiguration extends
DefaultKetamaNodeLocatorConfiguration {

private boolean enableShardKey = false;

/**
* insert a node from the internal node-address map.
*
Expand All @@ -41,6 +43,23 @@ public void insertNode(MemcachedNode node) {
public void removeNode(MemcachedNode node) {
super.socketAddresses.remove(node);
}
/**
* Returns whether the shard key feature is enabled.
*
* @return true if the shard key feature is enabled; false otherwise.
*/
public boolean isShardKeyEnabled() {
return enableShardKey;
}

/**
* Sets the enable status of the shard key feature.
*
* @param useShardKey true to enable the shard key feature; false to disable.
*/
public void enableShardKey(boolean useShardKey) {
enableShardKey = useShardKey;
}

public class NodeNameComparator implements Comparator<MemcachedNode> {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,25 @@ public class ArcusReplKetamaNodeLocatorConfiguration implements
KetamaNodeLocatorConfiguration {

private static final int NUM_REPS = 160;
private boolean enableShardKey = false;

/**
* Returns whether the shard key feature is enabled.
*
* @return true if the shard key feature is enabled; false otherwise.
*/
public boolean isShardKeyEnabled() {
return enableShardKey;
}

/**
* Sets the enable status of the shard key feature.
*
* @param useShardKey true to enable the shard key feature; false to disable.
*/
public void enableShardKey(boolean useShardKey) {
enableShardKey = useShardKey;
}

public String getKeyForNode(MemcachedNode node, int repetition) {
ArcusReplNodeAddress addr = (ArcusReplNodeAddress) node.getSocketAddress();
Expand Down
8 changes: 6 additions & 2 deletions src/test/java/net/spy/memcached/ArcusKetamaHashingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import java.util.SortedMap;
import java.util.SortedSet;

import net.spy.memcached.util.ArcusKetamaNodeLocatorConfiguration;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -75,8 +77,10 @@ private void runThisManyNodes(List<String> stringNode1, List<String> stringNode2
MemcachedNode oddManOut = larger.get(larger.size() - 1);
assertFalse(smaller.contains(oddManOut));

ArcusKetamaNodeLocator lgLocator = new ArcusKetamaNodeLocator(larger);
ArcusKetamaNodeLocator smLocator = new ArcusKetamaNodeLocator(smaller);
ArcusKetamaNodeLocator lgLocator =
new ArcusKetamaNodeLocator(larger, new ArcusKetamaNodeLocatorConfiguration());
ArcusKetamaNodeLocator smLocator =
new ArcusKetamaNodeLocator(smaller, new ArcusKetamaNodeLocatorConfiguration());

SortedMap<Long, SortedSet<MemcachedNode>> lgMap = lgLocator.getKetamaNodes();
SortedMap<Long, SortedSet<MemcachedNode>> smMap = smLocator.getKetamaNodes();
Expand Down
Loading