1 package voldemort
.client
.protocol
.admin
;
3 import static org
.junit
.Assert
.assertEquals
;
6 import java
.io
.IOException
;
7 import java
.util
.ArrayList
;
8 import java
.util
.Arrays
;
9 import java
.util
.Collection
;
10 import java
.util
.HashMap
;
11 import java
.util
.Iterator
;
12 import java
.util
.List
;
13 import java
.util
.Properties
;
14 import java
.util
.concurrent
.Callable
;
16 import org
.apache
.commons
.io
.FileUtils
;
17 import org
.junit
.After
;
18 import org
.junit
.Assert
;
19 import org
.junit
.Before
;
20 import org
.junit
.Test
;
21 import org
.junit
.runner
.RunWith
;
22 import org
.junit
.runners
.Parameterized
;
24 import voldemort
.ClusterTestUtils
;
25 import voldemort
.ServerTestUtils
;
26 import voldemort
.TestUtils
;
27 import voldemort
.client
.RoutingTier
;
28 import voldemort
.cluster
.Cluster
;
29 import voldemort
.cluster
.Node
;
30 import voldemort
.routing
.RoutingStrategy
;
31 import voldemort
.routing
.RoutingStrategyFactory
;
32 import voldemort
.routing
.RoutingStrategyType
;
33 import voldemort
.serialization
.DefaultSerializerFactory
;
34 import voldemort
.serialization
.Serializer
;
35 import voldemort
.serialization
.SerializerDefinition
;
36 import voldemort
.serialization
.SerializerFactory
;
37 import voldemort
.server
.VoldemortServer
;
38 import voldemort
.store
.StoreDefinition
;
39 import voldemort
.store
.StoreDefinitionBuilder
;
40 import voldemort
.store
.bdb
.BdbStorageConfiguration
;
41 import voldemort
.store
.compress
.CompressionStrategy
;
42 import voldemort
.store
.compress
.CompressionStrategyFactory
;
43 import voldemort
.store
.slop
.strategy
.HintedHandoffStrategyType
;
44 import voldemort
.store
.socket
.SocketStoreFactory
;
45 import voldemort
.store
.socket
.TestSocketStoreFactory
;
46 import voldemort
.utils
.ByteArray
;
47 import voldemort
.utils
.Props
;
48 import voldemort
.versioning
.Versioned
;
49 import voldemort
.xml
.StoreDefinitionsMapper
;
51 import com
.google
.common
.collect
.Lists
;
54 * Starts a streaming session and inserts some keys Using fetchKeys we check if
55 * they keys made it to the responsible node
56 * Three scenarios are tested :
57 * (i) Non zoned cluster with contiguous node ids
58 * (ii) Non zoned cluster with non contiguous node ids
59 * (iii) Zoned cluster with contiguous zone/node ids
62 @RunWith(Parameterized
.class)
63 public class StreamingClientTest
{
65 private long startTime
;
66 public final String SERVER_LOCAL_URL
= "tcp://localhost:";
67 public final static String TEST_STORE_NAME
= "test-store-streaming-1";
68 public final String STORES_XML_FILE
= "test/common/voldemort/config/stores.xml";
69 public int numServers
;
70 private int NUM_KEYS_1
= 4000;
71 private SocketStoreFactory socketStoreFactory
= new TestSocketStoreFactory();
72 private VoldemortServer
[] servers
= null;
73 private int[] serverPorts
= null;
74 private Cluster cluster
;
75 private AdminClient adminClient
;
76 private SerializerFactory serializerFactory
= new DefaultSerializerFactory();
77 private StoreDefinition storeDef
;
78 private int nodeIdOnWhichToVerifyKey
;
80 public StreamingClientTest(int numServers
,
82 int nodeIdOnWhichToVerifyKey
,
83 StoreDefinition storeDef
) {
84 this.numServers
= numServers
;
85 this.cluster
= cluster
;
86 this.nodeIdOnWhichToVerifyKey
= nodeIdOnWhichToVerifyKey
;
87 this.storeDef
= storeDef
;
90 @Parameterized.Parameters
91 public static Collection
<Object
[]> configs() {
92 StoreDefinition storeDefConsistestStrategy
= new StoreDefinitionBuilder().setName(TEST_STORE_NAME
)
93 .setType(BdbStorageConfiguration
.TYPE_NAME
)
94 .setKeySerializer(new SerializerDefinition("string"))
95 .setValueSerializer(new SerializerDefinition("string"))
96 .setRoutingPolicy(RoutingTier
.CLIENT
)
97 .setRoutingStrategyType(RoutingStrategyType
.CONSISTENT_STRATEGY
)
98 .setReplicationFactor(2)
101 .setPreferredWrites(2)
102 .setRequiredWrites(2)
105 HashMap
<Integer
, Integer
> zoneReplicationFactor
= new HashMap
<Integer
, Integer
>();
106 zoneReplicationFactor
.put(1, 2);
107 zoneReplicationFactor
.put(3, 2);
108 StoreDefinition storeDefZoneStrategy
= new StoreDefinitionBuilder().setName(TEST_STORE_NAME
)
109 .setType(BdbStorageConfiguration
.TYPE_NAME
)
110 .setKeySerializer(new SerializerDefinition("string"))
111 .setValueSerializer(new SerializerDefinition("string"))
112 .setRoutingPolicy(RoutingTier
.CLIENT
)
113 .setRoutingStrategyType(RoutingStrategyType
.ZONE_STRATEGY
)
114 .setReplicationFactor(4)
115 .setPreferredReads(1)
117 .setPreferredWrites(1)
118 .setRequiredWrites(1)
119 .setZoneCountReads(0)
120 .setZoneCountWrites(0)
121 .setZoneReplicationFactor(zoneReplicationFactor
)
122 .setHintedHandoffStrategy(HintedHandoffStrategyType
.PROXIMITY_STRATEGY
)
125 return Arrays
.asList(new Object
[][] {
128 ServerTestUtils
.getLocalCluster(2, new int[][] { { 0, 1, 2, 3 },
131 storeDefConsistestStrategy
135 ServerTestUtils
.getLocalNonContiguousNodesCluster(new int[] { 1, 3 },
139 1, storeDefConsistestStrategy
142 ClusterTestUtils
.getZ1Z3ClusterWithNonContiguousNodeIds(),
150 public void testSetup() throws IOException
{
152 if(null == servers
) {
153 servers
= new VoldemortServer
[numServers
];
154 serverPorts
= new int[numServers
];
156 File tempStoreXml
= new File(TestUtils
.createTempDir(), "stores.xml");
158 FileUtils
.writeStringToFile(tempStoreXml
,
159 new StoreDefinitionsMapper().writeStoreList(Lists
.newArrayList(storeDef
)));
160 } catch(IOException e1
) {
161 // TODO Auto-generated catch block
162 e1
.printStackTrace();
167 for(int nodeId
: cluster
.getNodeIds()) {
169 servers
[count
] = ServerTestUtils
.startVoldemortServer(socketStoreFactory
,
170 ServerTestUtils
.createServerConfig(true,
172 TestUtils
.createTempDir()
175 tempStoreXml
.getAbsolutePath(),
178 } catch(IOException e
) {
182 serverPorts
[count
] = servers
[count
].getIdentityNode().getSocketPort();
185 adminClient
= ServerTestUtils
.getAdminClient(cluster
);
188 startTime
= System
.currentTimeMillis();
193 public void testCleanup() {
194 // Teardown for data used by the unit tests
195 if(servers
!= null) {
196 for(VoldemortServer server
: servers
) {
203 public void testStreaming() {
205 Props property
= new Props();
206 property
.put("streaming.platform.bootstrapURL", SERVER_LOCAL_URL
+ serverPorts
[0]);
207 StreamingClientConfig config
= new StreamingClientConfig(property
);
209 BaseStreamingClient streamer
= new BaseStreamingClient(config
);
211 streamer
.initStreamingSession(TEST_STORE_NAME
, new Callable
<Object
>() {
214 public Object
call() throws Exception
{
218 }, new Callable
<Object
>() {
221 public Object
call() throws Exception
{
227 for(int i
= 0; i
< NUM_KEYS_1
; i
++) {
231 Versioned
<byte[]> outputValue
= Versioned
.value(value
.getBytes());
232 // adminClient.streamingPut(new ByteArray(key.getBytes()),
234 streamer
.streamingPut(new ByteArray(key
.getBytes()), outputValue
);
236 streamer
.commitToVoldemort();
237 streamer
.closeStreamingSession();
238 assertEquals(verifyKeysExist(nodeIdOnWhichToVerifyKey
), true);
244 * Checks if the streamingClient stays calm and not throw NPE when calling
245 * commit before it has been initialized
248 public void testUnInitializedClientPreventNPE() {
250 Props property
= new Props();
251 property
.put("streaming.platform.bootstrapURL", SERVER_LOCAL_URL
+ serverPorts
[0]);
252 StreamingClientConfig config
= new StreamingClientConfig(property
);
253 BaseStreamingClient streamer
= new BaseStreamingClient(config
);
255 streamer
.commitToVoldemort();
256 } catch (NullPointerException e
) {
258 Assert
.fail("Should not throw NPE at this stage even though streamingSession not initialized");
265 * Checks if each node has the keys it is reponsible for returns false
268 public boolean verifyKeysExist(int nodeIdToVerifyOn
) {
269 RoutingStrategyFactory factory
= new RoutingStrategyFactory();
270 RoutingStrategy storeRoutingStrategy
= factory
.updateRoutingStrategy(storeDef
,
271 adminClient
.getAdminClientCluster());
273 HashMap
<Integer
, ArrayList
<String
>> expectedNodeIdToKeys
;
274 expectedNodeIdToKeys
= new HashMap();
275 Collection
<Node
> nodesInCluster
= adminClient
.getAdminClientCluster().getNodes();
276 for(Node node
: nodesInCluster
) {
277 ArrayList
<String
> keysForNode
= new ArrayList();
278 expectedNodeIdToKeys
.put(node
.getId(), keysForNode
);
280 for(int i
= 0; i
< NUM_KEYS_1
; i
++) {
283 List
<Node
> nodeList
= storeRoutingStrategy
.routeRequest(key
.getBytes());
284 for(Node node
: nodeList
) {
285 ArrayList
<String
> keysForNode
= expectedNodeIdToKeys
.get(node
.getId());
286 keysForNode
.add(key
);
290 ArrayList
<String
> fetchedKeysForNode
= new ArrayList();
291 for(Node node
: nodesInCluster
) {
293 List
<Integer
> partitionIdList
= Lists
.newArrayList();
294 partitionIdList
.addAll(node
.getPartitionIds());
296 Iterator
<ByteArray
> keyIteratorRef
= null;
297 keyIteratorRef
= adminClient
.bulkFetchOps
.fetchKeys(node
.getId(),
303 final SerializerDefinition serializerDef
= storeDef
.getKeySerializer();
304 final SerializerFactory serializerFactory
= new DefaultSerializerFactory();
305 @SuppressWarnings("unchecked")
306 final Serializer
<Object
> serializer
= (Serializer
<Object
>) serializerFactory
.getSerializer(serializerDef
);
308 final CompressionStrategy keysCompressionStrategy
;
309 if(serializerDef
!= null && serializerDef
.hasCompression()) {
310 keysCompressionStrategy
= new CompressionStrategyFactory().get(serializerDef
.getCompression());
312 keysCompressionStrategy
= null;
314 final Iterator
<ByteArray
> keyIterator
= keyIteratorRef
;
315 while(keyIterator
.hasNext()) {
317 byte[] keyBytes
= keyIterator
.next().get();
319 Object keyObject
= serializer
.toObject((null == keysCompressionStrategy
) ? keyBytes
320 : keysCompressionStrategy
.inflate(keyBytes
));
321 fetchedKeysForNode
.add((String
) keyObject
);
323 } catch(IOException e
) {
331 ArrayList
<String
> keysForNode
= expectedNodeIdToKeys
.get(nodeIdToVerifyOn
);
332 if(!fetchedKeysForNode
.containsAll(keysForNode
))