From 5708c0cf9cf10a88194399542781bffc164ff27b Mon Sep 17 00:00:00 2001 From: Arunachalam Thirupathi Date: Tue, 29 Jul 2014 13:59:53 -0700 Subject: [PATCH] Threshold failure detector issue 1) Threshold failure detector, marks a node as available when it receives a non-catastrophic error after configured number of catastrophic errors. When a node goes unavailable, generally first there will be lots of connectExceptions ( catastrophic) and timeouts for already established connections. This causes failure detector to treat a node that is being down as up and affects the client latency as all the get waits for the connection timeout to happen and goes for the serial requests. 2) Threshold failure detector, marks a node as unavailable after a window rolls over. Threshold failure detector tracks successes/failures in window ( default 5 minutes) and if the successes drop below a configured ( defaul 95 ) percentage and if there are more errors than configured it marks the node as down. When the window rolls over, and if the first request succeeds or when a node comes back up the failure count is not reset. This causes an available node to be marked as down. 3) Enhance the unit tests to cover the above 2 cases. 4) Dump the statistics when a node goes down to reason about it in the logs. The new code is written with the following reasoning. Do the book keeping first. Make decision next. The new code intentionally does not reset the catastrophic errors on a window roll over as it will be reset by the first success anyway. The node can still flap when the failure is above minimum and the percentage oscillates around the configured percentage. But I don't see any good workaround and nor it was an actual issue on 20+ repetitions I created for an internal repro. So leaving them as it is. Previous code mixed both of these and lead to many issues. The previous code left many loose ends ( Boolean represented in int, Generic set methods, when it required reset, Set methods at individual variable level, when only reset method is required, copy pasted code). PS: There is a more serious third issue, Selector drains the parallel queue. When the request requires connection establishment Selector handles the connection too. If the node is dead Selector is going to wait configured time for connection to timeout ( default 5 seconds) . In this time Selector is not pumping read/write and hence all these requests eventually time out. We are discussing the potential issues for this fix and will address this in the next fixes. There are many other minor issues I uncovered as well. --- .../failuredetector/AbstractFailureDetector.java | 16 ++- .../failuredetector/FailureDetectorConfig.java | 9 +- .../cluster/failuredetector/NodeStatus.java | 48 ++++----- .../failuredetector/ThresholdFailureDetector.java | 108 +++++++++++---------- .../common/voldemort/FailureDetectorTestUtils.java | 2 +- .../ThresholdFailureDetectorTest.java | 80 ++++++++++----- 6 files changed, 152 insertions(+), 111 deletions(-) diff --git a/src/java/voldemort/cluster/failuredetector/AbstractFailureDetector.java b/src/java/voldemort/cluster/failuredetector/AbstractFailureDetector.java index a942c70ed..572e92303 100644 --- a/src/java/voldemort/cluster/failuredetector/AbstractFailureDetector.java +++ b/src/java/voldemort/cluster/failuredetector/AbstractFailureDetector.java @@ -169,7 +169,7 @@ public abstract class AbstractFailureDetector implements FailureDetector { logger.info("Node " + node.getId() + " now available"); synchronized(nodeStatus) { - nodeStatus.setNumConsecutiveCatastrophicErrors(0); + nodeStatus.resetNumConsecutiveCatastrophicErrors(); if(logger.isTraceEnabled()) { logger.trace("Resetting # consecutive connect errors for node : " + node); } @@ -205,8 +205,18 @@ public abstract class AbstractFailureDetector implements FailureDetector { // If we were previously available, we've just switched state from // available to unavailable, so notify any listeners. if(previouslyAvailable) { - if(logger.isInfoEnabled()) - logger.info("Node " + node.getId() + " now unavailable"); + if(logger.isEnabledFor(Level.WARN)) { + long elapsedMs = System.currentTimeMillis() - nodeStatus.getStartMillis(); + logger.warn("Node " + node.getId() + " now unavailable . Node metrics. " + + " Catastrophic errors " + + nodeStatus.getNumConsecutiveCatastrophicErrors() + " Successes " + + nodeStatus.getSuccess() + " Failure " + nodeStatus.getFailure() + + " Total " + nodeStatus.getTotal() + " Threshold success percentage " + + failureDetectorConfig.getThreshold() + " Threshold Minimum errors " + + failureDetectorConfig.getThresholdCountMinimum() + + " Threshold Interval " + failureDetectorConfig.getThresholdInterval() + + " Interval elapsed ms " + elapsedMs); + } for(FailureDetectorListener fdl: listeners.keySet()) { try { diff --git a/src/java/voldemort/cluster/failuredetector/FailureDetectorConfig.java b/src/java/voldemort/cluster/failuredetector/FailureDetectorConfig.java index f280bccb6..e9cc58b9a 100644 --- a/src/java/voldemort/cluster/failuredetector/FailureDetectorConfig.java +++ b/src/java/voldemort/cluster/failuredetector/FailureDetectorConfig.java @@ -86,7 +86,7 @@ public class FailureDetectorConfig { protected Time time = SystemTime.INSTANCE; - protected int maximumTolerableFatalFailures; + protected int maximumTolerableFatalFailures = DEFAULT_MAX_TOLERABLE_FATAL_FAILURES; private Cluster cluster = null; @@ -633,8 +633,13 @@ public class FailureDetectorConfig { * @param maximumTolerableFatalFailures #fatal failures acceptable before * node is marked as unavailable */ - public void setMaximumTolerableFatalFailures(int maximumTolerableFatalFailures) { + public FailureDetectorConfig setMaximumTolerableFatalFailures(int maximumTolerableFatalFailures) { + if(maximumTolerableFatalFailures <= 0) { + throw new IllegalArgumentException(" Catastrophic error limit should be greater than zero. Current value " + + maximumTolerableFatalFailures); + } this.maximumTolerableFatalFailures = maximumTolerableFatalFailures; + return this; } /** diff --git a/src/java/voldemort/cluster/failuredetector/NodeStatus.java b/src/java/voldemort/cluster/failuredetector/NodeStatus.java index 359098256..d62520095 100644 --- a/src/java/voldemort/cluster/failuredetector/NodeStatus.java +++ b/src/java/voldemort/cluster/failuredetector/NodeStatus.java @@ -40,7 +40,7 @@ class NodeStatus { private long total; - private AtomicInteger numConsecutiveCatastrophicErrors; + private final AtomicInteger numConsecutiveCatastrophicErrors; public NodeStatus() { numConsecutiveCatastrophicErrors = new AtomicInteger(0); @@ -74,48 +74,42 @@ class NodeStatus { return success; } - public void setSuccess(long success) { - this.success = success; - } - - public void incrementSuccess(long delta) { - this.success += delta; - } - public long getFailure() { return failure; } - public void setFailure(long failure) { - this.failure = failure; - } - - public void incrementFailure(long delta) { - this.failure += delta; - } - public long getTotal() { return total; } - public void setTotal(long total) { - this.total = total; + public int getNumConsecutiveCatastrophicErrors() { + return numConsecutiveCatastrophicErrors.get(); } - public void incrementTotal(long delta) { - this.total += delta; + public void resetNumConsecutiveCatastrophicErrors() { + this.numConsecutiveCatastrophicErrors.set(0); } - public int getNumConsecutiveCatastrophicErrors() { - return numConsecutiveCatastrophicErrors.get(); + public int incrementConsecutiveCatastrophicErrors() { + return this.numConsecutiveCatastrophicErrors.incrementAndGet(); } - public void setNumConsecutiveCatastrophicErrors(int numConsecutiveCatastrophicErrors) { - this.numConsecutiveCatastrophicErrors.set(numConsecutiveCatastrophicErrors); + public void resetCounters(long currentTime) { + setStartMillis(currentTime); + // Not resetting the catastrophic errors, as they will be reset + // whenever a success is received. + success = 0; + failure = 0; + total = 0; } - public void incrementConsecutiveCatastrophicErrors() { - this.numConsecutiveCatastrophicErrors.incrementAndGet(); + public void recordOperation(boolean isSuccess) { + if(isSuccess) { + success++; + } else { + failure++; + } + total++; } } diff --git a/src/java/voldemort/cluster/failuredetector/ThresholdFailureDetector.java b/src/java/voldemort/cluster/failuredetector/ThresholdFailureDetector.java index 8789ebc14..bd5b8fa68 100644 --- a/src/java/voldemort/cluster/failuredetector/ThresholdFailureDetector.java +++ b/src/java/voldemort/cluster/failuredetector/ThresholdFailureDetector.java @@ -68,14 +68,14 @@ public class ThresholdFailureDetector extends AsyncRecoveryFailureDetector { @Override public void recordException(Node node, long requestTime, UnreachableStoreException e) { checkArgs(node, requestTime); - update(node, 0, e); + update(node, false, e); } @Override public void recordSuccess(Node node, long requestTime) { checkArgs(node, requestTime); - int successDelta = 1; + boolean isSuccess = true; UnreachableStoreException e = null; if(requestTime > getConfig().getRequestLengthThreshold()) { @@ -86,13 +86,15 @@ public class ThresholdFailureDetector extends AsyncRecoveryFailureDetector { + requestTime + ") exceeded threshold (" + getConfig().getRequestLengthThreshold() + ")"); - successDelta = 0; + isSuccess = false; } - update(node, successDelta, e); + update(node, isSuccess, e); } - @JmxGetter(name = "nodeThresholdStats", description = "Each node is listed with its status (available/unavailable) and success percentage") + @JmxGetter( + name = "nodeThresholdStats", + description = "Each node is listed with its status (available/unavailable) and success percentage") public String getNodeThresholdStats() { List list = new ArrayList(); @@ -126,23 +128,24 @@ public class ThresholdFailureDetector extends AsyncRecoveryFailureDetector { NodeStatus nodeStatus = getNodeStatus(node); synchronized(nodeStatus) { - nodeStatus.setStartMillis(getConfig().getTime().getMilliseconds()); - nodeStatus.setSuccess(0); - nodeStatus.setTotal(0); + nodeStatus.resetCounters(getConfig().getTime().getMilliseconds()); super.nodeRecovered(node); } } - protected void update(Node node, int successDelta, UnreachableStoreException e) { + protected void update(Node node, boolean isSuccess, UnreachableStoreException e) { if(logger.isTraceEnabled()) { if(e != null) - logger.trace("Node " + node.getId() + " updated, successDelta: " + successDelta, e); + logger.trace("Node " + node.getId() + " updated, isSuccess: " + isSuccess, e); else - logger.trace("Node " + node.getId() + " updated, successDelta: " + successDelta); + logger.trace("Node " + node.getId() + " updated, isSuccess: " + isSuccess); } final long currentTime = getConfig().getTime().getMilliseconds(); - String catastrophicError = getCatastrophicError(e); + String catastrophicError = null; + if(!isSuccess) { + catastrophicError = getCatastrophicError(e); + } NodeStatus nodeStatus = getNodeStatus(node); boolean invokeSetAvailable = false; @@ -151,58 +154,59 @@ public class ThresholdFailureDetector extends AsyncRecoveryFailureDetector { // synchronized section synchronized(nodeStatus) { - if(nodeStatus.getNumConsecutiveCatastrophicErrors() != 0 && successDelta > 0) { - nodeStatus.setNumConsecutiveCatastrophicErrors(0); - if(logger.isTraceEnabled()) { - logger.trace("Resetting # consecutive connect errors for node : " + node); - } - } - if(currentTime >= nodeStatus.getStartMillis() + getConfig().getThresholdInterval()) { // We've passed into a new interval, so reset our counts // appropriately. - nodeStatus.setStartMillis(currentTime); - nodeStatus.setSuccess(successDelta); - if(successDelta < 1) - nodeStatus.setFailure(1); - nodeStatus.setTotal(1); + nodeStatus.resetCounters(currentTime); + } + + nodeStatus.recordOperation(isSuccess); + + int numCatastrophicErrors; + + if(catastrophicError != null) { + numCatastrophicErrors = nodeStatus.incrementConsecutiveCatastrophicErrors(); + + if(logger.isTraceEnabled()) { + logger.trace("Node " + node.getId() + " experienced catastrophic error: " + + catastrophicError + " on node : " + node + + " # accumulated errors = " + numCatastrophicErrors); + } } else { - nodeStatus.incrementSuccess(successDelta); - if(successDelta < 1) - nodeStatus.incrementFailure(1); - nodeStatus.incrementTotal(1); - - if(catastrophicError != null) { - if(logger.isTraceEnabled()) - logger.trace("Node " + node.getId() + " experienced catastrophic error: " - + catastrophicError + " on node : " + node - + " # accumulated errors = " - + nodeStatus.getNumConsecutiveCatastrophicErrors()); - - nodeStatus.incrementConsecutiveCatastrophicErrors(); - if(nodeStatus.getNumConsecutiveCatastrophicErrors() >= this.failureDetectorConfig.getMaximumTolerableFatalFailures()) { - invokeSetUnavailable = true; - logger.info("Node " + node.getId() + " experienced consecutive " - + this.failureDetectorConfig.getMaximumTolerableFatalFailures() - + " catastrophic errors. Marking it unavailable."); + numCatastrophicErrors = nodeStatus.getNumConsecutiveCatastrophicErrors(); + + if(numCatastrophicErrors > 0 && isSuccess) { + numCatastrophicErrors = 0; + nodeStatus.resetNumConsecutiveCatastrophicErrors(); + if(logger.isTraceEnabled()) { + logger.trace("Resetting # consecutive connect errors for node : " + node); } - } else if(nodeStatus.getFailure() >= getConfig().getThresholdCountMinimum()) { - long percentage = (nodeStatus.getSuccess() * 100) / nodeStatus.getTotal(); + } + } - if(logger.isTraceEnabled()) - logger.trace("Node " + node.getId() + " percentage: " + percentage + "%"); + if(numCatastrophicErrors > 0 + && numCatastrophicErrors >= this.failureDetectorConfig.getMaximumTolerableFatalFailures()) { + invokeSetUnavailable = true; + } else if(nodeStatus.getFailure() >= getConfig().getThresholdCountMinimum()) { + long successPercentage = (nodeStatus.getSuccess() * 100) / nodeStatus.getTotal(); - if(percentage >= getConfig().getThreshold()) - invokeSetAvailable = true; - else - invokeSetUnavailable = true; + if(logger.isTraceEnabled()) { + logger.trace("Node " + node.getId() + " success percentage: " + + successPercentage + "%"); + } + + if(successPercentage >= getConfig().getThreshold()) { + invokeSetAvailable = true; + } else { + invokeSetUnavailable = true; } } } - // Actually call set(Un)Available outside of synchronized section. This + + // Call set(Un)Available outside of synchronized section. This // ensures that side effects are not w/in a sync section (e.g., alerting // all the failure detector listeners). - if(invokeSetAvailable) { + if(isSuccess && invokeSetAvailable) { setAvailable(node); } else if(invokeSetUnavailable) { setUnavailable(node, e); diff --git a/test/common/voldemort/FailureDetectorTestUtils.java b/test/common/voldemort/FailureDetectorTestUtils.java index 268e9ecf4..9325cb00e 100644 --- a/test/common/voldemort/FailureDetectorTestUtils.java +++ b/test/common/voldemort/FailureDetectorTestUtils.java @@ -32,7 +32,7 @@ public class FailureDetectorTestUtils { long requestTime, UnreachableStoreException e) { ((MutableStoreVerifier) failureDetector.getConfig().getStoreVerifier()).setErrorStore(node, - new UnreachableStoreException("test error")); + new UnreachableStoreException("junit injected test error")); failureDetector.recordException(node, requestTime, e); } diff --git a/test/unit/voldemort/cluster/failuredetector/ThresholdFailureDetectorTest.java b/test/unit/voldemort/cluster/failuredetector/ThresholdFailureDetectorTest.java index 7a3461f41..79ac051f1 100644 --- a/test/unit/voldemort/cluster/failuredetector/ThresholdFailureDetectorTest.java +++ b/test/unit/voldemort/cluster/failuredetector/ThresholdFailureDetectorTest.java @@ -24,7 +24,6 @@ import static voldemort.VoldemortTestConstants.getTenNodeCluster; import static voldemort.cluster.failuredetector.FailureDetectorUtils.create; import static voldemort.cluster.failuredetector.MutableStoreVerifier.create; -import java.net.NoRouteToHostException; import java.net.UnknownHostException; import org.junit.Test; @@ -42,6 +41,9 @@ public class ThresholdFailureDetectorTest extends AbstractFailureDetectorTest { @Override public FailureDetector createFailureDetector() throws Exception { + // This test does not create any VoldemortServer. It uses the test store + // verifier which can be controlled via the functions recordException + // and recordSuccess to set and clear error states for a node. storeVerifier = create(cluster.getNodes()); FailureDetectorConfig failureDetectorConfig = new FailureDetectorConfig().setImplementationClassName(ThresholdFailureDetector.class.getName()) .setBannagePeriod(BANNAGE_MILLIS) @@ -62,18 +64,37 @@ public class ThresholdFailureDetectorTest extends AbstractFailureDetectorTest { public void testCatastrophicErrors() throws Exception { Node node = Iterables.get(cluster.getNodes(), 8); - failureDetector.recordException(node, - 0, - new UnreachableStoreException("intentionalerror", - new UnknownHostException("intentionalerror"))); + for(int i = 0; i < 1000; i++) { + failureDetector.recordSuccess(node, 1); + } + + UnreachableStoreException normalError = new UnreachableStoreException("intentionalerror"); + for(int i = 0; i < 10; i++) { + failureDetector.recordException(node, 0, normalError); + } + assertAvailable(node); + + UnreachableStoreException catastrophicError = new UnreachableStoreException("intentionalerror", + new UnknownHostException("intentionalerror")); + for(int i = 0; i < 10; i++) { + failureDetector.recordException(node, 0, catastrophicError); + } + assertEquals(false, failureDetector.isAvailable(node)); failureDetector.waitForAvailability(node); - failureDetector.recordException(node, - 0, - new UnreachableStoreException("intentionalerror", - new NoRouteToHostException("intentionalerror"))); + for(int i = 0; i < 10; i++) { + failureDetector.recordException(node, 0, catastrophicError); + } + assertEquals(false, failureDetector.isAvailable(node)); + + failureDetector.recordSuccess(node, + failureDetector.getConfig().getRequestLengthThreshold() + 1); + assertEquals(false, failureDetector.isAvailable(node)); + + failureDetector.recordException(node, 0, normalError); assertEquals(false, failureDetector.isAvailable(node)); + failureDetector.waitForAvailability(node); } @@ -99,6 +120,7 @@ public class ThresholdFailureDetectorTest extends AbstractFailureDetectorTest { assertEquals(false, failureDetector.isAvailable(node)); failureDetector.waitForAvailability(node); + assertTrue(failureDetector.isAvailable(node)); } @@ -108,26 +130,31 @@ public class ThresholdFailureDetectorTest extends AbstractFailureDetectorTest { Node node = Iterables.get(cluster.getNodes(), 8); - for(int i = 0; i < minimum - 1; i++) - recordException(failureDetector, node); + for(int iter = 0; iter < 2; iter++) { + for(int i = 0; i < minimum - 1; i++) + recordException(failureDetector, node); - assertAvailable(node); + assertAvailable(node); - recordException(failureDetector, node); + recordException(failureDetector, node); - assertUnavailable(node); - assertJmxEquals("availableNodes", "0,1,2,3,4,5,6,7"); - assertJmxEquals("unavailableNodes", "8"); - assertJmxEquals("availableNodeCount", 8); - assertJmxEquals("nodeCount", 9); + assertUnavailable(node); + assertJmxEquals("availableNodes", "0,1,2,3,4,5,6,7"); + assertJmxEquals("unavailableNodes", "8"); + assertJmxEquals("availableNodeCount", 8); + assertJmxEquals("nodeCount", 9); - recordSuccess(failureDetector, node); + recordSuccess(failureDetector, node); + failureDetector.waitForAvailability(node); + } } @Test public void testStartOffDownComeBackOnline() throws Exception { - failureDetector.getConfig().setThreshold(80); - failureDetector.getConfig().setThresholdCountMinimum(10); + final int SUCCESS_PERCENTAGE = 80; + final int ERROR_COUNT_MINIMUM = 10; + failureDetector.getConfig().setThreshold(SUCCESS_PERCENTAGE); + failureDetector.getConfig().setThresholdCountMinimum(ERROR_COUNT_MINIMUM); int failureCount = 20; @@ -144,7 +171,6 @@ public class ThresholdFailureDetectorTest extends AbstractFailureDetectorTest { recordSuccess(failureDetector, node, 0, false); failureDetector.waitForAvailability(node); - assertAvailable(node); } @@ -166,10 +192,12 @@ public class ThresholdFailureDetectorTest extends AbstractFailureDetectorTest { cluster = getTenNodeCluster(); Node node = cluster.getNodeById(9); storeVerifier.addStore(node); - failureDetector.recordException(node, - 0, - new UnreachableStoreException("intentionalerror", - new UnknownHostException("intentionalerror"))); + for(int i = 0; i < 10; i++) { + failureDetector.recordException(node, + 0, + new UnreachableStoreException("intentionalerror", + new UnknownHostException("intentionalerror"))); + } /** * Update the failure detector state with the new cluster -- 2.11.4.GIT