2 * Copyright 2013 LinkedIn, Inc
4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5 * use this file except in compliance with the License. You may obtain a copy of
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations under
17 package voldemort
.server
.storage
;
19 import static org
.junit
.Assert
.assertEquals
;
22 import java
.io
.StringReader
;
23 import java
.util
.ArrayList
;
24 import java
.util
.Arrays
;
25 import java
.util
.HashMap
;
26 import java
.util
.List
;
28 import java
.util
.Map
.Entry
;
29 import java
.util
.Properties
;
31 import org
.apache
.commons
.io
.FileUtils
;
32 import org
.junit
.After
;
33 import org
.junit
.Test
;
35 import voldemort
.MockTime
;
36 import voldemort
.ServerTestUtils
;
37 import voldemort
.TestUtils
;
38 import voldemort
.VoldemortTestConstants
;
39 import voldemort
.client
.ClientConfig
;
40 import voldemort
.client
.protocol
.RequestFormatType
;
41 import voldemort
.client
.protocol
.admin
.AdminClient
;
42 import voldemort
.client
.protocol
.admin
.AdminClientConfig
;
43 import voldemort
.cluster
.Cluster
;
44 import voldemort
.cluster
.Node
;
45 import voldemort
.common
.service
.SchedulerService
;
46 import voldemort
.routing
.BaseStoreRoutingPlan
;
47 import voldemort
.server
.RequestRoutingType
;
48 import voldemort
.server
.StoreRepository
;
49 import voldemort
.server
.VoldemortConfig
;
50 import voldemort
.server
.VoldemortServer
;
51 import voldemort
.store
.Store
;
52 import voldemort
.store
.StoreDefinition
;
53 import voldemort
.store
.metadata
.MetadataStore
;
54 import voldemort
.store
.socket
.SocketStoreFactory
;
55 import voldemort
.store
.socket
.clientrequest
.ClientRequestExecutorPool
;
56 import voldemort
.utils
.ByteArray
;
57 import voldemort
.utils
.ByteUtils
;
58 import voldemort
.versioning
.Versioned
;
59 import voldemort
.xml
.StoreDefinitionsMapper
;
62 * This class tests the repair job tool. The basic workflow is as follows : 1.
63 * Start a 9 node cluster with a single322 storedef. 2. Generate 1024 random
64 * <key,value> pairs and do puts of all these 1024 on all the servers. At this
65 * time every server will have all the data. 3. Run the repair job on all 9
66 * nodes. 4. Now, for each key check that the repair job deleted that key from
67 * nodes that do not belong to the preferenceList for that key. 5. Verify that
68 * the key exists on nodes that belong to preferenceList.
71 public class RepairJobTest
{
73 private Cluster cluster
;
74 private StoreRepository storeRepository
;
75 private StorageService storage
;
76 private SchedulerService scheduler
;
77 private List
<StoreDefinition
> storeDefs
;
78 private MetadataStore metadataStore
;
79 private SocketStoreFactory socketStoreFactory
;
80 private Map
<Integer
, Store
<ByteArray
, byte[], byte[]>> storeMap
;
81 Map
<Integer
, VoldemortServer
> serverMap
;
85 File temp
= TestUtils
.createTempDir();
86 VoldemortConfig config
= new VoldemortConfig(0, temp
.getAbsolutePath());
87 new File(config
.getMetadataDirectory()).mkdir();
88 this.serverMap
= new HashMap
<Integer
, VoldemortServer
>();
89 this.scheduler
= new SchedulerService(1, new MockTime());
90 this.cluster
= VoldemortTestConstants
.getNineNodeCluster();
91 StoreDefinitionsMapper mapper
= new StoreDefinitionsMapper();
92 this.storeDefs
= mapper
.readStoreList(new StringReader((VoldemortTestConstants
.getSingleStore322Xml())));
93 this.storeRepository
= new StoreRepository();
94 this.metadataStore
= ServerTestUtils
.createMetadataStore(cluster
, storeDefs
);
95 storage
= new StorageService(storeRepository
, metadataStore
, scheduler
, config
);
96 // Start the storage service
98 this.socketStoreFactory
= new ClientRequestExecutorPool(2, 10000, 100000, 32 * 1024);
99 String storeDefsString
= mapper
.writeStoreList(storeDefs
);
102 file
= File
.createTempFile("single-store-", ".xml");
103 FileUtils
.writeStringToFile(file
, storeDefsString
);
104 String storeDefFile
= file
.getAbsolutePath();
105 List
<Integer
> nodesToStart
= Arrays
.asList(0, 1, 2, 3, 4, 5, 6, 7, 8);
107 startServers(cluster
, storeDefFile
, nodesToStart
, null);
108 } catch(Exception e
) {
113 private Cluster
startServers(Cluster cluster
,
115 List
<Integer
> nodeToStart
,
116 Map
<String
, String
> configProps
) throws Exception
{
117 for(int node
: nodeToStart
) {
118 Properties properties
= new Properties();
119 if(null != configProps
) {
120 for(Entry
<String
, String
> property
: configProps
.entrySet()) {
121 properties
.put(property
.getKey(), property
.getValue());
124 VoldemortConfig config
= ServerTestUtils
.createServerConfig(true,
126 TestUtils
.createTempDir()
131 VoldemortServer server
= ServerTestUtils
.startVoldemortServer(socketStoreFactory
,
134 serverMap
.put(node
, server
);
140 public void tearDown() throws Exception
{
141 for(VoldemortServer vs
: serverMap
.values()) {
146 private Store
<ByteArray
, byte[], byte[]> getSocketStore(String storeName
, String host
, int port
) {
147 return socketStoreFactory
.create(storeName
,
150 RequestFormatType
.PROTOCOL_BUFFERS
,
151 RequestRoutingType
.IGNORE_CHECKS
);
154 private Map
<Integer
, Store
<ByteArray
, byte[], byte[]>> createSocketStore(StoreDefinition storeDef
) {
155 storeMap
= new HashMap
<Integer
, Store
<ByteArray
, byte[], byte[]>>();
156 for(Node node
: cluster
.getNodes()) {
157 storeMap
.put(node
.getId(),
158 getSocketStore(storeDef
.getName(), node
.getHost(), node
.getSocketPort()));
163 private HashMap
<String
, String
> populateData(HashMap
<String
, String
> testEntries
) {
164 for(Entry
<String
, String
> entry
: testEntries
.entrySet()) {
165 ByteArray keyBytes
= new ByteArray(ByteUtils
.getBytes(entry
.getKey(), "UTF-8"));
166 List
<Integer
> allNodes
= Arrays
.asList(0, 1, 2, 3, 4, 5, 6, 7, 8);
167 for(int nodeId
: allNodes
) {
171 new Versioned
<byte[]>(ByteUtils
.getBytes(entry
.getValue(), "UTF-8")),
173 } catch(Exception e
) {
174 // Don't do anything with the exception. Exception are
175 // expected here as we are
176 // putting all keys on all nodes.
180 for(Store
<ByteArray
, byte[], byte[]> store
: storeMap
.values()) {
187 public void testRepairJob() {
190 // Create socket store
191 storeMap
= createSocketStore(storeDefs
.get(0));
192 // Generate random data, populate cluster with it.
193 HashMap
<String
, String
> testEntries
= ServerTestUtils
.createRandomKeyValueString(128);
194 populateData(testEntries
);
196 // create admin client and run repair on all nodes
197 AdminClient admin
= new AdminClient(cluster
, new AdminClientConfig(), new ClientConfig());
198 for(int i
= 0; i
< 9; i
++) {
199 admin
.storeMntOps
.repairJob(i
);
202 // wait for the repair to complete
203 for(int i
= 0; i
< 9; i
++) {
204 ServerTestUtils
.waitForAsyncOperationOnServer(serverMap
.get(i
), "Repair", 5000);
206 BaseStoreRoutingPlan storeInstance
= new BaseStoreRoutingPlan(cluster
, storeDefs
.get(0));
207 for(Entry
<String
, String
> entry
: testEntries
.entrySet()) {
208 ByteArray keyBytes
= new ByteArray(ByteUtils
.getBytes(entry
.getKey(), "UTF-8"));
209 List
<Integer
> preferenceNodes
= storeInstance
.getReplicationNodeList(keyBytes
.get());
210 List
<Integer
> allNodes
= new ArrayList
<Integer
>(Arrays
.asList(0, 1, 2, 3, 4, 5, 6, 7, 8));
212 // Repair job should have deleted the keys on the nodes that
213 // shouldn't have been
214 // hosting the key. Go over all these remaining nodes to make sure
216 allNodes
.removeAll(preferenceNodes
);
217 for(int nodeId
: allNodes
) {
219 List
<Versioned
<byte[]>> retVal
= storeMap
.get(nodeId
).get(keyBytes
, null);
220 assertEquals("Repair did not run properly as it left the key it should have"
221 + " deleted", retVal
.isEmpty(), true);
222 } catch(Exception e
) {
223 // We expect a bunch of invalidmetadata exceptions as we are
225 // that doesn't belong to the nodes. Hence leaving the catch
229 // The repair job should not have deleted the keys from nodes on the
231 for(int nodeId
: preferenceNodes
) {
233 List
<Versioned
<byte[]>> retVal
= storeMap
.get(nodeId
).get(keyBytes
, null);
234 assertEquals("Repair job has deleted keys that it should not have",
237 } catch(Exception e
) {
242 for(Store
<ByteArray
, byte[], byte[]> store
: storeMap
.values()) {