2 * Copyright 2013 LinkedIn, Inc
4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5 * use this file except in compliance with the License. You may obtain a copy of
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations under
16 package voldemort
.tools
;
18 import joptsimple
.OptionParser
;
19 import joptsimple
.OptionSet
;
20 import org
.apache
.commons
.codec
.DecoderException
;
21 import voldemort
.VoldemortException
;
22 import voldemort
.cluster
.Cluster
;
23 import voldemort
.cluster
.Node
;
24 import voldemort
.routing
.RoutingStrategy
;
25 import voldemort
.routing
.RoutingStrategyFactory
;
26 import voldemort
.server
.VoldemortConfig
;
27 import voldemort
.store
.StorageEngine
;
28 import voldemort
.store
.StoreDefinition
;
29 import voldemort
.store
.bdb
.BdbStorageConfiguration
;
30 import voldemort
.utils
.ByteArray
;
31 import voldemort
.utils
.ByteUtils
;
32 import voldemort
.utils
.Pair
;
33 import voldemort
.versioning
.ObsoleteVersionException
;
34 import voldemort
.versioning
.VectorClock
;
35 import voldemort
.versioning
.Version
;
36 import voldemort
.versioning
.Versioned
;
37 import voldemort
.xml
.ClusterMapper
;
38 import voldemort
.xml
.StoreDefinitionsMapper
;
42 import java
.util
.concurrent
.Callable
;
44 public class ImportTextDumpToBDB
{
45 static Integer READER_BUFFER_SIZE
= 16777216;
47 public static OptionParser
getParser() {
48 OptionParser parser
= new OptionParser();
49 parser
.acceptsAll(Arrays
.asList("input"), "Input file or containing folder of data dump")
52 .describedAs("input-file-or-folder");
53 parser
.acceptsAll(Arrays
.asList("bdb"), "BDB folder store folder (not master)")
56 .describedAs("bdb-folder");
57 parser
.acceptsAll(Arrays
.asList("stores-xml"), "Location of stores xml")
60 .describedAs("stores-xml");
61 parser
.acceptsAll(Arrays
.asList("cluster-xml"), "Location of cluster xml")
64 .describedAs("cluster-xml");
65 parser
.acceptsAll(Arrays
.asList("node-id"), "Current node id")
67 .ofType(Integer
.class)
68 .describedAs("node-id");
73 public static void validateOptions(OptionSet options
) throws IOException
{
74 Integer exitStatus
= null;
75 if(options
.has("help")) {
77 System
.out
.println("This programs loads a data dump to a Voldemort Store.");
78 System
.out
.println("Supported data dump should be a file or a folder containing files with lines of data.");
79 System
.out
.println("Each line should be in the format of the following (all in Hex Decimal format)");
80 System
.out
.println("\n\tKEY_BINARY VECTOR_CLOCK_BINARY VALUE_BINARY\n");
82 else if(!options
.has("input")) {
83 System
.err
.println("Option \"input\" is required");
86 else if(!new File((String
)options
.valueOf("input")).isDirectory()) {
87 System
.err
.println("Not a directory: " + options
.valueOf("input") );
90 else if(!options
.has("bdb")) {
91 System
.err
.println("Option \"bdb\" is required");
94 else if(!options
.has("stores-xml")) {
95 System
.err
.println("Option \"stores-xml\" is required");
98 else if(!new File((String
)options
.valueOf("stores-xml")).isFile()) {
99 System
.err
.println("Not a file: " + options
.valueOf("stores-xml") );
102 else if(!options
.has("cluster-xml")) {
103 System
.err
.println("Option \"cluster-xml\" is required");
106 else if(!new File((String
)options
.valueOf("cluster-xml")).isFile()) {
107 System
.err
.println("Not a file: " + options
.valueOf("cluster-xml") );
110 if(exitStatus
!= null) {
112 getParser().printHelpOn(System
.out
);
114 getParser().printHelpOn(System
.err
);
115 System
.exit(exitStatus
);
119 public static void main(String
[] argv
) throws Exception
{
120 OptionParser parser
= getParser();
121 OptionSet options
= parser
.parse(argv
);
122 validateOptions(options
);
124 String inputPath
= (String
) options
.valueOf("input");
125 String storeBdbFolderPath
= (String
) options
.valueOf("bdb");
126 String clusterXmlPath
= (String
) options
.valueOf("cluster-xml");
127 String storesXmlPath
= (String
) options
.valueOf("stores-xml");
128 Integer nodeId
= (Integer
) options
.valueOf("node-id");
129 File input
= new File(inputPath
);
130 List
<File
> dataFiles
= new ArrayList
<File
>();
131 if(input
.isDirectory()) {
132 File
[] files
= input
.listFiles();
134 Collections
.addAll(dataFiles
, files
);
135 } else if(input
.isFile()) {
136 dataFiles
.add(input
);
138 System
.err
.println(inputPath
+ "is not file or directory");
140 File storeBdbFolder
= new File(storeBdbFolderPath
);
141 final String storeName
= storeBdbFolder
.getName();
143 Cluster cluster
= new ClusterMapper().readCluster(new File(clusterXmlPath
));
144 List
<StoreDefinition
> storeDefs
= new StoreDefinitionsMapper().readStoreList(new File(storesXmlPath
));
145 StoreDefinition storeDef
= null;
146 for(StoreDefinition sd
: storeDefs
) {
147 if(sd
.getName() != null && sd
.getName().equals(storeName
)) {
151 if(storeDef
== null) {
152 throw new VoldemortException("StoreNotfound: " + storeName
);
154 RoutingStrategy routingStrategy
= new RoutingStrategyFactory().updateRoutingStrategy(storeDef
, cluster
);
156 Properties properties
= new Properties();
157 properties
.put("node.id","0");
158 properties
.put("voldemort.home",storeBdbFolder
.getParent());
159 VoldemortConfig voldemortConfig
= new VoldemortConfig(properties
);
160 voldemortConfig
.setBdbDataDirectory(storeBdbFolder
.getParent());
161 voldemortConfig
.setEnableJmx(false);
162 voldemortConfig
.setBdbOneEnvPerStore(true);
163 BdbStorageConfiguration bdbConfiguration
= new BdbStorageConfiguration(voldemortConfig
);
164 class MockStoreDefinition
extends StoreDefinition
{
165 public MockStoreDefinition() {
166 super(storeName
,null,null,null,null,null,null,null,0,null,0,null,0,null,null,null,null,null,null,null,null,null,null,null,null,0);
169 public boolean hasMemoryFootprint() {
173 StoreDefinition mockStoreDef
= new MockStoreDefinition();
174 StorageEngine
<ByteArray
, byte[], byte[]> engine
= bdbConfiguration
.getStore(mockStoreDef
, routingStrategy
);
175 long reportIntervalMs
= 10000L;
177 Reporter
<Boolean
> rp
= new Reporter
<Boolean
>(reportIntervalMs
);
180 for(File f
: dataFiles
) {
182 BufferedReader bufferedReader
= new BufferedReader(new FileReader(f
), READER_BUFFER_SIZE
);
183 engine
.beginBatchModifications();
185 String line
= bufferedReader
.readLine();
189 Pair
<ByteArray
, Versioned
<byte[]>> entry
;
191 entry
= lineToEntry(line
);
192 } catch (Exception e
) {
193 System
.err
.println("Skipping line: " + line
);
197 ByteArray key
= entry
.getFirst();
198 List
<Node
> nodeList
= routingStrategy
.routeRequest(key
.get());
199 for(Node node
: nodeList
) {
200 if(nodeId
== node
.getId()) {
202 engine
.put(key
, entry
.getSecond(), null);
203 } catch(ObsoleteVersionException e
) {
210 final Long countObject
= count
;
211 Boolean reported
= rp
.tryReport(new Callable
<Boolean
>() {
213 public Boolean
call() throws Exception
{
214 System
.out
.print(String
.format("Imported %15d entries", countObject
));
218 if(reported
!= null) {
219 System
.out
.println(String
.format("; Speed: %8d/s", (count
- lastCount
)/ (reportIntervalMs
/ 1000)));
223 bufferedReader
.close();
224 } catch (IOException e
) {
227 engine
.endBatchModifications();
232 System
.out
.println(String
.format("Finished exporting %d entries", count
));
235 static private class Reporter
<T
> {
238 Reporter(long intervalMs
) {
239 this.intervalMs
= intervalMs
;
242 public T
tryReport(Callable
<T
> callable
) throws Exception
{
243 if(lastReport
== null) {
244 lastReport
= System
.currentTimeMillis();
247 if(lastReport
+ intervalMs
< System
.currentTimeMillis()) {
248 T result
= callable
.call();
249 lastReport
= System
.currentTimeMillis();
257 public static Pair
<ByteArray
, Versioned
<byte[]>> lineToEntry(String line
) throws DecoderException
{
258 String
[] components
= line
.split(" ");
260 String keyBytesString
= components
[0];
261 byte[] keyBytes
= ByteUtils
.fromHexString(keyBytesString
);
262 ByteArray key
= new ByteArray(keyBytes
);
264 String versionBytesString
= components
[1];
265 byte[] versionBytes
= ByteUtils
.fromHexString(versionBytesString
);
266 Version version
= new VectorClock(versionBytes
, 0);
268 String valueBytesString
= components
[1];
269 byte[] value
= ByteUtils
.fromHexString(valueBytesString
);
271 return new Pair
<ByteArray
, Versioned
<byte[]>>(key
, new Versioned
<byte[]>(value
, version
));