2 * Copyright 2013 LinkedIn, Inc
4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5 * use this file except in compliance with the License. You may obtain a copy of
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations under
17 package voldemort
.coordinator
;
19 import static org
.junit
.Assert
.assertEquals
;
20 import static org
.junit
.Assert
.fail
;
22 import java
.io
.OutputStream
;
23 import java
.net
.HttpURLConnection
;
25 import java
.util
.ArrayList
;
26 import java
.util
.HashMap
;
27 import java
.util
.List
;
29 import java
.util
.Properties
;
31 import javax
.mail
.internet
.MimeBodyPart
;
32 import javax
.mail
.internet
.MimeMultipart
;
33 import javax
.mail
.util
.ByteArrayDataSource
;
35 import org
.apache
.commons
.codec
.binary
.Base64
;
36 import org
.junit
.After
;
37 import org
.junit
.Before
;
38 import org
.junit
.Test
;
40 import voldemort
.ServerTestUtils
;
41 import voldemort
.rest
.RestMessageHeaders
;
42 import voldemort
.rest
.RestUtils
;
43 import voldemort
.rest
.coordinator
.CoordinatorConfig
;
44 import voldemort
.rest
.coordinator
.CoordinatorService
;
45 import voldemort
.server
.VoldemortServer
;
46 import voldemort
.store
.socket
.SocketStoreFactory
;
47 import voldemort
.store
.socket
.clientrequest
.ClientRequestExecutorPool
;
48 import voldemort
.versioning
.Occurred
;
49 import voldemort
.versioning
.VectorClock
;
51 public class CoordinatorRestAPITest
{
53 private VoldemortServer
[] servers
;
54 public static String socketUrl
= "";
55 private static final String STORE_NAME
= "slow-store-test";
56 private static final String STORES_XML
= "test/common/voldemort/config/single-slow-store.xml";
57 private static final String FAT_CLIENT_CONFIG_FILE_PATH
= "test/common/voldemort/config/fat-client-config.avro";
58 private final SocketStoreFactory socketStoreFactory
= new ClientRequestExecutorPool(2,
62 private CoordinatorService coordinator
= null;
63 private final String coordinatorURL
= "http://localhost:8080";
65 private class TestVersionedValue
{
68 private VectorClock vc
;
70 public TestVersionedValue(String val
, VectorClock vc
) {
75 public String
getValue() {
79 public void setValue(String value
) {
83 public VectorClock
getVc() {
87 public void setVc(VectorClock vc
) {
93 public void setUp() throws Exception
{
95 servers
= new VoldemortServer
[numServers
];
96 int partitionMap
[][] = { { 0, 2, 4, 6, 1, 3, 5, 7 } };
97 Properties props
= new Properties();
98 props
.setProperty("storage.configs",
99 "voldemort.store.bdb.BdbStorageConfiguration,voldemort.store.slow.SlowStorageConfiguration");
100 props
.setProperty("testing.slow.queueing.get.ms", "4");
101 props
.setProperty("testing.slow.queueing.getall.ms", "4");
102 props
.setProperty("testing.slow.queueing.put.ms", "4");
103 props
.setProperty("testing.slow.queueing.delete.ms", "4");
105 ServerTestUtils
.startVoldemortCluster(numServers
,
114 CoordinatorConfig config
= new CoordinatorConfig();
115 List
<String
> bootstrapUrls
= new ArrayList
<String
>();
116 socketUrl
= servers
[0].getIdentityNode().getSocketUrl().toString();
117 bootstrapUrls
.add(socketUrl
);
119 System
.out
.println("\n\n************************ Starting the Coordinator *************************");
121 config
.setBootstrapURLs(bootstrapUrls
);
122 config
.setFatClientConfigPath(FAT_CLIENT_CONFIG_FILE_PATH
);
124 this.coordinator
= new CoordinatorService(config
);
125 if(!this.coordinator
.isStarted()) {
126 this.coordinator
.start();
131 public void tearDown() throws Exception
{
132 if(this.socketStoreFactory
!= null) {
133 this.socketStoreFactory
.close();
136 if(this.coordinator
!= null && this.coordinator
.isStarted()) {
137 this.coordinator
.stop();
141 public static enum ValueType
{
147 public static String
generateRandomString(int length
, ValueType type
) {
149 StringBuffer buffer
= new StringBuffer();
150 String characters
= "";
155 characters
= "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
159 characters
= "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890";
163 characters
= "1234567890";
167 int charactersLength
= characters
.length();
169 for(int i
= 0; i
< length
; i
++) {
170 double index
= Math
.random() * charactersLength
;
171 buffer
.append(characters
.charAt((int) index
));
173 return buffer
.toString();
176 private VectorClock
doPut(String key
, String payload
, VectorClock vc
) {
177 return doPut(key
, payload
, vc
, null);
180 private VectorClock
doPut(String key
,
183 Map
<String
, Object
> options
) {
184 VectorClock successfulPutVC
= null;
185 int expectedResponseCode
= 201;
187 // Create the right URL and Http connection
188 HttpURLConnection conn
= null;
189 String base64Key
= new String(Base64
.encodeBase64(key
.getBytes()));
190 URL url
= new URL(this.coordinatorURL
+ "/" + STORE_NAME
+ "/" + base64Key
);
191 conn
= (HttpURLConnection
) url
.openConnection();
193 // Set the right headers
194 conn
.setRequestMethod("POST");
195 conn
.setDoOutput(true);
196 conn
.setDoInput(true);
197 conn
.setRequestProperty("Content-Type", "binary");
198 conn
.setRequestProperty("Content-Length", "" + payload
.length());
199 conn
.setRequestProperty(RestMessageHeaders
.X_VOLD_REQUEST_TIMEOUT_MS
, "1000");
200 conn
.setRequestProperty(RestMessageHeaders
.X_VOLD_REQUEST_ORIGIN_TIME_MS
,
201 Long
.toString(System
.currentTimeMillis()));
204 if(options
!= null) {
205 if(options
.get("timeout") != null && options
.get("timeout") instanceof String
) {
206 conn
.setRequestProperty(RestMessageHeaders
.X_VOLD_REQUEST_TIMEOUT_MS
,
207 (String
) options
.get("timeout"));
209 if(options
.get("responseCode") != null
210 && options
.get("responseCode") instanceof Integer
) {
211 expectedResponseCode
= (Integer
) options
.get("responseCode");
216 String eTag
= RestUtils
.getSerializedVectorClock(vc
);
217 conn
.setRequestProperty("ETag", eTag
);
221 OutputStream out
= conn
.getOutputStream();
222 out
.write(payload
.getBytes());
225 // Check for the right response code
227 if(conn
.getResponseCode() != expectedResponseCode
) {
228 System
.err
.println("Illegal response during PUT : " + conn
.getResponseMessage());
229 fail("Incorrect response received for a HTTP put request :"
230 + conn
.getResponseCode());
233 } catch(Exception e
) {
235 fail("Error in sending the REST request");
238 return successfulPutVC
;
241 private boolean doDelete(String key
) {
242 return doDelete(key
, null);
245 private boolean doDelete(String key
, Map
<String
, Object
> options
) {
246 int expectedResponseCode
= 204;
249 // Create the right URL and Http connection
250 HttpURLConnection conn
= null;
251 String base64Key
= new String(Base64
.encodeBase64(key
.getBytes()));
252 URL url
= new URL(this.coordinatorURL
+ "/" + STORE_NAME
+ "/" + base64Key
);
253 conn
= (HttpURLConnection
) url
.openConnection();
255 // Set the right headers
256 conn
.setRequestMethod("DELETE");
257 conn
.setDoInput(true);
258 conn
.setRequestProperty(RestMessageHeaders
.X_VOLD_REQUEST_TIMEOUT_MS
, "1000");
259 conn
.setRequestProperty(RestMessageHeaders
.X_VOLD_REQUEST_ORIGIN_TIME_MS
,
260 Long
.toString(System
.currentTimeMillis()));
263 if(options
!= null) {
264 if(options
.get("timeout") != null && options
.get("timeout") instanceof String
) {
265 conn
.setRequestProperty(RestMessageHeaders
.X_VOLD_REQUEST_TIMEOUT_MS
,
266 (String
) options
.get("timeout"));
268 if(options
.get("responseCode") != null
269 && options
.get("responseCode") instanceof Integer
) {
270 expectedResponseCode
= (Integer
) options
.get("responseCode");
274 // Check for the right response code
275 if(conn
.getResponseCode() != expectedResponseCode
) {
276 System
.err
.println("Illegal response during DELETE : " + conn
.getResponseMessage());
277 fail("Incorrect response received for a HTTP put request :"
278 + conn
.getResponseCode());
283 } catch(Exception e
) {
285 fail("Error in sending the REST request");
291 private TestVersionedValue
doGet(String key
) {
292 return doGet(key
, null);
295 private TestVersionedValue
doGet(String key
, Map
<String
, Object
> options
) {
296 HttpURLConnection conn
= null;
297 String response
= null;
298 TestVersionedValue responseObj
= null;
299 int expectedResponseCode
= 200;
302 // Create the right URL and Http connection
303 String base64Key
= new String(Base64
.encodeBase64(key
.getBytes()));
304 URL url
= new URL(this.coordinatorURL
+ "/" + STORE_NAME
+ "/" + base64Key
);
305 conn
= (HttpURLConnection
) url
.openConnection();
307 // Set the right headers
308 conn
.setRequestMethod("GET");
309 conn
.setDoInput(true);
310 conn
.setRequestProperty(RestMessageHeaders
.X_VOLD_REQUEST_TIMEOUT_MS
, "1000");
311 conn
.setRequestProperty(RestMessageHeaders
.X_VOLD_REQUEST_ORIGIN_TIME_MS
,
312 Long
.toString(System
.currentTimeMillis()));
315 if(options
!= null) {
316 if(options
.get("timeout") != null && options
.get("timeout") instanceof String
) {
317 conn
.setRequestProperty(RestMessageHeaders
.X_VOLD_REQUEST_TIMEOUT_MS
,
318 (String
) options
.get("timeout"));
320 if(options
.get("responseCode") != null
321 && options
.get("responseCode") instanceof Integer
) {
322 expectedResponseCode
= (Integer
) options
.get("responseCode");
326 // Check for the right response code
327 if(conn
.getResponseCode() != expectedResponseCode
) {
328 System
.err
.println("Illegal response during GET : " + conn
.getResponseMessage());
329 fail("Incorrect response received for a HTTP GET request :"
330 + conn
.getResponseCode());
333 if(conn
.getResponseCode() == 404 || conn
.getResponseCode() == 408) {
337 // Buffer the result into a string
338 ByteArrayDataSource ds
= new ByteArrayDataSource(conn
.getInputStream(),
340 MimeMultipart mp
= new MimeMultipart(ds
);
341 assertEquals("The number of body parts expected is not 1", 1, mp
.getCount());
343 MimeBodyPart part
= (MimeBodyPart
) mp
.getBodyPart(0);
344 VectorClock vc
= RestUtils
.deserializeVectorClock(part
.getHeader(RestMessageHeaders
.X_VOLD_VECTOR_CLOCK
)[0]);
345 int contentLength
= Integer
.parseInt(part
.getHeader(RestMessageHeaders
.CONTENT_LENGTH
)[0]);
346 byte[] bodyPartBytes
= new byte[contentLength
];
348 part
.getInputStream().read(bodyPartBytes
);
349 response
= new String(bodyPartBytes
);
351 responseObj
= new TestVersionedValue(response
, vc
);
353 } catch(Exception e
) {
355 fail("Error in sending the REST request");
365 public void testReadAfterWrite() {
366 String key
= "Which_Imperial_IPA_do_I_want_to_drink";
367 String payload
= "Pliny the Younger";
370 doPut(key
, payload
, null);
372 // 2. Do a get on the same key
373 TestVersionedValue response
= doGet(key
);
374 if(response
== null) {
375 fail("key does not exist after a put. ");
378 System
.out
.println("Received value: " + response
.getValue());
379 if(!response
.getValue().equals(payload
)) {
380 fail("Received value is incorrect ! Expected : " + payload
+ " but got : "
381 + response
.getValue());
386 public void testDelete() {
387 String key
= "Which_sour_beer_do_I_want_to_drink";
388 String payload
= "Duchesse De Bourgogne";
389 Map
<String
, Object
> options
= new HashMap
<String
, Object
>();
392 doPut(key
, payload
, null);
394 // 2. Do a get on the same key
395 TestVersionedValue response
= doGet(key
);
396 if(response
== null) {
397 fail("key does not exist after a put. ");
399 System
.out
.println("Received value: " + response
.getValue());
400 if(!response
.getValue().equals(payload
)) {
401 fail("Received value is incorrect ! Expected : " + payload
+ " but got : "
402 + response
.getValue());
406 boolean isDeleted
= doDelete(key
);
408 fail("Could not delete the key. Error !");
411 // 4. Do a get on the same key : this should fail
412 options
.put("responseCode", 404);
413 response
= doGet(key
, options
);
414 if(response
!= null) {
415 fail("key still exists after deletion. ");
420 public void testVersionedPut() {
421 String key
= "Which_Porter_do_I_want_to_drink";
422 String payload
= "Founders Porter";
423 String newPayload
= "Samuel Smith Taddy Porter";
426 doPut(key
, payload
, null);
428 // 2. Do a get on the same key
429 TestVersionedValue response
= doGet(key
, null);
430 if(response
== null) {
431 fail("key does not exist after a put. ");
433 System
.out
.println("Received value: " + response
.getValue());
435 // 3. Do a versioned put based on the version received previously
436 doPut(key
, newPayload
, response
.getVc());
438 // 4. Do a get again on the same key
439 TestVersionedValue newResponse
= doGet(key
);
440 if(newResponse
== null) {
441 fail("key does not exist after the versioned put. ");
443 assertEquals("Returned response does not have a higer version",
445 newResponse
.getVc().compare(response
.getVc()));
446 assertEquals("Returned response does not have a higer version",
448 response
.getVc().compare(newResponse
.getVc()));
450 System
.out
.println("Received value after the Versioned put: " + newResponse
.getValue());
451 if(!newResponse
.getValue().equals(newPayload
)) {
452 fail("Received value is incorrect ! Expected : " + newPayload
+ " but got : "
453 + newResponse
.getValue());
458 public void testLargeValueSizeVersionedPut() {
459 String key
= "amigo";
460 String payload
= generateRandomString(new CoordinatorConfig().getHttpMessageDecoderMaxChunkSize() * 10,
462 String newPayload
= generateRandomString(new CoordinatorConfig().getHttpMessageDecoderMaxChunkSize() * 10,
466 doPut(key
, payload
, null);
468 // 2. Do a get on the same key
469 TestVersionedValue response
= doGet(key
, null);
470 if(response
== null) {
471 fail("key does not exist after a put. ");
473 System
.out
.println("Received value: " + response
.getValue());
475 // 3. Do a versioned put based on the version received previously
476 doPut(key
, newPayload
, response
.getVc());
478 // 4. Do a get again on the same key
479 TestVersionedValue newResponse
= doGet(key
);
480 if(newResponse
== null) {
481 fail("key does not exist after the versioned put. ");
483 assertEquals("Returned response does not have a higer version",
485 newResponse
.getVc().compare(response
.getVc()));
486 assertEquals("Returned response does not have a higer version",
488 response
.getVc().compare(newResponse
.getVc()));
490 System
.out
.println("Received value after the Versioned put: " + newResponse
.getValue());
491 if(!newResponse
.getValue().equals(newPayload
)) {
492 fail("Received value is incorrect ! Expected : " + newPayload
+ " but got : "
493 + newResponse
.getValue());
498 public void testWriteWithTimeout() {
499 String key
= "Which_Imperial_IPA_do_I_want_to_drink";
500 String payload
= "Pliny the Younger";
501 Map
<String
, Object
> options
= new HashMap
<String
, Object
>();
503 // 1. Do a put (timeout)
504 options
.put("timeout", "1");
505 options
.put("responseCode", 408);
506 doPut(key
, payload
, null, options
);
508 // 2. Do a get on the same key
510 options
.put("responseCode", 404);
511 TestVersionedValue response
= doGet(key
, options
);
512 if(response
!= null) {
513 fail("key should not exist after a put. ");
517 doPut(key
, payload
, null);
519 // 4. Do a get on the same key with timeout
521 options
.put("timeout", "1");
522 options
.put("responseCode", 408);
523 response
= doGet(key
, options
);
525 // 5. Do a get on the same key
526 response
= doGet(key
);
527 if(response
== null) {
528 fail("key does not exist after a put. ");