diff --git a/src/main/java/net/spy/memcached/ArcusKetamaNodeLocator.java b/src/main/java/net/spy/memcached/ArcusKetamaNodeLocator.java index e0b586786..1c0bd54cf 100644 --- a/src/main/java/net/spy/memcached/ArcusKetamaNodeLocator.java +++ b/src/main/java/net/spy/memcached/ArcusKetamaNodeLocator.java @@ -42,6 +42,7 @@ public final class ArcusKetamaNodeLocator extends SpyObject implements NodeLocat private final TreeMap> ketamaNodes; private final Collection allNodes; + private final Collection delayedClosingNodes = new HashSet<>(); /* ENABLE_MIGRATION if */ private TreeMap> ketamaAlterNodes; @@ -238,6 +239,10 @@ public void update(Collection toAttach, for (MemcachedNode node : toDelete) { allNodes.remove(node); removeHash(node); + if (node.hasOp() && node.isActive()) { + delayedClosingNodes.add(node); + continue; + } try { node.closeChannel(); } catch (IOException e) { @@ -256,6 +261,14 @@ public void update(Collection toAttach, } } + public Collection getDelayedClosingNodes() { + return Collections.unmodifiableCollection(delayedClosingNodes); + } + + public void updateDelayedClosingNodes(Collection closedNodes) { + delayedClosingNodes.removeAll(closedNodes); + } + private Long getKetamaHashPoint(byte[] digest, int h) { return ((long) (digest[3 + h * 4] & 0xFF) << 24) | ((long) (digest[2 + h * 4] & 0xFF) << 16) @@ -452,6 +465,10 @@ public void updateAlter(Collection toAttach, for (MemcachedNode node : toDelete) { alterNodes.remove(node); removeHashOfAlter(node); + if (node.hasOp() && node.isActive()) { + delayedClosingNodes.add(node); + continue; + } try { node.closeChannel(); } catch (IOException e) { diff --git a/src/main/java/net/spy/memcached/ArcusReplKetamaNodeLocator.java b/src/main/java/net/spy/memcached/ArcusReplKetamaNodeLocator.java index 8307d7299..4447489d8 100644 --- a/src/main/java/net/spy/memcached/ArcusReplKetamaNodeLocator.java +++ b/src/main/java/net/spy/memcached/ArcusReplKetamaNodeLocator.java @@ -44,6 +44,7 @@ public final class ArcusReplKetamaNodeLocator extends SpyObject implements NodeL private final TreeMap> ketamaGroups; private final ConcurrentHashMap allGroups; private final Collection allNodes; + private final Collection delayedClosingNodes = new HashSet<>(); /* ENABLE_MIGRATION if */ private TreeMap> ketamaAlterGroups; @@ -263,6 +264,10 @@ public void update(Collection toAttach, for (MemcachedNode node : toDelete) { allNodes.remove(node); removeNodeFromGroup(node); + if (node.hasOp() && node.isActive()) { + delayedClosingNodes.add(node); + continue; + } try { node.closeChannel(); } catch (IOException e) { @@ -299,6 +304,14 @@ public void update(Collection toAttach, } } + public Collection getDelayedClosingNodes() { + return Collections.unmodifiableCollection(delayedClosingNodes); + } + + public void updateDelayedClosingNodes(Collection closedNodes) { + delayedClosingNodes.removeAll(closedNodes); + } + public void switchoverReplGroup(MemcachedReplicaGroup group) { lock.lock(); group.changeRole(); @@ -590,6 +603,10 @@ public void updateAlter(Collection toAttach, removeHashOfAlter(mrg); } } + if (node.hasOp() && node.isActive()) { + delayedClosingNodes.add(node); + continue; + } try { node.closeChannel(); } catch (IOException e) { diff --git a/src/main/java/net/spy/memcached/ArrayModNodeLocator.java b/src/main/java/net/spy/memcached/ArrayModNodeLocator.java index 1f21e201d..85b4d94da 100644 --- a/src/main/java/net/spy/memcached/ArrayModNodeLocator.java +++ b/src/main/java/net/spy/memcached/ArrayModNodeLocator.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -76,6 +77,14 @@ public void update(Collection toAttach, Collection throw new UnsupportedOperationException("update not supported"); } + public Collection getDelayedClosingNodes() { + return new HashSet(); + } + + public void updateDelayedClosingNodes(Collection closedNodes) { + // do NOT throw UnsupportedOperationException here for test codes. + } + /* ENABLE_MIGRATION if */ public Collection getAlterAll() { return new ArrayList<>(); diff --git a/src/main/java/net/spy/memcached/KetamaNodeLocator.java b/src/main/java/net/spy/memcached/KetamaNodeLocator.java index d0428e70d..524352e3b 100644 --- a/src/main/java/net/spy/memcached/KetamaNodeLocator.java +++ b/src/main/java/net/spy/memcached/KetamaNodeLocator.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -144,6 +145,14 @@ public void update(Collection toAttach, Collection throw new UnsupportedOperationException("update not supported"); } + public Collection getDelayedClosingNodes() { + return new HashSet(); + } + + public void updateDelayedClosingNodes(Collection closedNodes) { + // do NOT throw UnsupportedOperationException here for test codes. + } + public SortedMap getKetamaNodes() { return Collections.unmodifiableSortedMap(ketamaNodes); } diff --git a/src/main/java/net/spy/memcached/MemcachedConnection.java b/src/main/java/net/spy/memcached/MemcachedConnection.java index e7bc9e544..17cb43757 100644 --- a/src/main/java/net/spy/memcached/MemcachedConnection.java +++ b/src/main/java/net/spy/memcached/MemcachedConnection.java @@ -297,6 +297,9 @@ public void handleIO() throws IOException { } } + // Deal with the memcached nodes that removed from ZK but has operation in queue. + handleDelayedClosingNodes(); + // Deal with the memcached server group that's been added by CacheManager. handleCacheNodesChange(); @@ -323,12 +326,18 @@ private void handleNodesToRemove(final List nodesToRemove) { } /* ENABLE_MIGRATION end */ + if (node.isActive()) { + // if a memcached node is removed from ZK but can still serve operations, do NOT cancel it. + // operations that remain in operation queue will be processed until connection is lost. + // once all remaining operations are processed, client will close connection. + // if connection is lost before remaining operations are processed, + // all of them will be canceled after connection is lost. + continue; + } + // removing node is not related to failure mode. // so, cancel operations regardless of failure mode. - String cause = "node removed."; - cancelOperations(node.destroyReadQueue(false), cause); - cancelOperations(node.destroyWriteQueue(false), cause); - cancelOperations(node.destroyInputQueue(), cause); + cancelAllOperations(node, "node removed."); } } @@ -706,6 +715,39 @@ public void complete() { getLogger().debug("Added %s to writeQ of %s", op, node); } + // Handle the memcached nodes that removed from ZK but has operation in queue. + void handleDelayedClosingNodes() { + Collection closingNodes = locator.getDelayedClosingNodes(); + if (closingNodes.isEmpty()) { + return; + } + + Collection closedNodes = new HashSet<>(); + for (MemcachedNode node : closingNodes) { + boolean isConnected = node.isConnected(); + boolean hasOp = node.hasOp(); + + if (isConnected && !hasOp) { + try { + node.closeChannel(); + } catch (IOException e) { + getLogger().error("Failed to closeChannel the node : " + node); + } + } else if (!isConnected && hasOp) { + cancelAllOperations(node, "connection lost after node removed."); + } else { + addedQueue.offer(node); + continue; + } + + closedNodes.add(node); + } + + if (!closedNodes.isEmpty()) { + locator.updateDelayedClosingNodes(closedNodes); + } + } + // Handle the memcached server group that's been added by CacheManager. void handleCacheNodesChange() throws IOException { /* ENABLE_MIGRATION if */ @@ -1279,6 +1321,12 @@ private void cancelOperations(Collection ops, String cause) { } } + private void cancelAllOperations(MemcachedNode node, String cause) { + cancelOperations(node.destroyReadQueue(false), cause); + cancelOperations(node.destroyWriteQueue(false), cause); + cancelOperations(node.destroyInputQueue(), cause); + } + private void redistributeOperations(Collection ops, String cause) { for (Operation op : ops) { if (op instanceof KeyedOperation) { diff --git a/src/main/java/net/spy/memcached/MemcachedNode.java b/src/main/java/net/spy/memcached/MemcachedNode.java index 97d39634e..4a0ad45fb 100644 --- a/src/main/java/net/spy/memcached/MemcachedNode.java +++ b/src/main/java/net/spy/memcached/MemcachedNode.java @@ -105,6 +105,11 @@ public interface MemcachedNode { */ boolean hasWriteOp(); + /** + * True if any operation is in operation queue. + */ + boolean hasOp(); + /** * Add an operation to the queue. Authentication operations should * never be added to the queue, but this is not checked. diff --git a/src/main/java/net/spy/memcached/MemcachedNodeROImpl.java b/src/main/java/net/spy/memcached/MemcachedNodeROImpl.java index 528308036..d16546fc7 100644 --- a/src/main/java/net/spy/memcached/MemcachedNodeROImpl.java +++ b/src/main/java/net/spy/memcached/MemcachedNodeROImpl.java @@ -130,6 +130,10 @@ public boolean isConnected() { return root.isConnected(); } + public boolean hasOp() { + return root.hasOp(); + } + public boolean isActive() { return root.isActive(); } diff --git a/src/main/java/net/spy/memcached/NodeLocator.java b/src/main/java/net/spy/memcached/NodeLocator.java index 6cd4f97de..33ad67919 100644 --- a/src/main/java/net/spy/memcached/NodeLocator.java +++ b/src/main/java/net/spy/memcached/NodeLocator.java @@ -58,6 +58,18 @@ public interface NodeLocator { */ void update(Collection toAttach, Collection toDelete); + /** + * Get all memcached nodes that removed from ZK but has operation in queue. + * Note that this feature is only available in ArcusKetamaNodeLocator. + */ + Collection getDelayedClosingNodes(); + + /** + * Update all memcached nodes that removed from ZK but has operation in queue. + * Note that this feature is only available in ArcusKetamaNodeLocator. + */ + void updateDelayedClosingNodes(Collection closedNodes); + /* ENABLE_MIGRATION if */ /** * Get all alter memcached nodes. diff --git a/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java b/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java index 681507fec..632e2cdfe 100644 --- a/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java +++ b/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java @@ -292,6 +292,10 @@ public final boolean hasWriteOp() { return !(optimizedOp == null && writeQ.isEmpty()); } + public final boolean hasOp() { + return hasReadOp() || hasWriteOp() || !inputQueue.isEmpty(); + } + public final void addOpToInputQ(Operation op) { op.setHandlingNode(this); op.initialize(); diff --git a/src/test/java/net/spy/memcached/MemcachedNodeROImplTest.java b/src/test/java/net/spy/memcached/MemcachedNodeROImplTest.java index fc84c541b..45e7a1c72 100644 --- a/src/test/java/net/spy/memcached/MemcachedNodeROImplTest.java +++ b/src/test/java/net/spy/memcached/MemcachedNodeROImplTest.java @@ -56,7 +56,7 @@ void testReadOnliness() throws Exception { Set acceptable = new HashSet<>(Arrays.asList( "toString", "getSocketAddress", "getBytesRemainingToWrite", "getReconnectCount", "getSelectionOps", "getNodeName", "hasReadOp", - "hasWriteOp", "isActive", "isConnected")); + "hasWriteOp", "hasOp", "isActive", "isConnected")); for (Method meth : MemcachedNode.class.getMethods()) { if (acceptable.contains(meth.getName())) { diff --git a/src/test/java/net/spy/memcached/MockMemcachedNode.java b/src/test/java/net/spy/memcached/MockMemcachedNode.java index afda4fdff..6f83c8afc 100644 --- a/src/test/java/net/spy/memcached/MockMemcachedNode.java +++ b/src/test/java/net/spy/memcached/MockMemcachedNode.java @@ -100,6 +100,10 @@ public boolean hasWriteOp() { return false; } + public boolean hasOp() { + return false; + } + public void addOpToInputQ(Operation op) { // noop }