From 90c4a9489045805f1ffe87bada2ba09bf00a2aa5 Mon Sep 17 00:00:00 2001 From: Arunachalam Thirupathi Date: Wed, 21 May 2014 11:40:19 -0700 Subject: [PATCH] Add overwrite option to fork lift tool If the key already exists in the destination cluster, forklift tool will not overwrite the key. With the intent of supporting read only cluster data to be fork lifted to the read/write cluster, overwrite option is being added. If the newly forklifted data does not contain the entire key set from the old, then the new data will be the mix of old and new. The old data should expire sooner than the new one enforced by the retention policy. --- .../client/protocol/admin/AdminClient.java | 8 +- src/java/voldemort/utils/ClusterForkLiftTool.java | 173 +++++++++++++-------- .../voldemort/versioning/VectorClockUtils.java | 25 ++- .../voldemort/utils/ClusterForkLiftToolTest.java | 74 ++++++++- 4 files changed, 201 insertions(+), 79 deletions(-) diff --git a/src/java/voldemort/client/protocol/admin/AdminClient.java b/src/java/voldemort/client/protocol/admin/AdminClient.java index 27ac5f8cf..77c556a3d 100644 --- a/src/java/voldemort/client/protocol/admin/AdminClient.java +++ b/src/java/voldemort/client/protocol/admin/AdminClient.java @@ -2678,7 +2678,7 @@ public class AdminClient { * *
          * | swapRO | changeClusterMetadata | changeRebalanceState | Order                        |
-         * |   f    |         t             |          t           | cluster -> rebalance         | 
+         * |   f    |         t             |          t           | cluster -> rebalance         |
          * |   f    |         f             |          t           | rebalance                    |
          * |   t    |         t             |          f           | cluster -> swap              |
          * |   t    |         t             |          t           | cluster -> swap -> rebalance |
@@ -2694,7 +2694,7 @@ public class AdminClient {
          * 
          * 
          * | swapRO | changeClusterMetadata | changeRebalanceState | Order                                    |
-         * |   f    |         t             |          t           | remove from rebalance -> cluster         | 
+         * |   f    |         t             |          t           | remove from rebalance -> cluster         |
          * |   f    |         f             |          t           | remove from rebalance                    |
          * |   t    |         t             |          f           | cluster -> swap                          |
          * |   t    |         t             |          t           | remove from rebalance -> cluster -> swap |
@@ -3663,9 +3663,7 @@ public class AdminClient {
             // FIXME This is a temporary workaround for System store client not
             // being able to do a second insert. We simply generate a super
             // clock that will trump what is on storage
-            VectorClock denseClock = VectorClockUtils.makeClock(currentCluster.getNodeIds(),
-                                                                System.currentTimeMillis(),
-                                                                System.currentTimeMillis());
+            VectorClock denseClock = VectorClockUtils.makeClockWithCurrentTime(currentCluster.getNodeIds());
             quotaSysStoreClient.putSysStore(QuotaUtils.makeQuotaKey(storeName,
                                                                     QuotaType.valueOf(quotaType)),
                                             new Versioned(quotaValue, denseClock));
diff --git a/src/java/voldemort/utils/ClusterForkLiftTool.java b/src/java/voldemort/utils/ClusterForkLiftTool.java
index a4a343020..b3c131fe9 100644
--- a/src/java/voldemort/utils/ClusterForkLiftTool.java
+++ b/src/java/voldemort/utils/ClusterForkLiftTool.java
@@ -23,8 +23,8 @@ import voldemort.VoldemortException;
 import voldemort.client.ClientConfig;
 import voldemort.client.protocol.admin.AdminClient;
 import voldemort.client.protocol.admin.AdminClientConfig;
-import voldemort.client.protocol.admin.QueryKeyResult;
 import voldemort.client.protocol.admin.BaseStreamingClient;
+import voldemort.client.protocol.admin.QueryKeyResult;
 import voldemort.client.protocol.admin.StreamingClientConfig;
 import voldemort.cluster.Cluster;
 import voldemort.cluster.Node;
@@ -36,6 +36,7 @@ import voldemort.versioning.ObsoleteVersionException;
 import voldemort.versioning.TimeBasedInconsistencyResolver;
 import voldemort.versioning.VectorClock;
 import voldemort.versioning.VectorClockInconsistencyResolver;
+import voldemort.versioning.VectorClockUtils;
 import voldemort.versioning.Versioned;
 
 import com.google.common.collect.Lists;
@@ -45,17 +46,19 @@ import com.google.common.collect.Lists;
  * When used in conjunction with a client that "double writes" to both the
  * clusters, this can be a used as a feasible store migration tool to move an
  * existing store to a new cluster.
- * 
+ *
  * There are two modes around how the divergent versions of a key are
  * consolidated from the source cluster. :
- * 
- * 1) Primary only Resolution ({@link ClusterForkLiftTool#SinglePartitionForkLiftTask}: The entries
- * on the primary partition are moved over to the destination cluster with empty
- * vector clocks. if any key has multiple versions on the primary, they are
- * resolved. This approach is fast and is best suited if you deem the replicas
- * being very much in sync with each other. This is the DEFAULT mode
- * 
- * 2) Global Resolution ({@link ClusterForkLiftTool#SinglePartitionGloballyResolvingForkLiftTask} :
+ *
+ * 1) Primary only Resolution (
+ * {@link ClusterForkLiftTool#SinglePartitionForkLiftTask}: The entries on the
+ * primary partition are moved over to the destination cluster with empty vector
+ * clocks. if any key has multiple versions on the primary, they are resolved.
+ * This approach is fast and is best suited if you deem the replicas being very
+ * much in sync with each other. This is the DEFAULT mode
+ *
+ * 2) Global Resolution (
+ * {@link ClusterForkLiftTool#SinglePartitionGloballyResolvingForkLiftTask} :
  * The keys belonging to a partition are fetched out of the primary replica, and
  * for each such key, the corresponding values are obtained from all other
  * replicas, using get(..) operations. These versions are then resolved and
@@ -64,40 +67,40 @@ import com.google.common.collect.Lists;
  * potentially cross colo) and hence should be used when thorough version
  * resolution is neccessary or the admin deems the replicas being fairly
  * out-of-sync
- * 
- * 
+ *
+ *
  * In both mode, the default chained resolver (
  * {@link VectorClockInconsistencyResolver} +
  * {@link TimeBasedInconsistencyResolver} is used to determine a final resolved
  * version.
- * 
+ *
  * NOTES:
- * 
+ *
  * 1) If the tool fails for some reason in the middle, the admin can restart the
  * tool for the failed partitions alone. The keys that were already written in
  * the failed partitions, will all experience {@link ObsoleteVersionException}
  * and the un-inserted keys will be inserted.
- * 
+ *
  * 2) Since the forklift writes are issued with empty vector clocks, they will
  * always yield to online writes happening on the same key, before or during the
  * forklift window. Of course, after the forklift window, the destination
  * cluster resumes normal operation.
- * 
+ *
  * 3) For now, we will fallback to fetching the key from the primary replica,
  * fetch the values out manually, resolve and write it back. PitFalls : primary
  * somehow does not have the key.
- * 
+ *
  * Two scenarios.
- * 
+ *
  * 1) Key active after double writes: the situation is the result of slop not
  * propagating to the primary. But double writes would write the key back to
  * destination cluster anyway. We are good.
- * 
+ *
  * 2) Key inactive after double writes: This indicates a problem elsewhere. This
  * is a base guarantee voldemort should offer.
- * 
+ *
  * 4) Zoned <-> Non Zoned forklift implications.
- * 
+ *
  * When forklifting data from a non-zoned to zoned cluster, both destination
  * zones will be populated with data, by simply running the tool once with the
  * respective bootstrap urls. If you need to forklift data from zoned to
@@ -105,7 +108,7 @@ import com.google.common.collect.Lists;
  * by Voldemort), then you need to run the tool twice for each destination
  * non-zoned cluster. Zoned -> Zoned and Non-Zoned -> Non-Zoned forklifts are
  * trivial.
