From a33b280fc8068284ea1f7bcc3a6b47d5db65db80 Mon Sep 17 00:00:00 2001 From: Zhongjie Wu Date: Wed, 5 Sep 2012 17:03:31 -0700 Subject: [PATCH] Reuses --stores option and added multiple store query support. Fixed bug that prevents queryKeys, fetchEntries, fetchKeys to print results beyond the first store --- src/java/voldemort/VoldemortAdminTool.java | 206 ++++++++++++++++------------- 1 file changed, 116 insertions(+), 90 deletions(-) diff --git a/src/java/voldemort/VoldemortAdminTool.java b/src/java/voldemort/VoldemortAdminTool.java index 8e7c23dd2..080d2f3e3 100644 --- a/src/java/voldemort/VoldemortAdminTool.java +++ b/src/java/voldemort/VoldemortAdminTool.java @@ -27,6 +27,7 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.FileWriter; +import java.io.FilterOutputStream; import java.io.IOException; import java.io.OutputStream; import java.io.OutputStreamWriter; @@ -512,14 +513,11 @@ public class VoldemortAdminTool { adminClient.reserveMemory(nodeId, storeNames, reserveMB); } if(ops.contains("q")) { - List keyList = null; - String storeName = (String) options.valueOf("store"); - if(storeName == null) { - throw new VoldemortException("Must specify store name using --store option (NOT --stores)"); + List keyList = (List) options.valuesOf("query-keys"); + if(storeNames == null || storeNames.size() == 0) { + throw new VoldemortException("Must specify store name using --stores option"); } - if(options.hasArgument("query-keys")) - keyList = (List) options.valuesOf("query-keys"); - executeQueryKeys(nodeId, adminClient, storeName, keyList); + executeQueryKeys(nodeId, adminClient, storeNames, keyList); } } catch(Exception e) { e.printStackTrace(); @@ -1322,8 +1320,18 @@ public class VoldemortAdminTool { } private static void writeBinary(File outputFile, Printable printable) throws IOException { - OutputStream outputStream = (outputFile == null) ? System.out - : (new FileOutputStream(outputFile)); + OutputStream outputStream = null; + if(outputFile == null) { + outputStream = new FilterOutputStream(System.out) { + + @Override + public void close() throws IOException { + flush(); + } + }; + } else { + outputStream = new FileOutputStream(outputFile); + } DataOutputStream dataOutputStream = new DataOutputStream(new BufferedOutputStream(outputStream)); try { printable.printTo(dataOutputStream); @@ -1333,8 +1341,18 @@ public class VoldemortAdminTool { } private static void writeAscii(File outputFile, Writable writable) throws IOException { - Writer writer = (outputFile == null) ? (new OutputStreamWriter(System.out)) - : (new FileWriter(outputFile)); + Writer writer = null; + if(outputFile == null) { + writer = new OutputStreamWriter(new FilterOutputStream(System.out) { + + @Override + public void close() throws IOException { + flush(); + } + }); + } else { + writer = new FileWriter(outputFile); + } BufferedWriter bufferedWriter = new BufferedWriter(writer); try { writable.writeTo(bufferedWriter); @@ -1364,98 +1382,106 @@ public class VoldemortAdminTool { } } - private static void executeQueryKeys(Integer nodeId, + private static void executeQueryKeys(final Integer nodeId, AdminClient adminClient, - String storeName, + List storeNames, List keys) throws IOException { Serializer serializer = new StringSerializer(); List listKeys = new ArrayList(); for(String key: keys) { listKeys.add(new ByteArray(serializer.toBytes(key))); } - final Iterator>, Exception>>> iterator = adminClient.queryKeys(nodeId.intValue(), - storeName, - listKeys.iterator()); - List storeDefinitionList = adminClient.getRemoteStoreDefList(nodeId) - .getValue(); - StoreDefinition storeDefinition = null; - for(StoreDefinition storeDef: storeDefinitionList) { - if(storeDef.getName().equals(storeName)) - storeDefinition = storeDef; - } - - // k-v serializer - SerializerDefinition keySerializerDef = storeDefinition.getKeySerializer(); - SerializerDefinition valueSerializerDef = storeDefinition.getValueSerializer(); - SerializerFactory serializerFactory = new DefaultSerializerFactory(); - @SuppressWarnings("unchecked") - final Serializer keySerializer = (Serializer) serializerFactory.getSerializer(keySerializerDef); - @SuppressWarnings("unchecked") - final Serializer valueSerializer = (Serializer) serializerFactory.getSerializer(valueSerializerDef); - - // compression strategy - final CompressionStrategy keyCompressionStrategy; - final CompressionStrategy valueCompressionStrategy; - if(keySerializerDef != null && keySerializerDef.hasCompression()) { - keyCompressionStrategy = new CompressionStrategyFactory().get(keySerializerDef.getCompression()); - } else { - keyCompressionStrategy = null; - } - if(valueSerializerDef != null && valueSerializerDef.hasCompression()) { - valueCompressionStrategy = new CompressionStrategyFactory().get(valueSerializerDef.getCompression()); - } else { - valueCompressionStrategy = null; - } - - // write to stdout - writeAscii(null, new Writable() { + for(final String storeName: storeNames) { + final Iterator>, Exception>>> iterator = adminClient.queryKeys(nodeId.intValue(), + storeName, + listKeys.iterator()); + List storeDefinitionList = adminClient.getRemoteStoreDefList(nodeId) + .getValue(); + StoreDefinition storeDefinition = null; + for(StoreDefinition storeDef: storeDefinitionList) { + if(storeDef.getName().equals(storeName)) + storeDefinition = storeDef; + } - @Override - public void writeTo(BufferedWriter out) throws IOException { - final StringWriter stringWriter = new StringWriter(); - final JsonGenerator generator = new JsonFactory(new ObjectMapper()).createJsonGenerator(stringWriter); - - while(iterator.hasNext()) { - Pair>, Exception>> kvPair = iterator.next(); - // unserialize and write key - byte[] keyBytes = kvPair.getFirst().get(); - Object keyObject = keySerializer.toObject((null == keyCompressionStrategy) ? keyBytes - : keyCompressionStrategy.inflate(keyBytes)); - generator.writeObject(keyObject); - - // iterate through, unserialize and write values - List> values = kvPair.getSecond().getFirst(); - if(values != null) { - for(Versioned versioned: values) { - VectorClock version = (VectorClock) versioned.getVersion(); - byte[] valueBytes = versioned.getValue(); - Object valueObject = valueSerializer.toObject((null == valueCompressionStrategy) ? valueBytes - : valueCompressionStrategy.inflate(valueBytes)); + // k-v serializer + SerializerDefinition keySerializerDef = storeDefinition.getKeySerializer(); + SerializerDefinition valueSerializerDef = storeDefinition.getValueSerializer(); + SerializerFactory serializerFactory = new DefaultSerializerFactory(); + @SuppressWarnings("unchecked") + final Serializer keySerializer = (Serializer) serializerFactory.getSerializer(keySerializerDef); + @SuppressWarnings("unchecked") + final Serializer valueSerializer = (Serializer) serializerFactory.getSerializer(valueSerializerDef); + + // compression strategy + final CompressionStrategy keyCompressionStrategy; + final CompressionStrategy valueCompressionStrategy; + if(keySerializerDef != null && keySerializerDef.hasCompression()) { + keyCompressionStrategy = new CompressionStrategyFactory().get(keySerializerDef.getCompression()); + } else { + keyCompressionStrategy = null; + } + if(valueSerializerDef != null && valueSerializerDef.hasCompression()) { + valueCompressionStrategy = new CompressionStrategyFactory().get(valueSerializerDef.getCompression()); + } else { + valueCompressionStrategy = null; + } + // write to stdout + writeAscii(null, new Writable() { + + @Override + public void writeTo(BufferedWriter out) throws IOException { + final StringWriter stringWriter = new StringWriter(); + final JsonGenerator generator = new JsonFactory(new ObjectMapper()).createJsonGenerator(stringWriter); + stringWriter.write("Querying keys in node " + nodeId + " of " + storeName + + "\n"); + + while(iterator.hasNext()) { + Pair>, Exception>> kvPair = iterator.next(); + // unserialize and write key + byte[] keyBytes = kvPair.getFirst().get(); + Object keyObject = keySerializer.toObject((null == keyCompressionStrategy) ? keyBytes + : keyCompressionStrategy.inflate(keyBytes)); + generator.writeObject(keyObject); + + // iterate through, unserialize and write values + List> values = kvPair.getSecond().getFirst(); + if(values != null) { + if(values.size() == 0) { + stringWriter.write(", null"); + } + for(Versioned versioned: values) { + VectorClock version = (VectorClock) versioned.getVersion(); + byte[] valueBytes = versioned.getValue(); + Object valueObject = valueSerializer.toObject((null == valueCompressionStrategy) ? valueBytes + : valueCompressionStrategy.inflate(valueBytes)); + + stringWriter.write(", "); + stringWriter.write(version.toString()); + stringWriter.write('['); + stringWriter.write(new Date(version.getTimestamp()).toString()); + stringWriter.write(']'); + generator.writeObject(valueObject); + } + } else { + stringWriter.write(", null"); + } + // write out exception + if(kvPair.getSecond().getSecond() != null) { stringWriter.write(", "); - stringWriter.write(version.toString()); - stringWriter.write('['); - stringWriter.write(new Date(version.getTimestamp()).toString()); - stringWriter.write(']'); - generator.writeObject(valueObject); - + stringWriter.write(kvPair.getSecond().getSecond().toString()); } - } - // write out exception - if(kvPair.getSecond().getSecond() != null) { - stringWriter.write(", "); - stringWriter.write(kvPair.getSecond().getSecond().toString()); - } - StringBuffer buf = stringWriter.getBuffer(); - if(buf.charAt(0) == ' ') { - buf.setCharAt(0, '\n'); + StringBuffer buf = stringWriter.getBuffer(); + if(buf.charAt(0) == ' ') { + buf.setCharAt(0, '\n'); + } + out.write(buf.toString()); + buf.setLength(0); } - out.write(buf.toString()); - buf.setLength(0); + out.write('\n'); } - out.write('\n'); - } - }); + }); + } } } -- 2.11.4.GIT