Fix the junit tests, bump sleep for bind issues
[voldemort/jeffpc.git] / test / unit / voldemort / client / rebalance / AbstractZonedRebalanceTest.java
blobe0a7d2db5575905f7e76d7856ef3a727a15dcb4c
1 /*
2 * Copyright 2008-2013 LinkedIn, Inc
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5 * use this file except in compliance with the License. You may obtain a copy of
6 * the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations under
14 * the License.
17 package voldemort.client.rebalance;
19 import static org.junit.Assert.assertEquals;
20 import static org.junit.Assert.assertNotSame;
21 import static org.junit.Assert.assertTrue;
22 import static org.junit.Assert.fail;
24 import java.io.File;
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.Collections;
29 import java.util.HashMap;
30 import java.util.Iterator;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Map.Entry;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicBoolean;
40 import org.apache.commons.io.FileUtils;
41 import org.apache.log4j.Logger;
42 import org.junit.After;
43 import org.junit.Before;
44 import org.junit.Test;
46 import voldemort.ClusterTestUtils;
47 import voldemort.ServerTestUtils;
48 import voldemort.client.ClientConfig;
49 import voldemort.client.DefaultStoreClient;
50 import voldemort.client.RoutingTier;
51 import voldemort.client.SocketStoreClientFactory;
52 import voldemort.client.StoreClient;
53 import voldemort.client.protocol.admin.AdminClient;
54 import voldemort.cluster.Cluster;
55 import voldemort.cluster.Node;
56 import voldemort.routing.BaseStoreRoutingPlan;
57 import voldemort.routing.RoutingStrategyType;
58 import voldemort.serialization.SerializerDefinition;
59 import voldemort.server.VoldemortServer;
60 import voldemort.store.InvalidMetadataException;
61 import voldemort.store.Store;
62 import voldemort.store.StoreDefinition;
63 import voldemort.store.StoreDefinitionBuilder;
64 import voldemort.store.bdb.BdbStorageConfiguration;
65 import voldemort.store.metadata.MetadataStore;
66 import voldemort.store.metadata.MetadataStore.VoldemortState;
67 import voldemort.store.slop.strategy.HintedHandoffStrategyType;
68 import voldemort.utils.ByteArray;
69 import voldemort.utils.ByteUtils;
70 import voldemort.utils.RebalanceUtils;
71 import voldemort.utils.UpdateClusterUtils;
72 import voldemort.versioning.ClockEntry;
73 import voldemort.versioning.ObsoleteVersionException;
74 import voldemort.versioning.VectorClock;
75 import voldemort.versioning.Versioned;
76 import voldemort.xml.StoreDefinitionsMapper;
78 import com.google.common.collect.Lists;
80 /**
81 * Rebalancing tests with zoned configurations with cross zone moves (since
82 * {@link AbstractNonZonedRebalanceTest} should cover intra zone moves already
85 public abstract class AbstractZonedRebalanceTest extends AbstractRebalanceTest {
87 private static final Logger logger = Logger.getLogger(AbstractZonedRebalanceTest.class.getName());
89 protected static String testStoreNameRW = "test";
90 protected static String testStoreNameRW2 = "test2";
92 protected static String storeDefFileWithoutReplication;
93 protected static String storeDefFileWithReplication;
94 protected static String rwStoreDefFileWithReplication;
95 protected static String rwTwoStoreDefFileWithReplication;
97 static Cluster zzCurrent;
98 static Cluster zzShuffle;
99 static Cluster zzClusterExpansionNN;
100 static Cluster zzClusterExpansionPP;
101 static String zzStoresXml;
102 static List<StoreDefinition> zzStores;
104 static Cluster zzzCurrent;
105 static Cluster zzzShuffle;
106 static Cluster zzzClusterExpansionNNN;
107 static Cluster zzzClusterExpansionPPP;
108 static Cluster zzeZoneExpansion;
109 static Cluster zzzZoneExpansionXXP;
110 static String zzzStoresXml;
111 static List<StoreDefinition> zzzStores;
113 private List<StoreDefinition> storeDefWithoutReplication;
114 private List<StoreDefinition> storeDefWithReplication;
115 private StoreDefinition rwStoreDefWithoutReplication;
116 private StoreDefinition rwStoreDefWithReplication;
117 private StoreDefinition rwStoreDefWithReplication2;
119 public AbstractZonedRebalanceTest() {
120 super();
123 @Before
124 public void setUp() throws IOException {
125 setUpRWStuff();
126 setupZZandZZZ();
129 public static void setupZZandZZZ() throws IOException {
130 zzCurrent = ClusterTestUtils.getZZCluster();
131 zzShuffle = ClusterTestUtils.getZZClusterWithSwappedPartitions();
132 zzClusterExpansionNN = ClusterTestUtils.getZZClusterWithNN();
133 zzClusterExpansionPP = ClusterTestUtils.getZZClusterWithPP();
135 zzStores = ClusterTestUtils.getZZStoreDefsBDB();
136 File zzfile = File.createTempFile("zz-stores-", ".xml");
137 FileUtils.writeStringToFile(zzfile, new StoreDefinitionsMapper().writeStoreList(zzStores));
138 zzStoresXml = zzfile.getAbsolutePath();
140 zzzCurrent = ClusterTestUtils.getZZZCluster();
141 zzzShuffle = ClusterTestUtils.getZZZClusterWithSwappedPartitions();
142 zzzClusterExpansionNNN = ClusterTestUtils.getZZZClusterWithNNN();
143 zzzClusterExpansionPPP = ClusterTestUtils.getZZZClusterWithPPP();
144 zzeZoneExpansion = ClusterTestUtils.getZZECluster();
145 zzzZoneExpansionXXP = ClusterTestUtils.getZZEClusterXXP();
147 zzzStores = ClusterTestUtils.getZZZStoreDefsBDB();
148 File zzzfile = File.createTempFile("zzz-stores-", ".xml");
149 FileUtils.writeStringToFile(zzzfile, new StoreDefinitionsMapper().writeStoreList(zzzStores));
150 zzzStoresXml = zzzfile.getAbsolutePath();
153 public void setUpRWStuff() throws IOException {
154 // First without replication
155 HashMap<Integer, Integer> zrfRWStoreWithoutReplication = new HashMap<Integer, Integer>();
156 zrfRWStoreWithoutReplication.put(0, 1);
157 zrfRWStoreWithoutReplication.put(1, 1);
158 rwStoreDefWithoutReplication = new StoreDefinitionBuilder().setName(testStoreNameRW)
159 .setType(BdbStorageConfiguration.TYPE_NAME)
160 .setKeySerializer(new SerializerDefinition("string"))
161 .setValueSerializer(new SerializerDefinition("string"))
162 .setRoutingPolicy(RoutingTier.CLIENT)
163 .setRoutingStrategyType(RoutingStrategyType.ZONE_STRATEGY)
164 .setReplicationFactor(2)
165 .setPreferredReads(1)
166 .setRequiredReads(1)
167 .setPreferredWrites(1)
168 .setRequiredWrites(1)
169 .setZoneCountReads(0)
170 .setZoneCountWrites(0)
171 .setZoneReplicationFactor(zrfRWStoreWithoutReplication)
172 .setHintedHandoffStrategy(HintedHandoffStrategyType.PROXIMITY_STRATEGY)
173 .build();
175 storeDefWithoutReplication = Lists.newArrayList(rwStoreDefWithoutReplication);
176 String storeDefWithoutReplicationString = new StoreDefinitionsMapper().writeStoreList(storeDefWithoutReplication);
177 File file = File.createTempFile("two-stores-", ".xml");
178 FileUtils.writeStringToFile(file, storeDefWithoutReplicationString);
179 storeDefFileWithoutReplication = file.getAbsolutePath();
181 // Now with replication
182 HashMap<Integer, Integer> zrfRWStoreWithReplication = new HashMap<Integer, Integer>();
183 zrfRWStoreWithReplication.put(0, 2);
184 zrfRWStoreWithReplication.put(1, 2);
185 rwStoreDefWithReplication = new StoreDefinitionBuilder().setName(testStoreNameRW)
186 .setType(BdbStorageConfiguration.TYPE_NAME)
187 .setKeySerializer(new SerializerDefinition("string"))
188 .setValueSerializer(new SerializerDefinition("string"))
189 .setRoutingPolicy(RoutingTier.CLIENT)
190 .setRoutingStrategyType(RoutingStrategyType.ZONE_STRATEGY)
191 .setReplicationFactor(4)
192 .setPreferredReads(1)
193 .setRequiredReads(1)
194 .setPreferredWrites(1)
195 .setRequiredWrites(1)
196 .setZoneCountReads(0)
197 .setZoneCountWrites(0)
198 .setZoneReplicationFactor(zrfRWStoreWithReplication)
199 .setHintedHandoffStrategy(HintedHandoffStrategyType.PROXIMITY_STRATEGY)
200 .build();
201 rwStoreDefWithReplication2 = new StoreDefinitionBuilder().setName(testStoreNameRW2)
202 .setType(BdbStorageConfiguration.TYPE_NAME)
203 .setKeySerializer(new SerializerDefinition("string"))
204 .setValueSerializer(new SerializerDefinition("string"))
205 .setRoutingPolicy(RoutingTier.CLIENT)
206 .setRoutingStrategyType(RoutingStrategyType.ZONE_STRATEGY)
207 .setReplicationFactor(4)
208 .setPreferredReads(1)
209 .setRequiredReads(1)
210 .setPreferredWrites(1)
211 .setRequiredWrites(1)
212 .setZoneCountReads(0)
213 .setZoneCountWrites(0)
214 .setZoneReplicationFactor(zrfRWStoreWithReplication)
215 .setHintedHandoffStrategy(HintedHandoffStrategyType.PROXIMITY_STRATEGY)
216 .build();
218 file = File.createTempFile("rw-stores-", ".xml");
219 FileUtils.writeStringToFile(file,
220 new StoreDefinitionsMapper().writeStoreList(Lists.newArrayList(rwStoreDefWithReplication)));
221 rwStoreDefFileWithReplication = file.getAbsolutePath();
223 file = File.createTempFile("rw-two-stores-", ".xml");
224 FileUtils.writeStringToFile(file,
225 new StoreDefinitionsMapper().writeStoreList(Lists.newArrayList(rwStoreDefWithReplication,
226 rwStoreDefWithReplication2)));
227 rwTwoStoreDefFileWithReplication = file.getAbsolutePath();
229 storeDefWithReplication = Lists.newArrayList(rwStoreDefWithReplication);
230 String storeDefWithReplicationString = new StoreDefinitionsMapper().writeStoreList(storeDefWithReplication);
231 file = File.createTempFile("two-stores-", ".xml");
232 FileUtils.writeStringToFile(file, storeDefWithReplicationString);
233 storeDefFileWithReplication = file.getAbsolutePath();
236 @After
237 public void tearDown() {
238 testEntries.clear();
239 testEntries = null;
240 socketStoreFactory.close();
241 socketStoreFactory = null;
242 ClusterTestUtils.reset();
245 // TODO: The tests based on this method are susceptible to TOCTOU
246 // BindException issue since findFreePorts is used to determine the ports
247 // for localhost:PORT of each node.
249 * Scripts the execution of a specific type of zoned rebalance test: sets up
250 * cluster based on cCluster plus any new nodes/zones in fCluster,
251 * rebalances to fCluster, verifies rebalance was correct.
253 * @param testTag For pretty printing
254 * @param cCluster current cluster
255 * @param fCluster final cluster
256 * @param cStoresXml XML file with current stores xml
257 * @param fStoresXml Unused parameter. Included for symmetry in method
258 * declaration.
259 * @param cStoreDefs store defs for current cluster (from on cStoresXml)
260 * @param fStoreDefs store defs for final cluster.
261 * @throws Exception
263 public void testZonedRebalance(String testTag,
264 Cluster cCluster,
265 Cluster fCluster,
266 String cStoresXml,
267 String fStoresXml,
268 List<StoreDefinition> cStoreDefs,
269 List<StoreDefinition> fStoreDefs) throws Exception {
270 logger.info("Starting " + testTag);
271 // Hacky work around of TOCTOU bind Exception issues. Each test that
272 // invokes this method brings servers up & down on the same ports. The
273 // OS seems to need a rest between subsequent tests...
274 Thread.sleep(TimeUnit.SECONDS.toMillis(5));
275 try {
276 Cluster interimCluster = RebalanceUtils.getInterimCluster(cCluster, fCluster);
278 // start all the servers
279 List<Integer> serverList = new ArrayList<Integer>(interimCluster.getNodeIds());
280 Map<String, String> configProps = new HashMap<String, String>();
281 configProps.put("admin.max.threads", "5");
282 interimCluster = startServers(interimCluster, cStoresXml, serverList, configProps);
284 String bootstrapUrl = getBootstrapUrl(interimCluster, 0);
285 ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl,
286 fCluster,
287 fStoreDefs);
289 try {
290 for(StoreDefinition storeDef: cStoreDefs) {
291 populateData(cCluster, storeDef);
294 rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, serverList);
296 checkConsistentMetadata(fCluster, serverList);
297 } finally {
298 // stop servers
299 stopServer(serverList);
301 } catch(AssertionError ae) {
302 logger.error("Assertion broken in " + testTag + " : ", ae);
303 throw ae;
307 public void testZonedRebalance(String testTag,
308 Cluster cCluster,
309 Cluster fCluster,
310 String storesXml,
311 List<StoreDefinition> storeDefs) throws Exception {
312 testZonedRebalance(testTag, cCluster, fCluster, storesXml, storesXml, storeDefs, storeDefs);
315 @Test(timeout = 600000)
316 public void testNoopZZ() throws Exception {
317 testZonedRebalance("TestNoopZZ", zzCurrent, zzCurrent, zzStoresXml, zzStores);
320 @Test(timeout = 600000)
321 public void testShuffleZZ() throws Exception {
322 testZonedRebalance("TestShuffleZZ", zzCurrent, zzShuffle, zzStoresXml, zzStores);
325 @Test(timeout = 600000)
326 public void testShuffleZZAndShuffleAgain() throws Exception {
328 logger.info("Starting testShuffleZZAndShuffleAgain");
329 // Hacky work around of TOCTOU bind Exception issues. Each test that
330 // invokes this method brings servers up & down on the same ports. The
331 // OS seems to need a rest between subsequent tests...
332 Thread.sleep(TimeUnit.SECONDS.toMillis(5));
334 Cluster interimCluster = RebalanceUtils.getInterimCluster(zzCurrent, zzShuffle);
336 // start all the servers
337 List<Integer> serverList = new ArrayList<Integer>(interimCluster.getNodeIds());
338 Map<String, String> configProps = new HashMap<String, String>();
339 configProps.put("admin.max.threads", "5");
340 interimCluster = startServers(interimCluster, zzStoresXml, serverList, configProps);
342 // Populate cluster with data
343 for(StoreDefinition storeDef: zzStores) {
344 populateData(zzCurrent, storeDef);
347 String bootstrapUrl = getBootstrapUrl(interimCluster, 0);
349 // Shuffle cluster
350 ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl,
351 zzShuffle,
352 zzStores);
353 rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, serverList);
354 checkConsistentMetadata(zzShuffle, serverList);
356 // Now, go from shuffled state, back to the original to ocnfirm
357 // subsequent rebalances can be invoked.
358 rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, zzCurrent, zzStores);
359 rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, serverList);
360 checkConsistentMetadata(zzCurrent, serverList);
362 // Done.
363 stopServer(serverList);
366 @Test(timeout = 600000)
367 public void testClusterExpansion() throws Exception {
368 testZonedRebalance("TestClusterExpansionZZ",
369 zzCurrent,
370 zzClusterExpansionPP,
371 zzStoresXml,
372 zzStores);
375 @Test(timeout = 600000)
376 public void testNoopZZZ() throws Exception {
377 testZonedRebalance("TestNoopZZZ", zzzCurrent, zzzCurrent, zzzStoresXml, zzzStores);
380 @Test(timeout = 600000)
381 public void testShuffleZZZ() throws Exception {
382 testZonedRebalance("TestShuffleZZZ", zzzCurrent, zzzShuffle, zzzStoresXml, zzzStores);
385 @Test(timeout = 600000)
386 public void testClusterExpansionZZZ() throws Exception {
387 testZonedRebalance("TestClusterExpansionZZZ",
388 zzzCurrent,
389 zzzClusterExpansionPPP,
390 zzzStoresXml,
391 zzzStores);
394 @Test(timeout = 600000)
395 public void testZoneExpansionZZ2ZZZ() throws Exception {
396 // Pass in an interim cluster for currentCluster
397 testZonedRebalance("TestZoneExpansionZZ2ZZZ",
398 zzeZoneExpansion,
399 zzzZoneExpansionXXP,
400 zzzStoresXml,
401 zzzStores);
404 @Test(timeout = 600000)
405 public void testRWRebalance() throws Exception {
406 logger.info("Starting testRWRebalance");
407 try {
409 Cluster currentCluster = ServerTestUtils.getLocalZonedCluster(4, 2, new int[] { 0, 0,
410 1, 1 }, new int[][] { { 0, 2, 4, 6 }, {}, { 1, 3, 5, 7 }, {} });
411 Cluster finalCluster = UpdateClusterUtils.createUpdatedCluster(currentCluster,
413 Lists.newArrayList(2, 6));
414 finalCluster = UpdateClusterUtils.createUpdatedCluster(finalCluster,
416 Lists.newArrayList(3, 7));
418 // start all the servers
419 List<Integer> serverList = Arrays.asList(0, 1, 2, 3);
420 Map<String, String> configProps = new HashMap<String, String>();
421 configProps.put("admin.max.threads", "5");
422 currentCluster = startServers(currentCluster,
423 storeDefFileWithoutReplication,
424 serverList,
425 configProps);
427 String bootstrapUrl = getBootstrapUrl(currentCluster, 0);
428 ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl,
429 finalCluster);
431 try {
432 populateData(currentCluster, rwStoreDefWithoutReplication);
434 rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, Arrays.asList(1, 2));
436 checkConsistentMetadata(finalCluster, serverList);
437 } finally {
438 // stop servers
439 stopServer(serverList);
441 } catch(AssertionError ae) {
442 logger.error("Assertion broken in testRWRebalance ", ae);
443 throw ae;
447 public void testRWRebalanceWithReplication(boolean serial) throws Exception {
448 logger.info("Starting testRWRebalanceWithReplication");
450 Cluster currentCluster = ServerTestUtils.getLocalZonedCluster(4,
452 new int[] { 0, 0, 1, 1 },
453 new int[][] { { 0, 2, 4 },
454 { 6 }, { 1, 3, 5 },
455 { 7 } });
456 Cluster finalCluster = UpdateClusterUtils.createUpdatedCluster(currentCluster,
458 Lists.newArrayList(2));
459 finalCluster = UpdateClusterUtils.createUpdatedCluster(finalCluster,
461 Lists.newArrayList(3));
463 // start servers
464 List<Integer> serverList = Arrays.asList(0, 1, 2, 3);
465 Map<String, String> configProps = new HashMap<String, String>();
466 configProps.put("admin.max.threads", "5");
467 if(serial)
468 configProps.put("max.parallel.stores.rebalancing", String.valueOf(1));
469 currentCluster = startServers(currentCluster,
470 storeDefFileWithReplication,
471 serverList,
472 configProps);
474 String bootstrapUrl = getBootstrapUrl(currentCluster, 0);
475 int maxParallel = 5;
476 ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl,
477 maxParallel,
478 finalCluster);
480 try {
482 populateData(currentCluster, rwStoreDefWithReplication);
484 rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, Arrays.asList(0, 1, 2, 3));
486 checkConsistentMetadata(finalCluster, serverList);
487 } finally {
488 // stop servers
489 stopServer(serverList);
493 @Test(timeout = 600000)
494 public void testRWRebalanceWithReplication() throws Exception {
495 try {
496 testRWRebalanceWithReplication(false);
497 } catch(AssertionError ae) {
498 logger.error("Assertion broken in testRWRebalanceWithReplication ", ae);
499 throw ae;
503 @Test(timeout = 600000)
504 public void testRWRebalanceWithReplicationSerial() throws Exception {
505 try {
506 testRWRebalanceWithReplication(true);
507 } catch(AssertionError ae) {
508 logger.error("Assertion broken in testRWRebalanceWithReplicationSerial ", ae);
509 throw ae;
513 @Test(timeout = 600000)
514 public void testRebalanceCleanPrimarySecondary() throws Exception {
515 logger.info("Starting testRebalanceCleanPrimary");
516 try {
517 Cluster currentCluster = ServerTestUtils.getLocalZonedCluster(6, 2, new int[] { 0, 0,
518 0, 1, 1, 1 }, new int[][] { { 0 }, { 1, 6 }, { 2 }, { 3 }, { 4, 7 }, { 5 } });
519 Cluster finalCluster = UpdateClusterUtils.createUpdatedCluster(currentCluster,
521 Lists.newArrayList(7));
522 finalCluster = UpdateClusterUtils.createUpdatedCluster(finalCluster,
524 Lists.newArrayList(6));
527 * original server partition ownership
529 * [s0 : p0,p3,p4,p5,p6,p7] [s1 : p1-p7] [s2 : p1,p2] [s3 :
530 * p0,p1,p2,p3,p6,p7] [s4 : p1-p7] [s5 : p4,p5]
532 * final server partition ownership
534 * [s0 : p0,p2,p3,p4,p5,p6,p7] [s1 : p0,p1] [s2 : p1-p7] [s3 :
535 * p0.p1,p2,p3,p5,p6,p7] [s4 : p0,p1,p2,p3,p4,p7] [s5 : p4,p5,p6]
538 // start servers
539 List<Integer> serverList = Arrays.asList(0, 1, 2, 3, 4, 5);
540 Map<String, String> configProps = new HashMap<String, String>();
541 configProps.put("enable.repair", "true");
542 currentCluster = startServers(currentCluster,
543 rwStoreDefFileWithReplication,
544 serverList,
545 configProps);
547 String bootstrapUrl = getBootstrapUrl(currentCluster, 0);
548 ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl,
549 finalCluster);
551 try {
552 populateData(currentCluster, rwStoreDefWithReplication);
554 AdminClient admin = rebalanceKit.controller.getAdminClient();
556 List<ByteArray> p6KeySamples = sampleKeysFromPartition(admin,
558 rwStoreDefWithReplication.getName(),
559 Arrays.asList(6),
560 20);
561 List<ByteArray> p1KeySamples = sampleKeysFromPartition(admin,
563 rwStoreDefWithReplication.getName(),
564 Arrays.asList(1),
565 20);
566 List<ByteArray> p3KeySamples = sampleKeysFromPartition(admin,
568 rwStoreDefWithReplication.getName(),
569 Arrays.asList(3),
570 20);
571 List<ByteArray> p2KeySamples = sampleKeysFromPartition(admin,
573 rwStoreDefWithReplication.getName(),
574 Arrays.asList(2),
575 20);
576 List<ByteArray> p7KeySamples = sampleKeysFromPartition(admin,
578 rwStoreDefWithReplication.getName(),
579 Arrays.asList(7),
580 20);
582 rebalanceAndCheck(rebalanceKit.plan,
583 rebalanceKit.controller,
584 Arrays.asList(0, 1, 2, 3));
586 checkConsistentMetadata(finalCluster, serverList);
588 // Do the cleanup operation
589 for(int i = 0; i < 6; i++) {
590 admin.storeMntOps.repairJob(i);
592 // wait for the repairs to complete
593 for(int i = 0; i < 6; i++) {
594 ServerTestUtils.waitForAsyncOperationOnServer(serverMap.get(i), "Repair", 5000);
597 // confirm a primary movement in zone 0 : P6 : s1 -> S2. The
598 // zone 0
599 // primary changes when p6 moves cross zone
600 // check for existence of p6 in server 2,
601 checkForKeyExistence(admin, 2, rwStoreDefWithReplication.getName(), p6KeySamples);
603 // confirm a secondary movement in zone 0.. p2 : s1 -> s0
604 // check for its existence in server 0
605 checkForKeyExistence(admin, 0, rwStoreDefWithReplication.getName(), p2KeySamples);
606 // check for its absernce in server 1
608 // also check that p1 is stable in server 1 [primary stability]
609 checkForKeyExistence(admin, 1, rwStoreDefWithReplication.getName(), p1KeySamples);
610 // check that p3 is stable in server 0 [Secondary stability]
611 checkForKeyExistence(admin, 0, rwStoreDefWithReplication.getName(), p3KeySamples);
613 // finally, test for server 4 which now became the secondary for
614 // p7
615 // from being a primary before
616 checkForKeyExistence(admin, 4, rwStoreDefWithReplication.getName(), p7KeySamples);
617 } finally {
618 // stop servers
619 stopServer(serverList);
621 } catch(AssertionError ae) {
622 logger.error("Assertion broken in testRebalanceCleanPrimarySecondary ", ae);
623 throw ae;
627 @Test(timeout = 600000)
628 public void testProxyGetDuringRebalancing() throws Exception {
629 logger.info("Starting testProxyGetDuringRebalancing");
630 try {
631 Cluster currentCluster = ServerTestUtils.getLocalZonedCluster(4, 2, new int[] { 0, 0,
632 1, 1 }, new int[][] { { 0, 2, 4 }, { 6 }, { 1, 3, 5 }, { 7 } });
633 Cluster tmpfinalCluster = UpdateClusterUtils.createUpdatedCluster(currentCluster,
635 Lists.newArrayList(2));
636 final Cluster finalCluster = UpdateClusterUtils.createUpdatedCluster(tmpfinalCluster,
638 Lists.newArrayList(3));
640 final List<Integer> serverList = Arrays.asList(0, 1, 2, 3);
641 Map<String, String> configProps = new HashMap<String, String>();
642 configProps.put("admin.max.threads", "5");
643 final Cluster updatedCurrentCluster = startServers(currentCluster,
644 storeDefFileWithReplication,
645 serverList,
646 configProps);
648 ExecutorService executors = Executors.newFixedThreadPool(2);
649 final AtomicBoolean rebalancingComplete = new AtomicBoolean(false);
650 final List<Exception> exceptions = Collections.synchronizedList(new ArrayList<Exception>());
652 String bootstrapUrl = getBootstrapUrl(updatedCurrentCluster, 0);
653 int maxParallel = 2;
654 final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl,
655 maxParallel,
656 finalCluster);
658 try {
660 populateData(currentCluster, rwStoreDefWithReplication);
662 final SocketStoreClientFactory factory = new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(getBootstrapUrl(currentCluster,
664 .setEnableLazy(false)
665 .setSocketTimeout(120,
666 TimeUnit.SECONDS));
668 final StoreClient<String, String> storeClientRW = new DefaultStoreClient<String, String>(rwStoreDefWithReplication.getName(),
669 null,
670 factory,
673 final CountDownLatch latch = new CountDownLatch(2);
674 // start get operation.
675 executors.execute(new Runnable() {
677 @Override
678 public void run() {
679 try {
680 List<String> keys = new ArrayList<String>(testEntries.keySet());
682 while(!rebalancingComplete.get()) {
683 // should always able to get values.
684 int index = (int) (Math.random() * keys.size());
686 // should get a valid value
687 try {
688 Versioned<String> value = storeClientRW.get(keys.get(index));
689 assertNotSame("StoreClient get() should not return null.",
690 null,
691 value);
692 assertEquals("Value returned should be good",
693 new Versioned<String>(testEntries.get(keys.get(index))),
694 value);
695 } catch(Exception e) {
696 logger.error("Exception in proxy get thread", e);
697 e.printStackTrace();
698 exceptions.add(e);
702 } catch(Exception e) {
703 logger.error("Exception in proxy get thread", e);
704 exceptions.add(e);
705 } finally {
706 factory.close();
707 latch.countDown();
713 executors.execute(new Runnable() {
715 @Override
716 public void run() {
717 try {
719 Thread.sleep(500);
720 rebalanceAndCheck(rebalanceKit.plan,
721 rebalanceKit.controller,
722 Arrays.asList(0, 1, 2, 3));
724 Thread.sleep(500);
725 rebalancingComplete.set(true);
726 checkConsistentMetadata(finalCluster, serverList);
728 } catch(Exception e) {
729 exceptions.add(e);
730 } finally {
731 // stop servers
732 try {
733 stopServer(serverList);
734 } catch(Exception e) {
735 throw new RuntimeException(e);
737 latch.countDown();
742 latch.await();
743 executors.shutdown();
744 executors.awaitTermination(300, TimeUnit.SECONDS);
746 // check No Exception
747 if(exceptions.size() > 0) {
748 for(Exception e: exceptions) {
749 e.printStackTrace();
751 fail("Should not see any exceptions.");
753 } finally {
754 // stop servers
755 stopServer(serverList);
757 } catch(AssertionError ae) {
758 logger.error("Assertion broken in testProxyGetDuringRebalancing ", ae);
759 throw ae;
763 @Test(timeout = 600000)
764 public void testProxyPutDuringRebalancing() throws Exception {
765 logger.info("Starting testProxyPutDuringRebalancing");
766 try {
767 Cluster currentCluster = ServerTestUtils.getLocalZonedCluster(6, 2, new int[] { 0, 0,
768 0, 1, 1, 1 }, new int[][] { { 0 }, { 1, 6 }, { 2 }, { 3 }, { 4, 7 }, { 5 } });
769 Cluster finalCluster = UpdateClusterUtils.createUpdatedCluster(currentCluster,
771 Lists.newArrayList(7));
772 finalCluster = UpdateClusterUtils.createUpdatedCluster(finalCluster,
774 Lists.newArrayList(6));
777 * Original partition map
779 * [s0 : p0] [s1 : p1, p6] [s2 : p2]
781 * [s3 : p3] [s4 : p4, p7] [s5 : p5]
783 * final server partition ownership
785 * [s0 : p0] [s1 : p1] [s2 : p2, p7]
787 * [s3 : p3] [s4 : p4] [s5 : p5, p6]
789 * Note that rwStoreDefFileWithReplication is a "2/1/1" store def.
791 * Original server n-ary partition ownership
793 * [s0 : p0, p3-7] [s1 : p0-p7] [s2 : p1-2]
795 * [s3 : p0-3, p6-7] [s4 : p0-p7] [s5 : p4-5]
797 * final server n-ary partition ownership
799 * [s0 : p0, p2-7] [s1 : p0-1] [s2 : p1-p7]
801 * [s3 : p0-3, p5-7] [s4 : p0-4, p7] [s5 : p4-6]
803 List<Integer> serverList = Arrays.asList(0, 1, 2, 3, 4, 5);
804 Map<String, String> configProps = new HashMap<String, String>();
805 configProps.put("admin.max.threads", "5");
806 final Cluster updatedCurrentCluster = startServers(currentCluster,
807 rwStoreDefFileWithReplication,
808 serverList,
809 configProps);
811 ExecutorService executors = Executors.newFixedThreadPool(2);
812 final AtomicBoolean rebalancingComplete = new AtomicBoolean(false);
813 final List<Exception> exceptions = Collections.synchronizedList(new ArrayList<Exception>());
815 // Its is imperative that we test in a single shot since multiple
816 // batches would mean the proxy bridges being torn down and
817 // established multiple times and we cannot test against the source
818 // cluster topology then. getRebalanceKit uses batch size of
819 // infinite, so this should be fine.
820 String bootstrapUrl = getBootstrapUrl(updatedCurrentCluster, 0);
821 int maxParallel = 2;
822 final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl,
823 maxParallel,
824 finalCluster);
826 populateData(currentCluster, rwStoreDefWithReplication);
827 final AdminClient adminClient = rebalanceKit.controller.getAdminClient();
828 // the plan would cause these partitions to move:
829 // Partition : Donor -> stealer
831 // p2 (Z-SEC) : s1 -> s0
832 // p3-6 (Z-PRI) : s1 -> s2
833 // p7 (Z-PRI) : s0 -> s2
835 // p5 (Z-SEC): s4 -> s3
836 // p6 (Z-PRI): s4 -> s5
838 // :. rebalancing will run on servers 0, 2, 3, & 5
839 final List<ByteArray> movingKeysList = sampleKeysFromPartition(adminClient,
841 rwStoreDefWithReplication.getName(),
842 Arrays.asList(6),
843 20);
844 assertTrue("Empty list of moving keys...", movingKeysList.size() > 0);
845 final AtomicBoolean rebalancingStarted = new AtomicBoolean(false);
846 final AtomicBoolean proxyWritesDone = new AtomicBoolean(false);
847 final HashMap<String, String> baselineTuples = new HashMap<String, String>(testEntries);
848 final HashMap<String, VectorClock> baselineVersions = new HashMap<String, VectorClock>();
850 for(String key: baselineTuples.keySet()) {
851 baselineVersions.put(key, new VectorClock());
854 final CountDownLatch latch = new CountDownLatch(2);
855 // start get operation.
856 executors.execute(new Runnable() {
858 @Override
859 public void run() {
860 SocketStoreClientFactory factory = null;
861 try {
862 // wait for the rebalancing to begin
863 List<VoldemortServer> serverList = Lists.newArrayList(serverMap.get(0),
864 serverMap.get(2),
865 serverMap.get(3),
866 serverMap.get(5));
867 while(!rebalancingComplete.get()) {
868 Iterator<VoldemortServer> serverIterator = serverList.iterator();
869 while(serverIterator.hasNext()) {
870 VoldemortServer server = serverIterator.next();
871 if(ByteUtils.getString(server.getMetadataStore()
872 .get(MetadataStore.SERVER_STATE_KEY,
873 null)
874 .get(0)
875 .getValue(),
876 "UTF-8")
877 .compareTo(VoldemortState.REBALANCING_MASTER_SERVER.toString()) == 0) {
878 logger.info("Server " + server.getIdentityNode().getId()
879 + " transitioned into REBALANCING MODE");
880 serverIterator.remove();
883 if(serverList.size() == 0) {
884 rebalancingStarted.set(true);
885 break;
889 if(rebalancingStarted.get()) {
890 factory = new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(getBootstrapUrl(updatedCurrentCluster,
892 .setEnableLazy(false)
893 .setSocketTimeout(120,
894 TimeUnit.SECONDS)
895 .setClientZoneId(1));
897 final StoreClient<String, String> storeClientRW = new DefaultStoreClient<String, String>(testStoreNameRW,
898 null,
899 factory,
901 // Now perform some writes and determine the end
902 // state of the changed keys. Initially, all data
903 // now with zero vector clock
904 for(ByteArray movingKey: movingKeysList) {
905 try {
906 String keyStr = ByteUtils.getString(movingKey.get(), "UTF-8");
907 String valStr = "proxy_write";
908 storeClientRW.put(keyStr, valStr);
909 baselineTuples.put(keyStr, valStr);
910 // all these keys will have [5:1] vector
911 // clock is node 5 is the new pseudo master
912 baselineVersions.get(keyStr)
913 .incrementVersion(5, System.currentTimeMillis());
914 proxyWritesDone.set(true);
915 if(rebalancingComplete.get()) {
916 break;
918 } catch(InvalidMetadataException e) {
919 // let this go
920 logger.error("Encountered an invalid metadata exception.. ", e);
924 } catch(Exception e) {
925 logger.error("Exception in proxy write thread..", e);
926 exceptions.add(e);
927 } finally {
928 if(factory != null)
929 factory.close();
930 latch.countDown();
936 executors.execute(new Runnable() {
938 @Override
939 public void run() {
940 try {
941 rebalanceKit.rebalance();
942 } catch(Exception e) {
943 logger.error("Error in rebalancing... ", e);
944 exceptions.add(e);
945 } finally {
946 rebalancingComplete.set(true);
947 latch.countDown();
952 latch.await();
953 executors.shutdown();
954 executors.awaitTermination(300, TimeUnit.SECONDS);
956 assertEquals("Client did not see all server transition into rebalancing state",
957 rebalancingStarted.get(),
958 true);
959 assertEquals("Not enough time to begin proxy writing", proxyWritesDone.get(), true);
960 checkEntriesPostRebalance(updatedCurrentCluster,
961 finalCluster,
962 Lists.newArrayList(rwStoreDefWithReplication),
963 Arrays.asList(0, 1, 2, 3, 4, 5),
964 baselineTuples,
965 baselineVersions);
966 checkConsistentMetadata(finalCluster, serverList);
967 // check No Exception
968 if(exceptions.size() > 0) {
969 for(Exception e: exceptions) {
970 e.printStackTrace();
972 fail("Should not see any exceptions.");
974 // check that the proxy writes were made to the original donor, node
975 // 1
976 List<ClockEntry> clockEntries = new ArrayList<ClockEntry>(serverList.size());
977 for(Integer nodeid: serverList)
978 clockEntries.add(new ClockEntry(nodeid.shortValue(), System.currentTimeMillis()));
979 VectorClock clusterXmlClock = new VectorClock(clockEntries, System.currentTimeMillis());
980 for(Integer nodeid: serverList)
981 adminClient.metadataMgmtOps.updateRemoteCluster(nodeid,
982 currentCluster,
983 clusterXmlClock);
984 adminClient.setAdminClientCluster(currentCluster);
985 checkForTupleEquivalence(adminClient,
987 testStoreNameRW,
988 movingKeysList,
989 baselineTuples,
990 baselineVersions);
992 // stop servers
993 try {
994 stopServer(serverList);
995 } catch(Exception e) {
996 throw new RuntimeException(e);
998 } catch(AssertionError ae) {
999 logger.error("Assertion broken in testProxyPutDuringRebalancing ", ae);
1000 throw ae;
1004 private void populateData(Cluster cluster, StoreDefinition storeDef) throws Exception {
1006 // Create SocketStores for each Node first
1007 Map<Integer, Store<ByteArray, byte[], byte[]>> storeMap = new HashMap<Integer, Store<ByteArray, byte[], byte[]>>();
1008 for(Node node: cluster.getNodes()) {
1009 storeMap.put(node.getId(),
1010 getSocketStore(storeDef.getName(), node.getHost(), node.getSocketPort()));
1013 BaseStoreRoutingPlan storeInstance = new BaseStoreRoutingPlan(cluster, storeDef);
1014 for(Entry<String, String> entry: testEntries.entrySet()) {
1015 ByteArray keyBytes = new ByteArray(ByteUtils.getBytes(entry.getKey(), "UTF-8"));
1016 List<Integer> preferenceNodes = storeInstance.getReplicationNodeList(keyBytes.get());
1017 // Go over every node
1018 for(int nodeId: preferenceNodes) {
1019 try {
1020 storeMap.get(nodeId)
1021 .put(keyBytes,
1022 new Versioned<byte[]>(ByteUtils.getBytes(entry.getValue(), "UTF-8")),
1023 null);
1024 } catch(ObsoleteVersionException e) {
1025 logger.info("Why are we seeing this at all here ?? ");
1026 e.printStackTrace();
1031 // close all socket stores
1032 for(Store<ByteArray, byte[], byte[]> store: storeMap.values()) {
1033 store.close();