2 * Copyright 2008-2013 LinkedIn, Inc
4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5 * use this file except in compliance with the License. You may obtain a copy of
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations under
17 package voldemort
.client
.rebalance
;
19 import static org
.junit
.Assert
.assertEquals
;
20 import static org
.junit
.Assert
.assertNotSame
;
21 import static org
.junit
.Assert
.assertTrue
;
22 import static org
.junit
.Assert
.fail
;
25 import java
.io
.IOException
;
26 import java
.util
.ArrayList
;
27 import java
.util
.Arrays
;
28 import java
.util
.Collections
;
29 import java
.util
.HashMap
;
30 import java
.util
.Iterator
;
31 import java
.util
.List
;
33 import java
.util
.Map
.Entry
;
34 import java
.util
.concurrent
.CountDownLatch
;
35 import java
.util
.concurrent
.ExecutorService
;
36 import java
.util
.concurrent
.Executors
;
37 import java
.util
.concurrent
.TimeUnit
;
38 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
40 import org
.apache
.commons
.io
.FileUtils
;
41 import org
.apache
.log4j
.Logger
;
42 import org
.junit
.After
;
43 import org
.junit
.Before
;
44 import org
.junit
.Test
;
46 import voldemort
.ClusterTestUtils
;
47 import voldemort
.ServerTestUtils
;
48 import voldemort
.client
.ClientConfig
;
49 import voldemort
.client
.DefaultStoreClient
;
50 import voldemort
.client
.RoutingTier
;
51 import voldemort
.client
.SocketStoreClientFactory
;
52 import voldemort
.client
.StoreClient
;
53 import voldemort
.client
.protocol
.admin
.AdminClient
;
54 import voldemort
.cluster
.Cluster
;
55 import voldemort
.cluster
.Node
;
56 import voldemort
.routing
.BaseStoreRoutingPlan
;
57 import voldemort
.routing
.RoutingStrategyType
;
58 import voldemort
.serialization
.SerializerDefinition
;
59 import voldemort
.server
.VoldemortServer
;
60 import voldemort
.store
.InvalidMetadataException
;
61 import voldemort
.store
.Store
;
62 import voldemort
.store
.StoreDefinition
;
63 import voldemort
.store
.StoreDefinitionBuilder
;
64 import voldemort
.store
.bdb
.BdbStorageConfiguration
;
65 import voldemort
.store
.metadata
.MetadataStore
;
66 import voldemort
.store
.metadata
.MetadataStore
.VoldemortState
;
67 import voldemort
.store
.slop
.strategy
.HintedHandoffStrategyType
;
68 import voldemort
.utils
.ByteArray
;
69 import voldemort
.utils
.ByteUtils
;
70 import voldemort
.utils
.RebalanceUtils
;
71 import voldemort
.utils
.UpdateClusterUtils
;
72 import voldemort
.versioning
.ClockEntry
;
73 import voldemort
.versioning
.ObsoleteVersionException
;
74 import voldemort
.versioning
.VectorClock
;
75 import voldemort
.versioning
.Versioned
;
76 import voldemort
.xml
.StoreDefinitionsMapper
;
78 import com
.google
.common
.collect
.Lists
;
81 * Rebalancing tests with zoned configurations with cross zone moves (since
82 * {@link AbstractNonZonedRebalanceTest} should cover intra zone moves already
85 public abstract class AbstractZonedRebalanceTest
extends AbstractRebalanceTest
{
87 private static final Logger logger
= Logger
.getLogger(AbstractZonedRebalanceTest
.class.getName());
89 protected static String testStoreNameRW
= "test";
90 protected static String testStoreNameRW2
= "test2";
92 protected static String storeDefFileWithoutReplication
;
93 protected static String storeDefFileWithReplication
;
94 protected static String rwStoreDefFileWithReplication
;
95 protected static String rwTwoStoreDefFileWithReplication
;
97 static Cluster zzCurrent
;
98 static Cluster zzShuffle
;
99 static Cluster zzClusterExpansionNN
;
100 static Cluster zzClusterExpansionPP
;
101 static String zzStoresXml
;
102 static List
<StoreDefinition
> zzStores
;
104 static Cluster zzzCurrent
;
105 static Cluster zzzShuffle
;
106 static Cluster zzzClusterExpansionNNN
;
107 static Cluster zzzClusterExpansionPPP
;
108 static Cluster zzeZoneExpansion
;
109 static Cluster zzzZoneExpansionXXP
;
110 static String zzzStoresXml
;
111 static List
<StoreDefinition
> zzzStores
;
113 private List
<StoreDefinition
> storeDefWithoutReplication
;
114 private List
<StoreDefinition
> storeDefWithReplication
;
115 private StoreDefinition rwStoreDefWithoutReplication
;
116 private StoreDefinition rwStoreDefWithReplication
;
117 private StoreDefinition rwStoreDefWithReplication2
;
119 public AbstractZonedRebalanceTest() {
124 public void setUp() throws IOException
{
129 public static void setupZZandZZZ() throws IOException
{
130 zzCurrent
= ClusterTestUtils
.getZZCluster();
131 zzShuffle
= ClusterTestUtils
.getZZClusterWithSwappedPartitions();
132 zzClusterExpansionNN
= ClusterTestUtils
.getZZClusterWithNN();
133 zzClusterExpansionPP
= ClusterTestUtils
.getZZClusterWithPP();
135 zzStores
= ClusterTestUtils
.getZZStoreDefsBDB();
136 File zzfile
= File
.createTempFile("zz-stores-", ".xml");
137 FileUtils
.writeStringToFile(zzfile
, new StoreDefinitionsMapper().writeStoreList(zzStores
));
138 zzStoresXml
= zzfile
.getAbsolutePath();
140 zzzCurrent
= ClusterTestUtils
.getZZZCluster();
141 zzzShuffle
= ClusterTestUtils
.getZZZClusterWithSwappedPartitions();
142 zzzClusterExpansionNNN
= ClusterTestUtils
.getZZZClusterWithNNN();
143 zzzClusterExpansionPPP
= ClusterTestUtils
.getZZZClusterWithPPP();
144 zzeZoneExpansion
= ClusterTestUtils
.getZZECluster();
145 zzzZoneExpansionXXP
= ClusterTestUtils
.getZZEClusterXXP();
147 zzzStores
= ClusterTestUtils
.getZZZStoreDefsBDB();
148 File zzzfile
= File
.createTempFile("zzz-stores-", ".xml");
149 FileUtils
.writeStringToFile(zzzfile
, new StoreDefinitionsMapper().writeStoreList(zzzStores
));
150 zzzStoresXml
= zzzfile
.getAbsolutePath();
153 public void setUpRWStuff() throws IOException
{
154 // First without replication
155 HashMap
<Integer
, Integer
> zrfRWStoreWithoutReplication
= new HashMap
<Integer
, Integer
>();
156 zrfRWStoreWithoutReplication
.put(0, 1);
157 zrfRWStoreWithoutReplication
.put(1, 1);
158 rwStoreDefWithoutReplication
= new StoreDefinitionBuilder().setName(testStoreNameRW
)
159 .setType(BdbStorageConfiguration
.TYPE_NAME
)
160 .setKeySerializer(new SerializerDefinition("string"))
161 .setValueSerializer(new SerializerDefinition("string"))
162 .setRoutingPolicy(RoutingTier
.CLIENT
)
163 .setRoutingStrategyType(RoutingStrategyType
.ZONE_STRATEGY
)
164 .setReplicationFactor(2)
165 .setPreferredReads(1)
167 .setPreferredWrites(1)
168 .setRequiredWrites(1)
169 .setZoneCountReads(0)
170 .setZoneCountWrites(0)
171 .setZoneReplicationFactor(zrfRWStoreWithoutReplication
)
172 .setHintedHandoffStrategy(HintedHandoffStrategyType
.PROXIMITY_STRATEGY
)
175 storeDefWithoutReplication
= Lists
.newArrayList(rwStoreDefWithoutReplication
);
176 String storeDefWithoutReplicationString
= new StoreDefinitionsMapper().writeStoreList(storeDefWithoutReplication
);
177 File file
= File
.createTempFile("two-stores-", ".xml");
178 FileUtils
.writeStringToFile(file
, storeDefWithoutReplicationString
);
179 storeDefFileWithoutReplication
= file
.getAbsolutePath();
181 // Now with replication
182 HashMap
<Integer
, Integer
> zrfRWStoreWithReplication
= new HashMap
<Integer
, Integer
>();
183 zrfRWStoreWithReplication
.put(0, 2);
184 zrfRWStoreWithReplication
.put(1, 2);
185 rwStoreDefWithReplication
= new StoreDefinitionBuilder().setName(testStoreNameRW
)
186 .setType(BdbStorageConfiguration
.TYPE_NAME
)
187 .setKeySerializer(new SerializerDefinition("string"))
188 .setValueSerializer(new SerializerDefinition("string"))
189 .setRoutingPolicy(RoutingTier
.CLIENT
)
190 .setRoutingStrategyType(RoutingStrategyType
.ZONE_STRATEGY
)
191 .setReplicationFactor(4)
192 .setPreferredReads(1)
194 .setPreferredWrites(1)
195 .setRequiredWrites(1)
196 .setZoneCountReads(0)
197 .setZoneCountWrites(0)
198 .setZoneReplicationFactor(zrfRWStoreWithReplication
)
199 .setHintedHandoffStrategy(HintedHandoffStrategyType
.PROXIMITY_STRATEGY
)
201 rwStoreDefWithReplication2
= new StoreDefinitionBuilder().setName(testStoreNameRW2
)
202 .setType(BdbStorageConfiguration
.TYPE_NAME
)
203 .setKeySerializer(new SerializerDefinition("string"))
204 .setValueSerializer(new SerializerDefinition("string"))
205 .setRoutingPolicy(RoutingTier
.CLIENT
)
206 .setRoutingStrategyType(RoutingStrategyType
.ZONE_STRATEGY
)
207 .setReplicationFactor(4)
208 .setPreferredReads(1)
210 .setPreferredWrites(1)
211 .setRequiredWrites(1)
212 .setZoneCountReads(0)
213 .setZoneCountWrites(0)
214 .setZoneReplicationFactor(zrfRWStoreWithReplication
)
215 .setHintedHandoffStrategy(HintedHandoffStrategyType
.PROXIMITY_STRATEGY
)
218 file
= File
.createTempFile("rw-stores-", ".xml");
219 FileUtils
.writeStringToFile(file
,
220 new StoreDefinitionsMapper().writeStoreList(Lists
.newArrayList(rwStoreDefWithReplication
)));
221 rwStoreDefFileWithReplication
= file
.getAbsolutePath();
223 file
= File
.createTempFile("rw-two-stores-", ".xml");
224 FileUtils
.writeStringToFile(file
,
225 new StoreDefinitionsMapper().writeStoreList(Lists
.newArrayList(rwStoreDefWithReplication
,
226 rwStoreDefWithReplication2
)));
227 rwTwoStoreDefFileWithReplication
= file
.getAbsolutePath();
229 storeDefWithReplication
= Lists
.newArrayList(rwStoreDefWithReplication
);
230 String storeDefWithReplicationString
= new StoreDefinitionsMapper().writeStoreList(storeDefWithReplication
);
231 file
= File
.createTempFile("two-stores-", ".xml");
232 FileUtils
.writeStringToFile(file
, storeDefWithReplicationString
);
233 storeDefFileWithReplication
= file
.getAbsolutePath();
237 public void tearDown() {
240 socketStoreFactory
.close();
241 socketStoreFactory
= null;
242 ClusterTestUtils
.reset();
245 // TODO: The tests based on this method are susceptible to TOCTOU
246 // BindException issue since findFreePorts is used to determine the ports
247 // for localhost:PORT of each node.
249 * Scripts the execution of a specific type of zoned rebalance test: sets up
250 * cluster based on cCluster plus any new nodes/zones in fCluster,
251 * rebalances to fCluster, verifies rebalance was correct.
253 * @param testTag For pretty printing
254 * @param cCluster current cluster
255 * @param fCluster final cluster
256 * @param cStoresXml XML file with current stores xml
257 * @param fStoresXml Unused parameter. Included for symmetry in method
259 * @param cStoreDefs store defs for current cluster (from on cStoresXml)
260 * @param fStoreDefs store defs for final cluster.
263 public void testZonedRebalance(String testTag
,
268 List
<StoreDefinition
> cStoreDefs
,
269 List
<StoreDefinition
> fStoreDefs
) throws Exception
{
270 logger
.info("Starting " + testTag
);
271 // Hacky work around of TOCTOU bind Exception issues. Each test that
272 // invokes this method brings servers up & down on the same ports. The
273 // OS seems to need a rest between subsequent tests...
274 Thread
.sleep(TimeUnit
.SECONDS
.toMillis(5));
276 Cluster interimCluster
= RebalanceUtils
.getInterimCluster(cCluster
, fCluster
);
278 // start all the servers
279 List
<Integer
> serverList
= new ArrayList
<Integer
>(interimCluster
.getNodeIds());
280 Map
<String
, String
> configProps
= new HashMap
<String
, String
>();
281 configProps
.put("admin.max.threads", "5");
282 interimCluster
= startServers(interimCluster
, cStoresXml
, serverList
, configProps
);
284 String bootstrapUrl
= getBootstrapUrl(interimCluster
, 0);
285 ClusterTestUtils
.RebalanceKit rebalanceKit
= ClusterTestUtils
.getRebalanceKit(bootstrapUrl
,
290 for(StoreDefinition storeDef
: cStoreDefs
) {
291 populateData(cCluster
, storeDef
);
294 rebalanceAndCheck(rebalanceKit
.plan
, rebalanceKit
.controller
, serverList
);
296 checkConsistentMetadata(fCluster
, serverList
);
299 stopServer(serverList
);
301 } catch(AssertionError ae
) {
302 logger
.error("Assertion broken in " + testTag
+ " : ", ae
);
307 public void testZonedRebalance(String testTag
,
311 List
<StoreDefinition
> storeDefs
) throws Exception
{
312 testZonedRebalance(testTag
, cCluster
, fCluster
, storesXml
, storesXml
, storeDefs
, storeDefs
);
315 @Test(timeout
= 600000)
316 public void testNoopZZ() throws Exception
{
317 testZonedRebalance("TestNoopZZ", zzCurrent
, zzCurrent
, zzStoresXml
, zzStores
);
320 @Test(timeout
= 600000)
321 public void testShuffleZZ() throws Exception
{
322 testZonedRebalance("TestShuffleZZ", zzCurrent
, zzShuffle
, zzStoresXml
, zzStores
);
325 @Test(timeout
= 600000)
326 public void testShuffleZZAndShuffleAgain() throws Exception
{
328 logger
.info("Starting testShuffleZZAndShuffleAgain");
329 // Hacky work around of TOCTOU bind Exception issues. Each test that
330 // invokes this method brings servers up & down on the same ports. The
331 // OS seems to need a rest between subsequent tests...
332 Thread
.sleep(TimeUnit
.SECONDS
.toMillis(2));
334 Cluster interimCluster
= RebalanceUtils
.getInterimCluster(zzCurrent
, zzShuffle
);
336 // start all the servers
337 List
<Integer
> serverList
= new ArrayList
<Integer
>(interimCluster
.getNodeIds());
338 Map
<String
, String
> configProps
= new HashMap
<String
, String
>();
339 configProps
.put("admin.max.threads", "5");
340 interimCluster
= startServers(interimCluster
, zzStoresXml
, serverList
, configProps
);
342 // Populate cluster with data
343 for(StoreDefinition storeDef
: zzStores
) {
344 populateData(zzCurrent
, storeDef
);
347 String bootstrapUrl
= getBootstrapUrl(interimCluster
, 0);
350 ClusterTestUtils
.RebalanceKit rebalanceKit
= ClusterTestUtils
.getRebalanceKit(bootstrapUrl
,
353 rebalanceAndCheck(rebalanceKit
.plan
, rebalanceKit
.controller
, serverList
);
354 checkConsistentMetadata(zzShuffle
, serverList
);
356 // Now, go from shuffled state, back to the original to ocnfirm
357 // subsequent rebalances can be invoked.
358 rebalanceKit
= ClusterTestUtils
.getRebalanceKit(bootstrapUrl
, zzCurrent
, zzStores
);
359 rebalanceAndCheck(rebalanceKit
.plan
, rebalanceKit
.controller
, serverList
);
360 checkConsistentMetadata(zzCurrent
, serverList
);
363 stopServer(serverList
);
366 @Test(timeout
= 600000)
367 public void testClusterExpansion() throws Exception
{
368 testZonedRebalance("TestClusterExpansionZZ",
370 zzClusterExpansionPP
,
375 @Test(timeout
= 600000)
376 public void testNoopZZZ() throws Exception
{
377 testZonedRebalance("TestNoopZZZ", zzzCurrent
, zzzCurrent
, zzzStoresXml
, zzzStores
);
380 @Test(timeout
= 600000)
381 public void testShuffleZZZ() throws Exception
{
382 testZonedRebalance("TestShuffleZZZ", zzzCurrent
, zzzShuffle
, zzzStoresXml
, zzzStores
);
385 @Test(timeout
= 600000)
386 public void testClusterExpansionZZZ() throws Exception
{
387 testZonedRebalance("TestClusterExpansionZZZ",
389 zzzClusterExpansionPPP
,
394 @Test(timeout
= 600000)
395 public void testZoneExpansionZZ2ZZZ() throws Exception
{
396 // Pass in an interim cluster for currentCluster
397 testZonedRebalance("TestZoneExpansionZZ2ZZZ",
404 @Test(timeout
= 600000)
405 public void testRWRebalance() throws Exception
{
406 logger
.info("Starting testRWRebalance");
409 Cluster currentCluster
= ServerTestUtils
.getLocalZonedCluster(4, 2, new int[] { 0, 0,
410 1, 1 }, new int[][] { { 0, 2, 4, 6 }, {}, { 1, 3, 5, 7 }, {} });
411 Cluster finalCluster
= UpdateClusterUtils
.createUpdatedCluster(currentCluster
,
413 Lists
.newArrayList(2, 6));
414 finalCluster
= UpdateClusterUtils
.createUpdatedCluster(finalCluster
,
416 Lists
.newArrayList(3, 7));
418 // start all the servers
419 List
<Integer
> serverList
= Arrays
.asList(0, 1, 2, 3);
420 Map
<String
, String
> configProps
= new HashMap
<String
, String
>();
421 configProps
.put("admin.max.threads", "5");
422 currentCluster
= startServers(currentCluster
,
423 storeDefFileWithoutReplication
,
427 String bootstrapUrl
= getBootstrapUrl(currentCluster
, 0);
428 ClusterTestUtils
.RebalanceKit rebalanceKit
= ClusterTestUtils
.getRebalanceKit(bootstrapUrl
,
432 populateData(currentCluster
, rwStoreDefWithoutReplication
);
434 rebalanceAndCheck(rebalanceKit
.plan
, rebalanceKit
.controller
, Arrays
.asList(1, 2));
436 checkConsistentMetadata(finalCluster
, serverList
);
439 stopServer(serverList
);
441 } catch(AssertionError ae
) {
442 logger
.error("Assertion broken in testRWRebalance ", ae
);
447 public void testRWRebalanceWithReplication(boolean serial
) throws Exception
{
448 logger
.info("Starting testRWRebalanceWithReplication");
450 Cluster currentCluster
= ServerTestUtils
.getLocalZonedCluster(4,
452 new int[] { 0, 0, 1, 1 },
453 new int[][] { { 0, 2, 4 },
456 Cluster finalCluster
= UpdateClusterUtils
.createUpdatedCluster(currentCluster
,
458 Lists
.newArrayList(2));
459 finalCluster
= UpdateClusterUtils
.createUpdatedCluster(finalCluster
,
461 Lists
.newArrayList(3));
464 List
<Integer
> serverList
= Arrays
.asList(0, 1, 2, 3);
465 Map
<String
, String
> configProps
= new HashMap
<String
, String
>();
466 configProps
.put("admin.max.threads", "5");
468 configProps
.put("max.parallel.stores.rebalancing", String
.valueOf(1));
469 currentCluster
= startServers(currentCluster
,
470 storeDefFileWithReplication
,
474 String bootstrapUrl
= getBootstrapUrl(currentCluster
, 0);
476 ClusterTestUtils
.RebalanceKit rebalanceKit
= ClusterTestUtils
.getRebalanceKit(bootstrapUrl
,
482 populateData(currentCluster
, rwStoreDefWithReplication
);
484 rebalanceAndCheck(rebalanceKit
.plan
, rebalanceKit
.controller
, Arrays
.asList(0, 1, 2, 3));
486 checkConsistentMetadata(finalCluster
, serverList
);
489 stopServer(serverList
);
493 @Test(timeout
= 600000)
494 public void testRWRebalanceWithReplication() throws Exception
{
496 testRWRebalanceWithReplication(false);
497 } catch(AssertionError ae
) {
498 logger
.error("Assertion broken in testRWRebalanceWithReplication ", ae
);
503 @Test(timeout
= 600000)
504 public void testRWRebalanceWithReplicationSerial() throws Exception
{
506 testRWRebalanceWithReplication(true);
507 } catch(AssertionError ae
) {
508 logger
.error("Assertion broken in testRWRebalanceWithReplicationSerial ", ae
);
513 @Test(timeout
= 600000)
514 public void testRebalanceCleanPrimarySecondary() throws Exception
{
515 logger
.info("Starting testRebalanceCleanPrimary");
517 Cluster currentCluster
= ServerTestUtils
.getLocalZonedCluster(6, 2, new int[] { 0, 0,
518 0, 1, 1, 1 }, new int[][] { { 0 }, { 1, 6 }, { 2 }, { 3 }, { 4, 7 }, { 5 } });
519 Cluster finalCluster
= UpdateClusterUtils
.createUpdatedCluster(currentCluster
,
521 Lists
.newArrayList(7));
522 finalCluster
= UpdateClusterUtils
.createUpdatedCluster(finalCluster
,
524 Lists
.newArrayList(6));
527 * original server partition ownership
529 * [s0 : p0,p3,p4,p5,p6,p7] [s1 : p1-p7] [s2 : p1,p2] [s3 :
530 * p0,p1,p2,p3,p6,p7] [s4 : p1-p7] [s5 : p4,p5]
532 * final server partition ownership
534 * [s0 : p0,p2,p3,p4,p5,p6,p7] [s1 : p0,p1] [s2 : p1-p7] [s3 :
535 * p0.p1,p2,p3,p5,p6,p7] [s4 : p0,p1,p2,p3,p4,p7] [s5 : p4,p5,p6]
539 List
<Integer
> serverList
= Arrays
.asList(0, 1, 2, 3, 4, 5);
540 Map
<String
, String
> configProps
= new HashMap
<String
, String
>();
541 configProps
.put("enable.repair", "true");
542 currentCluster
= startServers(currentCluster
,
543 rwStoreDefFileWithReplication
,
547 String bootstrapUrl
= getBootstrapUrl(currentCluster
, 0);
548 ClusterTestUtils
.RebalanceKit rebalanceKit
= ClusterTestUtils
.getRebalanceKit(bootstrapUrl
,
552 populateData(currentCluster
, rwStoreDefWithReplication
);
554 AdminClient admin
= rebalanceKit
.controller
.getAdminClient();
556 List
<ByteArray
> p6KeySamples
= sampleKeysFromPartition(admin
,
558 rwStoreDefWithReplication
.getName(),
561 List
<ByteArray
> p1KeySamples
= sampleKeysFromPartition(admin
,
563 rwStoreDefWithReplication
.getName(),
566 List
<ByteArray
> p3KeySamples
= sampleKeysFromPartition(admin
,
568 rwStoreDefWithReplication
.getName(),
571 List
<ByteArray
> p2KeySamples
= sampleKeysFromPartition(admin
,
573 rwStoreDefWithReplication
.getName(),
576 List
<ByteArray
> p7KeySamples
= sampleKeysFromPartition(admin
,
578 rwStoreDefWithReplication
.getName(),
582 rebalanceAndCheck(rebalanceKit
.plan
,
583 rebalanceKit
.controller
,
584 Arrays
.asList(0, 1, 2, 3));
586 checkConsistentMetadata(finalCluster
, serverList
);
588 // Do the cleanup operation
589 for(int i
= 0; i
< 6; i
++) {
590 admin
.storeMntOps
.repairJob(i
);
592 // wait for the repairs to complete
593 for(int i
= 0; i
< 6; i
++) {
594 ServerTestUtils
.waitForAsyncOperationOnServer(serverMap
.get(i
), "Repair", 5000);
597 // confirm a primary movement in zone 0 : P6 : s1 -> S2. The
599 // primary changes when p6 moves cross zone
600 // check for existence of p6 in server 2,
601 checkForKeyExistence(admin
, 2, rwStoreDefWithReplication
.getName(), p6KeySamples
);
603 // confirm a secondary movement in zone 0.. p2 : s1 -> s0
604 // check for its existence in server 0
605 checkForKeyExistence(admin
, 0, rwStoreDefWithReplication
.getName(), p2KeySamples
);
606 // check for its absernce in server 1
608 // also check that p1 is stable in server 1 [primary stability]
609 checkForKeyExistence(admin
, 1, rwStoreDefWithReplication
.getName(), p1KeySamples
);
610 // check that p3 is stable in server 0 [Secondary stability]
611 checkForKeyExistence(admin
, 0, rwStoreDefWithReplication
.getName(), p3KeySamples
);
613 // finally, test for server 4 which now became the secondary for
615 // from being a primary before
616 checkForKeyExistence(admin
, 4, rwStoreDefWithReplication
.getName(), p7KeySamples
);
619 stopServer(serverList
);
621 } catch(AssertionError ae
) {
622 logger
.error("Assertion broken in testRebalanceCleanPrimarySecondary ", ae
);
627 @Test(timeout
= 600000)
628 public void testProxyGetDuringRebalancing() throws Exception
{
629 logger
.info("Starting testProxyGetDuringRebalancing");
631 Cluster currentCluster
= ServerTestUtils
.getLocalZonedCluster(4, 2, new int[] { 0, 0,
632 1, 1 }, new int[][] { { 0, 2, 4 }, { 6 }, { 1, 3, 5 }, { 7 } });
633 Cluster tmpfinalCluster
= UpdateClusterUtils
.createUpdatedCluster(currentCluster
,
635 Lists
.newArrayList(2));
636 final Cluster finalCluster
= UpdateClusterUtils
.createUpdatedCluster(tmpfinalCluster
,
638 Lists
.newArrayList(3));
640 final List
<Integer
> serverList
= Arrays
.asList(0, 1, 2, 3);
641 Map
<String
, String
> configProps
= new HashMap
<String
, String
>();
642 configProps
.put("admin.max.threads", "5");
643 final Cluster updatedCurrentCluster
= startServers(currentCluster
,
644 storeDefFileWithReplication
,
648 ExecutorService executors
= Executors
.newFixedThreadPool(2);
649 final AtomicBoolean rebalancingComplete
= new AtomicBoolean(false);
650 final List
<Exception
> exceptions
= Collections
.synchronizedList(new ArrayList
<Exception
>());
652 String bootstrapUrl
= getBootstrapUrl(updatedCurrentCluster
, 0);
654 final ClusterTestUtils
.RebalanceKit rebalanceKit
= ClusterTestUtils
.getRebalanceKit(bootstrapUrl
,
660 populateData(currentCluster
, rwStoreDefWithReplication
);
662 final SocketStoreClientFactory factory
= new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(getBootstrapUrl(currentCluster
,
664 .setEnableLazy(false)
665 .setSocketTimeout(120,
668 final StoreClient
<String
, String
> storeClientRW
= new DefaultStoreClient
<String
, String
>(rwStoreDefWithReplication
.getName(),
673 final CountDownLatch latch
= new CountDownLatch(2);
674 // start get operation.
675 executors
.execute(new Runnable() {
680 List
<String
> keys
= new ArrayList
<String
>(testEntries
.keySet());
682 while(!rebalancingComplete
.get()) {
683 // should always able to get values.
684 int index
= (int) (Math
.random() * keys
.size());
686 // should get a valid value
688 Versioned
<String
> value
= storeClientRW
.get(keys
.get(index
));
689 assertNotSame("StoreClient get() should not return null.",
692 assertEquals("Value returned should be good",
693 new Versioned
<String
>(testEntries
.get(keys
.get(index
))),
695 } catch(Exception e
) {
696 logger
.error("Exception in proxy get thread", e
);
702 } catch(Exception e
) {
703 logger
.error("Exception in proxy get thread", e
);
713 executors
.execute(new Runnable() {
720 rebalanceAndCheck(rebalanceKit
.plan
,
721 rebalanceKit
.controller
,
722 Arrays
.asList(0, 1, 2, 3));
725 rebalancingComplete
.set(true);
726 checkConsistentMetadata(finalCluster
, serverList
);
728 } catch(Exception e
) {
733 stopServer(serverList
);
734 } catch(Exception e
) {
735 throw new RuntimeException(e
);
743 executors
.shutdown();
744 executors
.awaitTermination(300, TimeUnit
.SECONDS
);
746 // check No Exception
747 if(exceptions
.size() > 0) {
748 for(Exception e
: exceptions
) {
751 fail("Should not see any exceptions.");
755 stopServer(serverList
);
757 } catch(AssertionError ae
) {
758 logger
.error("Assertion broken in testProxyGetDuringRebalancing ", ae
);
763 @Test(timeout
= 600000)
764 public void testProxyPutDuringRebalancing() throws Exception
{
765 logger
.info("Starting testProxyPutDuringRebalancing");
767 Cluster currentCluster
= ServerTestUtils
.getLocalZonedCluster(6, 2, new int[] { 0, 0,
768 0, 1, 1, 1 }, new int[][] { { 0 }, { 1, 6 }, { 2 }, { 3 }, { 4, 7 }, { 5 } });
769 Cluster finalCluster
= UpdateClusterUtils
.createUpdatedCluster(currentCluster
,
771 Lists
.newArrayList(7));
772 finalCluster
= UpdateClusterUtils
.createUpdatedCluster(finalCluster
,
774 Lists
.newArrayList(6));
777 * Original partition map
779 * [s0 : p0] [s1 : p1, p6] [s2 : p2]
781 * [s3 : p3] [s4 : p4, p7] [s5 : p5]
783 * final server partition ownership
785 * [s0 : p0] [s1 : p1] [s2 : p2, p7]
787 * [s3 : p3] [s4 : p4] [s5 : p5, p6]
789 * Note that rwStoreDefFileWithReplication is a "2/1/1" store def.
791 * Original server n-ary partition ownership
793 * [s0 : p0, p3-7] [s1 : p0-p7] [s2 : p1-2]
795 * [s3 : p0-3, p6-7] [s4 : p0-p7] [s5 : p4-5]
797 * final server n-ary partition ownership
799 * [s0 : p0, p2-7] [s1 : p0-1] [s2 : p1-p7]
801 * [s3 : p0-3, p5-7] [s4 : p0-4, p7] [s5 : p4-6]
803 List
<Integer
> serverList
= Arrays
.asList(0, 1, 2, 3, 4, 5);
804 Map
<String
, String
> configProps
= new HashMap
<String
, String
>();
805 configProps
.put("admin.max.threads", "5");
806 final Cluster updatedCurrentCluster
= startServers(currentCluster
,
807 rwStoreDefFileWithReplication
,
811 ExecutorService executors
= Executors
.newFixedThreadPool(2);
812 final AtomicBoolean rebalancingComplete
= new AtomicBoolean(false);
813 final List
<Exception
> exceptions
= Collections
.synchronizedList(new ArrayList
<Exception
>());
815 // Its is imperative that we test in a single shot since multiple
816 // batches would mean the proxy bridges being torn down and
817 // established multiple times and we cannot test against the source
818 // cluster topology then. getRebalanceKit uses batch size of
819 // infinite, so this should be fine.
820 String bootstrapUrl
= getBootstrapUrl(updatedCurrentCluster
, 0);
822 final ClusterTestUtils
.RebalanceKit rebalanceKit
= ClusterTestUtils
.getRebalanceKit(bootstrapUrl
,
826 populateData(currentCluster
, rwStoreDefWithReplication
);
827 final AdminClient adminClient
= rebalanceKit
.controller
.getAdminClient();
828 // the plan would cause these partitions to move:
829 // Partition : Donor -> stealer
831 // p2 (Z-SEC) : s1 -> s0
832 // p3-6 (Z-PRI) : s1 -> s2
833 // p7 (Z-PRI) : s0 -> s2
835 // p5 (Z-SEC): s4 -> s3
836 // p6 (Z-PRI): s4 -> s5
838 // :. rebalancing will run on servers 0, 2, 3, & 5
839 final List
<ByteArray
> movingKeysList
= sampleKeysFromPartition(adminClient
,
841 rwStoreDefWithReplication
.getName(),
844 assertTrue("Empty list of moving keys...", movingKeysList
.size() > 0);
845 final AtomicBoolean rebalancingStarted
= new AtomicBoolean(false);
846 final AtomicBoolean proxyWritesDone
= new AtomicBoolean(false);
847 final HashMap
<String
, String
> baselineTuples
= new HashMap
<String
, String
>(testEntries
);
848 final HashMap
<String
, VectorClock
> baselineVersions
= new HashMap
<String
, VectorClock
>();
850 for(String key
: baselineTuples
.keySet()) {
851 baselineVersions
.put(key
, new VectorClock());
854 final CountDownLatch latch
= new CountDownLatch(2);
855 // start get operation.
856 executors
.execute(new Runnable() {
860 SocketStoreClientFactory factory
= null;
862 // wait for the rebalancing to begin
863 List
<VoldemortServer
> serverList
= Lists
.newArrayList(serverMap
.get(0),
867 while(!rebalancingComplete
.get()) {
868 Iterator
<VoldemortServer
> serverIterator
= serverList
.iterator();
869 while(serverIterator
.hasNext()) {
870 VoldemortServer server
= serverIterator
.next();
871 if(ByteUtils
.getString(server
.getMetadataStore()
872 .get(MetadataStore
.SERVER_STATE_KEY
,
877 .compareTo(VoldemortState
.REBALANCING_MASTER_SERVER
.toString()) == 0) {
878 logger
.info("Server " + server
.getIdentityNode().getId()
879 + " transitioned into REBALANCING MODE");
880 serverIterator
.remove();
883 if(serverList
.size() == 0) {
884 rebalancingStarted
.set(true);
889 if(rebalancingStarted
.get()) {
890 factory
= new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(getBootstrapUrl(updatedCurrentCluster
,
892 .setEnableLazy(false)
893 .setSocketTimeout(120,
895 .setClientZoneId(1));
897 final StoreClient
<String
, String
> storeClientRW
= new DefaultStoreClient
<String
, String
>(testStoreNameRW
,
901 // Now perform some writes and determine the end
902 // state of the changed keys. Initially, all data
903 // now with zero vector clock
904 for(ByteArray movingKey
: movingKeysList
) {
906 String keyStr
= ByteUtils
.getString(movingKey
.get(), "UTF-8");
907 String valStr
= "proxy_write";
908 storeClientRW
.put(keyStr
, valStr
);
909 baselineTuples
.put(keyStr
, valStr
);
910 // all these keys will have [5:1] vector
911 // clock is node 5 is the new pseudo master
912 baselineVersions
.get(keyStr
)
913 .incrementVersion(5, System
.currentTimeMillis());
914 proxyWritesDone
.set(true);
915 if(rebalancingComplete
.get()) {
918 } catch(InvalidMetadataException e
) {
920 logger
.error("Encountered an invalid metadata exception.. ", e
);
924 } catch(Exception e
) {
925 logger
.error("Exception in proxy write thread..", e
);
936 executors
.execute(new Runnable() {
941 rebalanceKit
.rebalance();
942 } catch(Exception e
) {
943 logger
.error("Error in rebalancing... ", e
);
946 rebalancingComplete
.set(true);
953 executors
.shutdown();
954 executors
.awaitTermination(300, TimeUnit
.SECONDS
);
956 assertEquals("Client did not see all server transition into rebalancing state",
957 rebalancingStarted
.get(),
959 assertEquals("Not enough time to begin proxy writing", proxyWritesDone
.get(), true);
960 checkEntriesPostRebalance(updatedCurrentCluster
,
962 Lists
.newArrayList(rwStoreDefWithReplication
),
963 Arrays
.asList(0, 1, 2, 3, 4, 5),
966 checkConsistentMetadata(finalCluster
, serverList
);
967 // check No Exception
968 if(exceptions
.size() > 0) {
969 for(Exception e
: exceptions
) {
972 fail("Should not see any exceptions.");
974 // check that the proxy writes were made to the original donor, node
976 List
<ClockEntry
> clockEntries
= new ArrayList
<ClockEntry
>(serverList
.size());
977 for(Integer nodeid
: serverList
)
978 clockEntries
.add(new ClockEntry(nodeid
.shortValue(), System
.currentTimeMillis()));
979 VectorClock clusterXmlClock
= new VectorClock(clockEntries
, System
.currentTimeMillis());
980 for(Integer nodeid
: serverList
)
981 adminClient
.metadataMgmtOps
.updateRemoteCluster(nodeid
,
984 adminClient
.setAdminClientCluster(currentCluster
);
985 checkForTupleEquivalence(adminClient
,
994 stopServer(serverList
);
995 } catch(Exception e
) {
996 throw new RuntimeException(e
);
998 } catch(AssertionError ae
) {
999 logger
.error("Assertion broken in testProxyPutDuringRebalancing ", ae
);
1004 private void populateData(Cluster cluster
, StoreDefinition storeDef
) throws Exception
{
1006 // Create SocketStores for each Node first
1007 Map
<Integer
, Store
<ByteArray
, byte[], byte[]>> storeMap
= new HashMap
<Integer
, Store
<ByteArray
, byte[], byte[]>>();
1008 for(Node node
: cluster
.getNodes()) {
1009 storeMap
.put(node
.getId(),
1010 getSocketStore(storeDef
.getName(), node
.getHost(), node
.getSocketPort()));
1013 BaseStoreRoutingPlan storeInstance
= new BaseStoreRoutingPlan(cluster
, storeDef
);
1014 for(Entry
<String
, String
> entry
: testEntries
.entrySet()) {
1015 ByteArray keyBytes
= new ByteArray(ByteUtils
.getBytes(entry
.getKey(), "UTF-8"));
1016 List
<Integer
> preferenceNodes
= storeInstance
.getReplicationNodeList(keyBytes
.get());
1017 // Go over every node
1018 for(int nodeId
: preferenceNodes
) {
1020 storeMap
.get(nodeId
)
1022 new Versioned
<byte[]>(ByteUtils
.getBytes(entry
.getValue(), "UTF-8")),
1024 } catch(ObsoleteVersionException e
) {
1025 logger
.info("Why are we seeing this at all here ?? ");
1026 e
.printStackTrace();
1031 // close all socket stores
1032 for(Store
<ByteArray
, byte[], byte[]> store
: storeMap
.values()) {