From 12bf94045a4d2c5b687674b5e9598debc4eb7cd8 Mon Sep 17 00:00:00 2001 From: Arunachalam Thirupathi Date: Fri, 7 Feb 2014 16:08:42 -0800 Subject: [PATCH] Add a new option for hash based comparison 1) Refactored the code and introduced two types of comparison version based and hash of the value based 2) The code was dependent on node 0 for boot strapping. fixed. 3) Enable the test testDetermineConsistencyHashValue and fixed it. 4) Now the consistency check tool takes a new option --comparison-type which defaults to version and can be forced to hash based comparison. --- src/java/voldemort/utils/ConsistencyCheck.java | 509 +++++++++++++-------- .../unit/voldemort/utils/ConsistencyCheckTest.java | 153 ++++--- 2 files changed, 399 insertions(+), 263 deletions(-) diff --git a/src/java/voldemort/utils/ConsistencyCheck.java b/src/java/voldemort/utils/ConsistencyCheck.java index 88d17deb8..3a77698ec 100644 --- a/src/java/voldemort/utils/ConsistencyCheck.java +++ b/src/java/voldemort/utils/ConsistencyCheck.java @@ -17,6 +17,7 @@ package voldemort.utils; import java.io.BufferedWriter; import java.io.FileWriter; +import java.io.Writer; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -49,36 +50,38 @@ import voldemort.versioning.Versioned; public class ConsistencyCheck { + private static final String ComparisonTypeArgument = "comparison-type"; private static Logger logger = Logger.getLogger(ConsistencyCheck.class); private final List urls; private final String storeName; private final Integer partitionId; + private final ComparisonType comparisonType; private final Reporter reporter; private Integer retentionDays = null; private Integer replicationFactor = 0; private Integer requiredWrites = 0; - private List adminClients; private List clusterNodeList = new ArrayList(); - private final Map>> keyVersionNodeSetMap = new HashMap>>(); + private final Map>> keyValueNodeSetMap = + new HashMap>>(); private RetentionChecker retentionChecker; private KeyFetchTracker keyFetchTracker; public ConsistencyCheck(List urls, - String storeName, - int partitionId, - BufferedWriter badKeyWriter) { + String storeName, + int partitionId, Writer badKeyWriter, ComparisonType comparisonType) { this.urls = urls; this.storeName = storeName; this.partitionId = partitionId; this.reporter = new Reporter(badKeyWriter); + this.comparisonType = comparisonType; } /** * Connect to the clusters using given urls and start fetching process on * correct nodes - * + * * @throws Exception When no such store is found */ @@ -94,16 +97,18 @@ public class ConsistencyCheck { logger.info("Connecting to bootstrap server: " + url); } AdminClient adminClient = new AdminClient(url, - new AdminClientConfig(), - new ClientConfig()); + new AdminClientConfig(), + new ClientConfig()); adminClients.add(adminClient); /* get Cluster */ Cluster cluster = adminClient.getAdminClientCluster(); clusterMap.put(url, cluster); + + Integer nodeId = cluster.getNodeIds().iterator().next(); /* get StoreDefinition */ - Versioned> storeDefinitions = adminClient.metadataMgmtOps.getRemoteStoreDefList(0); + Versioned> storeDefinitions = adminClient.metadataMgmtOps.getRemoteStoreDefList(nodeId); StoreDefinition storeDefinition = StoreDefinitionUtils.getStoreDefinitionWithName(storeDefinitions.getValue(), - storeName); + storeName); storeDefinitionMap.put(url, storeDefinition); } @@ -116,7 +121,7 @@ public class ConsistencyCheck { } if(partitionCount != currentPartitionCount) { logger.error("Partition count of different clusters is not the same: " - + partitionCount + " vs " + currentPartitionCount); + + partitionCount + " vs " + currentPartitionCount); throw new VoldemortException("Will not connect because partition counts differ among clusters."); } } @@ -129,8 +134,8 @@ public class ConsistencyCheck { /* find list of nodeId hosting partition */ List partitionList = new RoutingStrategyFactory().updateRoutingStrategy(storeDefinition, - cluster) - .getReplicatingPartitionList(partitionId); + cluster) + .getReplicatingPartitionList(partitionId); for(int partition: partitionList) { Integer nodeId = partitionToNodeMap.get(partition); Node node = cluster.getNodeById(nodeId); @@ -188,7 +193,7 @@ public class ConsistencyCheck { /** * Run consistency check on connected key-value iterators - * + * * @return Results in form of ConsistencyCheckStats */ public Reporter execute() throws IOException { @@ -201,15 +206,15 @@ public class ConsistencyCheck { singlePartition.add(partitionId); if(logger.isDebugEnabled()) { logger.debug("Start fetch request to Node[" + clusterNode.toString() - + "] for partition[" + partitionId + "] of store[" + storeName + "]"); + + "] for partition[" + partitionId + "] of store[" + storeName + "]"); } Iterator>> fetchIterator; fetchIterator = adminClient.bulkFetchOps.fetchEntries(clusterNode.getNode().getId(), - storeName, - singlePartition, - null, - false); + storeName, + singlePartition, + null, + false); nodeFetchIteratorMap.put(clusterNode, fetchIterator); } keyFetchTracker = new KeyFetchTracker(clusterNodeList.size()); @@ -236,11 +241,11 @@ public class ConsistencyCheck { keyFetchTracker.recordFetch(clusterNode, key); if(logger.isTraceEnabled()) { logger.trace("fetched " + new String(key.get())); - logger.trace("map has keys: " + keyVersionNodeSetMap.size()); + logger.trace("map has keys: " + keyValueNodeSetMap.size()); } trySweepAll(); if(logger.isTraceEnabled()) { - logger.trace("sweeped; keys left: " + keyVersionNodeSetMap.size()); + logger.trace("sweeped; keys left: " + keyValueNodeSetMap.size()); } } } @@ -264,23 +269,23 @@ public class ConsistencyCheck { } // clean keys not sufficient for write - cleanIneligibleKeys(keyVersionNodeSetMap, requiredWrites); + cleanIneligibleKeys(keyValueNodeSetMap, requiredWrites); keyFetchTracker.finishAll(); trySweepAll(); - reporter.processInconsistentKeys(storeName, partitionId, keyVersionNodeSetMap); + reporter.processInconsistentKeys(storeName, partitionId, keyValueNodeSetMap); return reporter; } public void trySweepAll() { for(ByteArray finishedKey = keyFetchTracker.nextFinished(); finishedKey != null; finishedKey = keyFetchTracker.nextFinished()) { - if(keyVersionNodeSetMap.containsKey(finishedKey)) { - ConsistencyLevel level = determineConsistency(keyVersionNodeSetMap.get(finishedKey), - replicationFactor); + if (keyValueNodeSetMap.containsKey(finishedKey)) { + ConsistencyLevel level = determineConsistency(keyValueNodeSetMap.get(finishedKey), + replicationFactor); if(level == ConsistencyLevel.FULL || level == ConsistencyLevel.LATEST_CONSISTENT) { - keyVersionNodeSetMap.remove(finishedKey); + keyValueNodeSetMap.remove(finishedKey); reporter.recordGoodKey(1); } } @@ -288,44 +293,45 @@ public class ConsistencyCheck { } public void recordFetch(ClusterNode clusterNode, ByteArray key, Versioned versioned) { - Version version; - if(urls.size() == 1) { - version = versioned.getVersion(); - } else { - version = new HashedValue(versioned); - } + + Value value = new ValueFactory().Create(versioned, comparisonType); // skip version if expired - if(retentionChecker.isExpired(version)) { + if (retentionChecker.isExpired(value)) { reporter.recordExpired(1); return; } // initialize key -> Map> - if(!keyVersionNodeSetMap.containsKey(key)) { - keyVersionNodeSetMap.put(key, new HashMap>()); - } - Map> versionNodeSetMap = keyVersionNodeSetMap.get(key); - - // check existing version - if(!versionNodeSetMap.containsKey(version) && versionNodeSetMap.size() != 0) { - // if this version is new, sweep old version - // if this version is old, ignore this version - Version oneExistingVersion = versionNodeSetMap.keySet().iterator().next(); - if(version.compare(oneExistingVersion) == Occurred.AFTER) { - versionNodeSetMap.clear(); - } else if(oneExistingVersion.compare(version) == Occurred.AFTER) { + if (!keyValueNodeSetMap.containsKey(key)) { + keyValueNodeSetMap.put(key, new HashMap>()); + } + Map> versionNodeSetMap = keyValueNodeSetMap.get(key); + + List valuesToDiscard = new ArrayList(); + for(Map.Entry> versionNode : versionNodeSetMap.entrySet()) { + Value existingValue = versionNode.getKey(); + // Check for each existing value, if the existing value happened after current. If so the current value can be discarded + if(existingValue.compare(value) == Occurred.AFTER) { return; + } else if ( value.compare(existingValue) == Occurred.AFTER) { + // Check for each existing value, if the current value happened after existing. If so the existing value can be discarded + valuesToDiscard.add(existingValue); } } - if(!versionNodeSetMap.containsKey(version)) { + for(Value valueToDiscard: valuesToDiscard) + { + versionNodeSetMap.remove(valueToDiscard); + } + + if (!versionNodeSetMap.containsKey(value)) { // insert nodeSet into the map - versionNodeSetMap.put(version, new HashSet()); + versionNodeSetMap.put(value, new HashSet()); } // add node to set - versionNodeSetMap.get(version).add(clusterNode); + versionNodeSetMap.get(value).add(clusterNode); } /** @@ -346,7 +352,7 @@ public class ConsistencyCheck { /** * Record a fetched result - * + * * @param clusterNode The clusterNode from which the key has been * fetched * @param key The key itself @@ -386,7 +392,7 @@ public class ConsistencyCheck { /** * Get a key that are completed in fetching - * + * * @return key considered finished; otherwise null */ public ByteArray nextFinished() { @@ -406,9 +412,14 @@ public class ConsistencyCheck { INSUFFICIENT_WRITE } + public static enum ComparisonType { + VERSION, + HASH, + } + /** * Used to track nodes that may share the same nodeId in different clusters - * + * */ protected static class ClusterNode { @@ -450,61 +461,12 @@ public class ConsistencyCheck { } - /** - * A class to save version and value hash It is used to compare versions by - * the value hash - * - */ - protected static class HashedValue implements Version { - - final private Version innerVersion; - final private Integer valueHash; - - /** - * @param versioned Versioned value with version information and value - * itself - */ - public HashedValue(Versioned versioned) { - innerVersion = versioned.getVersion(); - valueHash = new FnvHashFunction().hash(versioned.getValue()); - } - - public int getValueHash() { - return valueHash; - } - - public Version getInner() { - return innerVersion; - } - - @Override - public boolean equals(Object object) { - if(this == object) - return true; - if(object == null) - return false; - if(!object.getClass().equals(HashedValue.class)) - return false; - HashedValue hash = (HashedValue) object; - boolean result = valueHash.equals(hash.getValueHash()); - return result; - } - - @Override - public int hashCode() { - return valueHash; - } - @Override - public Occurred compare(Version v) { - return Occurred.CONCURRENTLY; // always regard as conflict - } - } /** * A checker to determine if a key is to be cleaned according to retention * policy - * + * */ protected static class RetentionChecker { @@ -526,32 +488,23 @@ public class ConsistencyCheck { /** * Determine if a version is expired - * + * * @param v version to be checked * @return if the version is expired according to retention policy */ - public boolean isExpired(Version v) { - if(v instanceof VectorClock) { - return ((VectorClock) v).getTimestamp() < expiredTimeMs; - } else if(v instanceof HashedValue) { - return false; - } else { - logger.error("Version type is not supported for checking expiration"); - throw new VoldemortException("Version type is not supported for checking expiration" - + v.getClass().getCanonicalName()); - } + public boolean isExpired(Value v) { + return v.isExpired(expiredTimeMs); } } /** * Used to report bad keys, progress, and statistics - * + * */ protected static class Reporter { - final BufferedWriter badKeyWriter; + final Writer badKeyWriter; final long reportPeriodMs; - long lastReportTimeMs = 0; long numRecordsScanned = 0; long numRecordsScannedLast = 0; @@ -561,10 +514,10 @@ public class ConsistencyCheck { /** * Will output progress reports every 5 seconds. - * + * * @param badKeyWriter Writer to which to output bad keys. Null is OK. */ - public Reporter(BufferedWriter badKeyWriter) { + public Reporter(Writer badKeyWriter) { this(badKeyWriter, 5000); } @@ -572,7 +525,7 @@ public class ConsistencyCheck { * @param badKeyWriter Writer to which to output bad keys. Null is OK. * @param intervalMs Milliseconds between progress reports. */ - public Reporter(BufferedWriter badKeyWriter, long intervalMs) { + public Reporter(Writer badKeyWriter, long intervalMs) { this.badKeyWriter = badKeyWriter; this.reportPeriodMs = intervalMs; } @@ -593,7 +546,7 @@ public class ConsistencyCheck { s.append("Records Scanned: " + numRecordsScanned + "\n"); s.append("Records Ignored: " + numExpiredRecords + " (Out of Retention)\n"); s.append("Last Fetch Rate: " + (numRecordsScanned - numRecordsScannedLast) - / ((currentTimeMs - lastReportTimeMs) / 1000) + " (records/s)\n"); + / ((currentTimeMs - lastReportTimeMs) / 1000) + " (records/s)\n"); lastReportTimeMs = currentTimeMs; numRecordsScannedLast = numRecordsScanned; return s.toString(); @@ -603,19 +556,19 @@ public class ConsistencyCheck { } public void processInconsistentKeys(String storeName, - Integer partitionId, - Map>> keyVersionNodeSetMap) + Integer partitionId, + Map>> keyVersionNodeSetMap) throws IOException { if(logger.isDebugEnabled()) { logger.debug("TYPE,Store,ParId,Key,ServerSet,VersionTS,VectorClock[,ValueHash]"); } - for(Map.Entry>> entry: keyVersionNodeSetMap.entrySet()) { + for (Map.Entry>> entry : keyVersionNodeSetMap.entrySet()) { ByteArray key = entry.getKey(); if(badKeyWriter != null) { badKeyWriter.write(ByteUtils.toHexString(key.get()) + "\n"); } if(logger.isDebugEnabled()) { - Map> versionMap = entry.getValue(); + Map> versionMap = entry.getValue(); logger.debug(keyVersionToString(key, versionMap, storeName, partitionId)); } } @@ -635,7 +588,7 @@ public class ConsistencyCheck { /** * Return args parser - * + * * @return program parser * */ private static OptionParser getParser() { @@ -643,23 +596,27 @@ public class ConsistencyCheck { OptionParser parser = new OptionParser(); parser.accepts("help", "print help information"); parser.accepts("urls", "[REQUIRED] bootstrap URLs") - .withRequiredArg() - .describedAs("bootstrap-url") - .withValuesSeparatedBy(',') - .ofType(String.class); + .withRequiredArg() + .describedAs("bootstrap-url") + .withValuesSeparatedBy(',') + .ofType(String.class); parser.accepts("partitions", "partition-id") - .withRequiredArg() - .describedAs("partition-id") - .withValuesSeparatedBy(',') - .ofType(Integer.class); + .withRequiredArg() + .describedAs("partition-id") + .withValuesSeparatedBy(',') + .ofType(Integer.class); parser.accepts("store", "store name") - .withRequiredArg() - .describedAs("store-name") - .ofType(String.class); + .withRequiredArg() + .describedAs("store-name") + .ofType(String.class); parser.accepts("bad-key-file", "File name to which inconsistent keys are to be written.") - .withRequiredArg() - .describedAs("badKeyFileOut") - .ofType(String.class); + .withRequiredArg() + .describedAs("badKeyFileOut") + .ofType(String.class); + parser.accepts(ComparisonTypeArgument, "type of comparison to compare the values for the same key") + .withRequiredArg() + .describedAs("comparisonType") + .ofType(ComparisonType.class); return parser; } @@ -678,6 +635,7 @@ public class ConsistencyCheck { help.append(" --store \n"); help.append(" --bad-key-file \n"); help.append(" Optional:\n"); + help.append(" --comparison-type [version | hash ]\n"); help.append(" --help\n"); help.append(" Note:\n"); help.append(" If you have two or more clusters to scan for consistency across them,\n"); @@ -692,32 +650,30 @@ public class ConsistencyCheck { /** * Determine the consistency level of a key - * + * * @param versionNodeSetMap A map that maps version to set of PrefixNodes * @param replicationFactor Total replication factor for the set of clusters * @return ConsistencyLevel Enum */ - public static ConsistencyLevel determineConsistency(Map> versionNodeSetMap, - int replicationFactor) { + public static ConsistencyLevel determineConsistency(Map> versionNodeSetMap, + int replicationFactor) { boolean fullyConsistent = true; - Version latestVersion = null; - for(Map.Entry> versionNodeSetEntry: versionNodeSetMap.entrySet()) { - Version version = versionNodeSetEntry.getKey(); - if(version instanceof VectorClock) { - if(latestVersion == null - || ((VectorClock) latestVersion).getTimestamp() < ((VectorClock) version).getTimestamp()) { - latestVersion = version; - } + Value latestVersion = null; + for (Map.Entry> versionNodeSetEntry : versionNodeSetMap.entrySet()) { + Value value = versionNodeSetEntry.getKey(); + if (latestVersion == null) { + latestVersion = value; + } else if (value.isTimeStampLaterThan(latestVersion)) { + latestVersion = value; } Set nodeSet = versionNodeSetEntry.getValue(); fullyConsistent = fullyConsistent && (nodeSet.size() == replicationFactor); } - if(fullyConsistent) { + if (fullyConsistent) { return ConsistencyLevel.FULL; } else { // latest write consistent, effectively consistent - if(latestVersion != null - && versionNodeSetMap.get(latestVersion).size() == replicationFactor) { + if (latestVersion != null && versionNodeSetMap.get(latestVersion).size() == replicationFactor) { return ConsistencyLevel.LATEST_CONSISTENT; } // all other states inconsistent @@ -725,55 +681,57 @@ public class ConsistencyCheck { } } + /** * Determine if a key version is invalid by comparing the version's * existance and required writes configuration - * + * * @param keyVersionNodeSetMap A map that contains keys mapping to a map * that maps versions to set of PrefixNodes * @param requiredWrite Required Write configuration */ - public static void cleanIneligibleKeys(Map>> keyVersionNodeSetMap, - int requiredWrite) { + public static void cleanIneligibleKeys(Map>> keyVersionNodeSetMap, + int requiredWrite) { Set keysToDelete = new HashSet(); - for(Map.Entry>> entry: keyVersionNodeSetMap.entrySet()) { - Set versionsToDelete = new HashSet(); + for (Map.Entry>> entry : keyVersionNodeSetMap.entrySet()) { + Set valuesToDelete = new HashSet(); ByteArray key = entry.getKey(); - Map> versionNodeSetMap = entry.getValue(); + Map> valueNodeSetMap = entry.getValue(); // mark version for deletion if not enough writes - for(Map.Entry> versionNodeSetEntry: versionNodeSetMap.entrySet()) { + for (Map.Entry> versionNodeSetEntry : valueNodeSetMap.entrySet()) { Set nodeSet = versionNodeSetEntry.getValue(); - if(nodeSet.size() < requiredWrite) { - versionsToDelete.add(versionNodeSetEntry.getKey()); + if (nodeSet.size() < requiredWrite) { + valuesToDelete.add(versionNodeSetEntry.getKey()); } } // delete versions - for(Version v: versionsToDelete) { - versionNodeSetMap.remove(v); + for (Value v : valuesToDelete) { + valueNodeSetMap.remove(v); } // mark key for deletion if no versions left - if(versionNodeSetMap.size() == 0) { + if (valueNodeSetMap.size() == 0) { keysToDelete.add(key); } } // delete keys - for(ByteArray k: keysToDelete) { + for (ByteArray k : keysToDelete) { keyVersionNodeSetMap.remove(k); } } + @SuppressWarnings("unchecked") public static void main(String[] args) throws Exception { OptionSet options = getParser().parse(args); /* validate options */ - if(options.hasArgument("help")) { + if (options.hasArgument("help")) { printUsage(); return; } - if(!options.hasArgument("urls") || !options.hasArgument("partitions") - || !options.hasArgument("store") || !options.hasArgument("bad-key-file")) { + if (!options.hasArgument("urls") || !options.hasArgument("partitions") + || !options.hasArgument("store") || !options.hasArgument("bad-key-file")) { printUsage(); return; } @@ -783,26 +741,32 @@ public class ConsistencyCheck { List partitionIds = (List) options.valuesOf("partitions"); String badKeyFile = (String) options.valueOf("bad-key-file"); + ComparisonType comparisonType = ComparisonType.VERSION; + if (options.hasArgument(ComparisonTypeArgument)) { + comparisonType = (ComparisonType) options.valueOf(ComparisonTypeArgument); + } + BufferedWriter badKeyWriter = null; try { badKeyWriter = new BufferedWriter(new FileWriter(badKeyFile)); - } catch(IOException e) { + } catch (IOException e) { Utils.croak("Failure to open output file : " + e.getMessage()); } Map partitionStatsMap = new HashMap(); /* scan each partitions */ try { - for(Integer partitionId: partitionIds) { + for (Integer partitionId : partitionIds) { ConsistencyCheck checker = new ConsistencyCheck(urls, - storeName, - partitionId, - badKeyWriter); + storeName, + partitionId, + badKeyWriter, + comparisonType); checker.connect(); Reporter reporter = checker.execute(); partitionStatsMap.put(partitionId, reporter); } - } catch(Exception e) { + } catch (Exception e) { Utils.croak("Exception during consistency checking : " + e.getMessage()); } finally { badKeyWriter.close(); @@ -814,7 +778,7 @@ public class ConsistencyCheck { long totalTotalKeys = 0; // each partition statsString.append("TYPE,Store,ParitionId,KeysConsistent,KeysTotal,Consistency\n"); - for(Map.Entry entry: partitionStatsMap.entrySet()) { + for (Map.Entry entry : partitionStatsMap.entrySet()) { Integer partitionId = entry.getKey(); Reporter reporter = entry.getValue(); totalGoodKeys += reporter.numGoodKeys; @@ -836,14 +800,14 @@ public class ConsistencyCheck { statsString.append((double) (totalGoodKeys) / (double) totalTotalKeys); statsString.append("\n"); - for(String line: statsString.toString().split("\n")) { + for (String line : statsString.toString().split("\n")) { logger.info(line); } } /** * Convert a key-version-nodeSet information to string - * + * * @param key The key * @param versionMap mapping versions to set of PrefixNodes * @param storeName store's name @@ -851,12 +815,12 @@ public class ConsistencyCheck { * @return a string that describe the information passed in */ public static String keyVersionToString(ByteArray key, - Map> versionMap, - String storeName, - Integer partitionId) { + Map> versionMap, + String storeName, + Integer partitionId) { StringBuilder record = new StringBuilder(); - for(Map.Entry> versionSet: versionMap.entrySet()) { - Version version = versionSet.getKey(); + for (Map.Entry> versionSet : versionMap.entrySet()) { + Value value = versionSet.getKey(); Set nodeSet = versionSet.getValue(); record.append("BAD_KEY,"); @@ -864,25 +828,168 @@ public class ConsistencyCheck { record.append(partitionId + ","); record.append(ByteUtils.toHexString(key.get()) + ","); record.append(nodeSet.toString().replace(", ", ";") + ","); - if(version instanceof VectorClock) { - record.append(((VectorClock) version).getTimestamp() + ","); - record.append(version.toString() - .replaceAll(", ", ";") - .replaceAll(" ts:[0-9]*", "") - .replaceAll("version\\((.*)\\)", "[$1]")); - } - if(version instanceof HashedValue) { - Integer hashValue = ((HashedValue) version).getValueHash(); - Version realVersion = ((HashedValue) version).getInner(); - record.append(((VectorClock) realVersion).getTimestamp() + ","); - record.append(realVersion.toString() - .replaceAll(", ", ";") - .replaceAll(" ts:[0-9]*", "") - .replaceAll("version\\((.*)\\)", "[$1],")); - record.append(hashValue); - } + record.append(value.toString()); } return record.toString(); } + public static abstract class Value { + abstract Occurred compare(Value v); + + public abstract boolean equals(Object obj); + + public abstract int hashCode(); + + public abstract String toString(); + + public abstract boolean isExpired(long expiredTimeMs); + + public abstract boolean isTimeStampLaterThan(Value v); + } + + public static class VersionValue extends Value { + + protected final Version version; + + protected VersionValue(Versioned versioned) { + this.version = versioned.getVersion(); + } + + public Occurred compare(Value v) { + if (!(v instanceof VersionValue)) { + throw new VoldemortException(" Expected type VersionValue found type " + v.getClass().getCanonicalName()); + } + + VersionValue other = (VersionValue) v; + return version.compare(other.version); + } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } else if (!(o instanceof VersionValue)) { + return false; + } + + VersionValue other = (VersionValue) o; + return version.equals(other.version); + } + + @Override + public int hashCode() { + return version.hashCode(); + } + + @Override + public String toString() { + StringBuilder record = new StringBuilder(); + record.append(((VectorClock) version).getTimestamp() + ","); + record.append(version.toString().replaceAll(", ", ";").replaceAll(" ts:[0-9]*", "") + .replaceAll("version\\((.*)\\)", "[$1]")); + return record.toString(); + } + + public boolean isExpired(long expiredTimeMs) { + return ((VectorClock) version).getTimestamp() < expiredTimeMs; + } + + public boolean isTimeStampLaterThan(Value currentLatest) { + if (!(currentLatest instanceof VersionValue)) { + throw new VoldemortException( + " Expected type VersionValue found type " + currentLatest.getClass().getCanonicalName()); + } + + VersionValue latestVersion = (VersionValue) currentLatest; + long latestTimeStamp = ((VectorClock) latestVersion.version).getTimestamp(); + long myTimeStamp = ((VectorClock) version).getTimestamp(); + return myTimeStamp > latestTimeStamp; + } + } + + /** + * A class to save version and value hash It is used to compare versions by + * the value hash + * + */ + public static class HashedValue extends Value { + + final private Version innerVersion; + final private Integer valueHash; + + /** + * @param versioned Versioned value with version information and value + * itself + */ + public HashedValue(Versioned versioned) { + innerVersion = versioned.getVersion(); + valueHash = new FnvHashFunction().hash(versioned.getValue()); + } + + @Override + public boolean equals(Object object) { + if (this == object) { + return true; + } + if (object == null) { + return false; + } + if (!object.getClass().equals(HashedValue.class)) { + return false; + } + HashedValue hash = (HashedValue) object; + boolean result = valueHash.equals(hash.hashCode()); + return result; + } + + public Occurred compare(Value v) { + // TODO: Return before if they are equal. + return Occurred.CONCURRENTLY; // always regard as conflict + } + + @Override + public int hashCode() { + return valueHash; + } + + public boolean isExpired(long expiredTimeMs) { + return ((VectorClock) innerVersion).getTimestamp() < expiredTimeMs; + } + + @Override + public String toString() { + StringBuilder record = new StringBuilder(); + record.append(((VectorClock) innerVersion).getTimestamp() + ","); + record.append(innerVersion.toString().replaceAll(", ", ";").replaceAll(" ts:[0-9]*", "") + .replaceAll("version\\((.*)\\)", "[$1],")); + record.append(valueHash); + return record.toString(); + } + + + public boolean isTimeStampLaterThan(Value currentLatest) { + if (!(currentLatest instanceof HashedValue)) { + throw new VoldemortException( + " Expected type HashedValue found type " + currentLatest.getClass().getCanonicalName()); + } + + HashedValue latestVersion = (HashedValue) currentLatest; + long latestTimeStamp = ((VectorClock) latestVersion.innerVersion).getTimestamp(); + long myTimeStamp = ((VectorClock) innerVersion).getTimestamp(); + return myTimeStamp > latestTimeStamp; + } + } + + public class ValueFactory { + public Value Create(Versioned versioned, ComparisonType type) { + if (type == ComparisonType.HASH) { + return new HashedValue(versioned); + } else if (type == ComparisonType.VERSION) { + return new VersionValue(versioned); + } else { + throw new VoldemortException("ComparisonType:" + type.name() + " is not handled by ValueFactory"); + } + } + } } + diff --git a/test/unit/voldemort/utils/ConsistencyCheckTest.java b/test/unit/voldemort/utils/ConsistencyCheckTest.java index 379dd5089..65d942b2f 100644 --- a/test/unit/voldemort/utils/ConsistencyCheckTest.java +++ b/test/unit/voldemort/utils/ConsistencyCheckTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.io.StringWriter; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -71,21 +72,23 @@ public class ConsistencyCheckTest { ClusterNode cn0_4 = new ClusterNode(0, n4); ClusterNode cn1_2 = new ClusterNode(1, n2); // 1.1 + long now = System.currentTimeMillis(); + byte[] value1 = { 0, 1, 2, 3, 4 }; byte[] value2 = { 0, 1, 2, 3, 5 }; byte[] value3 = { 0, 1, 2, 3, 6 }; byte[] value4 = { 0, 1, 2, 3, 7 }; - Versioned versioned1 = new Versioned(value1); - Versioned versioned2 = new Versioned(value2); - Version hv1 = new ConsistencyCheck.HashedValue(versioned1); - Version hv1_dup = new ConsistencyCheck.HashedValue(versioned1); - Version hv2 = new ConsistencyCheck.HashedValue(versioned2); - - long now = System.currentTimeMillis(); - Version vc1 = new VectorClock(now - Time.MS_PER_DAY); - Version vc2 = new VectorClock(now); - Version hv3 = new ConsistencyCheck.HashedValue(new Versioned(value1)); - Version vc3 = new VectorClock(now - Time.MS_PER_HOUR * 24 + 500 * Time.MS_PER_SECOND); + VectorClock vc1 = new VectorClock(now - Time.MS_PER_DAY); + VectorClock vc2 = new VectorClock(now); + VectorClock vc3 = new VectorClock(now - Time.MS_PER_HOUR * 24 + 500 * Time.MS_PER_SECOND); + Versioned versioned1 = new Versioned(value1, vc1); + Versioned versioned2 = new Versioned(value2, vc2); + Versioned versioned3 = new Versioned(value3, vc3); + ConsistencyCheck.Value hv1 = new ConsistencyCheck.HashedValue(versioned1); + ConsistencyCheck.Value hv1_dup = new ConsistencyCheck.HashedValue(versioned1); + ConsistencyCheck.Value hv2 = new ConsistencyCheck.HashedValue(versioned2); + + ConsistencyCheck.Value hv3 = new ConsistencyCheck.HashedValue(new Versioned(value1,vc3)); // make set Set setFourNodes = new HashSet(); @@ -135,9 +138,6 @@ public class ConsistencyCheckTest { assertFalse(hv1.equals(null)); assertFalse(hv1.equals(new Versioned(null))); assertFalse(hv1.equals(new Integer(0))); - - assertEquals(versioned1.getVersion(), ((ConsistencyCheck.HashedValue) hv1).getInner()); - assertEquals(((ConsistencyCheck.HashedValue) hv1).getValueHash(), hv1.hashCode()); } @Test @@ -145,31 +145,39 @@ public class ConsistencyCheckTest { ConsistencyCheck.RetentionChecker rc1 = new ConsistencyCheck.RetentionChecker(0); ConsistencyCheck.RetentionChecker rc2 = new ConsistencyCheck.RetentionChecker(1); - assertFalse(rc1.isExpired(vc1)); - assertFalse(rc1.isExpired(vc2)); + // Test HashedValue timestamp assertFalse(rc1.isExpired(hv3)); - assertFalse(rc1.isExpired(vc3)); - assertTrue(rc2.isExpired(vc1)); - assertFalse(rc2.isExpired(vc2)); - assertFalse(rc2.isExpired(hv3)); - assertTrue(rc2.isExpired(vc3)); + assertTrue(rc2.isExpired(hv3)); + + // Test VersionValue time stamp + assertFalse(rc1.isExpired(new ConsistencyCheck.VersionValue(versioned1))); + assertFalse(rc1.isExpired(new ConsistencyCheck.VersionValue(versioned2))); + assertFalse(rc1.isExpired(new ConsistencyCheck.VersionValue(versioned3))); + assertTrue (rc2.isExpired(new ConsistencyCheck.VersionValue(versioned1))); + assertFalse(rc2.isExpired(new ConsistencyCheck.VersionValue(versioned2))); + assertTrue (rc2.isExpired(new ConsistencyCheck.VersionValue(versioned3))); } @Test public void testDetermineConsistencyVectorClock() { - Map> versionNodeSetMap = new HashMap>(); + Map> versionNodeSetMap = new HashMap>(); int replicationFactor = 4; // Version is vector clock - Version v1 = new VectorClock(); - ((VectorClock) v1).incrementVersion(1, 100000001); - ((VectorClock) v1).incrementVersion(2, 100000003); - Version v2 = new VectorClock(); - ((VectorClock) v2).incrementVersion(1, 100000001); - ((VectorClock) v2).incrementVersion(3, 100000002); - Version v3 = new VectorClock(); - ((VectorClock) v3).incrementVersion(1, 100000001); - ((VectorClock) v3).incrementVersion(4, 100000001); + VectorClock vc1 = new VectorClock(); + vc1.incrementVersion(1, 100000001); + vc1.incrementVersion(2, 100000003); + + VectorClock vc2= new VectorClock(); + vc2.incrementVersion(1, 100000001); + vc2.incrementVersion(3, 100000002); + VectorClock vc3 = new VectorClock(); + vc3.incrementVersion(1, 100000001); + vc3.incrementVersion(4, 100000001); + + ConsistencyCheck.Value v1 = new ConsistencyCheck.VersionValue(new Versioned(value1, vc1)); + ConsistencyCheck.Value v2 = new ConsistencyCheck.VersionValue(new Versioned(value2, vc2)); + ConsistencyCheck.Value v3 = new ConsistencyCheck.VersionValue(new Versioned(value3, vc3)); // FULL: simple versionNodeSetMap.put(v1, setFourNodes); @@ -213,8 +221,9 @@ public class ConsistencyCheckTest { ConsistencyCheck.determineConsistency(versionNodeSetMap, replicationFactor)); } + @Test public void testDetermineConsistencyHashValue() { - Map> versionNodeSetMap = new HashMap>(); + Map> versionNodeSetMap = new HashMap>(); int replicationFactor = 4; // vector clocks @@ -232,9 +241,9 @@ public class ConsistencyCheckTest { Versioned versioned1 = new Versioned(value1, v1); Versioned versioned2 = new Versioned(value2, v2); Versioned versioned3 = new Versioned(value3, v3); - Version hv1 = new ConsistencyCheck.HashedValue(versioned1); - Version hv2 = new ConsistencyCheck.HashedValue(versioned2); - Version hv3 = new ConsistencyCheck.HashedValue(versioned3); + ConsistencyCheck.Value hv1 = new ConsistencyCheck.HashedValue(versioned1); + ConsistencyCheck.Value hv2 = new ConsistencyCheck.HashedValue(versioned2); + ConsistencyCheck.Value hv3 = new ConsistencyCheck.HashedValue(versioned3); // FULL // one version @@ -262,22 +271,26 @@ public class ConsistencyCheckTest { versionNodeSetMap.clear(); versionNodeSetMap.put(hv1, setFourNodes); versionNodeSetMap.put(hv2, setThreeNodes); - assertEquals(ConsistencyCheck.ConsistencyLevel.INCONSISTENT, + assertEquals(ConsistencyCheck.ConsistencyLevel.LATEST_CONSISTENT, ConsistencyCheck.determineConsistency(versionNodeSetMap, replicationFactor)); } @Test public void testCleanInlegibleKeys() { // versions - Version v1 = new VectorClock(); - ((VectorClock) v1).incrementVersion(1, 100000001); - ((VectorClock) v1).incrementVersion(2, 100000003); - Version v2 = new VectorClock(); - ((VectorClock) v2).incrementVersion(1, 100000002); + VectorClock vc1 = new VectorClock(); + vc1.incrementVersion(1, 100000001); + vc1.incrementVersion(2, 100000003); + VectorClock vc2 = new VectorClock(); + vc2.incrementVersion(1, 100000002); + + ConsistencyCheck.Value v1 = new ConsistencyCheck.VersionValue(new Versioned(value1, vc1)); + ConsistencyCheck.Value v2 = new ConsistencyCheck.VersionValue(new Versioned(value2, vc2)); + // setup - Map>> map = new HashMap>>(); - Map> nodeSetMap = new HashMap>(); + Map>> map = new HashMap>>(); + Map> nodeSetMap = new HashMap>(); Set oneNodeSet = new HashSet(); oneNodeSet.add(cn0_1); Set twoNodeSet = new HashSet(); @@ -315,9 +328,13 @@ public class ConsistencyCheckTest { byte[] keyBytes = { 0, 1, 2, 17, 4 }; ByteArray key = new ByteArray(keyBytes); long now = System.currentTimeMillis(); - Version v1 = new VectorClock(now); - Version v2 = new VectorClock(now + 1); - Versioned versioned = new Versioned(value1, v1); + VectorClock vc1 = new VectorClock(now); + VectorClock vc2 = new VectorClock(now + 1); + Versioned versioned = new Versioned(value1, vc1); + + ConsistencyCheck.Value v1 = new ConsistencyCheck.VersionValue(new Versioned(value1, vc1)); + ConsistencyCheck.Value v2 = new ConsistencyCheck.VersionValue(new Versioned(value2, vc2)); + // make Prefix Nodes Set set = new HashSet(); @@ -326,16 +343,19 @@ public class ConsistencyCheckTest { set.add(cn0_3); // test vector clock - Map> mapVector = new HashMap>(); + Map> mapVector = new HashMap>(); mapVector.put(v1, set); - ((VectorClock) v1).incrementVersion(1, now); + vc1.incrementVersion(1, now); + v1 = new ConsistencyCheck.VersionValue(new Versioned(value1, vc1)); String sVector = ConsistencyCheck.keyVersionToString(key, mapVector, "testStore", 99); assertEquals("BAD_KEY,testStore,99,0001021104," + set.toString().replace(", ", ";") + "," + now + ",[1:1]", sVector); // test two lines - ((VectorClock) v2).incrementVersion(1, now); - ((VectorClock) v2).incrementVersion(1, now + 1); + vc2.incrementVersion(1, now); + vc2.incrementVersion(1, now + 1); + v2 = new ConsistencyCheck.VersionValue(new Versioned(value2, vc2)); + mapVector.put(v2, set); String sVector2 = ConsistencyCheck.keyVersionToString(key, mapVector, "testStore", 99); String s1 = "BAD_KEY,testStore,99,0001021104," + set.toString().replace(", ", ";") + "," @@ -346,8 +366,8 @@ public class ConsistencyCheckTest { assertTrue(sVector2.equals(s1 + s2) || sVector2.equals(s2 + s1)); // test value hash - Version v3 = new HashedValue(versioned); - Map> mapHashed = new HashMap>(); + ConsistencyCheck.Value v3 = new HashedValue(versioned); + Map> mapHashed = new HashMap>(); mapHashed.put(v3, set); assertEquals("BAD_KEY,testStore,99,0001021104," + set.toString().replace(", ", ";") + "," + now + ",[1:1],-1172398097", @@ -391,7 +411,7 @@ public class ConsistencyCheckTest { } @Test - public void testOnePartitionEndToEnd() throws Exception { + public void testOnePartitionEndToEndBasedOnVersion() throws Exception { long now = System.currentTimeMillis(); // setup four nodes with one store and one partition @@ -417,6 +437,7 @@ public class ConsistencyCheckTest { new ClientConfig()); byte[] value = { 1, 2, 3, 4, 5, 6, 7, 8, 9 }; + byte[] value2 = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; // make versions VectorClock vc1 = new VectorClock(); @@ -472,8 +493,8 @@ public class ConsistencyCheckTest { n1store.add(Pair.create(k6, v6)); n2store.add(Pair.create(k6, v6)); - // insert K6(conflicting but not latest version) into node 0,1,2,3 - Versioned v6ConflictEarly = new Versioned(value, vc2); + // insert K6(conflicting value and version) into node 0,1,2,3 + Versioned v6ConflictEarly = new Versioned(value2, vc2); n0store.add(Pair.create(k6, v6ConflictEarly)); n1store.add(Pair.create(k6, v6ConflictEarly)); n2store.add(Pair.create(k6, v6ConflictEarly)); @@ -516,6 +537,8 @@ public class ConsistencyCheckTest { // insert K0(out of retention) into node 0,1,2 Versioned v0 = new Versioned(value, vc3); n0store.add(Pair.create(k0, v0)); + n1store.add(Pair.create(k0, v0)); + n2store.add(Pair.create(k0, v0)); // stream to store adminClient.streamingOps.updateEntries(0, STORE_NAME, n0store.iterator(), null); @@ -527,12 +550,18 @@ public class ConsistencyCheckTest { // INCONSISTENT:2(K6,K2), ignored(K1,K0) List urls = new ArrayList(); urls.add(bootstrapUrl); - ConsistencyCheck checker = new ConsistencyCheck(urls, STORE_NAME, 0, null); - Reporter reporter = null; - checker.connect(); - reporter = checker.execute(); - - assertEquals(7 - 2, reporter.numTotalKeys); - assertEquals(3, reporter.numGoodKeys); + ConsistencyCheck.ComparisonType[] comparisonTypes = ConsistencyCheck.ComparisonType.values(); + + for(ConsistencyCheck.ComparisonType type : comparisonTypes) + { + StringWriter sw = new StringWriter(); + ConsistencyCheck checker = new ConsistencyCheck(urls, STORE_NAME, 0, sw, type); + Reporter reporter = null; + checker.connect(); + reporter = checker.execute(); + + assertEquals(7 - 2, reporter.numTotalKeys); + assertEquals(3, reporter.numGoodKeys); + } } } -- 2.11.4.GIT