2 * Copyright 2008-2013 LinkedIn, Inc
4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5 * use this file except in compliance with the License. You may obtain a copy of
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations under
17 package voldemort
.client
.rebalance
;
19 import static org
.junit
.Assert
.assertEquals
;
20 import static org
.junit
.Assert
.assertNotSame
;
21 import static org
.junit
.Assert
.assertTrue
;
22 import static org
.junit
.Assert
.fail
;
25 import java
.io
.IOException
;
26 import java
.util
.ArrayList
;
27 import java
.util
.Arrays
;
28 import java
.util
.Collections
;
29 import java
.util
.HashMap
;
30 import java
.util
.Iterator
;
31 import java
.util
.List
;
33 import java
.util
.Map
.Entry
;
34 import java
.util
.concurrent
.CountDownLatch
;
35 import java
.util
.concurrent
.ExecutorService
;
36 import java
.util
.concurrent
.Executors
;
37 import java
.util
.concurrent
.TimeUnit
;
38 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
40 import org
.apache
.commons
.io
.FileUtils
;
41 import org
.apache
.log4j
.Logger
;
42 import org
.junit
.After
;
43 import org
.junit
.Before
;
44 import org
.junit
.Test
;
46 import voldemort
.ClusterTestUtils
;
47 import voldemort
.ServerTestUtils
;
48 import voldemort
.client
.ClientConfig
;
49 import voldemort
.client
.DefaultStoreClient
;
50 import voldemort
.client
.RoutingTier
;
51 import voldemort
.client
.SocketStoreClientFactory
;
52 import voldemort
.client
.StoreClient
;
53 import voldemort
.client
.protocol
.admin
.AdminClient
;
54 import voldemort
.cluster
.Cluster
;
55 import voldemort
.cluster
.Node
;
56 import voldemort
.routing
.BaseStoreRoutingPlan
;
57 import voldemort
.routing
.RoutingStrategyType
;
58 import voldemort
.serialization
.SerializerDefinition
;
59 import voldemort
.server
.VoldemortServer
;
60 import voldemort
.store
.InvalidMetadataException
;
61 import voldemort
.store
.Store
;
62 import voldemort
.store
.StoreDefinition
;
63 import voldemort
.store
.StoreDefinitionBuilder
;
64 import voldemort
.store
.bdb
.BdbStorageConfiguration
;
65 import voldemort
.store
.metadata
.MetadataStore
;
66 import voldemort
.store
.metadata
.MetadataStore
.VoldemortState
;
67 import voldemort
.store
.slop
.strategy
.HintedHandoffStrategyType
;
68 import voldemort
.utils
.ByteArray
;
69 import voldemort
.utils
.ByteUtils
;
70 import voldemort
.utils
.RebalanceUtils
;
71 import voldemort
.utils
.UpdateClusterUtils
;
72 import voldemort
.versioning
.ClockEntry
;
73 import voldemort
.versioning
.ObsoleteVersionException
;
74 import voldemort
.versioning
.VectorClock
;
75 import voldemort
.versioning
.Versioned
;
76 import voldemort
.xml
.StoreDefinitionsMapper
;
78 import com
.google
.common
.collect
.Lists
;
81 * Rebalancing tests with zoned configurations with non contiguous zones/node ids
83 public class ZonedRebalanceNonContiguousZonesTest
extends AbstractRebalanceTest
{
85 private static final Logger logger
= Logger
.getLogger(ZonedRebalanceNonContiguousZonesTest
.class.getName());
87 private final int NUM_KEYS
= 100;
89 protected static String testStoreNameRW
= "test";
90 protected static String testStoreNameRW2
= "test2";
92 protected static String storeDefFileWithoutReplication
;
93 protected static String storeDefFileWithReplication
;
94 protected static String rwStoreDefFileWithReplication
;
95 protected static String rwTwoStoreDefFileWithReplication
;
97 static Cluster z1z3Current
;
98 static Cluster z1z3Shuffle
;
99 static Cluster z1z3ClusterExpansionNN
;
100 static Cluster z1z3ClusterExpansionPP
;
101 static String z1z3StoresXml
;
102 static List
<StoreDefinition
> z1z3Stores
;
104 static Cluster z1z3z5Current
;
105 static Cluster z1z3z5Shuffle
;
106 static Cluster z1z3z5ClusterExpansionNNN
;
107 static Cluster z1z3z5ClusterExpansionPPP
;
108 static String z1z3z5StoresXml
;
109 static List
<StoreDefinition
> z1z3z5Stores
;
111 private List
<StoreDefinition
> storeDefWithoutReplication
;
112 private List
<StoreDefinition
> storeDefWithReplication
;
113 private StoreDefinition rwStoreDefWithoutReplication
;
114 private StoreDefinition rwStoreDefWithReplication
;
115 private StoreDefinition rwStoreDefWithReplication2
;
117 public ZonedRebalanceNonContiguousZonesTest() {
122 protected int getNumKeys() {
127 public void setUp() throws IOException
{
132 public static void setupZ1Z3() throws IOException
{
133 z1z3Current
= ClusterTestUtils
.getZ1Z3ClusterWithNonContiguousNodeIds();
134 z1z3Shuffle
= ClusterTestUtils
.getZ1Z3ClusterWithNonContiguousNodeIdsWithSwappedPartitions();
135 z1z3ClusterExpansionNN
= ClusterTestUtils
.getZ1Z3ClusterWithNonContiguousNodeIdsWithNN();
136 z1z3ClusterExpansionPP
= ClusterTestUtils
.getZ1Z3ClusterWithNonContiguousNodeIdsWithPP();
138 z1z3Stores
= ClusterTestUtils
.getZ1Z3StoreDefsBDB();
139 File z1z3file
= File
.createTempFile("z1z3-stores-", ".xml");
140 FileUtils
.writeStringToFile(z1z3file
, new StoreDefinitionsMapper().writeStoreList(z1z3Stores
));
141 z1z3StoresXml
= z1z3file
.getAbsolutePath();
143 z1z3z5Current
= ClusterTestUtils
.getZ1Z3Z5ClusterWithNonContiguousNodeIds();
144 z1z3z5Shuffle
= ClusterTestUtils
.getZ1Z3Z5ClusterWithNonContiguousNodeIdsWithSwappedPartitions();
145 z1z3z5ClusterExpansionNNN
= ClusterTestUtils
.getZ1Z3Z5ClusterWithNonContiguousNodeIdsWithNNN();
146 z1z3z5ClusterExpansionPPP
= ClusterTestUtils
.getZ1Z3Z5ClusterWithNonContiguousNodeIdsWithPPP();
148 z1z3z5Stores
= ClusterTestUtils
.getZ1Z3Z5StoreDefsBDB();
149 File z1z3z5file
= File
.createTempFile("z1z3z5-stores-", ".xml");
150 FileUtils
.writeStringToFile(z1z3z5file
, new StoreDefinitionsMapper().writeStoreList(z1z3z5Stores
));
151 z1z3z5StoresXml
= z1z3z5file
.getAbsolutePath();
154 public void setUpRWStuff() throws IOException
{
155 // First without replication
156 HashMap
<Integer
, Integer
> zrfRWStoreWithoutReplication
= new HashMap
<Integer
, Integer
>();
157 zrfRWStoreWithoutReplication
.put(1, 1);
158 zrfRWStoreWithoutReplication
.put(3, 1);
159 rwStoreDefWithoutReplication
= new StoreDefinitionBuilder().setName(testStoreNameRW
)
160 .setType(BdbStorageConfiguration
.TYPE_NAME
)
161 .setKeySerializer(new SerializerDefinition("string"))
162 .setValueSerializer(new SerializerDefinition("string"))
163 .setRoutingPolicy(RoutingTier
.CLIENT
)
164 .setRoutingStrategyType(RoutingStrategyType
.ZONE_STRATEGY
)
165 .setReplicationFactor(2)
166 .setPreferredReads(1)
168 .setPreferredWrites(1)
169 .setRequiredWrites(1)
170 .setZoneCountReads(0)
171 .setZoneCountWrites(0)
172 .setZoneReplicationFactor(zrfRWStoreWithoutReplication
)
173 .setHintedHandoffStrategy(HintedHandoffStrategyType
.PROXIMITY_STRATEGY
)
176 storeDefWithoutReplication
= Lists
.newArrayList(rwStoreDefWithoutReplication
);
177 String storeDefWithoutReplicationString
= new StoreDefinitionsMapper().writeStoreList(storeDefWithoutReplication
);
178 File file
= File
.createTempFile("two-stores-", ".xml");
179 FileUtils
.writeStringToFile(file
, storeDefWithoutReplicationString
);
180 storeDefFileWithoutReplication
= file
.getAbsolutePath();
182 // Now with replication
183 HashMap
<Integer
, Integer
> zrfRWStoreWithReplication
= new HashMap
<Integer
, Integer
>();
184 zrfRWStoreWithReplication
.put(1, 2);
185 zrfRWStoreWithReplication
.put(3, 2);
186 rwStoreDefWithReplication
= new StoreDefinitionBuilder().setName(testStoreNameRW
)
187 .setType(BdbStorageConfiguration
.TYPE_NAME
)
188 .setKeySerializer(new SerializerDefinition("string"))
189 .setValueSerializer(new SerializerDefinition("string"))
190 .setRoutingPolicy(RoutingTier
.CLIENT
)
191 .setRoutingStrategyType(RoutingStrategyType
.ZONE_STRATEGY
)
192 .setReplicationFactor(4)
193 .setPreferredReads(1)
195 .setPreferredWrites(1)
196 .setRequiredWrites(1)
197 .setZoneCountReads(0)
198 .setZoneCountWrites(0)
199 .setZoneReplicationFactor(zrfRWStoreWithReplication
)
200 .setHintedHandoffStrategy(HintedHandoffStrategyType
.PROXIMITY_STRATEGY
)
202 rwStoreDefWithReplication2
= new StoreDefinitionBuilder().setName(testStoreNameRW2
)
203 .setType(BdbStorageConfiguration
.TYPE_NAME
)
204 .setKeySerializer(new SerializerDefinition("string"))
205 .setValueSerializer(new SerializerDefinition("string"))
206 .setRoutingPolicy(RoutingTier
.CLIENT
)
207 .setRoutingStrategyType(RoutingStrategyType
.ZONE_STRATEGY
)
208 .setReplicationFactor(4)
209 .setPreferredReads(1)
211 .setPreferredWrites(1)
212 .setRequiredWrites(1)
213 .setZoneCountReads(0)
214 .setZoneCountWrites(0)
215 .setZoneReplicationFactor(zrfRWStoreWithReplication
)
216 .setHintedHandoffStrategy(HintedHandoffStrategyType
.PROXIMITY_STRATEGY
)
219 file
= File
.createTempFile("rw-stores-", ".xml");
220 FileUtils
.writeStringToFile(file
,
221 new StoreDefinitionsMapper().writeStoreList(Lists
.newArrayList(rwStoreDefWithReplication
)));
222 rwStoreDefFileWithReplication
= file
.getAbsolutePath();
224 file
= File
.createTempFile("rw-two-stores-", ".xml");
225 FileUtils
.writeStringToFile(file
,
226 new StoreDefinitionsMapper().writeStoreList(Lists
.newArrayList(rwStoreDefWithReplication
,
227 rwStoreDefWithReplication2
)));
228 rwTwoStoreDefFileWithReplication
= file
.getAbsolutePath();
230 storeDefWithReplication
= Lists
.newArrayList(rwStoreDefWithReplication
);
231 String storeDefWithReplicationString
= new StoreDefinitionsMapper().writeStoreList(storeDefWithReplication
);
232 file
= File
.createTempFile("two-stores-", ".xml");
233 FileUtils
.writeStringToFile(file
, storeDefWithReplicationString
);
234 storeDefFileWithReplication
= file
.getAbsolutePath();
238 public void tearDown() {
241 socketStoreFactory
.close();
242 socketStoreFactory
= null;
243 ClusterTestUtils
.reset();
246 // TODO: The tests based on this method are susceptible to TOCTOU
247 // BindException issue since findFreePorts is used to determine the ports for localhost:PORT of each node.
249 * Scripts the execution of a specific type of zoned rebalance test: sets up
250 * cluster based on cCluster plus any new nodes/zones in fCluster,
251 * rebalances to fCluster, verifies rebalance was correct.
253 * @param testTag For pretty printing
254 * @param cCluster current cluster
255 * @param fCluster final cluster
256 * @param cStoresXml XML file with current stores xml
257 * @param fStoresXml Unused parameter. Included for symmetry in method
259 * @param cStoreDefs store defs for current cluster (from on cStoresXml)
260 * @param fStoreDefs store defs for final cluster.
263 public void testZonedRebalance(String testTag
,
268 List
<StoreDefinition
> cStoreDefs
,
269 List
<StoreDefinition
> fStoreDefs
) throws Exception
{
270 logger
.info("Starting " + testTag
);
271 // Hacky work around of TOCTOU bind Exception issues. Each test that invokes this method brings
272 // servers up & down on the same ports. The OS seems to need a rest between subsequent tests...
273 Thread
.sleep(TimeUnit
.SECONDS
.toMillis(5));
275 Cluster interimCluster
= RebalanceUtils
.getInterimCluster(cCluster
, fCluster
);
276 List
<Integer
> serverList
= new ArrayList
<Integer
>(interimCluster
.getNodeIds());
277 Map
<String
, String
> configProps
= new HashMap
<String
, String
>();
278 configProps
.put("admin.max.threads", "5");
279 interimCluster
= startServers(interimCluster
, cStoresXml
, serverList
, configProps
);
280 String bootstrapUrl
= getBootstrapUrl(interimCluster
, 3);
281 ClusterTestUtils
.RebalanceKit rebalanceKit
= ClusterTestUtils
.getRebalanceKit(bootstrapUrl
, fCluster
,
284 for(StoreDefinition storeDef
: cStoreDefs
) {
285 populateData(cCluster
, storeDef
);
287 rebalanceAndCheck(rebalanceKit
.plan
, rebalanceKit
.controller
, serverList
);
288 checkConsistentMetadata(fCluster
, serverList
);
290 stopServer(serverList
);
292 } catch(AssertionError ae
) {
293 logger
.error("Assertion broken in " + testTag
+ " : ", ae
);
298 public void testZonedRebalance(String testTag
,
302 List
<StoreDefinition
> storeDefs
) throws Exception
{
303 testZonedRebalance(testTag
, cCluster
, fCluster
, storesXml
, storesXml
, storeDefs
, storeDefs
);
306 @Test(timeout
= 600000)
307 public void testNoopZ1Z3() throws Exception
{
308 testZonedRebalance("TestNoopZ1Z3", z1z3Current
, z1z3Current
, z1z3StoresXml
, z1z3Stores
);
311 @Test(timeout
= 600000)
312 public void testShuffleZ1Z3() throws Exception
{
313 testZonedRebalance("TestShuffleZ1Z3", z1z3Current
, z1z3Shuffle
, z1z3StoresXml
, z1z3Stores
);
316 @Test(timeout
= 600000)
317 public void testShuffleZ1Z3AndShuffleAgain() throws Exception
{
319 logger
.info("Starting testShuffleZZAndShuffleAgain");
320 // Hacky work around of TOCTOU bind Exception issues. Each test that invokes this method brings servers
321 // up & down on the same ports. The OS seems to need a rest between subsequent tests...
322 Thread
.sleep(TimeUnit
.SECONDS
.toMillis(5));
324 Cluster interimCluster
= RebalanceUtils
.getInterimCluster(z1z3Current
, z1z3Shuffle
);
326 // start all the servers
327 List
<Integer
> serverList
= new ArrayList
<Integer
>(interimCluster
.getNodeIds());
328 Map
<String
, String
> configProps
= new HashMap
<String
, String
>();
329 configProps
.put("admin.max.threads", "5");
330 interimCluster
= startServers(interimCluster
, z1z3StoresXml
, serverList
, configProps
);
332 // Populate cluster with data
333 for(StoreDefinition storeDef
: z1z3Stores
) {
334 populateData(z1z3Current
, storeDef
);
336 String bootstrapUrl
= getBootstrapUrl(interimCluster
, 3);
338 ClusterTestUtils
.RebalanceKit rebalanceKit
= ClusterTestUtils
.getRebalanceKit(bootstrapUrl
, z1z3Shuffle
, z1z3Stores
);
339 rebalanceAndCheck(rebalanceKit
.plan
, rebalanceKit
.controller
, serverList
);
340 checkConsistentMetadata(z1z3Shuffle
, serverList
);
342 // Now, go from shuffled state, back to the original to confirm subsequent rebalances can be invoked.
343 rebalanceKit
= ClusterTestUtils
.getRebalanceKit(bootstrapUrl
, z1z3Current
, z1z3Stores
);
344 rebalanceAndCheck(rebalanceKit
.plan
, rebalanceKit
.controller
, serverList
);
345 checkConsistentMetadata(z1z3Current
, serverList
);
347 stopServer(serverList
);
350 @Test(timeout
= 600000)
351 public void testClusterExpansion() throws Exception
{
352 testZonedRebalance("TestClusterExpansionZ1Z3", z1z3Current
, z1z3ClusterExpansionPP
, z1z3StoresXml
, z1z3Stores
);
355 @Test(timeout
= 600000)
356 public void testNoopZ1Z3Z5() throws Exception
{
357 testZonedRebalance("TestNoopZ1Z3Z5", z1z3z5Current
, z1z3z5Current
, z1z3z5StoresXml
, z1z3z5Stores
);
360 @Test(timeout
= 600000)
361 public void testShuffleZ1Z3Z5() throws Exception
{
362 testZonedRebalance("TestShuffleZ1Z3Z5", z1z3z5Current
, z1z3z5Shuffle
, z1z3z5StoresXml
, z1z3z5Stores
);
365 @Test(timeout
= 600000)
366 public void testClusterExpansionZ1Z3Z5() throws Exception
{
367 testZonedRebalance("TestClusterExpansionZZZ", z1z3z5Current
, z1z3z5ClusterExpansionPPP
, z1z3z5StoresXml
, z1z3z5Stores
);
370 @Test(timeout
= 600000)
371 public void testRWRebalance() throws Exception
{
372 logger
.info("Starting testRWRebalance");
374 int zoneIds
[] = new int[] { 1, 3 };
375 int nodesPerZone
[][] = new int[][] { { 3, 4 }, { 9, 10 } };
376 int partitionMap
[][] = new int[][] { { 0, 2, 4, 6 }, {}, { 1, 3, 5, 7 }, {} };
377 Cluster currentCluster
= ServerTestUtils
.getLocalNonContiguousZonedCluster(zoneIds
,
380 ClusterTestUtils
.getClusterPorts());
382 Cluster finalCluster
= UpdateClusterUtils
.createUpdatedCluster(currentCluster
, 10, Lists
.newArrayList(2, 6));
383 finalCluster
= UpdateClusterUtils
.createUpdatedCluster(finalCluster
, 4, Lists
.newArrayList(3, 7));
384 // start all the servers
385 List
<Integer
> serverList
= Arrays
.asList(3, 4, 9, 10);
386 Map
<String
, String
> configProps
= new HashMap
<String
, String
>();
387 configProps
.put("admin.max.threads", "5");
388 currentCluster
= startServers(currentCluster
, storeDefFileWithoutReplication
, serverList
, configProps
);
389 String bootstrapUrl
= getBootstrapUrl(currentCluster
, 3);
390 ClusterTestUtils
.RebalanceKit rebalanceKit
= ClusterTestUtils
.getRebalanceKit(bootstrapUrl
, finalCluster
);
392 populateData(currentCluster
, rwStoreDefWithoutReplication
);
393 rebalanceAndCheck(rebalanceKit
.plan
, rebalanceKit
.controller
, Arrays
.asList(4, 9));
394 checkConsistentMetadata(finalCluster
, serverList
);
396 stopServer(serverList
);
398 } catch(AssertionError ae
) {
399 logger
.error("Assertion broken in testRWRebalance ", ae
);
404 public void testRWRebalanceWithReplication(boolean serial
) throws Exception
{
405 logger
.info("Starting testRWRebalanceWithReplication");
406 int zoneIds
[] = new int[] { 1, 3 };
407 int nodesPerZone
[][] = new int[][] { { 3, 4 }, { 9, 10 } };
408 int partitionMap
[][] = new int[][] { { 0, 2, 4 }, { 6 }, { 1, 3, 5 }, { 7 } };
409 Cluster currentCluster
= ServerTestUtils
.getLocalNonContiguousZonedCluster(zoneIds
,
412 ClusterTestUtils
.getClusterPorts());
413 Cluster finalCluster
= UpdateClusterUtils
.createUpdatedCluster(currentCluster
, 10, Lists
.newArrayList(2));
414 finalCluster
= UpdateClusterUtils
.createUpdatedCluster(finalCluster
, 4, Lists
.newArrayList(3));
416 List
<Integer
> serverList
= Arrays
.asList(3, 4, 9, 10);
417 Map
<String
, String
> configProps
= new HashMap
<String
, String
>();
418 configProps
.put("admin.max.threads", "5");
420 configProps
.put("max.parallel.stores.rebalancing", String
.valueOf(1));
422 currentCluster
= startServers(currentCluster
, storeDefFileWithReplication
, serverList
, configProps
);
423 String bootstrapUrl
= getBootstrapUrl(currentCluster
, 3);
425 ClusterTestUtils
.RebalanceKit rebalanceKit
= ClusterTestUtils
.getRebalanceKit(bootstrapUrl
, maxParallel
, finalCluster
);
427 populateData(currentCluster
, rwStoreDefWithReplication
);
428 rebalanceAndCheck(rebalanceKit
.plan
, rebalanceKit
.controller
, Arrays
.asList(3, 4, 9, 10));
429 checkConsistentMetadata(finalCluster
, serverList
);
431 stopServer(serverList
);
435 @Test(timeout
= 600000)
436 public void testRWRebalanceWithReplication() throws Exception
{
438 testRWRebalanceWithReplication(false);
439 } catch(AssertionError ae
) {
440 logger
.error("Assertion broken in testRWRebalanceWithReplication ", ae
);
445 @Test(timeout
= 600000)
446 public void testRWRebalanceWithReplicationSerial() throws Exception
{
448 testRWRebalanceWithReplication(true);
449 } catch(AssertionError ae
) {
450 logger
.error("Assertion broken in testRWRebalanceWithReplicationSerial ", ae
);
455 @Test(timeout
= 600000)
456 public void testRebalanceCleanPrimarySecondary() throws Exception
{
457 logger
.info("Starting testRebalanceCleanPrimary");
460 int zoneIds
[] = new int[] { 1, 3 };
461 int nodesPerZone
[][] = new int[][] { { 3, 4, 5 }, { 9, 10, 11 } };
462 int partitionMap
[][] = new int[][] { { 0 }, { 1, 6 }, { 2 }, { 3 }, { 4, 7 }, { 5 } };
464 Cluster currentCluster
= ServerTestUtils
.getLocalNonContiguousZonedCluster(zoneIds
,
467 ClusterTestUtils
.getClusterPorts());
469 Cluster finalCluster
= UpdateClusterUtils
.createUpdatedCluster(currentCluster
, 5, Lists
.newArrayList(7));
470 finalCluster
= UpdateClusterUtils
.createUpdatedCluster(finalCluster
, 11, Lists
.newArrayList(6));
473 * original server partition ownership
475 * [s3 : p0,p3,p4,p5,p6,p7] [s4 : p1-p7] [s5 : p1,p2] [s9 : p0,p1,p2,p3,p6,p7] [s10 : p1-p7] [s11 : p4,p5]
477 * final server partition ownership
479 * [s3 : p0,p2,p3,p4,p5,p6,p7] [s4 : p0,p1] [s5 : p1-p7] [s9 : p0.p1,p2,p3,p5,p6,p7]
480 * [s10 : p0,p1,p2,p3,p4,p7] [s11 : p4,p5,p6]
484 List
<Integer
> serverList
= Arrays
.asList(3, 4, 5, 9, 10, 11);
485 Map
<String
, String
> configProps
= new HashMap
<String
, String
>();
486 configProps
.put("enable.repair", "true");
487 currentCluster
= startServers(currentCluster
,
488 rwStoreDefFileWithReplication
,
492 String bootstrapUrl
= getBootstrapUrl(currentCluster
, 3);
493 ClusterTestUtils
.RebalanceKit rebalanceKit
= ClusterTestUtils
.getRebalanceKit(bootstrapUrl
,
497 populateData(currentCluster
, rwStoreDefWithReplication
);
498 AdminClient admin
= rebalanceKit
.controller
.getAdminClient();
500 List
<ByteArray
> p6KeySamples
= sampleKeysFromPartition(admin
, 4, rwStoreDefWithReplication
.getName(),
501 Arrays
.asList(6), 20);
502 List
<ByteArray
> p1KeySamples
= sampleKeysFromPartition(admin
, 4, rwStoreDefWithReplication
.getName(),
503 Arrays
.asList(1), 20);
504 List
<ByteArray
> p3KeySamples
= sampleKeysFromPartition(admin
, 3, rwStoreDefWithReplication
.getName(),
505 Arrays
.asList(3), 20);
506 List
<ByteArray
> p2KeySamples
= sampleKeysFromPartition(admin
, 4, rwStoreDefWithReplication
.getName(),
507 Arrays
.asList(2), 20);
508 List
<ByteArray
> p7KeySamples
= sampleKeysFromPartition(admin
, 10, rwStoreDefWithReplication
.getName(),
509 Arrays
.asList(7), 20);
511 rebalanceAndCheck(rebalanceKit
.plan
, rebalanceKit
.controller
, Arrays
.asList(3, 4, 5, 9));
512 checkConsistentMetadata(finalCluster
, serverList
);
513 // Do the cleanup operation
514 for(int i
= 0; i
< 6; i
++) {
515 admin
.storeMntOps
.repairJob(serverList
.get(i
));
517 // wait for the repairs to complete
518 for(int i
= 0; i
< 6; i
++) {
519 ServerTestUtils
.waitForAsyncOperationOnServer(serverMap
.get(serverList
.get(i
)), "Repair", 5000);
522 // confirm a primary movement in zone 1 : P6 : s4 -> S5. The zone 1 primary changes when
523 // p6 moves cross zone check for existence of p6 in server 5,
524 checkForKeyExistence(admin
, 5, rwStoreDefWithReplication
.getName(), p6KeySamples
);
526 // confirm a secondary movement in zone 1.. p2 : s4 -> s3 check
527 // for its existence in server 3
528 checkForKeyExistence(admin
, 3, rwStoreDefWithReplication
.getName(), p2KeySamples
);
530 // check for its absernce in server 4
531 // also check that p1 is stable in server 4 [primary stability]
532 checkForKeyExistence(admin
, 4, rwStoreDefWithReplication
.getName(), p1KeySamples
);
534 // check that p3 is stable in server 3 [Secondary stability]
535 checkForKeyExistence(admin
, 3, rwStoreDefWithReplication
.getName(), p3KeySamples
);
537 // finally, test for server 10 which now became the secondary
538 // for p7 from being a primary before
539 checkForKeyExistence(admin
, 10, rwStoreDefWithReplication
.getName(), p7KeySamples
);
542 stopServer(serverList
);
544 } catch(AssertionError ae
) {
545 logger
.error("Assertion broken in testRebalanceCleanPrimarySecondary ", ae
);
550 @Test(timeout
= 600000)
551 public void testProxyGetDuringRebalancing() throws Exception
{
552 logger
.info("Starting testProxyGetDuringRebalancing");
555 int zoneIds
[] = new int[] { 1, 3 };
556 int nodesPerZone
[][] = new int[][] { { 3, 4 }, { 9, 10 } };
557 int partitionMap
[][] = new int[][] { { 0, 2, 4 }, { 6 }, { 1, 3, 5 }, { 7 } };
558 Cluster currentCluster
= ServerTestUtils
.getLocalNonContiguousZonedCluster(zoneIds
,
561 ClusterTestUtils
.getClusterPorts());
562 Cluster tmpfinalCluster
= UpdateClusterUtils
.createUpdatedCluster(currentCluster
,
564 Lists
.newArrayList(2));
565 final Cluster finalCluster
= UpdateClusterUtils
.createUpdatedCluster(tmpfinalCluster
,
567 Lists
.newArrayList(3));
570 final List
<Integer
> serverList
= Arrays
.asList(3, 4, 9, 10);
571 Map
<String
, String
> configProps
= new HashMap
<String
, String
>();
572 configProps
.put("admin.max.threads", "5");
573 final Cluster updatedCurrentCluster
= startServers(currentCluster
,
574 storeDefFileWithReplication
,
578 ExecutorService executors
= Executors
.newFixedThreadPool(2);
579 final AtomicBoolean rebalancingComplete
= new AtomicBoolean(false);
580 final List
<Exception
> exceptions
= Collections
.synchronizedList(new ArrayList
<Exception
>());
582 String bootstrapUrl
= getBootstrapUrl(updatedCurrentCluster
, 3);
584 final ClusterTestUtils
.RebalanceKit rebalanceKit
= ClusterTestUtils
.getRebalanceKit(bootstrapUrl
,
590 populateData(currentCluster
, rwStoreDefWithReplication
);
592 final SocketStoreClientFactory factory
= new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(getBootstrapUrl(currentCluster
,
594 .setEnableLazy(false)
595 .setSocketTimeout(120,
598 final StoreClient
<String
, String
> storeClientRW
= new DefaultStoreClient
<String
, String
>(rwStoreDefWithReplication
.getName(),
603 final CountDownLatch latch
= new CountDownLatch(2);
604 // start get operation.
605 executors
.execute(new Runnable() {
610 List
<String
> keys
= new ArrayList
<String
>(testEntries
.keySet());
612 while(!rebalancingComplete
.get()) {
613 // should always able to get values.
614 int index
= (int) (Math
.random() * keys
.size());
616 // should get a valid value
618 Versioned
<String
> value
= storeClientRW
.get(keys
.get(index
));
619 assertNotSame("StoreClient get() should not return null.",
622 assertEquals("Value returned should be good",
623 new Versioned
<String
>(testEntries
.get(keys
.get(index
))),
625 } catch(Exception e
) {
626 logger
.error("Exception in proxy get thread", e
);
632 } catch(Exception e
) {
633 logger
.error("Exception in proxy get thread", e
);
643 executors
.execute(new Runnable() {
650 rebalanceAndCheck(rebalanceKit
.plan
,
651 rebalanceKit
.controller
,
652 Arrays
.asList(3, 4, 9, 10));
655 rebalancingComplete
.set(true);
656 checkConsistentMetadata(finalCluster
, serverList
);
658 } catch(Exception e
) {
663 stopServer(serverList
);
664 } catch(Exception e
) {
665 throw new RuntimeException(e
);
673 executors
.shutdown();
674 executors
.awaitTermination(300, TimeUnit
.SECONDS
);
676 // check No Exception
677 if(exceptions
.size() > 0) {
678 for(Exception e
: exceptions
) {
681 fail("Should not see any exceptions.");
685 stopServer(serverList
);
687 } catch(AssertionError ae
) {
688 logger
.error("Assertion broken in testProxyGetDuringRebalancing ", ae
);
693 @Test(timeout
= 600000)
694 public void testProxyPutDuringRebalancing() throws Exception
{
695 logger
.info("Starting testProxyPutDuringRebalancing");
698 int zoneIds
[] = new int[] { 1, 3 };
699 int nodesPerZone
[][] = new int[][] { { 3, 4, 5 }, { 9, 10, 11 } };
700 int partitionMap
[][] = new int[][] { { 0 }, { 1, 6 }, { 2 }, { 3 }, { 4, 7 }, { 5 } };
701 Cluster currentCluster
= ServerTestUtils
.getLocalNonContiguousZonedCluster(zoneIds
,
704 ClusterTestUtils
.getClusterPorts());
706 Cluster finalCluster
= UpdateClusterUtils
.createUpdatedCluster(currentCluster
,
708 Lists
.newArrayList(7));
709 finalCluster
= UpdateClusterUtils
.createUpdatedCluster(finalCluster
,
711 Lists
.newArrayList(6));
714 * Original partition map
716 * [s3 : p0] [s4 : p1, p6] [s5 : p2]
718 * [s9 : p3] [s10 : p4, p7] [s11 : p5]
720 * final server partition ownership
722 * [s3 : p0] [s4 : p1] [s5 : p2, p7]
724 * [s9 : p3] [s10 : p4] [s11 : p5, p6]
726 * Note that rwStoreDefFileWithReplication is a "2/1/1" store def.
728 * Original server n-ary partition ownership
730 * [s3 : p0, p3-7] [s4 : p0-p7] [s5 : p1-2]
732 * [s9 : p0-3, p6-7] [s10 : p0-p7] [s11 : p4-5]
734 * final server n-ary partition ownership
736 * [s3 : p0, p2-7] [s4 : p0-1] [s5 : p1-p7]
738 * [s9 : p0-3, p5-7] [s10 : p0-4, p7] [s11 : p4-6]
740 List
<Integer
> serverList
= Arrays
.asList(3, 4, 5, 9, 10, 11);
741 Map
<String
, String
> configProps
= new HashMap
<String
, String
>();
742 configProps
.put("admin.max.threads", "5");
743 final Cluster updatedCurrentCluster
= startServers(currentCluster
,
744 rwStoreDefFileWithReplication
,
747 ExecutorService executors
= Executors
.newFixedThreadPool(2);
748 final AtomicBoolean rebalancingComplete
= new AtomicBoolean(false);
749 final List
<Exception
> exceptions
= Collections
.synchronizedList(new ArrayList
<Exception
>());
751 // Its is imperative that we test in a single shot since multiple batches would mean the proxy bridges
752 // being torn down and established multiple times and we cannot test against the source
753 // cluster topology then. getRebalanceKit uses batch size of infinite, so this should be fine.
754 String bootstrapUrl
= getBootstrapUrl(updatedCurrentCluster
, 3);
756 final ClusterTestUtils
.RebalanceKit rebalanceKit
= ClusterTestUtils
.getRebalanceKit(bootstrapUrl
,
759 populateData(currentCluster
, rwStoreDefWithReplication
);
760 final AdminClient adminClient
= rebalanceKit
.controller
.getAdminClient();
761 // the plan would cause these partitions to move:
762 // Partition : Donor -> stealer
764 // p2 (Z-SEC) : s4 -> s3
765 // p3-6 (Z-PRI) : s4 -> s5
766 // p7 (Z-PRI) : s3 -> s5
768 // p5 (Z-SEC): s10 -> s9
769 // p6 (Z-PRI): s10 -> s11
771 // Rebalancing will run on servers 3, 5, 9, & 11
772 final List
<ByteArray
> movingKeysList
= sampleKeysFromPartition(adminClient
,
774 rwStoreDefWithReplication
.getName(),
777 assertTrue("Empty list of moving keys...", movingKeysList
.size() > 0);
778 final AtomicBoolean rebalancingStarted
= new AtomicBoolean(false);
779 final AtomicBoolean proxyWritesDone
= new AtomicBoolean(false);
780 final HashMap
<String
, String
> baselineTuples
= new HashMap
<String
, String
>(testEntries
);
781 final HashMap
<String
, VectorClock
> baselineVersions
= new HashMap
<String
, VectorClock
>();
782 for(String key
: baselineTuples
.keySet()) {
783 baselineVersions
.put(key
, new VectorClock());
785 final CountDownLatch latch
= new CountDownLatch(2);
786 // start get operation.
787 executors
.execute(new Runnable() {
790 SocketStoreClientFactory factory
= null;
792 // wait for the rebalancing to begin
793 List
<VoldemortServer
> serverList
= Lists
.newArrayList(serverMap
.get(3), serverMap
.get(5),
794 serverMap
.get(9), serverMap
.get(11));
795 while(!rebalancingComplete
.get()) {
796 Iterator
<VoldemortServer
> serverIterator
= serverList
.iterator();
797 while(serverIterator
.hasNext()) {
798 VoldemortServer server
= serverIterator
.next();
799 if(ByteUtils
.getString(server
.getMetadataStore()
800 .get(MetadataStore
.SERVER_STATE_KEY
, null)
804 .compareTo(VoldemortState
.REBALANCING_MASTER_SERVER
.toString()) == 0) {
805 logger
.info("Server " + server
.getIdentityNode().getId()
806 + " transitioned into REBALANCING MODE");
807 serverIterator
.remove();
810 if(serverList
.size() == 0) {
811 rebalancingStarted
.set(true);
815 if(rebalancingStarted
.get()) {
816 factory
= new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls
817 (getBootstrapUrl(updatedCurrentCluster
, 3))
818 .setEnableLazy(false)
819 .setSocketTimeout(120, TimeUnit
.SECONDS
)
820 .setClientZoneId(3));
822 final StoreClient
<String
, String
> storeClientRW
= new DefaultStoreClient
823 <String
, String
>(testStoreNameRW
,
827 // Now perform some writes and determine the end state of the changed keys.
828 // Initially, all data now with zero vector clock
829 for(ByteArray movingKey
: movingKeysList
) {
831 String keyStr
= ByteUtils
.getString(movingKey
.get(), "UTF-8");
832 String valStr
= "proxy_write";
833 storeClientRW
.put(keyStr
, valStr
);
834 baselineTuples
.put(keyStr
, valStr
);
835 baselineVersions
.get(keyStr
)
836 .incrementVersion(11, System
.currentTimeMillis());
837 proxyWritesDone
.set(true);
838 if(rebalancingComplete
.get()) {
841 } catch(InvalidMetadataException e
) {
842 logger
.error("Encountered an invalid metadata exception.. ", e
);
846 } catch(Exception e
) {
847 logger
.error("Exception in proxy write thread..", e
);
857 executors
.execute(new Runnable() {
862 rebalanceKit
.rebalance();
863 } catch(Exception e
) {
864 logger
.error("Error in rebalancing... ", e
);
867 rebalancingComplete
.set(true);
873 executors
.shutdown();
874 executors
.awaitTermination(300, TimeUnit
.SECONDS
);
876 assertEquals("Client did not see all server transition into rebalancing state",
877 rebalancingStarted
.get(), true);
878 assertEquals("Not enough time to begin proxy writing", proxyWritesDone
.get(), true);
879 checkEntriesPostRebalance(updatedCurrentCluster
, finalCluster
,
880 Lists
.newArrayList(rwStoreDefWithReplication
),
881 Arrays
.asList(3, 4, 5, 9, 10, 11), baselineTuples
, baselineVersions
);
882 checkConsistentMetadata(finalCluster
, serverList
);
883 // check No Exception
884 if(exceptions
.size() > 0) {
885 for(Exception e
: exceptions
) {
888 fail("Should not see any exceptions.");
890 // check that the proxy writes were made to the original donor, node 4
891 List
<ClockEntry
> clockEntries
= new ArrayList
<ClockEntry
>(serverList
.size());
892 for(Integer nodeid
: serverList
)
893 clockEntries
.add(new ClockEntry(nodeid
.shortValue(), System
.currentTimeMillis()));
894 VectorClock clusterXmlClock
= new VectorClock(clockEntries
, System
.currentTimeMillis());
895 for(Integer nodeid
: serverList
)
896 adminClient
.metadataMgmtOps
.updateRemoteCluster(nodeid
, currentCluster
, clusterXmlClock
);
897 adminClient
.setAdminClientCluster(currentCluster
);
898 checkForTupleEquivalence(adminClient
, 4, testStoreNameRW
, movingKeysList
, baselineTuples
, baselineVersions
);
901 stopServer(serverList
);
902 } catch(Exception e
) {
903 throw new RuntimeException(e
);
905 } catch(AssertionError ae
) {
906 logger
.error("Assertion broken in testProxyPutDuringRebalancing ", ae
);
912 private void populateData(Cluster cluster
, StoreDefinition storeDef
) throws Exception
{
913 // Create SocketStores for each Node first
914 Map
<Integer
, Store
<ByteArray
, byte[], byte[]>> storeMap
= new HashMap
<Integer
, Store
<ByteArray
, byte[], byte[]>>();
915 for(Node node
: cluster
.getNodes()) {
916 storeMap
.put(node
.getId(),
917 getSocketStore(storeDef
.getName(), node
.getHost(), node
.getSocketPort()));
919 BaseStoreRoutingPlan storeInstance
= new BaseStoreRoutingPlan(cluster
, storeDef
);
920 for(Entry
<String
, String
> entry
: testEntries
.entrySet()) {
921 ByteArray keyBytes
= new ByteArray(ByteUtils
.getBytes(entry
.getKey(), "UTF-8"));
922 List
<Integer
> preferenceNodes
= storeInstance
.getReplicationNodeList(keyBytes
.get());
923 // Go over every node
924 for(int nodeId
: preferenceNodes
) {
928 new Versioned
<byte[]>(ByteUtils
.getBytes(entry
.getValue(), "UTF-8")),
930 } catch(ObsoleteVersionException e
) {
931 logger
.info("Why are we seeing this at all here ?? ");
936 // close all socket stores
937 for(Store
<ByteArray
, byte[], byte[]> store
: storeMap
.values()) {