1 package voldemort
.store
.routed
;
3 import static org
.junit
.Assert
.fail
;
5 import java
.io
.IOException
;
6 import java
.util
.Arrays
;
7 import java
.util
.Collection
;
8 import java
.util
.HashMap
;
11 import java
.util
.Properties
;
14 import org
.junit
.After
;
15 import org
.junit
.Before
;
16 import org
.junit
.Test
;
17 import org
.junit
.runner
.RunWith
;
18 import org
.junit
.runners
.Parameterized
;
20 import voldemort
.ClusterTestUtils
;
21 import voldemort
.ServerTestUtils
;
22 import voldemort
.TestUtils
;
23 import voldemort
.client
.ClientConfig
;
24 import voldemort
.client
.SocketStoreClientFactory
;
25 import voldemort
.cluster
.Cluster
;
26 import voldemort
.cluster
.Node
;
27 import voldemort
.server
.VoldemortConfig
;
28 import voldemort
.server
.VoldemortServer
;
29 import voldemort
.store
.InsufficientOperationalNodesException
;
30 import voldemort
.store
.Store
;
31 import voldemort
.store
.StoreDefinition
;
32 import voldemort
.store
.socket
.SocketStoreFactory
;
33 import voldemort
.store
.socket
.clientrequest
.ClientRequestExecutorPool
;
34 import voldemort
.utils
.ByteArray
;
35 import voldemort
.versioning
.VectorClock
;
36 import voldemort
.versioning
.Versioned
;
38 @RunWith(Parameterized
.class)
39 public class ZoneAffinityGetVersionsTest
{
41 private Store
<String
, String
, byte[]> client
;
42 private Map
<Integer
, VoldemortServer
> vservers
= new HashMap
<Integer
, VoldemortServer
>();
43 private Map
<Integer
, SocketStoreFactory
> socketStoreFactories
= new HashMap
<Integer
, SocketStoreFactory
>();
44 private Cluster cluster
;
45 private final Integer clientZoneId
;
47 public ZoneAffinityGetVersionsTest(Integer clientZoneId
) {
48 this.clientZoneId
= clientZoneId
;
51 @Parameterized.Parameters
52 public static Collection
<Object
[]> configs() {
53 return Arrays
.asList(new Object
[][] { { 0 }, { 1 }, { 2 } });
57 public void setup() throws IOException
{
58 byte[] bytes1
= { (byte) 'A', (byte) 'B' };
59 byte[] bytes2
= { (byte) 'C', (byte) 'D' };
60 List
<StoreDefinition
> stores
= ClusterTestUtils
.getZZZ322StoreDefs("memory");
61 StoreDefinition storeDef
= stores
.get(0);
62 cluster
= ClusterTestUtils
.getZZZCluster();
63 ClientConfig clientConfig
= new ClientConfig();
64 clientConfig
.setBootstrapUrls(cluster
.getNodeById(0).getSocketUrl().toString());
65 clientConfig
.getZoneAffinity().setEnableGetVersionsOpZoneAffinity(true);
66 clientConfig
.setClientZoneId(clientZoneId
);
67 SocketStoreClientFactory socketStoreClientFactory
= new SocketStoreClientFactory(clientConfig
);
68 for(Integer nodeId
: cluster
.getNodeIds()) {
69 SocketStoreFactory socketStoreFactory
= new ClientRequestExecutorPool(2,
73 VoldemortConfig config
= ServerTestUtils
.createServerConfigWithDefs(true,
75 TestUtils
.createTempDir()
80 VoldemortServer vs
= ServerTestUtils
.startVoldemortServer(socketStoreFactory
,
83 vservers
.put(nodeId
, vs
);
84 socketStoreFactories
.put(nodeId
, socketStoreFactory
);
87 for(Integer nodeId
: cluster
.getNodeIds()) {
88 VoldemortServer vs
= vservers
.get(nodeId
);
89 Store
<ByteArray
, byte[], byte[]> store
= vs
.getStoreRepository()
90 .getLocalStore(storeDef
.getName());
91 Node node
= cluster
.getNodeById(nodeId
);
93 VectorClock version1
= new VectorClock();
94 version1
.incrementVersion(0, System
.currentTimeMillis());
95 VectorClock version2
= version1
.incremented(0, System
.currentTimeMillis());
97 if(node
.getZoneId() == clientZoneId
) {
98 store
.put(new ByteArray(bytes1
), new Versioned
<byte[]>(bytes1
, version1
), null);
100 store
.put(new ByteArray(bytes1
), new Versioned
<byte[]>(bytes2
, version2
), null);
104 client
= socketStoreClientFactory
.getRawStore(storeDef
.getName(), null);
108 public void tearDown() {
110 for(VoldemortServer vs
: this.vservers
.values()) {
113 for(SocketStoreFactory ssf
: this.socketStoreFactories
.values()) {
116 ClusterTestUtils
.reset();
120 public void testAllUp() {
122 client
.getVersions("AB");
123 } catch(InsufficientOperationalNodesException e
) {
124 fail("Failed with exception: " + e
);
129 public void testLocalZoneDown() {
130 for(Integer nodeId
: cluster
.getNodeIdsInZone(clientZoneId
)) {
131 this.vservers
.get(nodeId
).stop();
134 client
.getVersions("AB");
135 fail("Did not fail fast");
136 } catch(InsufficientOperationalNodesException e
) {
142 public void testLocalZonePartialDownSufficientReads() {
143 // turn off one node in same zone as client so that reads can still
145 this.vservers
.get(cluster
.getNodeIdsInZone(clientZoneId
).iterator().next()).stop();
147 client
.getVersions("AB");
148 } catch(InsufficientOperationalNodesException e
) {
149 fail("Failed with exception: " + e
);
154 public void testLocalZonePartialDownInSufficientReads() {
155 // Stop all but one node in same zone as client. This is not sufficient
157 Set
<Integer
> nodeIds
= cluster
.getNodeIdsInZone(clientZoneId
);
158 nodeIds
.remove(nodeIds
.iterator().next());
159 for(Integer nodeId
: nodeIds
) {
160 this.vservers
.get(nodeId
).stop();
163 client
.getVersions("AB");
164 fail("Did not fail fast");
165 } catch(InsufficientOperationalNodesException e
) {