Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/main/java/net/spy/memcached/ArcusKetamaNodeLocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class ArcusKetamaNodeLocator extends SpyObject implements NodeLocator {

private final TreeMap<Long, SortedSet<MemcachedNode>> ketamaNodes;
private final Collection<MemcachedNode> allNodes;
private final Collection<MemcachedNode> delayedClosingNodes = new HashSet<MemcachedNode>();

/* ENABLE_MIGRATION if */
private TreeMap<Long, SortedSet<MemcachedNode>> ketamaAlterNodes;
Expand Down Expand Up @@ -226,6 +227,10 @@ public void update(Collection<MemcachedNode> toAttach,
for (MemcachedNode node : toDelete) {
allNodes.remove(node);
removeHash(node);
if (node.hasOp() && node.isActive()) {
delayedClosingNodes.add(node);
continue;
}
try {
node.closeChannel();
} catch (IOException e) {
Expand All @@ -244,6 +249,14 @@ public void update(Collection<MemcachedNode> toAttach,
}
}

public Collection<MemcachedNode> getDelayedClosingNodes() {
return Collections.unmodifiableCollection(delayedClosingNodes);
}

public void updateDelayedClosingNodes(Collection<MemcachedNode> closedNodes) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

메서드명을 removeDelayedClosedNodes 로 변경하면 어떤가요?

delayedClosingNodes.removeAll(closedNodes);
}

private Long getKetamaHashPoint(byte[] digest, int h) {
return ((long) (digest[3 + h * 4] & 0xFF) << 24)
| ((long) (digest[2 + h * 4] & 0xFF) << 16)
Expand Down Expand Up @@ -440,6 +453,10 @@ public void updateAlter(Collection<MemcachedNode> toAttach,
for (MemcachedNode node : toDelete) {
alterNodes.remove(node);
removeHashOfAlter(node);
if (node.hasOp() && node.isActive()) {
delayedClosingNodes.add(node);
continue;
}
try {
node.closeChannel();
} catch (IOException e) {
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/net/spy/memcached/ArcusReplKetamaNodeLocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class ArcusReplKetamaNodeLocator extends SpyObject implements NodeLocator
private final TreeMap<Long, SortedSet<MemcachedReplicaGroup>> ketamaGroups;
private final HashMap<String, MemcachedReplicaGroup> allGroups;
private final Collection<MemcachedNode> allNodes;
private final Collection<MemcachedNode> delayedClosingNodes = new HashSet<MemcachedNode>();

/* ENABLE_MIGRATION if */
private TreeMap<Long, SortedSet<MemcachedReplicaGroup>> ketamaAlterGroups;
Expand Down Expand Up @@ -267,6 +268,10 @@ public void update(Collection<MemcachedNode> toAttach,
for (MemcachedNode node : toDelete) {
allNodes.remove(node);
removeNodeFromGroup(node);
if (node.hasOp() && node.isActive()) {
delayedClosingNodes.add(node);
continue;
}
try {
node.closeChannel();
} catch (IOException e) {
Expand Down Expand Up @@ -303,6 +308,14 @@ public void update(Collection<MemcachedNode> toAttach,
}
}

public Collection<MemcachedNode> getDelayedClosingNodes() {
return Collections.unmodifiableCollection(delayedClosingNodes);
}

public void updateDelayedClosingNodes(Collection<MemcachedNode> closedNodes) {
delayedClosingNodes.removeAll(closedNodes);
}

public void switchoverReplGroup(MemcachedReplicaGroup group) {
lock.lock();
group.changeRole();
Expand Down Expand Up @@ -573,6 +586,10 @@ public void updateAlter(Collection<MemcachedNode> toAttach,
removeHashOfAlter(mrg);
}
}
if (node.hasOp() && node.isActive()) {
delayedClosingNodes.add(node);
continue;
}
try {
node.closeChannel();
} catch (IOException e) {
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/net/spy/memcached/ArrayModNodeLocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;

Expand Down Expand Up @@ -76,6 +77,14 @@ public void update(Collection<MemcachedNode> toAttach, Collection<MemcachedNode>
throw new UnsupportedOperationException("update not supported");
}

public Collection<MemcachedNode> getDelayedClosingNodes() {
return new HashSet<MemcachedNode>();
}

public void updateDelayedClosingNodes(Collection<MemcachedNode> closedNodes) {
// do NOT throw UnsupportedOperationException here for test codes.
}

/* ENABLE_MIGRATION if */
public Collection<MemcachedNode> getAlterAll() {
return new ArrayList<MemcachedNode>();
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/net/spy/memcached/KetamaNodeLocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -148,6 +149,14 @@ public void update(Collection<MemcachedNode> toAttach, Collection<MemcachedNode>
throw new UnsupportedOperationException("update not supported");
}

public Collection<MemcachedNode> getDelayedClosingNodes() {
return new HashSet<MemcachedNode>();
}

public void updateDelayedClosingNodes(Collection<MemcachedNode> closedNodes) {
// do NOT throw UnsupportedOperationException here for test codes.
}

public SortedMap<Long, MemcachedNode> getKetamaNodes() {
return Collections.unmodifiableSortedMap(ketamaNodes);
}
Expand Down
55 changes: 51 additions & 4 deletions src/main/java/net/spy/memcached/MemcachedConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ public void handleIO() throws IOException {
}
/* ENABLE_REPLICATION end */

// Deal with the memcached nodes that removed from ZK but has operation in queue.
handleDelayedClosingNodes();

// Deal with the memcached server group that's been added by CacheManager.
handleCacheNodesChange();

Expand All @@ -330,12 +333,18 @@ private void handleNodesToRemove(final List<MemcachedNode> nodesToRemove) {
}
/* ENABLE_MIGRATION end */

if (node.isActive()) {
// if a memcached node is removed from ZK but can still serve operations, do NOT cancel it.
// operations that remain in operation queue will be processed until connection is lost.
// once all remaining operations are processed, client will close connection.
// if connection is lost before remaining operations are processed,
// all of them will be canceled after connection is lost.
continue;
}

// removing node is not related to failure mode.
// so, cancel operations regardless of failure mode.
String cause = "node removed.";
cancelOperations(node.destroyReadQueue(false), cause);
cancelOperations(node.destroyWriteQueue(false), cause);
cancelOperations(node.destroyInputQueue(), cause);
cancelAllOperations(node, "node removed.");
}
}

Expand Down Expand Up @@ -680,6 +689,38 @@ public void complete() {
addOperation(node, op);
}

// Handle the memcached nodes that removed from ZK but has operation in queue.
void handleDelayedClosingNodes() {
Collection<MemcachedNode> closingNodes = locator.getDelayedClosingNodes();
if (closingNodes.isEmpty()) {
return;
}

Collection<MemcachedNode> closedNodes = new HashSet<MemcachedNode>();
for (MemcachedNode node : closingNodes) {
boolean isActive = node.isActive();
boolean hasOp = node.hasOp();

if (isActive && !hasOp) {
try {
node.closeChannel();
} catch (IOException e) {
getLogger().error("Failed to closeChannel the node : " + node);
}
} else if (!isActive && hasOp) {
cancelAllOperations(node, "connection lost after node removed.");
} else {
continue;
}

closedNodes.add(node);
}

if (!closedNodes.isEmpty()) {
locator.updateDelayedClosingNodes(closedNodes);
}
}

// Handle the memcached server group that's been added by CacheManager.
void handleCacheNodesChange() throws IOException {
/* ENABLE_MIGRATION if */
Expand Down Expand Up @@ -1225,6 +1266,12 @@ private void cancelOperations(Collection<Operation> ops, String cause) {
}
}

private void cancelAllOperations(MemcachedNode node, String cause) {
cancelOperations(node.destroyReadQueue(false), cause);
cancelOperations(node.destroyWriteQueue(false), cause);
cancelOperations(node.destroyInputQueue(), cause);
}

private void redistributeOperations(Collection<Operation> ops, String cause) {
for (Operation op : ops) {
if (op instanceof KeyedOperation) {
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/net/spy/memcached/MemcachedNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public interface MemcachedNode {
*/
boolean hasWriteOp();

/**
* True if any operation is in operation queue.
*/
boolean hasOp();

/**
* Add an operation to the queue. Authentication operations should
* never be added to the queue, but this is not checked.
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/net/spy/memcached/MemcachedNodeROImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ public boolean hasWriteOp() {
return root.hasReadOp();
}

public boolean hasOp() {
return root.hasOp();
}

public boolean isActive() {
return root.isActive();
}
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/net/spy/memcached/NodeLocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ public interface NodeLocator {
*/
void update(Collection<MemcachedNode> toAttach, Collection<MemcachedNode> toDelete);

/**
* Get all memcached nodes that removed from ZK but has operation in queue.
* Note that this feature is only available in ArcusKetamaNodeLocator.
*/
Collection<MemcachedNode> getDelayedClosingNodes();

/**
* Update all memcached nodes that removed from ZK but has operation in queue.
* Note that this feature is only available in ArcusKetamaNodeLocator.
*/
void updateDelayedClosingNodes(Collection<MemcachedNode> closedNodes);

/* ENABLE_MIGRATION if */
/**
* Get all alter memcached nodes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,10 @@ public final boolean hasWriteOp() {
return !(optimizedOp == null && writeQ.isEmpty());
}

public final boolean hasOp() {
return hasReadOp() || hasWriteOp() || !inputQueue.isEmpty();
}

public final void addOpToInputQ(Operation op) {
op.setHandlingNode(this);
op.initialize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void testReadOnliness() throws Exception {
Set<String> acceptable = new HashSet<String>(Arrays.asList(
"toString", "getSocketAddress", "getBytesRemainingToWrite",
"getReconnectCount", "getSelectionOps", "getNodeName", "hasReadOp",
"hasWriteOp", "isActive", "isFirstConnecting"));
"hasWriteOp", "hasOp", "isActive", "isFirstConnecting"));

for (Method meth : MemcachedNode.class.getMethods()) {
if (acceptable.contains(meth.getName())) {
Expand Down
4 changes: 4 additions & 0 deletions src/test/java/net/spy/memcached/MockMemcachedNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ public boolean hasWriteOp() {
return false;
}

public boolean hasOp() {
return false;
}

public void addOpToInputQ(Operation op) {
// noop
}
Expand Down