1 package voldemort
.utils
;
3 import java
.util
.ArrayList
;
4 import java
.util
.Collections
;
5 import java
.util
.HashMap
;
6 import java
.util
.HashSet
;
7 import java
.util
.Iterator
;
11 import java
.util
.concurrent
.Callable
;
12 import java
.util
.concurrent
.CountDownLatch
;
13 import java
.util
.concurrent
.ExecutorService
;
14 import java
.util
.concurrent
.Executors
;
15 import java
.util
.concurrent
.TimeUnit
;
17 import joptsimple
.OptionParser
;
18 import joptsimple
.OptionSet
;
20 import org
.apache
.log4j
.Logger
;
22 import voldemort
.VoldemortException
;
23 import voldemort
.client
.ClientConfig
;
24 import voldemort
.client
.protocol
.admin
.AdminClient
;
25 import voldemort
.client
.protocol
.admin
.AdminClientConfig
;
26 import voldemort
.client
.protocol
.admin
.BaseStreamingClient
;
27 import voldemort
.client
.protocol
.admin
.QueryKeyResult
;
28 import voldemort
.client
.protocol
.admin
.StreamingClientConfig
;
29 import voldemort
.cluster
.Cluster
;
30 import voldemort
.cluster
.Node
;
31 import voldemort
.routing
.StoreRoutingPlan
;
32 import voldemort
.store
.StoreDefinition
;
33 import voldemort
.store
.StoreUtils
;
34 import voldemort
.versioning
.ChainedResolver
;
35 import voldemort
.versioning
.ObsoleteVersionException
;
36 import voldemort
.versioning
.TimeBasedInconsistencyResolver
;
37 import voldemort
.versioning
.VectorClock
;
38 import voldemort
.versioning
.VectorClockInconsistencyResolver
;
39 import voldemort
.versioning
.VectorClockUtils
;
40 import voldemort
.versioning
.Versioned
;
42 import com
.google
.common
.collect
.Lists
;
45 * Tool to fork lift data over from a source cluster to a destination cluster.
46 * When used in conjunction with a client that "double writes" to both the
47 * clusters, this can be a used as a feasible store migration tool to move an
48 * existing store to a new cluster.
50 * There are two modes around how the divergent versions of a key are
51 * consolidated from the source cluster. :
53 * 1) Primary only Resolution (
54 * {@link ClusterForkLiftTool#SinglePartitionForkLiftTask}: The entries on the
55 * primary partition are moved over to the destination cluster with empty vector
56 * clocks. if any key has multiple versions on the primary, they are resolved.
57 * This approach is fast and is best suited if you deem the replicas being very
58 * much in sync with each other. This is the DEFAULT mode
60 * 2) Global Resolution (
61 * {@link ClusterForkLiftTool#SinglePartitionGloballyResolvingForkLiftTask} :
62 * The keys belonging to a partition are fetched out of the primary replica, and
63 * for each such key, the corresponding values are obtained from all other
64 * replicas, using get(..) operations. These versions are then resolved and
65 * written back to the destination cluster as before. This approach is slow
66 * since it involves several roundtrips to the server for each key (some
67 * potentially cross colo) and hence should be used when thorough version
68 * resolution is neccessary or the admin deems the replicas being fairly
72 * In both mode, the default chained resolver (
73 * {@link VectorClockInconsistencyResolver} +
74 * {@link TimeBasedInconsistencyResolver} is used to determine a final resolved
79 * 1) If the tool fails for some reason in the middle, the admin can restart the
80 * tool for the failed partitions alone. The keys that were already written in
81 * the failed partitions, will all experience {@link ObsoleteVersionException}
82 * and the un-inserted keys will be inserted.
84 * 2) Since the forklift writes are issued with empty vector clocks, they will
85 * always yield to online writes happening on the same key, before or during the
86 * forklift window. Of course, after the forklift window, the destination
87 * cluster resumes normal operation.
89 * 3) For now, we will fallback to fetching the key from the primary replica,
90 * fetch the values out manually, resolve and write it back. PitFalls : primary
91 * somehow does not have the key.
95 * 1) Key active after double writes: the situation is the result of slop not
96 * propagating to the primary. But double writes would write the key back to
97 * destination cluster anyway. We are good.
99 * 2) Key inactive after double writes: This indicates a problem elsewhere. This
100 * is a base guarantee voldemort should offer.
102 * 4) Zoned <-> Non Zoned forklift implications.
104 * When forklifting data from a non-zoned to zoned cluster, both destination
105 * zones will be populated with data, by simply running the tool once with the
106 * respective bootstrap urls. If you need to forklift data from zoned to
107 * non-zoned clusters (i.e your replication between datacenters is not handled
108 * by Voldemort), then you need to run the tool twice for each destination
109 * non-zoned cluster. Zoned -> Zoned and Non-Zoned -> Non-Zoned forklifts are
113 public class ClusterForkLiftTool
implements Runnable
{
116 * different modes available with the forklift tool
118 enum ForkLiftTaskMode
{
119 global_resolution
, /* Fetch data from all partitions and do resolution */
120 primary_resolution
, /*
121 * Fetch data from primary partition and do
124 no_resolution
/* fetch data from primary parition and do no resolution */
127 private static Logger logger
= Logger
.getLogger(ClusterForkLiftTool
.class);
128 private static final int DEFAULT_MAX_PUTS_PER_SEC
= 500;
129 private static final int DEFAULT_PROGRESS_PERIOD_OPS
= 100000;
130 private static final int DEFAULT_PARTITION_PARALLELISM
= 8;
131 private static final int DEFAULT_WORKER_POOL_SHUTDOWN_WAIT_MINS
= 5;
133 private static final String OVERWRITE_WARNING_MESSAGE
= "**WARNING** If source and destination has overlapping keys, will overwrite the destination values "
134 + " using source. The option is ir-reversible. The old value if exists in the destination cluster will "
135 + " be permanently lost. This option could be useful if you are copying read only data from latest "
136 + " otherwise very dangerous option. Think twice before using this option. for keys that only exists "
137 + " in destination, they will be left un-modified. ";
139 private final AdminClient srcAdminClient
;
140 private final BaseStreamingClient dstStreamingClient
;
141 private final List
<String
> storesList
;
142 private final ExecutorService workerPool
;
143 private final int progressOps
;
144 private final HashMap
<String
, StoreDefinition
> srcStoreDefMap
;
145 private final List
<Integer
> partitionList
;
146 private final ForkLiftTaskMode mode
;
147 private final Boolean overwrite
;
149 private static List
<StoreDefinition
> getStoreDefinitions(AdminClient adminClient
) {
150 Cluster cluster
= adminClient
.getAdminClientCluster();
151 Integer nodeId
= cluster
.getNodeIds().iterator().next();
152 return adminClient
.metadataMgmtOps
.getRemoteStoreDefList(nodeId
).getValue();
155 public ClusterForkLiftTool(String srcBootstrapUrl
,
156 String dstBootstrapUrl
,
158 int maxPutsPerSecond
,
159 int partitionParallelism
,
161 List
<String
> storesList
,
162 List
<Integer
> partitions
,
163 ForkLiftTaskMode mode
) {
164 // set up AdminClient on source cluster
165 this.srcAdminClient
= new AdminClient(srcBootstrapUrl
,
166 new AdminClientConfig(),
169 // set up streaming client to the destination cluster
170 Props props
= new Props();
171 props
.put("streaming.platform.bootstrapURL", dstBootstrapUrl
);
172 props
.put("streaming.platform.throttle.qps", maxPutsPerSecond
);
173 StreamingClientConfig config
= new StreamingClientConfig(props
);
174 this.dstStreamingClient
= new BaseStreamingClient(config
);
176 this.overwrite
= overwrite
;
178 // determine and verify final list of stores to be forklifted over
179 if(storesList
!= null) {
180 this.storesList
= storesList
;
182 this.storesList
= StoreUtils
.getStoreNames(getStoreDefinitions(srcAdminClient
),
185 this.srcStoreDefMap
= checkStoresOnBothSides();
187 // determine the partitions to be fetched
188 if(partitions
!= null) {
189 this.partitionList
= partitions
;
191 this.partitionList
= new ArrayList
<Integer
>(srcAdminClient
.getAdminClientCluster()
192 .getNumberOfPartitions());
193 for(Node node
: srcAdminClient
.getAdminClientCluster().getNodes())
194 this.partitionList
.addAll(node
.getPartitionIds());
195 // shuffle the partition list so the fetching will equally spread
196 // across the source cluster
197 Collections
.shuffle(this.partitionList
);
198 if(this.partitionList
.size() > srcAdminClient
.getAdminClientCluster()
199 .getNumberOfPartitions()) {
200 throw new VoldemortException("Incorrect partition mapping in source cluster");
204 // set up thread pool to parallely forklift partitions
205 this.workerPool
= Executors
.newFixedThreadPool(partitionParallelism
);
206 this.progressOps
= progressOps
;
210 private HashMap
<String
, StoreDefinition
> checkStoresOnBothSides() {
211 List
<StoreDefinition
> srcStoreDefs
= getStoreDefinitions(srcAdminClient
);
212 HashMap
<String
, StoreDefinition
> srcStoreDefMap
= StoreUtils
.getStoreDefsAsMap(srcStoreDefs
);
213 List
<StoreDefinition
> dstStoreDefs
= getStoreDefinitions(dstStreamingClient
.getAdminClient());
214 HashMap
<String
, StoreDefinition
> dstStoreDefMap
= StoreUtils
.getStoreDefsAsMap(dstStoreDefs
);
216 Set
<String
> storesToSkip
= new HashSet
<String
>();
217 for(String store
: storesList
) {
218 if(!srcStoreDefMap
.containsKey(store
)) {
219 logger
.warn("Store " + store
+ " does not exist in source cluster ");
220 storesToSkip
.add(store
);
222 if(!dstStoreDefMap
.containsKey(store
)) {
223 logger
.warn("Store " + store
+ " does not exist in destination cluster ");
224 storesToSkip
.add(store
);
228 if(storesToSkip
.size() > 0) {
229 logger
.warn("List of stores that will be skipped :" + storesToSkip
);
230 storesList
.removeAll(storesToSkip
);
232 return srcStoreDefMap
;
236 * TODO this base class can potentially provide some framework of execution
237 * for the subclasses, to yield a better objected oriented design (progress
241 abstract class SinglePartitionForkLiftTask
{
243 protected int partitionId
;
244 protected CountDownLatch latch
;
245 protected StoreRoutingPlan storeInstance
;
246 protected String workName
;
247 private Set
<Integer
> dstServerIds
;
248 private long entriesForkLifted
= 0;
250 SinglePartitionForkLiftTask(StoreRoutingPlan storeInstance
,
252 CountDownLatch latch
) {
253 this.partitionId
= partitionId
;
255 this.storeInstance
= storeInstance
;
256 workName
= "[Store: " + storeInstance
.getStoreDefinition().getName() + ", Partition: "
257 + this.partitionId
+ "] ";
258 dstServerIds
= dstStreamingClient
.getAdminClient().getAdminClientCluster().getNodeIds();
261 void streamingPut(ByteArray key
, Versioned
<byte[]> value
) {
262 if(overwrite
== false) {
263 dstStreamingClient
.streamingPut(key
, value
);
265 VectorClock denseClock
= VectorClockUtils
.makeClockWithCurrentTime(dstServerIds
);
266 Versioned
<byte[]> updatedValue
= new Versioned
<byte[]>(value
.getValue(), denseClock
);
267 dstStreamingClient
.streamingPut(key
, updatedValue
);
271 if(entriesForkLifted
% progressOps
== 0) {
272 logger
.info(workName
+ " fork lifted " + entriesForkLifted
273 + " entries successfully");
277 void printSummary() {
278 logger
.info(workName
+ "Completed processing " + entriesForkLifted
+ " records");
283 * Fetches keys belonging the primary partition, and then fetches values for
284 * that key from all replicas in a non-streaming fashion, applies the
285 * default resolver and writes it back to the destination cluster
287 * TODO a streaming N way merge is the more efficient & correct solution.
288 * Without this, the resolving can be very slow due to cross data center
291 class SinglePartitionGloballyResolvingForkLiftTask
extends SinglePartitionForkLiftTask
292 implements Runnable
{
294 SinglePartitionGloballyResolvingForkLiftTask(StoreRoutingPlan storeInstance
,
296 CountDownLatch latch
) {
297 super(storeInstance
, partitionId
, latch
);
301 String storeName
= this.storeInstance
.getStoreDefinition().getName();
303 logger
.info(workName
+ "Starting processing");
304 ChainedResolver
<Versioned
<byte[]>> resolver
= new ChainedResolver
<Versioned
<byte[]>>(new VectorClockInconsistencyResolver
<byte[]>(),
305 new TimeBasedInconsistencyResolver
<byte[]>());
306 Iterator
<ByteArray
> keyItr
= srcAdminClient
.bulkFetchOps
.fetchKeys(storeInstance
.getNodeIdForPartitionId(this.partitionId
),
308 Lists
.newArrayList(this.partitionId
),
311 List
<Integer
> nodeList
= storeInstance
.getReplicationNodeList(this.partitionId
);
312 while(keyItr
.hasNext()) {
313 ByteArray keyToResolve
= keyItr
.next();
314 Map
<Integer
, QueryKeyResult
> valuesMap
= doReads(nodeList
, keyToResolve
.get());
315 List
<Versioned
<byte[]>> values
= new ArrayList
<Versioned
<byte[]>>(valuesMap
.size());
316 for(Map
.Entry
<Integer
, QueryKeyResult
> entry
: valuesMap
.entrySet()) {
317 int nodeId
= entry
.getKey();
318 QueryKeyResult result
= entry
.getValue();
320 if(result
.hasException()) {
321 logger
.error(workName
+ "key fetch failed for key "
322 + ByteUtils
.toHexString(keyToResolve
.get())
323 + " on node " + nodeId
,
324 result
.getException());
327 values
.addAll(result
.getValues());
330 List
<Versioned
<byte[]>> resolvedVersions
= resolver
.resolveConflicts(values
);
331 // after timestamp based resolving there should be only one
332 // version. Insert that to the destination cluster with
333 // empty vector clock
334 if(resolvedVersions
.size() > 1) {
335 throw new VoldemortException("More than one resolved versions, key: "
336 + ByteUtils
.toHexString(keyToResolve
.get())
337 + " vals:" + resolvedVersions
);
339 Versioned
<byte[]> value
= new Versioned
<byte[]>(resolvedVersions
.get(0).getValue());
340 streamingPut(keyToResolve
, value
);
343 } catch(Exception e
) {
344 // all work should stop if we get here
345 logger
.error(workName
+ "Error forklifting data ", e
);
357 private Map
<Integer
, QueryKeyResult
> doReads(final List
<Integer
> nodeIdList
,
358 final byte[] keyInBytes
) {
359 Map
<Integer
, QueryKeyResult
> nodeIdToKeyValues
= new HashMap
<Integer
, QueryKeyResult
>();
361 ByteArray key
= new ByteArray(keyInBytes
);
362 for(int nodeId
: nodeIdList
) {
363 List
<Versioned
<byte[]>> values
= null;
365 values
= srcAdminClient
.storeOps
.getNodeKey(storeInstance
.getStoreDefinition()
369 nodeIdToKeyValues
.put(nodeId
, new QueryKeyResult(key
, values
));
370 } catch(VoldemortException ve
) {
371 nodeIdToKeyValues
.put(nodeId
, new QueryKeyResult(key
, ve
));
374 return nodeIdToKeyValues
;
379 * Simply fetches the data for the partition from the primary replica and
380 * writes it into the destination cluster. Works well when the replicas are
384 class SinglePartitionPrimaryResolvingForkLiftTask
extends SinglePartitionForkLiftTask
implements
387 SinglePartitionPrimaryResolvingForkLiftTask(StoreRoutingPlan storeInstance
,
389 CountDownLatch latch
) {
390 super(storeInstance
, partitionId
, latch
);
395 String storeName
= this.storeInstance
.getStoreDefinition().getName();
396 ChainedResolver
<Versioned
<byte[]>> resolver
= new ChainedResolver
<Versioned
<byte[]>>(new VectorClockInconsistencyResolver
<byte[]>(),
397 new TimeBasedInconsistencyResolver
<byte[]>());
399 logger
.info(workName
+ "Starting processing");
400 Iterator
<Pair
<ByteArray
, Versioned
<byte[]>>> entryItr
= srcAdminClient
.bulkFetchOps
.fetchEntries(storeInstance
.getNodeIdForPartitionId(this.partitionId
),
402 Lists
.newArrayList(this.partitionId
),
405 ByteArray prevKey
= null;
406 List
<Versioned
<byte[]>> vals
= new ArrayList
<Versioned
<byte[]>>();
408 while(entryItr
.hasNext()) {
409 Pair
<ByteArray
, Versioned
<byte[]>> record
= entryItr
.next();
410 ByteArray key
= record
.getFirst();
411 Versioned
<byte[]> versioned
= record
.getSecond();
413 if(prevKey
!= null && !prevKey
.equals(key
)) {
414 // resolve and write, if you see a new key
415 List
<Versioned
<byte[]>> resolvedVersions
= resolver
.resolveConflicts(vals
);
416 if(resolvedVersions
.size() > 1) {
417 throw new VoldemortException("More than one resolved versions, key: "
418 + ByteUtils
.toHexString(prevKey
.get())
419 + " vals:" + resolvedVersions
);
421 Versioned
<byte[]> resolvedVersioned
= resolvedVersions
.get(0);
422 // an empty vector clock will ensure, online traffic
423 // will always win over the forklift writes
424 Versioned
<byte[]> newEntry
= new Versioned
<byte[]>(resolvedVersioned
.getValue(),
425 new VectorClock(((VectorClock
) resolvedVersioned
.getVersion()).getTimestamp()));
427 streamingPut(prevKey
, newEntry
);
428 vals
= new ArrayList
<Versioned
<byte[]>>();
434 // process the last record
435 if(vals
.size() > 0) {
436 List
<Versioned
<byte[]>> resolvedVals
= resolver
.resolveConflicts(vals
);
437 assert resolvedVals
.size() == 1;
438 Versioned
<byte[]> resolvedVersioned
= resolvedVals
.get(0);
439 Versioned
<byte[]> newEntry
= new Versioned
<byte[]>(resolvedVersioned
.getValue(),
440 new VectorClock(((VectorClock
) resolvedVersioned
.getVersion()).getTimestamp()));
441 streamingPut(prevKey
, newEntry
);
445 } catch(Exception e
) {
446 // if for some reason this partition fails, we will have retry
447 // again for those partitions alone.
448 logger
.error(workName
+ "Error forklifting data ", e
);
456 * Simply fetches the data for the partition from the primary replica and
457 * writes it into the destination cluster without resolving any of the
461 class SinglePartitionNoResolutionForkLiftTask
extends SinglePartitionForkLiftTask
implements
464 SinglePartitionNoResolutionForkLiftTask(StoreRoutingPlan storeInstance
,
466 CountDownLatch latch
) {
467 super(storeInstance
, partitionId
, latch
);
472 String storeName
= this.storeInstance
.getStoreDefinition().getName();
474 logger
.info(workName
+ "Starting processing");
475 Iterator
<Pair
<ByteArray
, Versioned
<byte[]>>> entryItr
= srcAdminClient
.bulkFetchOps
.fetchEntries(storeInstance
.getNodeIdForPartitionId(this.partitionId
),
477 Lists
.newArrayList(this.partitionId
),
481 while(entryItr
.hasNext()) {
482 Pair
<ByteArray
, Versioned
<byte[]>> record
= entryItr
.next();
483 ByteArray key
= record
.getFirst();
484 Versioned
<byte[]> versioned
= record
.getSecond();
485 streamingPut(key
, versioned
);
489 } catch(Exception e
) {
490 // if for some reason this partition fails, we will have retry
491 // again for those partitions alone.
492 logger
.error(workName
+ "Error forklifting data ", e
);
502 final Cluster srcCluster
= srcAdminClient
.getAdminClientCluster();
504 // process stores one-by-one
505 for(String store
: storesList
) {
506 logger
.info("Processing store " + store
);
507 dstStreamingClient
.initStreamingSession(store
, new Callable
<Object
>() {
510 public Object
call() throws Exception
{
514 }, new Callable
<Object
>() {
517 public Object
call() throws Exception
{
523 final CountDownLatch latch
= new CountDownLatch(srcCluster
.getNumberOfPartitions());
524 StoreRoutingPlan storeInstance
= new StoreRoutingPlan(srcCluster
,
525 srcStoreDefMap
.get(store
));
527 // submit work on every partition that is to be forklifted
528 for(Integer partitionId
: partitionList
) {
529 if(this.mode
== ForkLiftTaskMode
.global_resolution
) {
530 // do thorough global resolution across replicas
531 SinglePartitionGloballyResolvingForkLiftTask work
= new SinglePartitionGloballyResolvingForkLiftTask(storeInstance
,
534 workerPool
.submit(work
);
535 } else if(this.mode
== ForkLiftTaskMode
.primary_resolution
) {
536 // do the less cleaner, but much faster route
537 SinglePartitionPrimaryResolvingForkLiftTask work
= new SinglePartitionPrimaryResolvingForkLiftTask(storeInstance
,
540 workerPool
.submit(work
);
541 } else if(this.mode
== ForkLiftTaskMode
.no_resolution
) {
542 // do the less cleaner, but much faster route
543 SinglePartitionNoResolutionForkLiftTask work
= new SinglePartitionNoResolutionForkLiftTask(storeInstance
,
546 workerPool
.submit(work
);
550 // wait till all the partitions are processed
552 dstStreamingClient
.closeStreamingSession();
553 logger
.info("Finished processing store " + store
);
555 } catch(Exception e
) {
556 logger
.error("Exception running forklift tool", e
);
558 workerPool
.shutdown();
560 workerPool
.awaitTermination(DEFAULT_WORKER_POOL_SHUTDOWN_WAIT_MINS
,
562 } catch(InterruptedException ie
) {
563 logger
.error("InterruptedException while waiting for worker pool to shutdown", ie
);
565 srcAdminClient
.close();
566 dstStreamingClient
.getAdminClient().close();
573 * @return program parser
575 private static OptionParser
getParser() {
576 OptionParser parser
= new OptionParser();
577 parser
.accepts("help", "print help information");
578 parser
.accepts("src-url", "[REQUIRED] bootstrap URL of source cluster")
580 .describedAs("source-bootstrap-url")
581 .ofType(String
.class);
582 parser
.accepts("dst-url", "[REQUIRED] bootstrap URL of destination cluster")
584 .describedAs("destination-bootstrap-url")
585 .ofType(String
.class);
586 parser
.accepts("stores",
587 "Store names to forklift. Comma delimited list or singleton. [Default: ALL SOURCE STORES]")
589 .describedAs("stores")
590 .withValuesSeparatedBy(',')
591 .ofType(String
.class);
592 parser
.accepts("partitions",
593 "partitions to forklift. Comma delimited list or singleton. [Default: ALL SOURCE PARTITIONS]")
595 .describedAs("partitions")
596 .withValuesSeparatedBy(',')
597 .ofType(Integer
.class);
598 parser
.accepts("max-puts-per-second",
599 "Maximum number of put(...) operations issued against destination cluster per second. [Default: "
600 + DEFAULT_MAX_PUTS_PER_SEC
+ " ]")
602 .describedAs("maxPutsPerSecond")
603 .ofType(Integer
.class);
604 parser
.accepts("progress-period-ops",
605 "Number of operations between progress info is displayed. [Default: "
606 + DEFAULT_PROGRESS_PERIOD_OPS
+ " ]")
608 .describedAs("progressPeriodOps")
609 .ofType(Integer
.class);
610 parser
.accepts("parallelism",
611 "Number of partitions to fetch in parallel. [Default: "
612 + DEFAULT_PARTITION_PARALLELISM
+ " ]")
614 .describedAs("partitionParallelism")
615 .ofType(Integer
.class);
616 parser
.accepts("mode",
617 "Determines if a thorough global resolution needs to be done, by comparing all replicas. [Default: "
618 + ForkLiftTaskMode
.primary_resolution
.toString()
619 + " Fetch from primary alone ]");
622 parser
.accepts("overwrite", OVERWRITE_WARNING_MESSAGE
)
624 .describedAs("overwriteExistingValue")
625 .ofType(Boolean
.class)
634 public static void main(String
[] args
) throws Exception
{
635 OptionParser parser
= null;
636 OptionSet options
= null;
638 parser
= getParser();
639 options
= parser
.parse(args
);
640 } catch(Exception oe
) {
641 logger
.error("Exception processing command line options", oe
);
642 parser
.printHelpOn(System
.out
);
646 /* validate options */
647 if(options
.has("help")) {
648 parser
.printHelpOn(System
.out
);
652 if(!options
.has("src-url") || !options
.has("dst-url")) {
653 logger
.error("Both 'src-url' and 'dst-url' options are mandatory");
654 parser
.printHelpOn(System
.out
);
658 String srcBootstrapUrl
= (String
) options
.valueOf("src-url");
659 String dstBootstrapUrl
= (String
) options
.valueOf("dst-url");
660 int maxPutsPerSecond
= DEFAULT_MAX_PUTS_PER_SEC
;
661 if(options
.has("max-puts-per-second"))
662 maxPutsPerSecond
= (Integer
) options
.valueOf("max-puts-per-second");
663 List
<String
> storesList
= null;
664 if(options
.has("stores")) {
665 storesList
= new ArrayList
<String
>((List
<String
>) options
.valuesOf("stores"));
667 List
<Integer
> partitions
= null;
668 if(options
.has("partitions")) {
669 partitions
= (List
<Integer
>) options
.valuesOf("partitions");
672 int partitionParallelism
= DEFAULT_PARTITION_PARALLELISM
;
673 if(options
.has("parallelism")) {
674 partitionParallelism
= (Integer
) options
.valueOf("parallelism");
676 int progressOps
= DEFAULT_PROGRESS_PERIOD_OPS
;
677 if(options
.has("progress-period-ops")) {
678 progressOps
= (Integer
) options
.valueOf("progress-period-ops");
681 ForkLiftTaskMode mode
;
682 mode
= ForkLiftTaskMode
.primary_resolution
;
683 if(options
.has("mode")) {
684 mode
= Utils
.getEnumFromString(ForkLiftTaskMode
.class, (String
) options
.valueOf("mode"));
686 mode
= ForkLiftTaskMode
.primary_resolution
;
690 Boolean overwrite
= false;
691 if(options
.has("overwrite")) {
692 if(options
.hasArgument("overwrite")) {
693 overwrite
= (Boolean
) options
.valueOf("overwrite");
700 logger
.warn(OVERWRITE_WARNING_MESSAGE
);
703 ClusterForkLiftTool forkLiftTool
= new ClusterForkLiftTool(srcBootstrapUrl
,
707 partitionParallelism
,
713 // TODO cleanly shut down the hanging threadpool