bind failures while running tests
[voldemort/jeffpc.git] / test / unit / voldemort / client / protocol / admin / StreamingClientTest.java
blob8b604b6a2a6ccb6a6a6aa219a4f353e2a8342e17
1 package voldemort.client.protocol.admin;
3 import static org.junit.Assert.assertEquals;
5 import java.io.File;
6 import java.io.IOException;
7 import java.util.ArrayList;
8 import java.util.Arrays;
9 import java.util.Collection;
10 import java.util.HashMap;
11 import java.util.Iterator;
12 import java.util.List;
13 import java.util.Properties;
14 import java.util.concurrent.Callable;
16 import org.apache.commons.io.FileUtils;
17 import org.junit.After;
18 import org.junit.Assert;
19 import org.junit.Before;
20 import org.junit.Test;
21 import org.junit.runner.RunWith;
22 import org.junit.runners.Parameterized;
24 import voldemort.ClusterTestUtils;
25 import voldemort.ServerTestUtils;
26 import voldemort.TestUtils;
27 import voldemort.client.RoutingTier;
28 import voldemort.cluster.Cluster;
29 import voldemort.cluster.Node;
30 import voldemort.routing.RoutingStrategy;
31 import voldemort.routing.RoutingStrategyFactory;
32 import voldemort.routing.RoutingStrategyType;
33 import voldemort.serialization.DefaultSerializerFactory;
34 import voldemort.serialization.Serializer;
35 import voldemort.serialization.SerializerDefinition;
36 import voldemort.serialization.SerializerFactory;
37 import voldemort.server.VoldemortServer;
38 import voldemort.store.StoreDefinition;
39 import voldemort.store.StoreDefinitionBuilder;
40 import voldemort.store.bdb.BdbStorageConfiguration;
41 import voldemort.store.compress.CompressionStrategy;
42 import voldemort.store.compress.CompressionStrategyFactory;
43 import voldemort.store.slop.strategy.HintedHandoffStrategyType;
44 import voldemort.store.socket.SocketStoreFactory;
45 import voldemort.store.socket.TestSocketStoreFactory;
46 import voldemort.utils.ByteArray;
47 import voldemort.utils.Props;
48 import voldemort.versioning.Versioned;
49 import voldemort.xml.StoreDefinitionsMapper;
51 import com.google.common.collect.Lists;
54 * Starts a streaming session and inserts some keys Using fetchKeys we check if
55 * they keys made it to the responsible node
56 * Three scenarios are tested :
57 * (i) Non zoned cluster with contiguous node ids
58 * (ii) Non zoned cluster with non contiguous node ids
59 * (iii) Zoned cluster with contiguous zone/node ids
62 @RunWith(Parameterized.class)
63 public class StreamingClientTest {
65 private long startTime;
66 public final String SERVER_LOCAL_URL = "tcp://localhost:";
67 public final static String TEST_STORE_NAME = "test-store-streaming-1";
68 public final String STORES_XML_FILE = "test/common/voldemort/config/stores.xml";
69 public int numServers;
70 private int NUM_KEYS_1 = 4000;
71 private SocketStoreFactory socketStoreFactory = new TestSocketStoreFactory();
72 private VoldemortServer[] servers = null;
73 private int[] serverPorts = null;
74 private Cluster cluster;
75 private AdminClient adminClient;
76 private SerializerFactory serializerFactory = new DefaultSerializerFactory();
77 private StoreDefinition storeDef;
78 private int nodeIdOnWhichToVerifyKey;
80 public StreamingClientTest(int numServers,
81 Cluster cluster,
82 int nodeIdOnWhichToVerifyKey,
83 StoreDefinition storeDef) {
84 this.numServers = numServers;
85 this.cluster = cluster;
86 this.nodeIdOnWhichToVerifyKey = nodeIdOnWhichToVerifyKey;
87 this.storeDef = storeDef;
90 @Parameterized.Parameters
91 public static Collection<Object[]> configs() {
92 StoreDefinition storeDefConsistestStrategy = new StoreDefinitionBuilder().setName(TEST_STORE_NAME)
93 .setType(BdbStorageConfiguration.TYPE_NAME)
94 .setKeySerializer(new SerializerDefinition("string"))
95 .setValueSerializer(new SerializerDefinition("string"))
96 .setRoutingPolicy(RoutingTier.CLIENT)
97 .setRoutingStrategyType(RoutingStrategyType.CONSISTENT_STRATEGY)
98 .setReplicationFactor(2)
99 .setPreferredReads(1)
100 .setRequiredReads(1)
101 .setPreferredWrites(2)
102 .setRequiredWrites(2)
103 .build();
105 HashMap<Integer, Integer> zoneReplicationFactor = new HashMap<Integer, Integer>();
106 zoneReplicationFactor.put(1, 2);
107 zoneReplicationFactor.put(3, 2);
108 StoreDefinition storeDefZoneStrategy = new StoreDefinitionBuilder().setName(TEST_STORE_NAME)
109 .setType(BdbStorageConfiguration.TYPE_NAME)
110 .setKeySerializer(new SerializerDefinition("string"))
111 .setValueSerializer(new SerializerDefinition("string"))
112 .setRoutingPolicy(RoutingTier.CLIENT)
113 .setRoutingStrategyType(RoutingStrategyType.ZONE_STRATEGY)
114 .setReplicationFactor(4)
115 .setPreferredReads(1)
116 .setRequiredReads(1)
117 .setPreferredWrites(1)
118 .setRequiredWrites(1)
119 .setZoneCountReads(0)
120 .setZoneCountWrites(0)
121 .setZoneReplicationFactor(zoneReplicationFactor)
122 .setHintedHandoffStrategy(HintedHandoffStrategyType.PROXIMITY_STRATEGY)
123 .build();
125 return Arrays.asList(new Object[][] {
127 { 2,
128 ServerTestUtils.getLocalCluster(2, new int[][] { { 0, 1, 2, 3 },
129 { 4, 5, 6, 7 } }),
131 storeDefConsistestStrategy
135 ServerTestUtils.getLocalNonContiguousNodesCluster(new int[] { 1, 3 },
136 new int[][] {
137 { 0, 1, 2, 3 },
138 { 4, 5, 6, 7 } }),
139 1, storeDefConsistestStrategy
141 { 6,
142 ClusterTestUtils.getZ1Z3ClusterWithNonContiguousNodeIds(),
144 storeDefZoneStrategy
149 @Before
150 public void testSetup() throws IOException {
152 if(null == servers) {
153 servers = new VoldemortServer[numServers];
154 serverPorts = new int[numServers];
156 File tempStoreXml = new File(TestUtils.createTempDir(), "stores.xml");
157 try {
158 FileUtils.writeStringToFile(tempStoreXml,
159 new StoreDefinitionsMapper().writeStoreList(Lists.newArrayList(storeDef)));
160 } catch(IOException e1) {
161 // TODO Auto-generated catch block
162 e1.printStackTrace();
163 throw e1;
166 int count = 0;
167 for(int nodeId: cluster.getNodeIds()) {
168 try {
169 servers[count] = ServerTestUtils.startVoldemortServer(socketStoreFactory,
170 ServerTestUtils.createServerConfig(true,
171 nodeId,
172 TestUtils.createTempDir()
173 .getAbsolutePath(),
174 null,
175 tempStoreXml.getAbsolutePath(),
176 new Properties()),
177 cluster);
178 } catch(IOException e) {
179 e.printStackTrace();
180 throw e;
182 serverPorts[count] = servers[count].getIdentityNode().getSocketPort();
183 count++;
185 adminClient = ServerTestUtils.getAdminClient(cluster);
188 startTime = System.currentTimeMillis();
192 @After
193 public void testCleanup() {
194 // Teardown for data used by the unit tests
195 if(servers != null) {
196 for(VoldemortServer server: servers) {
197 server.stop();
202 @Test
203 public void testStreaming() {
205 Props property = new Props();
206 property.put("streaming.platform.bootstrapURL", SERVER_LOCAL_URL + serverPorts[0]);
207 StreamingClientConfig config = new StreamingClientConfig(property);
209 BaseStreamingClient streamer = new BaseStreamingClient(config);
211 streamer.initStreamingSession(TEST_STORE_NAME, new Callable<Object>() {
213 @Override
214 public Object call() throws Exception {
216 return null;
218 }, new Callable<Object>() {
220 @Override
221 public Object call() throws Exception {
223 return null;
225 }, true);
227 for(int i = 0; i < NUM_KEYS_1; i++) {
228 String key = i + "";
229 String value = key;
231 Versioned<byte[]> outputValue = Versioned.value(value.getBytes());
232 // adminClient.streamingPut(new ByteArray(key.getBytes()),
233 // outputValue);
234 streamer.streamingPut(new ByteArray(key.getBytes()), outputValue);
236 streamer.commitToVoldemort();
237 streamer.closeStreamingSession();
238 assertEquals(verifyKeysExist(nodeIdOnWhichToVerifyKey), true);
244 * Checks if the streamingClient stays calm and not throw NPE when calling
245 * commit before it has been initialized
247 @Test
248 public void testUnInitializedClientPreventNPE() {
250 Props property = new Props();
251 property.put("streaming.platform.bootstrapURL", SERVER_LOCAL_URL + serverPorts[0]);
252 StreamingClientConfig config = new StreamingClientConfig(property);
253 BaseStreamingClient streamer = new BaseStreamingClient(config);
254 try {
255 streamer.commitToVoldemort();
256 } catch (NullPointerException e) {
257 e.printStackTrace();
258 Assert.fail("Should not throw NPE at this stage even though streamingSession not initialized");
265 * Checks if each node has the keys it is reponsible for returns false
266 * otherwise
268 public boolean verifyKeysExist(int nodeIdToVerifyOn) {
269 RoutingStrategyFactory factory = new RoutingStrategyFactory();
270 RoutingStrategy storeRoutingStrategy = factory.updateRoutingStrategy(storeDef,
271 adminClient.getAdminClientCluster());
273 HashMap<Integer, ArrayList<String>> expectedNodeIdToKeys;
274 expectedNodeIdToKeys = new HashMap();
275 Collection<Node> nodesInCluster = adminClient.getAdminClientCluster().getNodes();
276 for(Node node: nodesInCluster) {
277 ArrayList<String> keysForNode = new ArrayList();
278 expectedNodeIdToKeys.put(node.getId(), keysForNode);
280 for(int i = 0; i < NUM_KEYS_1; i++) {
281 String key = i + "";
282 String value = key;
283 List<Node> nodeList = storeRoutingStrategy.routeRequest(key.getBytes());
284 for(Node node: nodeList) {
285 ArrayList<String> keysForNode = expectedNodeIdToKeys.get(node.getId());
286 keysForNode.add(key);
290 ArrayList<String> fetchedKeysForNode = new ArrayList();
291 for(Node node: nodesInCluster) {
293 List<Integer> partitionIdList = Lists.newArrayList();
294 partitionIdList.addAll(node.getPartitionIds());
296 Iterator<ByteArray> keyIteratorRef = null;
297 keyIteratorRef = adminClient.bulkFetchOps.fetchKeys(node.getId(),
298 TEST_STORE_NAME,
299 partitionIdList,
300 null,
301 false);
303 final SerializerDefinition serializerDef = storeDef.getKeySerializer();
304 final SerializerFactory serializerFactory = new DefaultSerializerFactory();
305 @SuppressWarnings("unchecked")
306 final Serializer<Object> serializer = (Serializer<Object>) serializerFactory.getSerializer(serializerDef);
308 final CompressionStrategy keysCompressionStrategy;
309 if(serializerDef != null && serializerDef.hasCompression()) {
310 keysCompressionStrategy = new CompressionStrategyFactory().get(serializerDef.getCompression());
311 } else {
312 keysCompressionStrategy = null;
314 final Iterator<ByteArray> keyIterator = keyIteratorRef;
315 while(keyIterator.hasNext()) {
317 byte[] keyBytes = keyIterator.next().get();
318 try {
319 Object keyObject = serializer.toObject((null == keysCompressionStrategy) ? keyBytes
320 : keysCompressionStrategy.inflate(keyBytes));
321 fetchedKeysForNode.add((String) keyObject);
323 } catch(IOException e) {
325 e.printStackTrace();
331 ArrayList<String> keysForNode = expectedNodeIdToKeys.get(nodeIdToVerifyOn);
332 if(!fetchedKeysForNode.containsAll(keysForNode))
333 return false;
334 else
335 return true;