1 package voldemort
.store
.routed
;
3 import static org
.junit
.Assert
.assertEquals
;
4 import static org
.junit
.Assert
.fail
;
6 import java
.io
.IOException
;
7 import java
.util
.Arrays
;
8 import java
.util
.Collection
;
9 import java
.util
.HashMap
;
10 import java
.util
.List
;
12 import java
.util
.Properties
;
15 import org
.junit
.After
;
16 import org
.junit
.Before
;
17 import org
.junit
.Test
;
18 import org
.junit
.runner
.RunWith
;
19 import org
.junit
.runners
.Parameterized
;
21 import voldemort
.ClusterTestUtils
;
22 import voldemort
.ServerTestUtils
;
23 import voldemort
.TestUtils
;
24 import voldemort
.client
.ClientConfig
;
25 import voldemort
.client
.SocketStoreClientFactory
;
26 import voldemort
.cluster
.Cluster
;
27 import voldemort
.cluster
.Node
;
28 import voldemort
.server
.VoldemortConfig
;
29 import voldemort
.server
.VoldemortServer
;
30 import voldemort
.store
.InsufficientOperationalNodesException
;
31 import voldemort
.store
.Store
;
32 import voldemort
.store
.StoreDefinition
;
33 import voldemort
.store
.socket
.SocketStoreFactory
;
34 import voldemort
.store
.socket
.clientrequest
.ClientRequestExecutorPool
;
35 import voldemort
.utils
.ByteArray
;
36 import voldemort
.versioning
.VectorClock
;
37 import voldemort
.versioning
.Versioned
;
39 @RunWith(Parameterized
.class)
40 public class ZoneAffinityGetTest
{
42 private Store
<String
, String
, byte[]> client
;
43 private Map
<Integer
, VoldemortServer
> vservers
= new HashMap
<Integer
, VoldemortServer
>();
44 private Cluster cluster
;
45 private final Integer clientZoneId
;
47 public ZoneAffinityGetTest(Integer clientZoneId
) {
48 this.clientZoneId
= clientZoneId
;
51 @Parameterized.Parameters
52 public static Collection
<Object
[]> configs() {
53 return Arrays
.asList(new Object
[][] { { 0 }, { 1 }, { 2 } });
57 public void setup() throws IOException
{
58 byte[] bytes1
= { (byte) 'A', (byte) 'B' };
59 byte[] bytes2
= { (byte) 'C', (byte) 'D' };
60 List
<StoreDefinition
> stores
= ClusterTestUtils
.getZZZ322StoreDefs("memory");
61 StoreDefinition storeDef
= stores
.get(0);
62 cluster
= ClusterTestUtils
.getZZZCluster();
63 ClientConfig clientConfig
= new ClientConfig();
64 clientConfig
.setBootstrapUrls(cluster
.getNodeById(0).getSocketUrl().toString());
65 clientConfig
.getZoneAffinity().setEnableGetOpZoneAffinity(true);
66 clientConfig
.setClientZoneId(clientZoneId
);
67 SocketStoreClientFactory socketStoreClientFactory
= new SocketStoreClientFactory(clientConfig
);
68 for(Integer nodeId
: cluster
.getNodeIds()) {
69 SocketStoreFactory socketStoreFactory
= new ClientRequestExecutorPool(2,
73 VoldemortConfig config
= ServerTestUtils
.createServerConfigWithDefs(true,
75 TestUtils
.createTempDir()
80 VoldemortServer vs
= ServerTestUtils
.startVoldemortServer(socketStoreFactory
,
83 vservers
.put(nodeId
, vs
);
84 Store
<ByteArray
, byte[], byte[]> store
= vs
.getStoreRepository()
85 .getLocalStore(storeDef
.getName());
86 Node node
= cluster
.getNodeById(nodeId
);
88 VectorClock version1
= new VectorClock();
89 version1
.incrementVersion(0, System
.currentTimeMillis());
90 VectorClock version2
= version1
.incremented(0, System
.currentTimeMillis());
92 if(node
.getZoneId() == clientZoneId
) {
93 store
.put(new ByteArray(bytes1
), new Versioned
<byte[]>(bytes1
, version1
), null);
95 store
.put(new ByteArray(bytes1
), new Versioned
<byte[]>(bytes2
, version2
), null);
99 client
= socketStoreClientFactory
.getRawStore(storeDef
.getName(), null);
103 public void tearDown() {
104 for(VoldemortServer vs
: this.vservers
.values()) {
110 public void testAllUp() {
112 List
<Versioned
<String
>> versioneds
= client
.get("AB", null);
113 assertEquals(versioneds
.get(0).getValue(), "AB");
114 } catch(InsufficientOperationalNodesException e
) {
115 fail("Failed with exception: " + e
);
120 public void testLocalZoneDown() {
121 for(Integer nodeId
: cluster
.getNodeIdsInZone(clientZoneId
)) {
122 this.vservers
.get(nodeId
).stop();
125 client
.get("AB", null);
126 fail("Did not fail fast");
127 } catch(InsufficientOperationalNodesException e
) {
133 public void testLocalZonePartialDownSufficientReads() {
134 // turn off one node in same zone as client so that reads can still
136 this.vservers
.get(cluster
.getNodeIdsInZone(clientZoneId
).iterator().next()).stop();
138 client
.get("AB", null);
139 } catch(InsufficientOperationalNodesException e
) {
140 fail("Failed with exception: " + e
);
145 public void testLocalZonePartialDownInSufficientReads() {
146 // Stop all but one node in same zone as client. This is not sufficient
148 Set
<Integer
> nodeIds
= cluster
.getNodeIdsInZone(clientZoneId
);
149 nodeIds
.remove(nodeIds
.iterator().next());
150 for(Integer nodeId
: nodeIds
) {
151 this.vservers
.get(nodeId
).stop();
154 client
.get("AB", null);
155 fail("Did not fail fast");
156 } catch(InsufficientOperationalNodesException e
) {