- * 
+ *
  */
 public class ClusterForkLiftTool implements Runnable {
 
@@ -127,6 +130,12 @@ public class ClusterForkLiftTool implements Runnable {
     private static final int DEFAULT_PARTITION_PARALLELISM = 8;
     private static final int DEFAULT_WORKER_POOL_SHUTDOWN_WAIT_MINS = 5;
 
+    private static final String OVERWRITE_WARNING_MESSAGE = "**WARNING** If source and destination has overlapping keys, will overwrite the destination values "
+                                                            + " using source. The option is ir-reversible. The old value if exists in the destination cluster will "
+                                                            + " be permanently lost. This option could be useful if you are copying read only data from latest "
+                                                            + " otherwise very dangerous option. Think twice before using this option. for keys that only exists "
+                                                            + " in destination, they will be left un-modified. ";
+
     private final AdminClient srcAdminClient;
     private final BaseStreamingClient dstStreamingClient;
     private final List storesList;
@@ -135,9 +144,17 @@ public class ClusterForkLiftTool implements Runnable {
     private final HashMap srcStoreDefMap;
     private final List partitionList;
     private final ForkLiftTaskMode mode;
+    private final Boolean overwrite;
+
+    private static List getStoreDefinitions(AdminClient adminClient) {
+        Cluster cluster = adminClient.getAdminClientCluster();
+        Integer nodeId = cluster.getNodeIds().iterator().next();
+        return adminClient.metadataMgmtOps.getRemoteStoreDefList(nodeId).getValue();
+    }
 
     public ClusterForkLiftTool(String srcBootstrapUrl,
                                String dstBootstrapUrl,
+                               Boolean overwrite,
                                int maxPutsPerSecond,
                                int partitionParallelism,
                                int progressOps,
@@ -156,13 +173,13 @@ public class ClusterForkLiftTool implements Runnable {
         StreamingClientConfig config = new StreamingClientConfig(props);
         this.dstStreamingClient = new BaseStreamingClient(config);
         this.mode = mode;
+        this.overwrite = overwrite;
 
         // determine and verify final list of stores to be forklifted over
         if(storesList != null) {
             this.storesList = storesList;
         } else {
-            this.storesList = StoreUtils.getStoreNames(srcAdminClient.metadataMgmtOps.getRemoteStoreDefList(0)
-                                                                                     .getValue(),
+            this.storesList = StoreUtils.getStoreNames(getStoreDefinitions(srcAdminClient),
                                                        true);
         }
         this.srcStoreDefMap = checkStoresOnBothSides();
@@ -191,11 +208,9 @@ public class ClusterForkLiftTool implements Runnable {
     }
 
     private HashMap checkStoresOnBothSides() {
-        List srcStoreDefs = srcAdminClient.metadataMgmtOps.getRemoteStoreDefList(0)
-                                                                           .getValue();
+        List srcStoreDefs = getStoreDefinitions(srcAdminClient);
         HashMap srcStoreDefMap = StoreUtils.getStoreDefsAsMap(srcStoreDefs);
-        List dstStoreDefs = dstStreamingClient.getAdminClient().metadataMgmtOps.getRemoteStoreDefList(0)
-                                                                                                .getValue();
+        List dstStoreDefs = getStoreDefinitions(dstStreamingClient.getAdminClient());
         HashMap dstStoreDefMap = StoreUtils.getStoreDefsAsMap(dstStoreDefs);
 
         Set storesToSkip = new HashSet();
@@ -209,8 +224,11 @@ public class ClusterForkLiftTool implements Runnable {
                 storesToSkip.add(store);
             }
         }
-        logger.warn("List of stores that will be skipped :" + storesToSkip);
-        storesList.removeAll(storesToSkip);
+
+        if(storesToSkip.size() > 0) {
+            logger.warn("List of stores that will be skipped :" + storesToSkip);
+            storesList.removeAll(storesToSkip);
+        }
         return srcStoreDefMap;
     }
 
@@ -218,7 +236,7 @@ public class ClusterForkLiftTool implements Runnable {
      * TODO this base class can potentially provide some framework of execution
      * for the subclasses, to yield a better objected oriented design (progress
      * tracking etc)
-     * 
+     *
      */
     abstract class SinglePartitionForkLiftTask {
 
@@ -226,6 +244,8 @@ public class ClusterForkLiftTool implements Runnable {
         protected CountDownLatch latch;
         protected StoreRoutingPlan storeInstance;
         protected String workName;
+        private Set dstServerIds;
+        private long entriesForkLifted = 0;
 
         SinglePartitionForkLiftTask(StoreRoutingPlan storeInstance,
                                     int partitionId,
@@ -235,6 +255,27 @@ public class ClusterForkLiftTool implements Runnable {
             this.storeInstance = storeInstance;
             workName = "[Store: " + storeInstance.getStoreDefinition().getName() + ", Partition: "
                        + this.partitionId + "] ";
+            dstServerIds = dstStreamingClient.getAdminClient().getAdminClientCluster().getNodeIds();
+        }
+
+        void streamingPut(ByteArray key, Versioned value) {
+            if(overwrite == false) {
+                dstStreamingClient.streamingPut(key, value);
+            } else {
+                VectorClock denseClock = VectorClockUtils.makeClockWithCurrentTime(dstServerIds);
+                Versioned updatedValue = new Versioned(value.getValue(), denseClock);
+                dstStreamingClient.streamingPut(key, updatedValue);
+            }
+
+            entriesForkLifted++;
+            if(entriesForkLifted % progressOps == 0) {
+                logger.info(workName + " fork lifted " + entriesForkLifted
+                            + " entries successfully");
+            }
+        }
+
+        void printSummary() {
+            logger.info(workName + "Completed processing " + entriesForkLifted + " records");
         }
     }
 
@@ -242,7 +283,7 @@ public class ClusterForkLiftTool implements Runnable {
      * Fetches keys belonging the primary partition, and then fetches values for
      * that key from all replicas in a non-streaming fashion, applies the
      * default resolver and writes it back to the destination cluster
-     * 
+     *
      * TODO a streaming N way merge is the more efficient & correct solution.
      * Without this, the resolving can be very slow due to cross data center
      * get(..)
@@ -258,7 +299,6 @@ public class ClusterForkLiftTool implements Runnable {
 
         public void run() {
             String storeName = this.storeInstance.getStoreDefinition().getName();
-            long entriesForkLifted = 0;
             try {
                 logger.info(workName + "Starting processing");
                 ChainedResolver> resolver = new ChainedResolver>(new VectorClockInconsistencyResolver(),
@@ -296,17 +336,10 @@ public class ClusterForkLiftTool implements Runnable {
                                                      + ByteUtils.toHexString(keyToResolve.get())
                                                      + " vals:" + resolvedVersions);
                     }
-                    dstStreamingClient.streamingPut(keyToResolve,
-                                                    new Versioned(resolvedVersions.get(0)
-                                                                                          .getValue()));
-
-                    entriesForkLifted++;
-                    if(entriesForkLifted % progressOps == 0) {
-                        logger.info(workName + " fork lifted " + entriesForkLifted
-                                    + " entries successfully");
-                    }
+                    Versioned value = new Versioned(resolvedVersions.get(0).getValue());
+                    streamingPut(keyToResolve, value);
                 }
-                logger.info(workName + "Completed processing " + entriesForkLifted + " records");
+                printSummary();
             } catch(Exception e) {
                 // all work should stop if we get here
                 logger.error(workName + "Error forklifting data ", e);
@@ -316,7 +349,7 @@ public class ClusterForkLiftTool implements Runnable {
         }
 
         /**
-         * 
+         *
          * @param nodeIdList
          * @param keyInBytes
          * @return
@@ -346,7 +379,7 @@ public class ClusterForkLiftTool implements Runnable {
      * Simply fetches the data for the partition from the primary replica and
      * writes it into the destination cluster. Works well when the replicas are
      * fairly consistent.
-     * 
+     *
      */
     class SinglePartitionPrimaryResolvingForkLiftTask extends SinglePartitionForkLiftTask implements
             Runnable {
@@ -360,7 +393,6 @@ public class ClusterForkLiftTool implements Runnable {
         @Override
         public void run() {
             String storeName = this.storeInstance.getStoreDefinition().getName();
-            long entriesForkLifted = 0;
             ChainedResolver> resolver = new ChainedResolver>(new VectorClockInconsistencyResolver(),
                                                                                                  new TimeBasedInconsistencyResolver());
             try {
@@ -392,12 +424,7 @@ public class ClusterForkLiftTool implements Runnable {
                         Versioned newEntry = new Versioned(resolvedVersioned.getValue(),
                                                                            new VectorClock(((VectorClock) resolvedVersioned.getVersion()).getTimestamp()));
 
-                        dstStreamingClient.streamingPut(prevKey, newEntry);
-                        entriesForkLifted++;
-                        if(entriesForkLifted % progressOps == 0) {
-                            logger.info(workName + " fork lifted " + entriesForkLifted
-                                        + " entries successfully");
-                        }
+                        streamingPut(prevKey, newEntry);
                         vals = new ArrayList>();
                     }
                     vals.add(versioned);
@@ -411,11 +438,10 @@ public class ClusterForkLiftTool implements Runnable {
                     Versioned resolvedVersioned = resolvedVals.get(0);
                     Versioned newEntry = new Versioned(resolvedVersioned.getValue(),
                                                                        new VectorClock(((VectorClock) resolvedVersioned.getVersion()).getTimestamp()));
-                    dstStreamingClient.streamingPut(prevKey, newEntry);
-                    entriesForkLifted++;
+                    streamingPut(prevKey, newEntry);
                 }
 
-                logger.info(workName + "Completed processing " + entriesForkLifted + " records");
+                printSummary();
             } catch(Exception e) {
                 // if for some reason this partition fails, we will have retry
                 // again for those partitions alone.
@@ -430,7 +456,7 @@ public class ClusterForkLiftTool implements Runnable {
      * Simply fetches the data for the partition from the primary replica and
      * writes it into the destination cluster without resolving any of the
      * conflicting values
-     * 
+     *
      */
     class SinglePartitionNoResolutionForkLiftTask extends SinglePartitionForkLiftTask implements
             Runnable {
@@ -444,7 +470,6 @@ public class ClusterForkLiftTool implements Runnable {
         @Override
         public void run() {
             String storeName = this.storeInstance.getStoreDefinition().getName();
-            long entriesForkLifted = 0;
             try {
                 logger.info(workName + "Starting processing");
                 Iterator>> entryItr = srcAdminClient.bulkFetchOps.fetchEntries(storeInstance.getNodeIdForPartitionId(this.partitionId),
@@ -457,15 +482,9 @@ public class ClusterForkLiftTool implements Runnable {
                     Pair> record = entryItr.next();
                     ByteArray key = record.getFirst();
                     Versioned versioned = record.getSecond();
-                    dstStreamingClient.streamingPut(key, versioned);
-                    entriesForkLifted++;
-                    if(entriesForkLifted % progressOps == 0) {
-                        logger.info(workName + " fork lifted " + entriesForkLifted
-                                    + " entries successfully");
-                    }
-
+                    streamingPut(key, versioned);
                 }
-                logger.info(workName + "Completed processing " + entriesForkLifted + " records");
+                printSummary();
 
             } catch(Exception e) {
                 // if for some reason this partition fails, we will have retry
@@ -550,7 +569,7 @@ public class ClusterForkLiftTool implements Runnable {
 
     /**
      * Return args parser
-     * 
+     *
      * @return program parser
      * */
     private static OptionParser getParser() {
@@ -598,6 +617,14 @@ public class ClusterForkLiftTool implements Runnable {
                        "Determines if a thorough global resolution needs to be done, by comparing all replicas. [Default: "
                                + ForkLiftTaskMode.primary_resolution.toString()
                                + " Fetch from primary alone ]");
+
+
+        parser.accepts("overwrite", OVERWRITE_WARNING_MESSAGE)
+              .withOptionalArg()
+              .describedAs("overwriteExistingValue")
+              .ofType(Boolean.class)
+              .defaultsTo(false);
+
         return parser;
     }
 
@@ -660,8 +687,22 @@ public class ClusterForkLiftTool implements Runnable {
 
         }
 
+        Boolean overwrite = false;
+        if(options.has("overwrite")) {
+            if(options.hasArgument("overwrite")) {
+                overwrite = (Boolean) options.valueOf("overwrite");
+            } else {
+                overwrite = true;
+            }
+        }
+
+        if(overwrite) {
+            logger.warn(OVERWRITE_WARNING_MESSAGE);
+        }
+
         ClusterForkLiftTool forkLiftTool = new ClusterForkLiftTool(srcBootstrapUrl,
                                                                    dstBootstrapUrl,
+                                                                   overwrite,
                                                                    maxPutsPerSecond,
                                                                    partitionParallelism,
                                                                    progressOps,
diff --git a/src/java/voldemort/versioning/VectorClockUtils.java b/src/java/voldemort/versioning/VectorClockUtils.java
index 32bc71487..0a93a1c31 100644
--- a/src/java/voldemort/versioning/VectorClockUtils.java
+++ b/src/java/voldemort/versioning/VectorClockUtils.java
@@ -1,12 +1,12 @@
 /*
  * Copyright 2008-2013 LinkedIn, Inc
- * 
+ *
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
  * use this file except in compliance with the License. You may obtain a copy of
  * the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -33,7 +33,7 @@ public class VectorClockUtils {
      * -- Clock 1 is CONCURRENT to clock 2 if there exists an nodeId, nodeId2
      * such that c1(nodeId) < c2(nodeId) and c1(nodeId2) > c2(nodeId2)
* -- Clock 1 is AFTER clock 2 otherwise - * + * * @param v1 The first VectorClock * @param v2 The second VectorClock */ @@ -95,7 +95,7 @@ public class VectorClockUtils { /** * Given a set of versions, constructs a resolved list of versions based on * the compare function above - * + * * @param values * @return list of values after resolution */ @@ -128,7 +128,7 @@ public class VectorClockUtils { /** * Generates a vector clock with the provided values - * + * * @param serverIds servers in the clock * @param clockValue value of the clock for each server entry * @param timestamp ts value to be set for the clock @@ -142,4 +142,17 @@ public class VectorClockUtils { return new VectorClock(clockEntries, timestamp); } + /** + * Generates a vector clock with the provided nodes and current time stamp + * This clock can be used to overwrite the existing value avoiding obsolete + * version exceptions in most cases, except If the existing Vector Clock was + * generated in custom way. (i.e. existing vector clock does not use + * milliseconds) + * + * @param serverIds servers in the clock + */ + public static VectorClock makeClockWithCurrentTime(Set serverIds) { + return makeClock(serverIds, System.currentTimeMillis(), System.currentTimeMillis()); + } + } diff --git a/test/unit/voldemort/utils/ClusterForkLiftToolTest.java b/test/unit/voldemort/utils/ClusterForkLiftToolTest.java index 71822cdcc..d67a88684 100644 --- a/test/unit/voldemort/utils/ClusterForkLiftToolTest.java +++ b/test/unit/voldemort/utils/ClusterForkLiftToolTest.java @@ -194,6 +194,7 @@ public class ClusterForkLiftToolTest { // perform the forklifting.. ClusterForkLiftTool forkLiftTool = new ClusterForkLiftTool(srcBootStrapUrl, dstBootStrapUrl, + false, 10000, 1, 1000, @@ -212,7 +213,7 @@ public class ClusterForkLiftToolTest { dstPrimaryResolvingStoreClient.get(firstKey).getValue(), "before forklift"); } else if(entry.getKey().equals(lastKey)) { - assertEquals("Online write overwritten", + assertEquals("can't update value after forklift", dstPrimaryResolvingStoreClient.get(lastKey).getValue(), "after forklift"); } else if(entry.getKey().equals(conflictKey)) { @@ -261,6 +262,7 @@ public class ClusterForkLiftToolTest { // perform the forklifting.. ClusterForkLiftTool forkLiftTool = new ClusterForkLiftTool(srcBootStrapUrl, dstBootStrapUrl, + false, 10000, 1, 1000, @@ -279,7 +281,7 @@ public class ClusterForkLiftToolTest { dstGloballyResolvingStoreClient.get(firstKey).getValue(), "before forklift"); } else if(entry.getKey().equals(lastKey)) { - assertEquals("Online write overwritten", + assertEquals("can't update value after forklift", dstGloballyResolvingStoreClient.get(lastKey).getValue(), "after forklift"); } else if(entry.getKey().equals(conflictKey)) { @@ -295,6 +297,73 @@ public class ClusterForkLiftToolTest { } @Test + public void testForkLiftOverWrite() throws Exception { + + StoreRoutingPlan srcStoreInstance = new StoreRoutingPlan(srcCluster, + globallyResolvingStoreDef); + + // populate data on the source cluster.. + for(Map.Entry entry: kvPairs.entrySet()) { + srcGloballyResolvingStoreClient.put(entry.getKey(), entry.getValue()); + } + + // generate a conflict on the primary and a secondary + List nodeList = srcStoreInstance.getReplicationNodeList(srcStoreInstance.getMasterPartitionId(conflictKey.getBytes("UTF-8"))); + VectorClock losingClock = new VectorClock(Lists.newArrayList(new ClockEntry((short) 0, 5)), + System.currentTimeMillis()); + VectorClock winningClock = new VectorClock(Lists.newArrayList(new ClockEntry((short) 1, 5)), + losingClock.getTimestamp() + 1); + srcAdminClient.storeOps.putNodeKeyValue(GLOBALLY_RESOLVING_STORE_NAME, + new NodeValue(nodeList.get(0), + new ByteArray(conflictKey.getBytes("UTF-8")), + new Versioned("losing value".getBytes("UTF-8"), + losingClock))); + srcAdminClient.storeOps.putNodeKeyValue(GLOBALLY_RESOLVING_STORE_NAME, + new NodeValue(nodeList.get(1), + new ByteArray(conflictKey.getBytes("UTF-8")), + new Versioned("winning value".getBytes("UTF-8"), + winningClock))); + + // *** do a write to destination cluster *** + // This is the main test , where after fork lift this value should be + // overwritten. This is the only difference. + dstGloballyResolvingStoreClient.put(firstKey, "before forklift"); + + // Make the current thread sleep , so when the new clock is generated + // using milliSeconds it is greater and must be overwritten. + Thread.sleep(2); + + // perform the forklifting.. + ClusterForkLiftTool forkLiftTool = new ClusterForkLiftTool(srcBootStrapUrl, + dstBootStrapUrl, + true, // OverWrite + 10000, + 1, + 1000, + Lists.newArrayList(GLOBALLY_RESOLVING_STORE_NAME), + null, + ClusterForkLiftTool.ForkLiftTaskMode.global_resolution); + forkLiftTool.run(); + + // do a write to destination cluster + dstGloballyResolvingStoreClient.put(lastKey, "after forklift"); + + // verify data on the destination is as expected + for(Map.Entry entry: kvPairs.entrySet()) { + String dstClusterValue = dstGloballyResolvingStoreClient.get(entry.getKey()).getValue(); + if(entry.getKey().equals(lastKey)) { + assertEquals("can't update value after forklift", dstClusterValue, "after forklift"); + } else if(entry.getKey().equals(conflictKey)) { + assertEquals("Conflict resolution incorrect", dstClusterValue, "winning value"); + } else { + if(!dstClusterValue.equals(entry.getValue())) { + assertEquals("fork lift data missing", dstClusterValue, entry.getValue()); + } + } + } + } + + @Test public void testNoresolutionForkLift() throws Exception { int versions = 0; @@ -320,6 +389,7 @@ public class ClusterForkLiftToolTest { // perform the forklifting.. ClusterForkLiftTool forkLiftTool = new ClusterForkLiftTool(srcBootStrapUrl, dstBootStrapUrl, + false, 10000, 1, 1000, -- 2.11.4.GIT