Add overwrite option to fork lift tool
[voldemort/jeffpc.git] / src / java / voldemort / utils / ClusterForkLiftTool.java
blobb3c131fe90a45ceeb31ab7b0af655327912237d5
1 package voldemort.utils;
3 import java.util.ArrayList;
4 import java.util.Collections;
5 import java.util.HashMap;
6 import java.util.HashSet;
7 import java.util.Iterator;
8 import java.util.List;
9 import java.util.Map;
10 import java.util.Set;
11 import java.util.concurrent.Callable;
12 import java.util.concurrent.CountDownLatch;
13 import java.util.concurrent.ExecutorService;
14 import java.util.concurrent.Executors;
15 import java.util.concurrent.TimeUnit;
17 import joptsimple.OptionParser;
18 import joptsimple.OptionSet;
20 import org.apache.log4j.Logger;
22 import voldemort.VoldemortException;
23 import voldemort.client.ClientConfig;
24 import voldemort.client.protocol.admin.AdminClient;
25 import voldemort.client.protocol.admin.AdminClientConfig;
26 import voldemort.client.protocol.admin.BaseStreamingClient;
27 import voldemort.client.protocol.admin.QueryKeyResult;
28 import voldemort.client.protocol.admin.StreamingClientConfig;
29 import voldemort.cluster.Cluster;
30 import voldemort.cluster.Node;
31 import voldemort.routing.StoreRoutingPlan;
32 import voldemort.store.StoreDefinition;
33 import voldemort.store.StoreUtils;
34 import voldemort.versioning.ChainedResolver;
35 import voldemort.versioning.ObsoleteVersionException;
36 import voldemort.versioning.TimeBasedInconsistencyResolver;
37 import voldemort.versioning.VectorClock;
38 import voldemort.versioning.VectorClockInconsistencyResolver;
39 import voldemort.versioning.VectorClockUtils;
40 import voldemort.versioning.Versioned;
42 import com.google.common.collect.Lists;
44 /**
45 * Tool to fork lift data over from a source cluster to a destination cluster.
46 * When used in conjunction with a client that "double writes" to both the
47 * clusters, this can be a used as a feasible store migration tool to move an
48 * existing store to a new cluster.
50 * There are two modes around how the divergent versions of a key are
51 * consolidated from the source cluster. :
53 * 1) Primary only Resolution (
54 * {@link ClusterForkLiftTool#SinglePartitionForkLiftTask}: The entries on the
55 * primary partition are moved over to the destination cluster with empty vector
56 * clocks. if any key has multiple versions on the primary, they are resolved.
57 * This approach is fast and is best suited if you deem the replicas being very
58 * much in sync with each other. This is the DEFAULT mode
60 * 2) Global Resolution (
61 * {@link ClusterForkLiftTool#SinglePartitionGloballyResolvingForkLiftTask} :
62 * The keys belonging to a partition are fetched out of the primary replica, and
63 * for each such key, the corresponding values are obtained from all other
64 * replicas, using get(..) operations. These versions are then resolved and
65 * written back to the destination cluster as before. This approach is slow
66 * since it involves several roundtrips to the server for each key (some
67 * potentially cross colo) and hence should be used when thorough version
68 * resolution is neccessary or the admin deems the replicas being fairly
69 * out-of-sync
72 * In both mode, the default chained resolver (
73 * {@link VectorClockInconsistencyResolver} +
74 * {@link TimeBasedInconsistencyResolver} is used to determine a final resolved
75 * version.
77 * NOTES:
79 * 1) If the tool fails for some reason in the middle, the admin can restart the
80 * tool for the failed partitions alone. The keys that were already written in
81 * the failed partitions, will all experience {@link ObsoleteVersionException}
82 * and the un-inserted keys will be inserted.
84 * 2) Since the forklift writes are issued with empty vector clocks, they will
85 * always yield to online writes happening on the same key, before or during the
86 * forklift window. Of course, after the forklift window, the destination
87 * cluster resumes normal operation.
89 * 3) For now, we will fallback to fetching the key from the primary replica,
90 * fetch the values out manually, resolve and write it back. PitFalls : primary
91 * somehow does not have the key.
93 * Two scenarios.
95 * 1) Key active after double writes: the situation is the result of slop not
96 * propagating to the primary. But double writes would write the key back to
97 * destination cluster anyway. We are good.
99 * 2) Key inactive after double writes: This indicates a problem elsewhere. This
100 * is a base guarantee voldemort should offer.
102 * 4) Zoned <-> Non Zoned forklift implications.
104 * When forklifting data from a non-zoned to zoned cluster, both destination
105 * zones will be populated with data, by simply running the tool once with the
106 * respective bootstrap urls. If you need to forklift data from zoned to
107 * non-zoned clusters (i.e your replication between datacenters is not handled
108 * by Voldemort), then you need to run the tool twice for each destination
109 * non-zoned cluster. Zoned -> Zoned and Non-Zoned -> Non-Zoned forklifts are
110 * trivial.
113 public class ClusterForkLiftTool implements Runnable {
116 * different modes available with the forklift tool
118 enum ForkLiftTaskMode {
119 global_resolution, /* Fetch data from all partitions and do resolution */
120 primary_resolution, /*
121 * Fetch data from primary partition and do
122 * resolution
124 no_resolution /* fetch data from primary parition and do no resolution */
127 private static Logger logger = Logger.getLogger(ClusterForkLiftTool.class);
128 private static final int DEFAULT_MAX_PUTS_PER_SEC = 500;
129 private static final int DEFAULT_PROGRESS_PERIOD_OPS = 100000;
130 private static final int DEFAULT_PARTITION_PARALLELISM = 8;
131 private static final int DEFAULT_WORKER_POOL_SHUTDOWN_WAIT_MINS = 5;
133 private static final String OVERWRITE_WARNING_MESSAGE = "**WARNING** If source and destination has overlapping keys, will overwrite the destination values "
134 + " using source. The option is ir-reversible. The old value if exists in the destination cluster will "
135 + " be permanently lost. This option could be useful if you are copying read only data from latest "
136 + " otherwise very dangerous option. Think twice before using this option. for keys that only exists "
137 + " in destination, they will be left un-modified. ";
139 private final AdminClient srcAdminClient;
140 private final BaseStreamingClient dstStreamingClient;
141 private final List<String> storesList;
142 private final ExecutorService workerPool;
143 private final int progressOps;
144 private final HashMap<String, StoreDefinition> srcStoreDefMap;
145 private final List<Integer> partitionList;
146 private final ForkLiftTaskMode mode;
147 private final Boolean overwrite;
149 private static List<StoreDefinition> getStoreDefinitions(AdminClient adminClient) {
150 Cluster cluster = adminClient.getAdminClientCluster();
151 Integer nodeId = cluster.getNodeIds().iterator().next();
152 return adminClient.metadataMgmtOps.getRemoteStoreDefList(nodeId).getValue();
155 public ClusterForkLiftTool(String srcBootstrapUrl,
156 String dstBootstrapUrl,
157 Boolean overwrite,
158 int maxPutsPerSecond,
159 int partitionParallelism,
160 int progressOps,
161 List<String> storesList,
162 List<Integer> partitions,
163 ForkLiftTaskMode mode) {
164 // set up AdminClient on source cluster
165 this.srcAdminClient = new AdminClient(srcBootstrapUrl,
166 new AdminClientConfig(),
167 new ClientConfig());
169 // set up streaming client to the destination cluster
170 Props props = new Props();
171 props.put("streaming.platform.bootstrapURL", dstBootstrapUrl);
172 props.put("streaming.platform.throttle.qps", maxPutsPerSecond);
173 StreamingClientConfig config = new StreamingClientConfig(props);
174 this.dstStreamingClient = new BaseStreamingClient(config);
175 this.mode = mode;
176 this.overwrite = overwrite;
178 // determine and verify final list of stores to be forklifted over
179 if(storesList != null) {
180 this.storesList = storesList;
181 } else {
182 this.storesList = StoreUtils.getStoreNames(getStoreDefinitions(srcAdminClient),
183 true);
185 this.srcStoreDefMap = checkStoresOnBothSides();
187 // determine the partitions to be fetched
188 if(partitions != null) {
189 this.partitionList = partitions;
190 } else {
191 this.partitionList = new ArrayList<Integer>(srcAdminClient.getAdminClientCluster()
192 .getNumberOfPartitions());
193 for(Node node: srcAdminClient.getAdminClientCluster().getNodes())
194 this.partitionList.addAll(node.getPartitionIds());
195 // shuffle the partition list so the fetching will equally spread
196 // across the source cluster
197 Collections.shuffle(this.partitionList);
198 if(this.partitionList.size() > srcAdminClient.getAdminClientCluster()
199 .getNumberOfPartitions()) {
200 throw new VoldemortException("Incorrect partition mapping in source cluster");
204 // set up thread pool to parallely forklift partitions
205 this.workerPool = Executors.newFixedThreadPool(partitionParallelism);
206 this.progressOps = progressOps;
210 private HashMap<String, StoreDefinition> checkStoresOnBothSides() {
211 List<StoreDefinition> srcStoreDefs = getStoreDefinitions(srcAdminClient);
212 HashMap<String, StoreDefinition> srcStoreDefMap = StoreUtils.getStoreDefsAsMap(srcStoreDefs);
213 List<StoreDefinition> dstStoreDefs = getStoreDefinitions(dstStreamingClient.getAdminClient());
214 HashMap<String, StoreDefinition> dstStoreDefMap = StoreUtils.getStoreDefsAsMap(dstStoreDefs);
216 Set<String> storesToSkip = new HashSet<String>();
217 for(String store: storesList) {
218 if(!srcStoreDefMap.containsKey(store)) {
219 logger.warn("Store " + store + " does not exist in source cluster ");
220 storesToSkip.add(store);
222 if(!dstStoreDefMap.containsKey(store)) {
223 logger.warn("Store " + store + " does not exist in destination cluster ");
224 storesToSkip.add(store);
228 if(storesToSkip.size() > 0) {
229 logger.warn("List of stores that will be skipped :" + storesToSkip);
230 storesList.removeAll(storesToSkip);
232 return srcStoreDefMap;
236 * TODO this base class can potentially provide some framework of execution
237 * for the subclasses, to yield a better objected oriented design (progress
238 * tracking etc)
241 abstract class SinglePartitionForkLiftTask {
243 protected int partitionId;
244 protected CountDownLatch latch;
245 protected StoreRoutingPlan storeInstance;
246 protected String workName;
247 private Set<Integer> dstServerIds;
248 private long entriesForkLifted = 0;
250 SinglePartitionForkLiftTask(StoreRoutingPlan storeInstance,
251 int partitionId,
252 CountDownLatch latch) {
253 this.partitionId = partitionId;
254 this.latch = latch;
255 this.storeInstance = storeInstance;
256 workName = "[Store: " + storeInstance.getStoreDefinition().getName() + ", Partition: "
257 + this.partitionId + "] ";
258 dstServerIds = dstStreamingClient.getAdminClient().getAdminClientCluster().getNodeIds();
261 void streamingPut(ByteArray key, Versioned<byte[]> value) {
262 if(overwrite == false) {
263 dstStreamingClient.streamingPut(key, value);
264 } else {
265 VectorClock denseClock = VectorClockUtils.makeClockWithCurrentTime(dstServerIds);
266 Versioned<byte[]> updatedValue = new Versioned<byte[]>(value.getValue(), denseClock);
267 dstStreamingClient.streamingPut(key, updatedValue);
270 entriesForkLifted++;
271 if(entriesForkLifted % progressOps == 0) {
272 logger.info(workName + " fork lifted " + entriesForkLifted
273 + " entries successfully");
277 void printSummary() {
278 logger.info(workName + "Completed processing " + entriesForkLifted + " records");
283 * Fetches keys belonging the primary partition, and then fetches values for
284 * that key from all replicas in a non-streaming fashion, applies the
285 * default resolver and writes it back to the destination cluster
287 * TODO a streaming N way merge is the more efficient & correct solution.
288 * Without this, the resolving can be very slow due to cross data center
289 * get(..)
291 class SinglePartitionGloballyResolvingForkLiftTask extends SinglePartitionForkLiftTask
292 implements Runnable {
294 SinglePartitionGloballyResolvingForkLiftTask(StoreRoutingPlan storeInstance,
295 int partitionId,
296 CountDownLatch latch) {
297 super(storeInstance, partitionId, latch);
300 public void run() {
301 String storeName = this.storeInstance.getStoreDefinition().getName();
302 try {
303 logger.info(workName + "Starting processing");
304 ChainedResolver<Versioned<byte[]>> resolver = new ChainedResolver<Versioned<byte[]>>(new VectorClockInconsistencyResolver<byte[]>(),
305 new TimeBasedInconsistencyResolver<byte[]>());
306 Iterator<ByteArray> keyItr = srcAdminClient.bulkFetchOps.fetchKeys(storeInstance.getNodeIdForPartitionId(this.partitionId),
307 storeName,
308 Lists.newArrayList(this.partitionId),
309 null,
310 true);
311 List<Integer> nodeList = storeInstance.getReplicationNodeList(this.partitionId);
312 while(keyItr.hasNext()) {
313 ByteArray keyToResolve = keyItr.next();
314 Map<Integer, QueryKeyResult> valuesMap = doReads(nodeList, keyToResolve.get());
315 List<Versioned<byte[]>> values = new ArrayList<Versioned<byte[]>>(valuesMap.size());
316 for(Map.Entry<Integer, QueryKeyResult> entry: valuesMap.entrySet()) {
317 int nodeId = entry.getKey();
318 QueryKeyResult result = entry.getValue();
320 if(result.hasException()) {
321 logger.error(workName + "key fetch failed for key "
322 + ByteUtils.toHexString(keyToResolve.get())
323 + " on node " + nodeId,
324 result.getException());
325 break;
327 values.addAll(result.getValues());
330 List<Versioned<byte[]>> resolvedVersions = resolver.resolveConflicts(values);
331 // after timestamp based resolving there should be only one
332 // version. Insert that to the destination cluster with
333 // empty vector clock
334 if(resolvedVersions.size() > 1) {
335 throw new VoldemortException("More than one resolved versions, key: "
336 + ByteUtils.toHexString(keyToResolve.get())
337 + " vals:" + resolvedVersions);
339 Versioned<byte[]> value = new Versioned<byte[]>(resolvedVersions.get(0).getValue());
340 streamingPut(keyToResolve, value);
342 printSummary();
343 } catch(Exception e) {
344 // all work should stop if we get here
345 logger.error(workName + "Error forklifting data ", e);
346 } finally {
347 latch.countDown();
353 * @param nodeIdList
354 * @param keyInBytes
355 * @return
357 private Map<Integer, QueryKeyResult> doReads(final List<Integer> nodeIdList,
358 final byte[] keyInBytes) {
359 Map<Integer, QueryKeyResult> nodeIdToKeyValues = new HashMap<Integer, QueryKeyResult>();
361 ByteArray key = new ByteArray(keyInBytes);
362 for(int nodeId: nodeIdList) {
363 List<Versioned<byte[]>> values = null;
364 try {
365 values = srcAdminClient.storeOps.getNodeKey(storeInstance.getStoreDefinition()
366 .getName(),
367 nodeId,
368 key);
369 nodeIdToKeyValues.put(nodeId, new QueryKeyResult(key, values));
370 } catch(VoldemortException ve) {
371 nodeIdToKeyValues.put(nodeId, new QueryKeyResult(key, ve));
374 return nodeIdToKeyValues;
379 * Simply fetches the data for the partition from the primary replica and
380 * writes it into the destination cluster. Works well when the replicas are
381 * fairly consistent.
384 class SinglePartitionPrimaryResolvingForkLiftTask extends SinglePartitionForkLiftTask implements
385 Runnable {
387 SinglePartitionPrimaryResolvingForkLiftTask(StoreRoutingPlan storeInstance,
388 int partitionId,
389 CountDownLatch latch) {
390 super(storeInstance, partitionId, latch);
393 @Override
394 public void run() {
395 String storeName = this.storeInstance.getStoreDefinition().getName();
396 ChainedResolver<Versioned<byte[]>> resolver = new ChainedResolver<Versioned<byte[]>>(new VectorClockInconsistencyResolver<byte[]>(),
397 new TimeBasedInconsistencyResolver<byte[]>());
398 try {
399 logger.info(workName + "Starting processing");
400 Iterator<Pair<ByteArray, Versioned<byte[]>>> entryItr = srcAdminClient.bulkFetchOps.fetchEntries(storeInstance.getNodeIdForPartitionId(this.partitionId),
401 storeName,
402 Lists.newArrayList(this.partitionId),
403 null,
404 true);
405 ByteArray prevKey = null;
406 List<Versioned<byte[]>> vals = new ArrayList<Versioned<byte[]>>();
408 while(entryItr.hasNext()) {
409 Pair<ByteArray, Versioned<byte[]>> record = entryItr.next();
410 ByteArray key = record.getFirst();
411 Versioned<byte[]> versioned = record.getSecond();
413 if(prevKey != null && !prevKey.equals(key)) {
414 // resolve and write, if you see a new key
415 List<Versioned<byte[]>> resolvedVersions = resolver.resolveConflicts(vals);
416 if(resolvedVersions.size() > 1) {
417 throw new VoldemortException("More than one resolved versions, key: "
418 + ByteUtils.toHexString(prevKey.get())
419 + " vals:" + resolvedVersions);
421 Versioned<byte[]> resolvedVersioned = resolvedVersions.get(0);
422 // an empty vector clock will ensure, online traffic
423 // will always win over the forklift writes
424 Versioned<byte[]> newEntry = new Versioned<byte[]>(resolvedVersioned.getValue(),
425 new VectorClock(((VectorClock) resolvedVersioned.getVersion()).getTimestamp()));
427 streamingPut(prevKey, newEntry);
428 vals = new ArrayList<Versioned<byte[]>>();
430 vals.add(versioned);
431 prevKey = key;
434 // process the last record
435 if(vals.size() > 0) {
436 List<Versioned<byte[]>> resolvedVals = resolver.resolveConflicts(vals);
437 assert resolvedVals.size() == 1;
438 Versioned<byte[]> resolvedVersioned = resolvedVals.get(0);
439 Versioned<byte[]> newEntry = new Versioned<byte[]>(resolvedVersioned.getValue(),
440 new VectorClock(((VectorClock) resolvedVersioned.getVersion()).getTimestamp()));
441 streamingPut(prevKey, newEntry);
444 printSummary();
445 } catch(Exception e) {
446 // if for some reason this partition fails, we will have retry
447 // again for those partitions alone.
448 logger.error(workName + "Error forklifting data ", e);
449 } finally {
450 latch.countDown();
456 * Simply fetches the data for the partition from the primary replica and
457 * writes it into the destination cluster without resolving any of the
458 * conflicting values
461 class SinglePartitionNoResolutionForkLiftTask extends SinglePartitionForkLiftTask implements
462 Runnable {
464 SinglePartitionNoResolutionForkLiftTask(StoreRoutingPlan storeInstance,
465 int partitionId,
466 CountDownLatch latch) {
467 super(storeInstance, partitionId, latch);
470 @Override
471 public void run() {
472 String storeName = this.storeInstance.getStoreDefinition().getName();
473 try {
474 logger.info(workName + "Starting processing");
475 Iterator<Pair<ByteArray, Versioned<byte[]>>> entryItr = srcAdminClient.bulkFetchOps.fetchEntries(storeInstance.getNodeIdForPartitionId(this.partitionId),
476 storeName,
477 Lists.newArrayList(this.partitionId),
478 null,
479 true);
481 while(entryItr.hasNext()) {
482 Pair<ByteArray, Versioned<byte[]>> record = entryItr.next();
483 ByteArray key = record.getFirst();
484 Versioned<byte[]> versioned = record.getSecond();
485 streamingPut(key, versioned);
487 printSummary();
489 } catch(Exception e) {
490 // if for some reason this partition fails, we will have retry
491 // again for those partitions alone.
492 logger.error(workName + "Error forklifting data ", e);
493 } finally {
494 latch.countDown();
500 @Override
501 public void run() {
502 final Cluster srcCluster = srcAdminClient.getAdminClientCluster();
503 try {
504 // process stores one-by-one
505 for(String store: storesList) {
506 logger.info("Processing store " + store);
507 dstStreamingClient.initStreamingSession(store, new Callable<Object>() {
509 @Override
510 public Object call() throws Exception {
512 return null;
514 }, new Callable<Object>() {
516 @Override
517 public Object call() throws Exception {
519 return null;
521 }, true);
523 final CountDownLatch latch = new CountDownLatch(srcCluster.getNumberOfPartitions());
524 StoreRoutingPlan storeInstance = new StoreRoutingPlan(srcCluster,
525 srcStoreDefMap.get(store));
527 // submit work on every partition that is to be forklifted
528 for(Integer partitionId: partitionList) {
529 if(this.mode == ForkLiftTaskMode.global_resolution) {
530 // do thorough global resolution across replicas
531 SinglePartitionGloballyResolvingForkLiftTask work = new SinglePartitionGloballyResolvingForkLiftTask(storeInstance,
532 partitionId,
533 latch);
534 workerPool.submit(work);
535 } else if(this.mode == ForkLiftTaskMode.primary_resolution) {
536 // do the less cleaner, but much faster route
537 SinglePartitionPrimaryResolvingForkLiftTask work = new SinglePartitionPrimaryResolvingForkLiftTask(storeInstance,
538 partitionId,
539 latch);
540 workerPool.submit(work);
541 } else if(this.mode == ForkLiftTaskMode.no_resolution) {
542 // do the less cleaner, but much faster route
543 SinglePartitionNoResolutionForkLiftTask work = new SinglePartitionNoResolutionForkLiftTask(storeInstance,
544 partitionId,
545 latch);
546 workerPool.submit(work);
550 // wait till all the partitions are processed
551 latch.await();
552 dstStreamingClient.closeStreamingSession();
553 logger.info("Finished processing store " + store);
555 } catch(Exception e) {
556 logger.error("Exception running forklift tool", e);
557 } finally {
558 workerPool.shutdown();
559 try {
560 workerPool.awaitTermination(DEFAULT_WORKER_POOL_SHUTDOWN_WAIT_MINS,
561 TimeUnit.MINUTES);
562 } catch(InterruptedException ie) {
563 logger.error("InterruptedException while waiting for worker pool to shutdown", ie);
565 srcAdminClient.close();
566 dstStreamingClient.getAdminClient().close();
571 * Return args parser
573 * @return program parser
574 * */
575 private static OptionParser getParser() {
576 OptionParser parser = new OptionParser();
577 parser.accepts("help", "print help information");
578 parser.accepts("src-url", "[REQUIRED] bootstrap URL of source cluster")
579 .withRequiredArg()
580 .describedAs("source-bootstrap-url")
581 .ofType(String.class);
582 parser.accepts("dst-url", "[REQUIRED] bootstrap URL of destination cluster")
583 .withRequiredArg()
584 .describedAs("destination-bootstrap-url")
585 .ofType(String.class);
586 parser.accepts("stores",
587 "Store names to forklift. Comma delimited list or singleton. [Default: ALL SOURCE STORES]")
588 .withRequiredArg()
589 .describedAs("stores")
590 .withValuesSeparatedBy(',')
591 .ofType(String.class);
592 parser.accepts("partitions",
593 "partitions to forklift. Comma delimited list or singleton. [Default: ALL SOURCE PARTITIONS]")
594 .withRequiredArg()
595 .describedAs("partitions")
596 .withValuesSeparatedBy(',')
597 .ofType(Integer.class);
598 parser.accepts("max-puts-per-second",
599 "Maximum number of put(...) operations issued against destination cluster per second. [Default: "
600 + DEFAULT_MAX_PUTS_PER_SEC + " ]")
601 .withRequiredArg()
602 .describedAs("maxPutsPerSecond")
603 .ofType(Integer.class);
604 parser.accepts("progress-period-ops",
605 "Number of operations between progress info is displayed. [Default: "
606 + DEFAULT_PROGRESS_PERIOD_OPS + " ]")
607 .withRequiredArg()
608 .describedAs("progressPeriodOps")
609 .ofType(Integer.class);
610 parser.accepts("parallelism",
611 "Number of partitions to fetch in parallel. [Default: "
612 + DEFAULT_PARTITION_PARALLELISM + " ]")
613 .withRequiredArg()
614 .describedAs("partitionParallelism")
615 .ofType(Integer.class);
616 parser.accepts("mode",
617 "Determines if a thorough global resolution needs to be done, by comparing all replicas. [Default: "
618 + ForkLiftTaskMode.primary_resolution.toString()
619 + " Fetch from primary alone ]");
622 parser.accepts("overwrite", OVERWRITE_WARNING_MESSAGE)
623 .withOptionalArg()
624 .describedAs("overwriteExistingValue")
625 .ofType(Boolean.class)
626 .defaultsTo(false);
628 return parser;
632 * @param args
634 public static void main(String[] args) throws Exception {
635 OptionParser parser = null;
636 OptionSet options = null;
637 try {
638 parser = getParser();
639 options = parser.parse(args);
640 } catch(Exception oe) {
641 logger.error("Exception processing command line options", oe);
642 parser.printHelpOn(System.out);
643 return;
646 /* validate options */
647 if(options.has("help")) {
648 parser.printHelpOn(System.out);
649 return;
652 if(!options.has("src-url") || !options.has("dst-url")) {
653 logger.error("Both 'src-url' and 'dst-url' options are mandatory");
654 parser.printHelpOn(System.out);
655 return;
658 String srcBootstrapUrl = (String) options.valueOf("src-url");
659 String dstBootstrapUrl = (String) options.valueOf("dst-url");
660 int maxPutsPerSecond = DEFAULT_MAX_PUTS_PER_SEC;
661 if(options.has("max-puts-per-second"))
662 maxPutsPerSecond = (Integer) options.valueOf("max-puts-per-second");
663 List<String> storesList = null;
664 if(options.has("stores")) {
665 storesList = new ArrayList<String>((List<String>) options.valuesOf("stores"));
667 List<Integer> partitions = null;
668 if(options.has("partitions")) {
669 partitions = (List<Integer>) options.valuesOf("partitions");
672 int partitionParallelism = DEFAULT_PARTITION_PARALLELISM;
673 if(options.has("parallelism")) {
674 partitionParallelism = (Integer) options.valueOf("parallelism");
676 int progressOps = DEFAULT_PROGRESS_PERIOD_OPS;
677 if(options.has("progress-period-ops")) {
678 progressOps = (Integer) options.valueOf("progress-period-ops");
681 ForkLiftTaskMode mode;
682 mode = ForkLiftTaskMode.primary_resolution;
683 if(options.has("mode")) {
684 mode = Utils.getEnumFromString(ForkLiftTaskMode.class, (String) options.valueOf("mode"));
685 if(mode == null)
686 mode = ForkLiftTaskMode.primary_resolution;
690 Boolean overwrite = false;
691 if(options.has("overwrite")) {
692 if(options.hasArgument("overwrite")) {
693 overwrite = (Boolean) options.valueOf("overwrite");
694 } else {
695 overwrite = true;
699 if(overwrite) {
700 logger.warn(OVERWRITE_WARNING_MESSAGE);
703 ClusterForkLiftTool forkLiftTool = new ClusterForkLiftTool(srcBootstrapUrl,
704 dstBootstrapUrl,
705 overwrite,
706 maxPutsPerSecond,
707 partitionParallelism,
708 progressOps,
709 storesList,
710 partitions,
711 mode);
712 forkLiftTool.run();
713 // TODO cleanly shut down the hanging threadpool
714 System.exit(0);