2 * Copyright 2008-2013 LinkedIn, Inc
4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5 * use this file except in compliance with the License. You may obtain a copy of
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations under
16 package voldemort
.client
.rebalance
;
18 import static org
.junit
.Assert
.assertEquals
;
19 import static org
.junit
.Assert
.fail
;
21 import java
.util
.ArrayList
;
22 import java
.util
.HashMap
;
23 import java
.util
.Iterator
;
24 import java
.util
.List
;
26 import java
.util
.Map
.Entry
;
27 import java
.util
.Properties
;
30 import org
.junit
.After
;
32 import voldemort
.ROTestUtils
;
33 import voldemort
.ServerTestUtils
;
34 import voldemort
.TestUtils
;
35 import voldemort
.VoldemortException
;
36 import voldemort
.client
.protocol
.RequestFormatType
;
37 import voldemort
.client
.protocol
.admin
.AdminClient
;
38 import voldemort
.client
.protocol
.admin
.QueryKeyResult
;
39 import voldemort
.cluster
.Cluster
;
40 import voldemort
.cluster
.Node
;
41 import voldemort
.routing
.RoutingStrategy
;
42 import voldemort
.routing
.RoutingStrategyFactory
;
43 import voldemort
.routing
.StoreRoutingPlan
;
44 import voldemort
.server
.VoldemortConfig
;
45 import voldemort
.server
.VoldemortServer
;
46 import voldemort
.store
.Store
;
47 import voldemort
.store
.StoreDefinition
;
48 import voldemort
.store
.metadata
.MetadataStore
;
49 import voldemort
.store
.metadata
.MetadataStore
.VoldemortState
;
50 import voldemort
.store
.socket
.SocketStoreFactory
;
51 import voldemort
.store
.socket
.clientrequest
.ClientRequestExecutorPool
;
52 import voldemort
.utils
.ByteArray
;
53 import voldemort
.utils
.ByteUtils
;
54 import voldemort
.utils
.Pair
;
55 import voldemort
.utils
.Utils
;
56 import voldemort
.versioning
.VectorClock
;
57 import voldemort
.versioning
.Versioned
;
59 public abstract class AbstractRebalanceTest
{
61 Map
<Integer
, VoldemortServer
> serverMap
;
62 HashMap
<String
, String
> testEntries
;
63 protected SocketStoreFactory socketStoreFactory
;
65 public AbstractRebalanceTest() {
66 this.serverMap
= new HashMap
<Integer
, VoldemortServer
>();
67 testEntries
= ServerTestUtils
.createRandomKeyValueString(getNumKeys());
68 socketStoreFactory
= new ClientRequestExecutorPool(2, 1000, 1000, 32 * 1024);
71 // This method is susceptible to BindException issues due to TOCTOU
72 // problem with getLocalCluster (which is used to construct cluster that is
74 // TODO: (refactor) AbstractRebalanceTest to take advantage of
75 // ServerTestUtils.startVoldemortCluster.
76 protected Cluster
startServers(Cluster cluster
,
78 List
<Integer
> nodeToStart
,
79 Map
<String
, String
> configProps
) throws Exception
{
80 for(int node
: nodeToStart
) {
81 Properties properties
= new Properties();
82 if(null != configProps
) {
83 for(Entry
<String
, String
> property
: configProps
.entrySet()) {
84 properties
.put(property
.getKey(), property
.getValue());
88 properties
.put("proxy.puts.during.rebalance", "true");
89 properties
.put("bdb.cache.size", "" + (5 * 1024 * 1024));
90 properties
.put("bdb.one.env.per.store", "true");
92 VoldemortConfig config
= ServerTestUtils
.createServerConfig(true,
94 TestUtils
.createTempDir()
100 VoldemortServer server
= ServerTestUtils
.startVoldemortServer(socketStoreFactory
,
103 serverMap
.put(node
, server
);
110 public void tearDown() throws Exception
{
111 for(VoldemortServer vs
: serverMap
.values()) {
116 protected Store
<ByteArray
, byte[], byte[]> getSocketStore(String storeName
,
119 return getSocketStore(storeName
, host
, port
, false);
122 protected Store
<ByteArray
, byte[], byte[]> getSocketStore(String storeName
,
126 return ServerTestUtils
.getSocketStore(socketStoreFactory
,
130 RequestFormatType
.PROTOCOL_BUFFERS
,
134 protected VoldemortState
getCurrentState(int nodeId
) {
135 VoldemortServer server
= serverMap
.get(nodeId
);
137 throw new VoldemortException("Node id " + nodeId
+ " does not exist");
139 return server
.getMetadataStore().getServerStateUnlocked();
143 protected Cluster
getCurrentCluster(int nodeId
) {
144 VoldemortServer server
= serverMap
.get(nodeId
);
146 throw new VoldemortException("Node id " + nodeId
+ " does not exist");
148 return server
.getMetadataStore().getCluster();
152 public void checkConsistentMetadata(Cluster finalCluster
, List
<Integer
> serverList
) {
153 for(int nodeId
: serverList
) {
154 assertEquals(finalCluster
, getCurrentCluster(nodeId
));
155 assertEquals(MetadataStore
.VoldemortState
.NORMAL_SERVER
, getCurrentState(nodeId
));
159 protected void stopServer(List
<Integer
> nodesToStop
) throws Exception
{
160 for(int node
: nodesToStop
) {
162 ServerTestUtils
.stopVoldemortServer(serverMap
.get(node
));
163 } catch(VoldemortException e
) {
164 // ignore these at stop time
170 * This method determines the "size" of the test to run...
174 protected abstract int getNumKeys();
176 protected String
getBootstrapUrl(Cluster cluster
, int nodeId
) {
177 Node node
= cluster
.getNodeById(nodeId
);
178 return "tcp://" + node
.getHost() + ":" + node
.getSocketPort();
182 * Does the rebalance and then checks that it succeeded.
184 * @param rebalancePlan
185 * @param rebalanceClient
186 * @param nodeCheckList
188 protected void rebalanceAndCheck(RebalancePlan rebalancePlan
,
189 RebalanceController rebalanceClient
,
190 List
<Integer
> nodeCheckList
) {
191 rebalanceClient
.rebalance(rebalancePlan
);
192 checkEntriesPostRebalance(rebalancePlan
.getCurrentCluster(),
193 rebalancePlan
.getFinalCluster(),
194 rebalancePlan
.getCurrentStores(),
201 * Makes sure that all expected partition-stores are on each server after
204 * @param currentCluster
205 * @param finalCluster
207 * @param nodeCheckList
208 * @param baselineTuples
209 * @param baselineVersions
211 protected void checkEntriesPostRebalance(Cluster currentCluster
,
212 Cluster finalCluster
,
213 List
<StoreDefinition
> storeDefs
,
214 List
<Integer
> nodeCheckList
,
215 HashMap
<String
, String
> baselineTuples
,
216 HashMap
<String
, VectorClock
> baselineVersions
) {
217 for(StoreDefinition storeDef
: storeDefs
) {
218 Map
<Integer
, Set
<Pair
<Integer
, Integer
>>> currentNodeToPartitionTuples
= ROTestUtils
.getNodeIdToAllPartitions(currentCluster
,
221 Map
<Integer
, Set
<Pair
<Integer
, Integer
>>> finalNodeToPartitionTuples
= ROTestUtils
.getNodeIdToAllPartitions(finalCluster
,
225 for(int nodeId
: nodeCheckList
) {
226 Set
<Pair
<Integer
, Integer
>> currentPartitionTuples
= currentNodeToPartitionTuples
.get(nodeId
);
227 Set
<Pair
<Integer
, Integer
>> finalPartitionTuples
= finalNodeToPartitionTuples
.get(nodeId
);
229 HashMap
<Integer
, List
<Integer
>> flattenedPresentTuples
= ROTestUtils
.flattenPartitionTuples(Utils
.getAddedInTarget(currentPartitionTuples
,
230 finalPartitionTuples
));
231 Store
<ByteArray
, byte[], byte[]> store
= getSocketStore(storeDef
.getName(),
232 finalCluster
.getNodeById(nodeId
)
234 finalCluster
.getNodeById(nodeId
)
236 checkGetEntries(finalCluster
.getNodeById(nodeId
),
240 flattenedPresentTuples
,
247 protected void checkGetEntries(Node node
,
250 Store
<ByteArray
, byte[], byte[]> store
,
251 HashMap
<Integer
, List
<Integer
>> flattenedPresentTuples
,
252 HashMap
<String
, String
> baselineTuples
,
253 HashMap
<String
, VectorClock
> baselineVersions
) {
254 RoutingStrategy routing
= new RoutingStrategyFactory().updateRoutingStrategy(def
, cluster
);
256 for(Entry
<String
, String
> entry
: baselineTuples
.entrySet()) {
257 ByteArray keyBytes
= new ByteArray(ByteUtils
.getBytes(entry
.getKey(), "UTF-8"));
259 List
<Integer
> partitions
= routing
.getPartitionList(keyBytes
.get());
261 if(StoreRoutingPlan
.checkKeyBelongsToPartition(partitions
,
262 node
.getPartitionIds(),
263 flattenedPresentTuples
)) {
264 List
<Versioned
<byte[]>> values
= store
.get(keyBytes
, null);
266 // expecting exactly one version
267 if(values
.size() == 0) {
268 fail("unable to find value for key=" + entry
.getKey() + " on node="
271 assertEquals("Expecting exactly one version", 1, values
.size());
272 Versioned
<byte[]> value
= values
.get(0);
273 // check version matches
274 if(baselineVersions
== null) {
275 // expecting base version for all
276 assertEquals("Value version should match",
280 assertEquals("Value version should match",
281 baselineVersions
.get(entry
.getKey()),
285 // check value matches.
286 assertEquals("Value bytes should match",
288 ByteUtils
.getString(value
.getValue(), "UTF-8"));
294 protected List
<ByteArray
> sampleKeysFromPartition(AdminClient admin
,
297 List
<Integer
> partitionsToSample
,
299 List
<ByteArray
> samples
= new ArrayList
<ByteArray
>(numSamples
);
300 Iterator
<ByteArray
> keys
= admin
.bulkFetchOps
.fetchKeys(serverId
,
306 while(keys
.hasNext() && count
< numSamples
) {
307 samples
.add(keys
.next());
314 * REFACTOR: these should belong AdminClient so existence checks can be done
315 * easily across the board
322 protected void checkForKeyExistence(AdminClient admin
,
325 List
<ByteArray
> keyList
) {
326 // do the positive tests
327 Iterator
<QueryKeyResult
> positiveTestResultsItr
= admin
.streamingOps
.queryKeys(serverId
,
330 while(positiveTestResultsItr
.hasNext()) {
331 QueryKeyResult item
= positiveTestResultsItr
.next();
332 ByteArray key
= item
.getKey();
333 List
<Versioned
<byte[]>> vals
= item
.getValues();
334 Exception e
= item
.getException();
336 assertEquals("Error fetching key " + key
, null, e
);
337 assertEquals("Value not found for key " + key
, true, vals
!= null & vals
.size() != 0);
343 * REFACTOR: these should belong AdminClient so existence checks can be done
344 * easily across the board
351 protected void checkForTupleEquivalence(AdminClient admin
,
354 List
<ByteArray
> keyList
,
355 HashMap
<String
, String
> baselineTuples
,
356 HashMap
<String
, VectorClock
> baselineVersions
) {
357 // do the positive tests
358 Iterator
<QueryKeyResult
> positiveTestResultsItr
= admin
.streamingOps
.queryKeys(serverId
,
361 while(positiveTestResultsItr
.hasNext()) {
362 QueryKeyResult item
= positiveTestResultsItr
.next();
363 ByteArray key
= item
.getKey();
364 List
<Versioned
<byte[]>> vals
= item
.getValues();
365 Exception e
= item
.getException();
367 assertEquals("Error fetching key " + key
, null, e
);
368 assertEquals("Value not found for key " + key
, true, vals
!= null & vals
.size() != 0);
370 String keyStr
= ByteUtils
.getString(key
.get(), "UTF-8");
371 if(baselineTuples
!= null)
372 assertEquals("Value does not match up ",
373 baselineTuples
.get(keyStr
),
374 ByteUtils
.getString(vals
.get(0).getValue(), "UTF-8"));
375 if(baselineVersions
!= null)
376 assertEquals("Version does not match up",
377 baselineVersions
.get(keyStr
),
378 vals
.get(0).getVersion());
383 * REFACTOR: these should belong AdminClient so existence checks can be done
384 * easily across the board
391 protected void checkForKeyNonExistence(AdminClient admin
,
394 List
<ByteArray
> keyList
) {
395 Iterator
<QueryKeyResult
> negativeTestResultsItr
= admin
.streamingOps
.queryKeys(serverId
,
398 while(negativeTestResultsItr
.hasNext()) {
399 QueryKeyResult item
= negativeTestResultsItr
.next();
400 ByteArray key
= item
.getKey();
401 List
<Versioned
<byte[]>> vals
= item
.getValues();
402 Exception e
= item
.getException();
404 assertEquals("Error fetching key " + key
, null, e
);
405 assertEquals("Value " + vals
+ "found for key " + key
,
407 vals
== null || vals
.size() == 0);