Fix the junit tests, bump sleep for bind issues
[voldemort/jeffpc.git] / test / unit / voldemort / client / rebalance / ZonedRebalanceNonContiguousZonesTest.java
blob0932865ca189f5d5d0f1a22f401abd88fb44a339
1 /*
2 * Copyright 2008-2013 LinkedIn, Inc
4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5 * use this file except in compliance with the License. You may obtain a copy of
6 * the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations under
14 * the License.
17 package voldemort.client.rebalance;
19 import static org.junit.Assert.assertEquals;
20 import static org.junit.Assert.assertNotSame;
21 import static org.junit.Assert.assertTrue;
22 import static org.junit.Assert.fail;
24 import java.io.File;
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.Collections;
29 import java.util.HashMap;
30 import java.util.Iterator;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Map.Entry;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicBoolean;
40 import org.apache.commons.io.FileUtils;
41 import org.apache.log4j.Logger;
42 import org.junit.After;
43 import org.junit.Before;
44 import org.junit.Test;
46 import voldemort.ClusterTestUtils;
47 import voldemort.ServerTestUtils;
48 import voldemort.client.ClientConfig;
49 import voldemort.client.DefaultStoreClient;
50 import voldemort.client.RoutingTier;
51 import voldemort.client.SocketStoreClientFactory;
52 import voldemort.client.StoreClient;
53 import voldemort.client.protocol.admin.AdminClient;
54 import voldemort.cluster.Cluster;
55 import voldemort.cluster.Node;
56 import voldemort.routing.BaseStoreRoutingPlan;
57 import voldemort.routing.RoutingStrategyType;
58 import voldemort.serialization.SerializerDefinition;
59 import voldemort.server.VoldemortServer;
60 import voldemort.store.InvalidMetadataException;
61 import voldemort.store.Store;
62 import voldemort.store.StoreDefinition;
63 import voldemort.store.StoreDefinitionBuilder;
64 import voldemort.store.bdb.BdbStorageConfiguration;
65 import voldemort.store.metadata.MetadataStore;
66 import voldemort.store.metadata.MetadataStore.VoldemortState;
67 import voldemort.store.slop.strategy.HintedHandoffStrategyType;
68 import voldemort.utils.ByteArray;
69 import voldemort.utils.ByteUtils;
70 import voldemort.utils.RebalanceUtils;
71 import voldemort.utils.UpdateClusterUtils;
72 import voldemort.versioning.ClockEntry;
73 import voldemort.versioning.ObsoleteVersionException;
74 import voldemort.versioning.VectorClock;
75 import voldemort.versioning.Versioned;
76 import voldemort.xml.StoreDefinitionsMapper;
78 import com.google.common.collect.Lists;
80 /**
81 * Rebalancing tests with zoned configurations with non contiguous zones/node ids
83 public class ZonedRebalanceNonContiguousZonesTest extends AbstractRebalanceTest {
85 private static final Logger logger = Logger.getLogger(ZonedRebalanceNonContiguousZonesTest.class.getName());
87 private final int NUM_KEYS = 100;
89 protected static String testStoreNameRW = "test";
90 protected static String testStoreNameRW2 = "test2";
92 protected static String storeDefFileWithoutReplication;
93 protected static String storeDefFileWithReplication;
94 protected static String rwStoreDefFileWithReplication;
95 protected static String rwTwoStoreDefFileWithReplication;
97 static Cluster z1z3Current;
98 static Cluster z1z3Shuffle;
99 static Cluster z1z3ClusterExpansionNN;
100 static Cluster z1z3ClusterExpansionPP;
101 static String z1z3StoresXml;
102 static List<StoreDefinition> z1z3Stores;
104 static Cluster z1z3z5Current;
105 static Cluster z1z3z5Shuffle;
106 static Cluster z1z3z5ClusterExpansionNNN;
107 static Cluster z1z3z5ClusterExpansionPPP;
108 static String z1z3z5StoresXml;
109 static List<StoreDefinition> z1z3z5Stores;
111 private List<StoreDefinition> storeDefWithoutReplication;
112 private List<StoreDefinition> storeDefWithReplication;
113 private StoreDefinition rwStoreDefWithoutReplication;
114 private StoreDefinition rwStoreDefWithReplication;
115 private StoreDefinition rwStoreDefWithReplication2;
117 public ZonedRebalanceNonContiguousZonesTest() {
118 super();
121 @Override
122 protected int getNumKeys() {
123 return NUM_KEYS;
126 @Before
127 public void setUp() throws IOException {
128 setUpRWStuff();
129 setupZ1Z3();
132 public static void setupZ1Z3() throws IOException {
133 z1z3Current = ClusterTestUtils.getZ1Z3ClusterWithNonContiguousNodeIds();
134 z1z3Shuffle = ClusterTestUtils.getZ1Z3ClusterWithNonContiguousNodeIdsWithSwappedPartitions();
135 z1z3ClusterExpansionNN = ClusterTestUtils.getZ1Z3ClusterWithNonContiguousNodeIdsWithNN();
136 z1z3ClusterExpansionPP = ClusterTestUtils.getZ1Z3ClusterWithNonContiguousNodeIdsWithPP();
138 z1z3Stores = ClusterTestUtils.getZ1Z3StoreDefsBDB();
139 File z1z3file = File.createTempFile("z1z3-stores-", ".xml");
140 FileUtils.writeStringToFile(z1z3file, new StoreDefinitionsMapper().writeStoreList(z1z3Stores));
141 z1z3StoresXml = z1z3file.getAbsolutePath();
143 z1z3z5Current = ClusterTestUtils.getZ1Z3Z5ClusterWithNonContiguousNodeIds();
144 z1z3z5Shuffle = ClusterTestUtils.getZ1Z3Z5ClusterWithNonContiguousNodeIdsWithSwappedPartitions();
145 z1z3z5ClusterExpansionNNN = ClusterTestUtils.getZ1Z3Z5ClusterWithNonContiguousNodeIdsWithNNN();
146 z1z3z5ClusterExpansionPPP = ClusterTestUtils.getZ1Z3Z5ClusterWithNonContiguousNodeIdsWithPPP();
148 z1z3z5Stores = ClusterTestUtils.getZ1Z3Z5StoreDefsBDB();
149 File z1z3z5file = File.createTempFile("z1z3z5-stores-", ".xml");
150 FileUtils.writeStringToFile(z1z3z5file, new StoreDefinitionsMapper().writeStoreList(z1z3z5Stores));
151 z1z3z5StoresXml = z1z3z5file.getAbsolutePath();
154 public void setUpRWStuff() throws IOException {
155 // First without replication
156 HashMap<Integer, Integer> zrfRWStoreWithoutReplication = new HashMap<Integer, Integer>();
157 zrfRWStoreWithoutReplication.put(1, 1);
158 zrfRWStoreWithoutReplication.put(3, 1);
159 rwStoreDefWithoutReplication = new StoreDefinitionBuilder().setName(testStoreNameRW)
160 .setType(BdbStorageConfiguration.TYPE_NAME)
161 .setKeySerializer(new SerializerDefinition("string"))
162 .setValueSerializer(new SerializerDefinition("string"))
163 .setRoutingPolicy(RoutingTier.CLIENT)
164 .setRoutingStrategyType(RoutingStrategyType.ZONE_STRATEGY)
165 .setReplicationFactor(2)
166 .setPreferredReads(1)
167 .setRequiredReads(1)
168 .setPreferredWrites(1)
169 .setRequiredWrites(1)
170 .setZoneCountReads(0)
171 .setZoneCountWrites(0)
172 .setZoneReplicationFactor(zrfRWStoreWithoutReplication)
173 .setHintedHandoffStrategy(HintedHandoffStrategyType.PROXIMITY_STRATEGY)
174 .build();
176 storeDefWithoutReplication = Lists.newArrayList(rwStoreDefWithoutReplication);
177 String storeDefWithoutReplicationString = new StoreDefinitionsMapper().writeStoreList(storeDefWithoutReplication);
178 File file = File.createTempFile("two-stores-", ".xml");
179 FileUtils.writeStringToFile(file, storeDefWithoutReplicationString);
180 storeDefFileWithoutReplication = file.getAbsolutePath();
182 // Now with replication
183 HashMap<Integer, Integer> zrfRWStoreWithReplication = new HashMap<Integer, Integer>();
184 zrfRWStoreWithReplication.put(1, 2);
185 zrfRWStoreWithReplication.put(3, 2);
186 rwStoreDefWithReplication = new StoreDefinitionBuilder().setName(testStoreNameRW)
187 .setType(BdbStorageConfiguration.TYPE_NAME)
188 .setKeySerializer(new SerializerDefinition("string"))
189 .setValueSerializer(new SerializerDefinition("string"))
190 .setRoutingPolicy(RoutingTier.CLIENT)
191 .setRoutingStrategyType(RoutingStrategyType.ZONE_STRATEGY)
192 .setReplicationFactor(4)
193 .setPreferredReads(1)
194 .setRequiredReads(1)
195 .setPreferredWrites(1)
196 .setRequiredWrites(1)
197 .setZoneCountReads(0)
198 .setZoneCountWrites(0)
199 .setZoneReplicationFactor(zrfRWStoreWithReplication)
200 .setHintedHandoffStrategy(HintedHandoffStrategyType.PROXIMITY_STRATEGY)
201 .build();
202 rwStoreDefWithReplication2 = new StoreDefinitionBuilder().setName(testStoreNameRW2)
203 .setType(BdbStorageConfiguration.TYPE_NAME)
204 .setKeySerializer(new SerializerDefinition("string"))
205 .setValueSerializer(new SerializerDefinition("string"))
206 .setRoutingPolicy(RoutingTier.CLIENT)
207 .setRoutingStrategyType(RoutingStrategyType.ZONE_STRATEGY)
208 .setReplicationFactor(4)
209 .setPreferredReads(1)
210 .setRequiredReads(1)
211 .setPreferredWrites(1)
212 .setRequiredWrites(1)
213 .setZoneCountReads(0)
214 .setZoneCountWrites(0)
215 .setZoneReplicationFactor(zrfRWStoreWithReplication)
216 .setHintedHandoffStrategy(HintedHandoffStrategyType.PROXIMITY_STRATEGY)
217 .build();
219 file = File.createTempFile("rw-stores-", ".xml");
220 FileUtils.writeStringToFile(file,
221 new StoreDefinitionsMapper().writeStoreList(Lists.newArrayList(rwStoreDefWithReplication)));
222 rwStoreDefFileWithReplication = file.getAbsolutePath();
224 file = File.createTempFile("rw-two-stores-", ".xml");
225 FileUtils.writeStringToFile(file,
226 new StoreDefinitionsMapper().writeStoreList(Lists.newArrayList(rwStoreDefWithReplication,
227 rwStoreDefWithReplication2)));
228 rwTwoStoreDefFileWithReplication = file.getAbsolutePath();
230 storeDefWithReplication = Lists.newArrayList(rwStoreDefWithReplication);
231 String storeDefWithReplicationString = new StoreDefinitionsMapper().writeStoreList(storeDefWithReplication);
232 file = File.createTempFile("two-stores-", ".xml");
233 FileUtils.writeStringToFile(file, storeDefWithReplicationString);
234 storeDefFileWithReplication = file.getAbsolutePath();
237 @After
238 public void tearDown() {
239 testEntries.clear();
240 testEntries = null;
241 socketStoreFactory.close();
242 socketStoreFactory = null;
243 ClusterTestUtils.reset();
246 // TODO: The tests based on this method are susceptible to TOCTOU
247 // BindException issue since findFreePorts is used to determine the ports for localhost:PORT of each node.
249 * Scripts the execution of a specific type of zoned rebalance test: sets up
250 * cluster based on cCluster plus any new nodes/zones in fCluster,
251 * rebalances to fCluster, verifies rebalance was correct.
253 * @param testTag For pretty printing
254 * @param cCluster current cluster
255 * @param fCluster final cluster
256 * @param cStoresXml XML file with current stores xml
257 * @param fStoresXml Unused parameter. Included for symmetry in method
258 * declaration.
259 * @param cStoreDefs store defs for current cluster (from on cStoresXml)
260 * @param fStoreDefs store defs for final cluster.
261 * @throws Exception
263 public void testZonedRebalance(String testTag,
264 Cluster cCluster,
265 Cluster fCluster,
266 String cStoresXml,
267 String fStoresXml,
268 List<StoreDefinition> cStoreDefs,
269 List<StoreDefinition> fStoreDefs) throws Exception {
270 logger.info("Starting " + testTag);
271 // Hacky work around of TOCTOU bind Exception issues. Each test that invokes this method brings
272 // servers up & down on the same ports. The OS seems to need a rest between subsequent tests...
273 Thread.sleep(TimeUnit.SECONDS.toMillis(5));
274 try {
275 Cluster interimCluster = RebalanceUtils.getInterimCluster(cCluster, fCluster);
276 List<Integer> serverList = new ArrayList<Integer>(interimCluster.getNodeIds());
277 Map<String, String> configProps = new HashMap<String, String>();
278 configProps.put("admin.max.threads", "5");
279 interimCluster = startServers(interimCluster, cStoresXml, serverList, configProps);
280 String bootstrapUrl = getBootstrapUrl(interimCluster, 3);
281 ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, fCluster,
282 fStoreDefs);
283 try {
284 for(StoreDefinition storeDef: cStoreDefs) {
285 populateData(cCluster, storeDef);
287 rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, serverList);
288 checkConsistentMetadata(fCluster, serverList);
289 } finally {
290 stopServer(serverList);
292 } catch(AssertionError ae) {
293 logger.error("Assertion broken in " + testTag + " : ", ae);
294 throw ae;
298 public void testZonedRebalance(String testTag,
299 Cluster cCluster,
300 Cluster fCluster,
301 String storesXml,
302 List<StoreDefinition> storeDefs) throws Exception {
303 testZonedRebalance(testTag, cCluster, fCluster, storesXml, storesXml, storeDefs, storeDefs);
306 @Test(timeout = 600000)
307 public void testNoopZ1Z3() throws Exception {
308 testZonedRebalance("TestNoopZ1Z3", z1z3Current, z1z3Current, z1z3StoresXml, z1z3Stores);
311 @Test(timeout = 600000)
312 public void testShuffleZ1Z3() throws Exception {
313 testZonedRebalance("TestShuffleZ1Z3", z1z3Current, z1z3Shuffle, z1z3StoresXml, z1z3Stores);
316 @Test(timeout = 600000)
317 public void testShuffleZ1Z3AndShuffleAgain() throws Exception {
319 logger.info("Starting testShuffleZZAndShuffleAgain");
320 // Hacky work around of TOCTOU bind Exception issues. Each test that invokes this method brings servers
321 // up & down on the same ports. The OS seems to need a rest between subsequent tests...
322 Thread.sleep(TimeUnit.SECONDS.toMillis(5));
324 Cluster interimCluster = RebalanceUtils.getInterimCluster(z1z3Current, z1z3Shuffle);
326 // start all the servers
327 List<Integer> serverList = new ArrayList<Integer>(interimCluster.getNodeIds());
328 Map<String, String> configProps = new HashMap<String, String>();
329 configProps.put("admin.max.threads", "5");
330 interimCluster = startServers(interimCluster, z1z3StoresXml, serverList, configProps);
332 // Populate cluster with data
333 for(StoreDefinition storeDef: z1z3Stores) {
334 populateData(z1z3Current, storeDef);
336 String bootstrapUrl = getBootstrapUrl(interimCluster, 3);
337 // Shuffle cluster
338 ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, z1z3Shuffle, z1z3Stores);
339 rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, serverList);
340 checkConsistentMetadata(z1z3Shuffle, serverList);
342 // Now, go from shuffled state, back to the original to confirm subsequent rebalances can be invoked.
343 rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, z1z3Current, z1z3Stores);
344 rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, serverList);
345 checkConsistentMetadata(z1z3Current, serverList);
346 // Done.
347 stopServer(serverList);
350 @Test(timeout = 600000)
351 public void testClusterExpansion() throws Exception {
352 testZonedRebalance("TestClusterExpansionZ1Z3", z1z3Current, z1z3ClusterExpansionPP, z1z3StoresXml, z1z3Stores);
355 @Test(timeout = 600000)
356 public void testNoopZ1Z3Z5() throws Exception {
357 testZonedRebalance("TestNoopZ1Z3Z5", z1z3z5Current, z1z3z5Current, z1z3z5StoresXml, z1z3z5Stores);
360 @Test(timeout = 600000)
361 public void testShuffleZ1Z3Z5() throws Exception {
362 testZonedRebalance("TestShuffleZ1Z3Z5", z1z3z5Current, z1z3z5Shuffle, z1z3z5StoresXml, z1z3z5Stores);
365 @Test(timeout = 600000)
366 public void testClusterExpansionZ1Z3Z5() throws Exception {
367 testZonedRebalance("TestClusterExpansionZZZ", z1z3z5Current, z1z3z5ClusterExpansionPPP, z1z3z5StoresXml, z1z3z5Stores);
370 @Test(timeout = 600000)
371 public void testRWRebalance() throws Exception {
372 logger.info("Starting testRWRebalance");
373 try {
374 int zoneIds[] = new int[] { 1, 3 };
375 int nodesPerZone[][] = new int[][] { { 3, 4 }, { 9, 10 } };
376 int partitionMap[][] = new int[][] { { 0, 2, 4, 6 }, {}, { 1, 3, 5, 7 }, {} };
377 Cluster currentCluster = ServerTestUtils.getLocalNonContiguousZonedCluster(zoneIds,
378 nodesPerZone,
379 partitionMap,
380 ClusterTestUtils.getClusterPorts());
382 Cluster finalCluster = UpdateClusterUtils.createUpdatedCluster(currentCluster, 10, Lists.newArrayList(2, 6));
383 finalCluster = UpdateClusterUtils.createUpdatedCluster(finalCluster, 4, Lists.newArrayList(3, 7));
384 // start all the servers
385 List<Integer> serverList = Arrays.asList(3, 4, 9, 10);
386 Map<String, String> configProps = new HashMap<String, String>();
387 configProps.put("admin.max.threads", "5");
388 currentCluster = startServers(currentCluster, storeDefFileWithoutReplication, serverList, configProps);
389 String bootstrapUrl = getBootstrapUrl(currentCluster, 3);
390 ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, finalCluster);
391 try {
392 populateData(currentCluster, rwStoreDefWithoutReplication);
393 rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, Arrays.asList(4, 9));
394 checkConsistentMetadata(finalCluster, serverList);
395 } finally {
396 stopServer(serverList);
398 } catch(AssertionError ae) {
399 logger.error("Assertion broken in testRWRebalance ", ae);
400 throw ae;
404 public void testRWRebalanceWithReplication(boolean serial) throws Exception {
405 logger.info("Starting testRWRebalanceWithReplication");
406 int zoneIds[] = new int[] { 1, 3 };
407 int nodesPerZone[][] = new int[][] { { 3, 4 }, { 9, 10 } };
408 int partitionMap[][] = new int[][] { { 0, 2, 4 }, { 6 }, { 1, 3, 5 }, { 7 } };
409 Cluster currentCluster = ServerTestUtils.getLocalNonContiguousZonedCluster(zoneIds,
410 nodesPerZone,
411 partitionMap,
412 ClusterTestUtils.getClusterPorts());
413 Cluster finalCluster = UpdateClusterUtils.createUpdatedCluster(currentCluster, 10, Lists.newArrayList(2));
414 finalCluster = UpdateClusterUtils.createUpdatedCluster(finalCluster, 4, Lists.newArrayList(3));
415 // start servers
416 List<Integer> serverList = Arrays.asList(3, 4, 9, 10);
417 Map<String, String> configProps = new HashMap<String, String>();
418 configProps.put("admin.max.threads", "5");
419 if(serial) {
420 configProps.put("max.parallel.stores.rebalancing", String.valueOf(1));
422 currentCluster = startServers(currentCluster, storeDefFileWithReplication, serverList, configProps);
423 String bootstrapUrl = getBootstrapUrl(currentCluster, 3);
424 int maxParallel = 5;
425 ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl, maxParallel, finalCluster);
426 try {
427 populateData(currentCluster, rwStoreDefWithReplication);
428 rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, Arrays.asList(3, 4, 9, 10));
429 checkConsistentMetadata(finalCluster, serverList);
430 } finally {
431 stopServer(serverList);
435 @Test(timeout = 600000)
436 public void testRWRebalanceWithReplication() throws Exception {
437 try {
438 testRWRebalanceWithReplication(false);
439 } catch(AssertionError ae) {
440 logger.error("Assertion broken in testRWRebalanceWithReplication ", ae);
441 throw ae;
445 @Test(timeout = 600000)
446 public void testRWRebalanceWithReplicationSerial() throws Exception {
447 try {
448 testRWRebalanceWithReplication(true);
449 } catch(AssertionError ae) {
450 logger.error("Assertion broken in testRWRebalanceWithReplicationSerial ", ae);
451 throw ae;
455 @Test(timeout = 600000)
456 public void testRebalanceCleanPrimarySecondary() throws Exception {
457 logger.info("Starting testRebalanceCleanPrimary");
458 try {
460 int zoneIds[] = new int[] { 1, 3 };
461 int nodesPerZone[][] = new int[][] { { 3, 4, 5 }, { 9, 10, 11 } };
462 int partitionMap[][] = new int[][] { { 0 }, { 1, 6 }, { 2 }, { 3 }, { 4, 7 }, { 5 } };
464 Cluster currentCluster = ServerTestUtils.getLocalNonContiguousZonedCluster(zoneIds,
465 nodesPerZone,
466 partitionMap,
467 ClusterTestUtils.getClusterPorts());
469 Cluster finalCluster = UpdateClusterUtils.createUpdatedCluster(currentCluster, 5, Lists.newArrayList(7));
470 finalCluster = UpdateClusterUtils.createUpdatedCluster(finalCluster, 11, Lists.newArrayList(6));
473 * original server partition ownership
475 * [s3 : p0,p3,p4,p5,p6,p7] [s4 : p1-p7] [s5 : p1,p2] [s9 : p0,p1,p2,p3,p6,p7] [s10 : p1-p7] [s11 : p4,p5]
477 * final server partition ownership
479 * [s3 : p0,p2,p3,p4,p5,p6,p7] [s4 : p0,p1] [s5 : p1-p7] [s9 : p0.p1,p2,p3,p5,p6,p7]
480 * [s10 : p0,p1,p2,p3,p4,p7] [s11 : p4,p5,p6]
483 // start servers
484 List<Integer> serverList = Arrays.asList(3, 4, 5, 9, 10, 11);
485 Map<String, String> configProps = new HashMap<String, String>();
486 configProps.put("enable.repair", "true");
487 currentCluster = startServers(currentCluster,
488 rwStoreDefFileWithReplication,
489 serverList,
490 configProps);
492 String bootstrapUrl = getBootstrapUrl(currentCluster, 3);
493 ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl,
494 finalCluster);
496 try {
497 populateData(currentCluster, rwStoreDefWithReplication);
498 AdminClient admin = rebalanceKit.controller.getAdminClient();
500 List<ByteArray> p6KeySamples = sampleKeysFromPartition(admin, 4, rwStoreDefWithReplication.getName(),
501 Arrays.asList(6), 20);
502 List<ByteArray> p1KeySamples = sampleKeysFromPartition(admin, 4, rwStoreDefWithReplication.getName(),
503 Arrays.asList(1), 20);
504 List<ByteArray> p3KeySamples = sampleKeysFromPartition(admin, 3, rwStoreDefWithReplication.getName(),
505 Arrays.asList(3), 20);
506 List<ByteArray> p2KeySamples = sampleKeysFromPartition(admin, 4, rwStoreDefWithReplication.getName(),
507 Arrays.asList(2), 20);
508 List<ByteArray> p7KeySamples = sampleKeysFromPartition(admin, 10, rwStoreDefWithReplication.getName(),
509 Arrays.asList(7), 20);
511 rebalanceAndCheck(rebalanceKit.plan, rebalanceKit.controller, Arrays.asList(3, 4, 5, 9));
512 checkConsistentMetadata(finalCluster, serverList);
513 // Do the cleanup operation
514 for(int i = 0; i < 6; i++) {
515 admin.storeMntOps.repairJob(serverList.get(i));
517 // wait for the repairs to complete
518 for(int i = 0; i < 6; i++) {
519 ServerTestUtils.waitForAsyncOperationOnServer(serverMap.get(serverList.get(i)), "Repair", 5000);
522 // confirm a primary movement in zone 1 : P6 : s4 -> S5. The zone 1 primary changes when
523 // p6 moves cross zone check for existence of p6 in server 5,
524 checkForKeyExistence(admin, 5, rwStoreDefWithReplication.getName(), p6KeySamples);
526 // confirm a secondary movement in zone 1.. p2 : s4 -> s3 check
527 // for its existence in server 3
528 checkForKeyExistence(admin, 3, rwStoreDefWithReplication.getName(), p2KeySamples);
530 // check for its absernce in server 4
531 // also check that p1 is stable in server 4 [primary stability]
532 checkForKeyExistence(admin, 4, rwStoreDefWithReplication.getName(), p1KeySamples);
534 // check that p3 is stable in server 3 [Secondary stability]
535 checkForKeyExistence(admin, 3, rwStoreDefWithReplication.getName(), p3KeySamples);
537 // finally, test for server 10 which now became the secondary
538 // for p7 from being a primary before
539 checkForKeyExistence(admin, 10, rwStoreDefWithReplication.getName(), p7KeySamples);
540 } finally {
541 // stop servers
542 stopServer(serverList);
544 } catch(AssertionError ae) {
545 logger.error("Assertion broken in testRebalanceCleanPrimarySecondary ", ae);
546 throw ae;
550 @Test(timeout = 600000)
551 public void testProxyGetDuringRebalancing() throws Exception {
552 logger.info("Starting testProxyGetDuringRebalancing");
553 try {
555 int zoneIds[] = new int[] { 1, 3 };
556 int nodesPerZone[][] = new int[][] { { 3, 4 }, { 9, 10 } };
557 int partitionMap[][] = new int[][] { { 0, 2, 4 }, { 6 }, { 1, 3, 5 }, { 7 } };
558 Cluster currentCluster = ServerTestUtils.getLocalNonContiguousZonedCluster(zoneIds,
559 nodesPerZone,
560 partitionMap,
561 ClusterTestUtils.getClusterPorts());
562 Cluster tmpfinalCluster = UpdateClusterUtils.createUpdatedCluster(currentCluster,
564 Lists.newArrayList(2));
565 final Cluster finalCluster = UpdateClusterUtils.createUpdatedCluster(tmpfinalCluster,
567 Lists.newArrayList(3));
569 // start servers
570 final List<Integer> serverList = Arrays.asList(3, 4, 9, 10);
571 Map<String, String> configProps = new HashMap<String, String>();
572 configProps.put("admin.max.threads", "5");
573 final Cluster updatedCurrentCluster = startServers(currentCluster,
574 storeDefFileWithReplication,
575 serverList,
576 configProps);
578 ExecutorService executors = Executors.newFixedThreadPool(2);
579 final AtomicBoolean rebalancingComplete = new AtomicBoolean(false);
580 final List<Exception> exceptions = Collections.synchronizedList(new ArrayList<Exception>());
582 String bootstrapUrl = getBootstrapUrl(updatedCurrentCluster, 3);
583 int maxParallel = 2;
584 final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl,
585 maxParallel,
586 finalCluster);
588 try {
590 populateData(currentCluster, rwStoreDefWithReplication);
592 final SocketStoreClientFactory factory = new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(getBootstrapUrl(currentCluster,
594 .setEnableLazy(false)
595 .setSocketTimeout(120,
596 TimeUnit.SECONDS));
598 final StoreClient<String, String> storeClientRW = new DefaultStoreClient<String, String>(rwStoreDefWithReplication.getName(),
599 null,
600 factory,
603 final CountDownLatch latch = new CountDownLatch(2);
604 // start get operation.
605 executors.execute(new Runnable() {
607 @Override
608 public void run() {
609 try {
610 List<String> keys = new ArrayList<String>(testEntries.keySet());
612 while(!rebalancingComplete.get()) {
613 // should always able to get values.
614 int index = (int) (Math.random() * keys.size());
616 // should get a valid value
617 try {
618 Versioned<String> value = storeClientRW.get(keys.get(index));
619 assertNotSame("StoreClient get() should not return null.",
620 null,
621 value);
622 assertEquals("Value returned should be good",
623 new Versioned<String>(testEntries.get(keys.get(index))),
624 value);
625 } catch(Exception e) {
626 logger.error("Exception in proxy get thread", e);
627 e.printStackTrace();
628 exceptions.add(e);
632 } catch(Exception e) {
633 logger.error("Exception in proxy get thread", e);
634 exceptions.add(e);
635 } finally {
636 factory.close();
637 latch.countDown();
643 executors.execute(new Runnable() {
645 @Override
646 public void run() {
647 try {
649 Thread.sleep(500);
650 rebalanceAndCheck(rebalanceKit.plan,
651 rebalanceKit.controller,
652 Arrays.asList(3, 4, 9, 10));
654 Thread.sleep(500);
655 rebalancingComplete.set(true);
656 checkConsistentMetadata(finalCluster, serverList);
658 } catch(Exception e) {
659 exceptions.add(e);
660 } finally {
661 // stop servers
662 try {
663 stopServer(serverList);
664 } catch(Exception e) {
665 throw new RuntimeException(e);
667 latch.countDown();
672 latch.await();
673 executors.shutdown();
674 executors.awaitTermination(300, TimeUnit.SECONDS);
676 // check No Exception
677 if(exceptions.size() > 0) {
678 for(Exception e: exceptions) {
679 e.printStackTrace();
681 fail("Should not see any exceptions.");
683 } finally {
684 // stop servers
685 stopServer(serverList);
687 } catch(AssertionError ae) {
688 logger.error("Assertion broken in testProxyGetDuringRebalancing ", ae);
689 throw ae;
693 @Test(timeout = 600000)
694 public void testProxyPutDuringRebalancing() throws Exception {
695 logger.info("Starting testProxyPutDuringRebalancing");
696 try {
698 int zoneIds[] = new int[] { 1, 3 };
699 int nodesPerZone[][] = new int[][] { { 3, 4, 5 }, { 9, 10, 11 } };
700 int partitionMap[][] = new int[][] { { 0 }, { 1, 6 }, { 2 }, { 3 }, { 4, 7 }, { 5 } };
701 Cluster currentCluster = ServerTestUtils.getLocalNonContiguousZonedCluster(zoneIds,
702 nodesPerZone,
703 partitionMap,
704 ClusterTestUtils.getClusterPorts());
706 Cluster finalCluster = UpdateClusterUtils.createUpdatedCluster(currentCluster,
708 Lists.newArrayList(7));
709 finalCluster = UpdateClusterUtils.createUpdatedCluster(finalCluster,
711 Lists.newArrayList(6));
714 * Original partition map
716 * [s3 : p0] [s4 : p1, p6] [s5 : p2]
718 * [s9 : p3] [s10 : p4, p7] [s11 : p5]
720 * final server partition ownership
722 * [s3 : p0] [s4 : p1] [s5 : p2, p7]
724 * [s9 : p3] [s10 : p4] [s11 : p5, p6]
726 * Note that rwStoreDefFileWithReplication is a "2/1/1" store def.
728 * Original server n-ary partition ownership
730 * [s3 : p0, p3-7] [s4 : p0-p7] [s5 : p1-2]
732 * [s9 : p0-3, p6-7] [s10 : p0-p7] [s11 : p4-5]
734 * final server n-ary partition ownership
736 * [s3 : p0, p2-7] [s4 : p0-1] [s5 : p1-p7]
738 * [s9 : p0-3, p5-7] [s10 : p0-4, p7] [s11 : p4-6]
740 List<Integer> serverList = Arrays.asList(3, 4, 5, 9, 10, 11);
741 Map<String, String> configProps = new HashMap<String, String>();
742 configProps.put("admin.max.threads", "5");
743 final Cluster updatedCurrentCluster = startServers(currentCluster,
744 rwStoreDefFileWithReplication,
745 serverList,
746 configProps);
747 ExecutorService executors = Executors.newFixedThreadPool(2);
748 final AtomicBoolean rebalancingComplete = new AtomicBoolean(false);
749 final List<Exception> exceptions = Collections.synchronizedList(new ArrayList<Exception>());
751 // Its is imperative that we test in a single shot since multiple batches would mean the proxy bridges
752 // being torn down and established multiple times and we cannot test against the source
753 // cluster topology then. getRebalanceKit uses batch size of infinite, so this should be fine.
754 String bootstrapUrl = getBootstrapUrl(updatedCurrentCluster, 3);
755 int maxParallel = 2;
756 final ClusterTestUtils.RebalanceKit rebalanceKit = ClusterTestUtils.getRebalanceKit(bootstrapUrl,
757 maxParallel,
758 finalCluster);
759 populateData(currentCluster, rwStoreDefWithReplication);
760 final AdminClient adminClient = rebalanceKit.controller.getAdminClient();
761 // the plan would cause these partitions to move:
762 // Partition : Donor -> stealer
764 // p2 (Z-SEC) : s4 -> s3
765 // p3-6 (Z-PRI) : s4 -> s5
766 // p7 (Z-PRI) : s3 -> s5
768 // p5 (Z-SEC): s10 -> s9
769 // p6 (Z-PRI): s10 -> s11
771 // Rebalancing will run on servers 3, 5, 9, & 11
772 final List<ByteArray> movingKeysList = sampleKeysFromPartition(adminClient,
774 rwStoreDefWithReplication.getName(),
775 Arrays.asList(6),
776 20);
777 assertTrue("Empty list of moving keys...", movingKeysList.size() > 0);
778 final AtomicBoolean rebalancingStarted = new AtomicBoolean(false);
779 final AtomicBoolean proxyWritesDone = new AtomicBoolean(false);
780 final HashMap<String, String> baselineTuples = new HashMap<String, String>(testEntries);
781 final HashMap<String, VectorClock> baselineVersions = new HashMap<String, VectorClock>();
782 for(String key: baselineTuples.keySet()) {
783 baselineVersions.put(key, new VectorClock());
785 final CountDownLatch latch = new CountDownLatch(2);
786 // start get operation.
787 executors.execute(new Runnable() {
788 @Override
789 public void run() {
790 SocketStoreClientFactory factory = null;
791 try {
792 // wait for the rebalancing to begin
793 List<VoldemortServer> serverList = Lists.newArrayList(serverMap.get(3), serverMap.get(5),
794 serverMap.get(9), serverMap.get(11));
795 while(!rebalancingComplete.get()) {
796 Iterator<VoldemortServer> serverIterator = serverList.iterator();
797 while(serverIterator.hasNext()) {
798 VoldemortServer server = serverIterator.next();
799 if(ByteUtils.getString(server.getMetadataStore()
800 .get(MetadataStore.SERVER_STATE_KEY, null)
801 .get(0)
802 .getValue(),
803 "UTF-8")
804 .compareTo(VoldemortState.REBALANCING_MASTER_SERVER.toString()) == 0) {
805 logger.info("Server " + server.getIdentityNode().getId()
806 + " transitioned into REBALANCING MODE");
807 serverIterator.remove();
810 if(serverList.size() == 0) {
811 rebalancingStarted.set(true);
812 break;
815 if(rebalancingStarted.get()) {
816 factory = new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls
817 (getBootstrapUrl(updatedCurrentCluster, 3))
818 .setEnableLazy(false)
819 .setSocketTimeout(120, TimeUnit.SECONDS)
820 .setClientZoneId(3));
822 final StoreClient<String, String> storeClientRW = new DefaultStoreClient
823 <String, String>(testStoreNameRW,
824 null,
825 factory,
827 // Now perform some writes and determine the end state of the changed keys.
828 // Initially, all data now with zero vector clock
829 for(ByteArray movingKey: movingKeysList) {
830 try {
831 String keyStr = ByteUtils.getString(movingKey.get(), "UTF-8");
832 String valStr = "proxy_write";
833 storeClientRW.put(keyStr, valStr);
834 baselineTuples.put(keyStr, valStr);
835 baselineVersions.get(keyStr)
836 .incrementVersion(11, System.currentTimeMillis());
837 proxyWritesDone.set(true);
838 if(rebalancingComplete.get()) {
839 break;
841 } catch(InvalidMetadataException e) {
842 logger.error("Encountered an invalid metadata exception.. ", e);
846 } catch(Exception e) {
847 logger.error("Exception in proxy write thread..", e);
848 exceptions.add(e);
849 } finally {
850 if(factory != null)
851 factory.close();
852 latch.countDown();
857 executors.execute(new Runnable() {
859 @Override
860 public void run() {
861 try {
862 rebalanceKit.rebalance();
863 } catch(Exception e) {
864 logger.error("Error in rebalancing... ", e);
865 exceptions.add(e);
866 } finally {
867 rebalancingComplete.set(true);
868 latch.countDown();
872 latch.await();
873 executors.shutdown();
874 executors.awaitTermination(300, TimeUnit.SECONDS);
876 assertEquals("Client did not see all server transition into rebalancing state",
877 rebalancingStarted.get(), true);
878 assertEquals("Not enough time to begin proxy writing", proxyWritesDone.get(), true);
879 checkEntriesPostRebalance(updatedCurrentCluster, finalCluster,
880 Lists.newArrayList(rwStoreDefWithReplication),
881 Arrays.asList(3, 4, 5, 9, 10, 11), baselineTuples, baselineVersions);
882 checkConsistentMetadata(finalCluster, serverList);
883 // check No Exception
884 if(exceptions.size() > 0) {
885 for(Exception e: exceptions) {
886 e.printStackTrace();
888 fail("Should not see any exceptions.");
890 // check that the proxy writes were made to the original donor, node 4
891 List<ClockEntry> clockEntries = new ArrayList<ClockEntry>(serverList.size());
892 for(Integer nodeid: serverList)
893 clockEntries.add(new ClockEntry(nodeid.shortValue(), System.currentTimeMillis()));
894 VectorClock clusterXmlClock = new VectorClock(clockEntries, System.currentTimeMillis());
895 for(Integer nodeid: serverList)
896 adminClient.metadataMgmtOps.updateRemoteCluster(nodeid, currentCluster, clusterXmlClock);
897 adminClient.setAdminClientCluster(currentCluster);
898 checkForTupleEquivalence(adminClient, 4, testStoreNameRW, movingKeysList, baselineTuples, baselineVersions);
899 // stop servers
900 try {
901 stopServer(serverList);
902 } catch(Exception e) {
903 throw new RuntimeException(e);
905 } catch(AssertionError ae) {
906 logger.error("Assertion broken in testProxyPutDuringRebalancing ", ae);
907 throw ae;
912 private void populateData(Cluster cluster, StoreDefinition storeDef) throws Exception {
913 // Create SocketStores for each Node first
914 Map<Integer, Store<ByteArray, byte[], byte[]>> storeMap = new HashMap<Integer, Store<ByteArray, byte[], byte[]>>();
915 for(Node node: cluster.getNodes()) {
916 storeMap.put(node.getId(),
917 getSocketStore(storeDef.getName(), node.getHost(), node.getSocketPort()));
919 BaseStoreRoutingPlan storeInstance = new BaseStoreRoutingPlan(cluster, storeDef);
920 for(Entry<String, String> entry: testEntries.entrySet()) {
921 ByteArray keyBytes = new ByteArray(ByteUtils.getBytes(entry.getKey(), "UTF-8"));
922 List<Integer> preferenceNodes = storeInstance.getReplicationNodeList(keyBytes.get());
923 // Go over every node
924 for(int nodeId: preferenceNodes) {
925 try {
926 storeMap.get(nodeId)
927 .put(keyBytes,
928 new Versioned<byte[]>(ByteUtils.getBytes(entry.getValue(), "UTF-8")),
929 null);
930 } catch(ObsoleteVersionException e) {
931 logger.info("Why are we seeing this at all here ?? ");
932 e.printStackTrace();
936 // close all socket stores
937 for(Store<ByteArray, byte[], byte[]> store: storeMap.values()) {
938 store.close();