bind failures while running tests
[voldemort/jeffpc.git] / test / unit / voldemort / client / rebalance / AbstractRebalanceTest.java
blobbefd39c7471e56394b738cdbfe10dfe69d019c34
1 /*
2 * Copyright 2008-2013 LinkedIn, Inc
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5 * use this file except in compliance with the License. You may obtain a copy of
6 * the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations under
14 * the License.
16 package voldemort.client.rebalance;
18 import static org.junit.Assert.assertEquals;
19 import static org.junit.Assert.fail;
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Map.Entry;
27 import java.util.Properties;
28 import java.util.Set;
30 import org.junit.After;
32 import voldemort.ROTestUtils;
33 import voldemort.ServerTestUtils;
34 import voldemort.TestUtils;
35 import voldemort.VoldemortException;
36 import voldemort.client.protocol.RequestFormatType;
37 import voldemort.client.protocol.admin.AdminClient;
38 import voldemort.client.protocol.admin.QueryKeyResult;
39 import voldemort.cluster.Cluster;
40 import voldemort.cluster.Node;
41 import voldemort.routing.RoutingStrategy;
42 import voldemort.routing.RoutingStrategyFactory;
43 import voldemort.routing.StoreRoutingPlan;
44 import voldemort.server.VoldemortConfig;
45 import voldemort.server.VoldemortServer;
46 import voldemort.store.Store;
47 import voldemort.store.StoreDefinition;
48 import voldemort.store.metadata.MetadataStore;
49 import voldemort.store.metadata.MetadataStore.VoldemortState;
50 import voldemort.store.socket.SocketStoreFactory;
51 import voldemort.store.socket.clientrequest.ClientRequestExecutorPool;
52 import voldemort.utils.ByteArray;
53 import voldemort.utils.ByteUtils;
54 import voldemort.utils.Pair;
55 import voldemort.utils.Utils;
56 import voldemort.versioning.VectorClock;
57 import voldemort.versioning.Versioned;
59 public abstract class AbstractRebalanceTest {
61 Map<Integer, VoldemortServer> serverMap;
62 HashMap<String, String> testEntries;
63 protected SocketStoreFactory socketStoreFactory;
65 public AbstractRebalanceTest() {
66 this.serverMap = new HashMap<Integer, VoldemortServer>();
67 testEntries = ServerTestUtils.createRandomKeyValueString(getNumKeys());
68 socketStoreFactory = new ClientRequestExecutorPool(2, 1000, 1000, 32 * 1024);
71 // This method is susceptible to BindException issues due to TOCTOU
72 // problem with getLocalCluster (which is used to construct cluster that is
73 // passed in).
74 // TODO: (refactor) AbstractRebalanceTest to take advantage of
75 // ServerTestUtils.startVoldemortCluster.
76 protected Cluster startServers(Cluster cluster,
77 String storeXmlFile,
78 List<Integer> nodeToStart,
79 Map<String, String> configProps) throws Exception {
80 for(int node: nodeToStart) {
81 Properties properties = new Properties();
82 if(null != configProps) {
83 for(Entry<String, String> property: configProps.entrySet()) {
84 properties.put(property.getKey(), property.getValue());
87 // turn proxy puts on
88 properties.put("proxy.puts.during.rebalance", "true");
89 properties.put("bdb.cache.size", "" + (5 * 1024 * 1024));
90 properties.put("bdb.one.env.per.store", "true");
92 VoldemortConfig config = ServerTestUtils.createServerConfig(true,
93 node,
94 TestUtils.createTempDir()
95 .getAbsolutePath(),
96 null,
97 storeXmlFile,
98 properties);
100 VoldemortServer server = ServerTestUtils.startVoldemortServer(socketStoreFactory,
101 config,
102 cluster);
103 serverMap.put(node, server);
106 return cluster;
109 @After
110 public void tearDown() throws Exception {
111 for(VoldemortServer vs: serverMap.values()) {
112 vs.stop();
116 protected Store<ByteArray, byte[], byte[]> getSocketStore(String storeName,
117 String host,
118 int port) {
119 return getSocketStore(storeName, host, port, false);
122 protected Store<ByteArray, byte[], byte[]> getSocketStore(String storeName,
123 String host,
124 int port,
125 boolean isRouted) {
126 return ServerTestUtils.getSocketStore(socketStoreFactory,
127 storeName,
128 host,
129 port,
130 RequestFormatType.PROTOCOL_BUFFERS,
131 isRouted);
134 protected VoldemortState getCurrentState(int nodeId) {
135 VoldemortServer server = serverMap.get(nodeId);
136 if(server == null) {
137 throw new VoldemortException("Node id " + nodeId + " does not exist");
138 } else {
139 return server.getMetadataStore().getServerStateUnlocked();
143 protected Cluster getCurrentCluster(int nodeId) {
144 VoldemortServer server = serverMap.get(nodeId);
145 if(server == null) {
146 throw new VoldemortException("Node id " + nodeId + " does not exist");
147 } else {
148 return server.getMetadataStore().getCluster();
152 public void checkConsistentMetadata(Cluster finalCluster, List<Integer> serverList) {
153 for(int nodeId: serverList) {
154 assertEquals(finalCluster, getCurrentCluster(nodeId));
155 assertEquals(MetadataStore.VoldemortState.NORMAL_SERVER, getCurrentState(nodeId));
159 protected void stopServer(List<Integer> nodesToStop) throws Exception {
160 for(int node: nodesToStop) {
161 try {
162 ServerTestUtils.stopVoldemortServer(serverMap.get(node));
163 } catch(VoldemortException e) {
164 // ignore these at stop time
170 * This method determines the "size" of the test to run...
172 * @return
174 protected abstract int getNumKeys();
176 protected String getBootstrapUrl(Cluster cluster, int nodeId) {
177 Node node = cluster.getNodeById(nodeId);
178 return "tcp://" + node.getHost() + ":" + node.getSocketPort();
182 * Does the rebalance and then checks that it succeeded.
184 * @param rebalancePlan
185 * @param rebalanceClient
186 * @param nodeCheckList
188 protected void rebalanceAndCheck(RebalancePlan rebalancePlan,
189 RebalanceController rebalanceClient,
190 List<Integer> nodeCheckList) {
191 rebalanceClient.rebalance(rebalancePlan);
192 checkEntriesPostRebalance(rebalancePlan.getCurrentCluster(),
193 rebalancePlan.getFinalCluster(),
194 rebalancePlan.getCurrentStores(),
195 nodeCheckList,
196 testEntries,
197 null);
201 * Makes sure that all expected partition-stores are on each server after
202 * the rebalance.
204 * @param currentCluster
205 * @param finalCluster
206 * @param storeDefs
207 * @param nodeCheckList
208 * @param baselineTuples
209 * @param baselineVersions
211 protected void checkEntriesPostRebalance(Cluster currentCluster,
212 Cluster finalCluster,
213 List<StoreDefinition> storeDefs,
214 List<Integer> nodeCheckList,
215 HashMap<String, String> baselineTuples,
216 HashMap<String, VectorClock> baselineVersions) {
217 for(StoreDefinition storeDef: storeDefs) {
218 Map<Integer, Set<Pair<Integer, Integer>>> currentNodeToPartitionTuples = ROTestUtils.getNodeIdToAllPartitions(currentCluster,
219 storeDef,
220 true);
221 Map<Integer, Set<Pair<Integer, Integer>>> finalNodeToPartitionTuples = ROTestUtils.getNodeIdToAllPartitions(finalCluster,
222 storeDef,
223 true);
225 for(int nodeId: nodeCheckList) {
226 Set<Pair<Integer, Integer>> currentPartitionTuples = currentNodeToPartitionTuples.get(nodeId);
227 Set<Pair<Integer, Integer>> finalPartitionTuples = finalNodeToPartitionTuples.get(nodeId);
229 HashMap<Integer, List<Integer>> flattenedPresentTuples = ROTestUtils.flattenPartitionTuples(Utils.getAddedInTarget(currentPartitionTuples,
230 finalPartitionTuples));
231 Store<ByteArray, byte[], byte[]> store = getSocketStore(storeDef.getName(),
232 finalCluster.getNodeById(nodeId)
233 .getHost(),
234 finalCluster.getNodeById(nodeId)
235 .getSocketPort());
236 checkGetEntries(finalCluster.getNodeById(nodeId),
237 finalCluster,
238 storeDef,
239 store,
240 flattenedPresentTuples,
241 baselineTuples,
242 baselineVersions);
247 protected void checkGetEntries(Node node,
248 Cluster cluster,
249 StoreDefinition def,
250 Store<ByteArray, byte[], byte[]> store,
251 HashMap<Integer, List<Integer>> flattenedPresentTuples,
252 HashMap<String, String> baselineTuples,
253 HashMap<String, VectorClock> baselineVersions) {
254 RoutingStrategy routing = new RoutingStrategyFactory().updateRoutingStrategy(def, cluster);
256 for(Entry<String, String> entry: baselineTuples.entrySet()) {
257 ByteArray keyBytes = new ByteArray(ByteUtils.getBytes(entry.getKey(), "UTF-8"));
259 List<Integer> partitions = routing.getPartitionList(keyBytes.get());
261 if(StoreRoutingPlan.checkKeyBelongsToPartition(partitions,
262 node.getPartitionIds(),
263 flattenedPresentTuples)) {
264 List<Versioned<byte[]>> values = store.get(keyBytes, null);
266 // expecting exactly one version
267 if(values.size() == 0) {
268 fail("unable to find value for key=" + entry.getKey() + " on node="
269 + node.getId());
271 assertEquals("Expecting exactly one version", 1, values.size());
272 Versioned<byte[]> value = values.get(0);
273 // check version matches
274 if(baselineVersions == null) {
275 // expecting base version for all
276 assertEquals("Value version should match",
277 new VectorClock(),
278 value.getVersion());
279 } else {
280 assertEquals("Value version should match",
281 baselineVersions.get(entry.getKey()),
282 value.getVersion());
285 // check value matches.
286 assertEquals("Value bytes should match",
287 entry.getValue(),
288 ByteUtils.getString(value.getValue(), "UTF-8"));
294 protected List<ByteArray> sampleKeysFromPartition(AdminClient admin,
295 int serverId,
296 String store,
297 List<Integer> partitionsToSample,
298 int numSamples) {
299 List<ByteArray> samples = new ArrayList<ByteArray>(numSamples);
300 Iterator<ByteArray> keys = admin.bulkFetchOps.fetchKeys(serverId,
301 store,
302 partitionsToSample,
303 null,
304 false);
305 int count = 0;
306 while(keys.hasNext() && count < numSamples) {
307 samples.add(keys.next());
308 count++;
310 return samples;
314 * REFACTOR: these should belong AdminClient so existence checks can be done
315 * easily across the board
317 * @param admin
318 * @param serverId
319 * @param store
320 * @param keyList
322 protected void checkForKeyExistence(AdminClient admin,
323 int serverId,
324 String store,
325 List<ByteArray> keyList) {
326 // do the positive tests
327 Iterator<QueryKeyResult> positiveTestResultsItr = admin.streamingOps.queryKeys(serverId,
328 store,
329 keyList.iterator());
330 while(positiveTestResultsItr.hasNext()) {
331 QueryKeyResult item = positiveTestResultsItr.next();
332 ByteArray key = item.getKey();
333 List<Versioned<byte[]>> vals = item.getValues();
334 Exception e = item.getException();
336 assertEquals("Error fetching key " + key, null, e);
337 assertEquals("Value not found for key " + key, true, vals != null & vals.size() != 0);
343 * REFACTOR: these should belong AdminClient so existence checks can be done
344 * easily across the board
346 * @param admin
347 * @param serverId
348 * @param store
349 * @param keyList
351 protected void checkForTupleEquivalence(AdminClient admin,
352 int serverId,
353 String store,
354 List<ByteArray> keyList,
355 HashMap<String, String> baselineTuples,
356 HashMap<String, VectorClock> baselineVersions) {
357 // do the positive tests
358 Iterator<QueryKeyResult> positiveTestResultsItr = admin.streamingOps.queryKeys(serverId,
359 store,
360 keyList.iterator());
361 while(positiveTestResultsItr.hasNext()) {
362 QueryKeyResult item = positiveTestResultsItr.next();
363 ByteArray key = item.getKey();
364 List<Versioned<byte[]>> vals = item.getValues();
365 Exception e = item.getException();
367 assertEquals("Error fetching key " + key, null, e);
368 assertEquals("Value not found for key " + key, true, vals != null & vals.size() != 0);
370 String keyStr = ByteUtils.getString(key.get(), "UTF-8");
371 if(baselineTuples != null)
372 assertEquals("Value does not match up ",
373 baselineTuples.get(keyStr),
374 ByteUtils.getString(vals.get(0).getValue(), "UTF-8"));
375 if(baselineVersions != null)
376 assertEquals("Version does not match up",
377 baselineVersions.get(keyStr),
378 vals.get(0).getVersion());
383 * REFACTOR: these should belong AdminClient so existence checks can be done
384 * easily across the board
386 * @param admin
387 * @param serverId
388 * @param store
389 * @param keyList
391 protected void checkForKeyNonExistence(AdminClient admin,
392 int serverId,
393 String store,
394 List<ByteArray> keyList) {
395 Iterator<QueryKeyResult> negativeTestResultsItr = admin.streamingOps.queryKeys(serverId,
396 store,
397 keyList.iterator());
398 while(negativeTestResultsItr.hasNext()) {
399 QueryKeyResult item = negativeTestResultsItr.next();
400 ByteArray key = item.getKey();
401 List<Versioned<byte[]>> vals = item.getValues();
402 Exception e = item.getException();
404 assertEquals("Error fetching key " + key, null, e);
405 assertEquals("Value " + vals + "found for key " + key,
406 true,
407 vals == null || vals.size() == 0);