1 package voldemort
.store
.routed
;
3 import static org
.junit
.Assert
.assertEquals
;
4 import static org
.junit
.Assert
.fail
;
6 import java
.io
.IOException
;
7 import java
.util
.Arrays
;
8 import java
.util
.Collection
;
9 import java
.util
.HashMap
;
10 import java
.util
.List
;
12 import java
.util
.Properties
;
15 import org
.junit
.After
;
16 import org
.junit
.Before
;
17 import org
.junit
.Test
;
18 import org
.junit
.runner
.RunWith
;
19 import org
.junit
.runners
.Parameterized
;
21 import voldemort
.ClusterTestUtils
;
22 import voldemort
.ServerTestUtils
;
23 import voldemort
.TestUtils
;
24 import voldemort
.client
.ClientConfig
;
25 import voldemort
.client
.SocketStoreClientFactory
;
26 import voldemort
.cluster
.Cluster
;
27 import voldemort
.cluster
.Node
;
28 import voldemort
.server
.VoldemortConfig
;
29 import voldemort
.server
.VoldemortServer
;
30 import voldemort
.store
.InsufficientOperationalNodesException
;
31 import voldemort
.store
.Store
;
32 import voldemort
.store
.StoreDefinition
;
33 import voldemort
.store
.socket
.SocketStoreFactory
;
34 import voldemort
.store
.socket
.clientrequest
.ClientRequestExecutorPool
;
35 import voldemort
.utils
.ByteArray
;
36 import voldemort
.versioning
.VectorClock
;
37 import voldemort
.versioning
.Versioned
;
39 @RunWith(Parameterized
.class)
40 public class ZoneAffinityGetTest
{
42 private Store
<String
, String
, byte[]> client
;
43 private Map
<Integer
, VoldemortServer
> vservers
= new HashMap
<Integer
, VoldemortServer
>();
44 private Map
<Integer
, SocketStoreFactory
> socketStoreFactories
= new HashMap
<Integer
, SocketStoreFactory
>();
46 private Cluster cluster
;
47 private final Integer clientZoneId
;
49 public ZoneAffinityGetTest(Integer clientZoneId
) {
50 this.clientZoneId
= clientZoneId
;
53 @Parameterized.Parameters
54 public static Collection
<Object
[]> configs() {
55 return Arrays
.asList(new Object
[][] { { 0 }, { 1 }, { 2 } });
59 public void setup() throws IOException
{
60 byte[] bytes1
= { (byte) 'A', (byte) 'B' };
61 byte[] bytes2
= { (byte) 'C', (byte) 'D' };
62 List
<StoreDefinition
> stores
= ClusterTestUtils
.getZZZ322StoreDefs("memory");
63 StoreDefinition storeDef
= stores
.get(0);
64 cluster
= ClusterTestUtils
.getZZZCluster();
65 ClientConfig clientConfig
= new ClientConfig();
66 clientConfig
.setBootstrapUrls(cluster
.getNodeById(0).getSocketUrl().toString());
67 clientConfig
.getZoneAffinity().setEnableGetOpZoneAffinity(true);
68 clientConfig
.setClientZoneId(clientZoneId
);
69 SocketStoreClientFactory socketStoreClientFactory
= new SocketStoreClientFactory(clientConfig
);
70 for(Integer nodeId
: cluster
.getNodeIds()) {
71 SocketStoreFactory socketStoreFactory
= new ClientRequestExecutorPool(2,
75 VoldemortConfig config
= ServerTestUtils
.createServerConfigWithDefs(true,
77 TestUtils
.createTempDir()
82 VoldemortServer vs
= ServerTestUtils
.startVoldemortServer(socketStoreFactory
,
85 vservers
.put(nodeId
, vs
);
86 socketStoreFactories
.put(nodeId
, socketStoreFactory
);
87 Store
<ByteArray
, byte[], byte[]> store
= vs
.getStoreRepository()
88 .getLocalStore(storeDef
.getName());
89 Node node
= cluster
.getNodeById(nodeId
);
91 VectorClock version1
= new VectorClock();
92 version1
.incrementVersion(0, System
.currentTimeMillis());
93 VectorClock version2
= version1
.incremented(0, System
.currentTimeMillis());
95 if(node
.getZoneId() == clientZoneId
) {
96 store
.put(new ByteArray(bytes1
), new Versioned
<byte[]>(bytes1
, version1
), null);
98 store
.put(new ByteArray(bytes1
), new Versioned
<byte[]>(bytes2
, version2
), null);
102 client
= socketStoreClientFactory
.getRawStore(storeDef
.getName(), null);
106 public void tearDown() {
108 for(VoldemortServer vs
: this.vservers
.values()) {
111 for(SocketStoreFactory ssf
: this.socketStoreFactories
.values()) {
114 ClusterTestUtils
.reset();
118 public void testAllUp() {
120 List
<Versioned
<String
>> versioneds
= client
.get("AB", null);
121 assertEquals(versioneds
.get(0).getValue(), "AB");
122 } catch(InsufficientOperationalNodesException e
) {
123 fail("Failed with exception: " + e
);
128 public void testLocalZoneDown() {
129 for(Integer nodeId
: cluster
.getNodeIdsInZone(clientZoneId
)) {
130 this.vservers
.get(nodeId
).stop();
133 client
.get("AB", null);
134 fail("Did not fail fast");
135 } catch(InsufficientOperationalNodesException e
) {
141 public void testLocalZonePartialDownSufficientReads() {
142 // turn off one node in same zone as client so that reads can still
144 this.vservers
.get(cluster
.getNodeIdsInZone(clientZoneId
).iterator().next()).stop();
146 client
.get("AB", null);
147 } catch(InsufficientOperationalNodesException e
) {
148 fail("Failed with exception: " + e
);
153 public void testLocalZonePartialDownInSufficientReads() {
154 // Stop all but one node in same zone as client. This is not sufficient
156 Set
<Integer
> nodeIds
= cluster
.getNodeIdsInZone(clientZoneId
);
157 nodeIds
.remove(nodeIds
.iterator().next());
158 for(Integer nodeId
: nodeIds
) {
159 this.vservers
.get(nodeId
).stop();
162 client
.get("AB", null);
163 fail("Did not fail fast");
164 } catch(InsufficientOperationalNodesException e
) {