Improved BnP logging.
[voldemort.git] / contrib / hadoop-store-builder / src / java / voldemort / store / readonly / mr / azkaban / VoldemortBuildAndPushJob.java
blob5154f0d4ec7829e8bcccdb9123dcc5d6ceb5a28d
1 /*
2 * Copyright 2008-2013 LinkedIn, Inc
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5 * use this file except in compliance with the License. You may obtain a copy of
6 * the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations under
14 * the License.
17 package voldemort.store.readonly.mr.azkaban;
19 import azkaban.jobExecutor.AbstractJob;
20 import com.google.common.collect.ImmutableList;
21 import com.google.common.collect.Lists;
22 import com.google.common.collect.Maps;
23 import com.google.protobuf.UninitializedMessageException;
24 import org.apache.avro.Schema;
25 import org.apache.commons.lang.Validate;
26 import org.apache.hadoop.fs.Path;
27 import org.apache.hadoop.mapred.JobConf;
28 import org.apache.log4j.Logger;
29 import voldemort.VoldemortException;
30 import voldemort.client.ClientConfig;
31 import voldemort.client.protocol.admin.AdminClient;
32 import voldemort.client.protocol.admin.AdminClientConfig;
33 import voldemort.client.protocol.pb.VAdminProto;
34 import voldemort.cluster.Cluster;
35 import voldemort.cluster.Node;
36 import voldemort.serialization.DefaultSerializerFactory;
37 import voldemort.serialization.SerializerDefinition;
38 import voldemort.serialization.json.JsonTypeDefinition;
39 import voldemort.server.VoldemortConfig;
40 import voldemort.store.StoreDefinition;
41 import voldemort.store.readonly.checksum.CheckSum;
42 import voldemort.store.readonly.checksum.CheckSum.CheckSumType;
43 import voldemort.store.readonly.disk.KeyValueWriter;
44 import voldemort.store.readonly.hooks.BuildAndPushHook;
45 import voldemort.store.readonly.hooks.BuildAndPushStatus;
46 import voldemort.store.readonly.mr.azkaban.VoldemortStoreBuilderJob.VoldemortStoreBuilderConf;
47 import voldemort.store.readonly.mr.utils.AvroUtils;
48 import voldemort.store.readonly.mr.utils.HadoopUtils;
49 import voldemort.store.readonly.mr.utils.JsonSchema;
50 import voldemort.store.readonly.mr.utils.VoldemortUtils;
51 import voldemort.store.readonly.swapper.DeleteAllFailedFetchStrategy;
52 import voldemort.store.readonly.swapper.DisableStoreOnFailedNodeFailedFetchStrategy;
53 import voldemort.store.readonly.swapper.FailedFetchLock;
54 import voldemort.store.readonly.swapper.FailedFetchStrategy;
55 import voldemort.store.readonly.swapper.RecoverableFailedFetchException;
56 import voldemort.utils.Props;
57 import voldemort.utils.ReflectUtils;
58 import voldemort.utils.Utils;
60 import java.io.Closeable;
61 import java.io.IOException;
62 import java.net.URI;
63 import java.text.DateFormat;
64 import java.text.SimpleDateFormat;
65 import java.util.ArrayList;
66 import java.util.Date;
67 import java.util.HashMap;
68 import java.util.HashSet;
69 import java.util.List;
70 import java.util.Locale;
71 import java.util.Map;
72 import java.util.Random;
73 import java.util.Set;
74 import java.util.concurrent.Callable;
75 import java.util.concurrent.ExecutorService;
76 import java.util.concurrent.Executors;
77 import java.util.concurrent.Future;
79 public class VoldemortBuildAndPushJob extends AbstractJob {
81 private final Logger log;
83 // CONFIG NAME CONSTANTS
85 // build.required
86 public final static String BUILD_INPUT_PATH = "build.input.path";
87 public final static String BUILD_OUTPUT_DIR = "build.output.dir";
88 // build.optional
89 public final static String BUILD_TEMP_DIR = "build.temp.dir";
90 public final static String BUILD_REPLICATION_FACTOR = "build.replication.factor";
91 public final static String BUILD_COMPRESS_VALUE = "build.compress.value";
92 public final static String BUILD_CHUNK_SIZE = "build.chunk.size";
93 public final static String BUILD_OUTPUT_KEEP = "build.output.keep";
94 public final static String BUILD_TYPE_AVRO = "build.type.avro";
95 public final static String BUILD_REQUIRED_READS = "build.required.reads";
96 public final static String BUILD_REQUIRED_WRITES = "build.required.writes";
97 public final static String BUILD_FORCE_SCHEMA_KEY = "build.force.schema.key";
98 public final static String BUILD_FORCE_SCHEMA_VALUE = "build.force.schema.value";
99 public final static String BUILD_PREFERRED_READS = "build.preferred.reads";
100 public final static String BUILD_PREFERRED_WRITES = "build.preferred.writes";
101 // push.required
102 public final static String PUSH_STORE_NAME = "push.store.name";
103 public final static String PUSH_CLUSTER = "push.cluster";
104 public final static String PUSH_STORE_OWNERS = "push.store.owners";
105 public final static String PUSH_STORE_DESCRIPTION = "push.store.description";
106 // push.optional
107 public final static String PUSH_HTTP_TIMEOUT_SECONDS = "push.http.timeout.seconds";
108 public final static String PUSH_NODE = "push.node";
109 public final static String PUSH_VERSION = "push.version";
110 public final static String PUSH_VERSION_TIMESTAMP = "push.version.timestamp";
111 public final static String PUSH_BACKOFF_DELAY_SECONDS = "push.backoff.delay.seconds";
112 public final static String PUSH_ROLLBACK = "push.rollback";
113 public final static String PUSH_FORCE_SCHEMA_KEY = "push.force.schema.key";
114 public final static String PUSH_FORCE_SCHEMA_VALUE = "push.force.schema.value";
115 // others.optional
116 public final static String KEY_SELECTION = "key.selection";
117 public final static String VALUE_SELECTION = "value.selection";
118 public final static String NUM_CHUNKS = "num.chunks";
119 public final static String BUILD = "build";
120 public final static String PUSH = "push";
121 public final static String VOLDEMORT_FETCHER_PROTOCOL = "voldemort.fetcher.protocol";
122 public final static String VOLDEMORT_FETCHER_PORT = "voldemort.fetcher.port";
123 public final static String AVRO_SERIALIZER_VERSIONED = "avro.serializer.versioned";
124 public final static String AVRO_KEY_FIELD = "avro.key.field";
125 public final static String AVRO_VALUE_FIELD = "avro.value.field";
126 public final static String HADOOP_JOB_UGI = "hadoop.job.ugi";
127 public final static String REDUCER_PER_BUCKET = "reducer.per.bucket";
128 public final static String CHECKSUM_TYPE = "checksum.type";
129 public final static String SAVE_KEYS = "save.keys";
130 public final static String HEARTBEAT_HOOK_INTERVAL_MS = "heartbeat.hook.interval.ms";
131 public final static String HOOKS = "hooks";
132 public final static String MIN_NUMBER_OF_RECORDS = "min.number.of.records";
133 public final static String REDUCER_OUTPUT_COMPRESS_CODEC = "reducer.output.compress.codec";
134 public final static String REDUCER_OUTPUT_COMPRESS = "reducer.output.compress";
136 // CONFIG VALUES (and other immutable state)
137 private final Props props;
138 private final String storeName;
139 private final List<String> clusterURLs;
140 private final Map<String, AdminClient> adminClientPerCluster;
141 private final int nodeId;
142 private final List<String> dataDirs;
143 private final boolean isAvroJob;
144 private final String keyFieldName;
145 private final String valueFieldName;
146 private final boolean isAvroVersioned;
147 private final long minNumberOfRecords;
148 private final String hdfsFetcherPort;
149 private final String hdfsFetcherProtocol;
150 private final String jsonKeyField;
151 private final String jsonValueField;
152 private final Set<BuildAndPushHook> hooks = new HashSet<BuildAndPushHook>();
153 private final int heartBeatHookIntervalTime;
154 private final HeartBeatHookRunnable heartBeatHookRunnable;
155 private final boolean pushHighAvailability;
156 private final List<Closeable> closeables = Lists.newArrayList();
157 private final ExecutorService executorService;
159 // Mutable state
160 private List<StoreDefinition> storeDefs;
161 private Path sanitizedInputPath = null;
162 private Schema inputPathSchema = null;
164 public VoldemortBuildAndPushJob(String name, azkaban.utils.Props azkabanProps) {
165 super(name, Logger.getLogger(name));
166 this.log = getLog();
167 log.info("Job props.toString(): " + azkabanProps.toString());
169 this.props = new Props(azkabanProps.toProperties());
170 this.storeName = props.getString(PUSH_STORE_NAME).trim();
171 this.clusterURLs = new ArrayList<String>();
172 this.dataDirs = new ArrayList<String>();
173 this.adminClientPerCluster = Maps.newHashMap();
175 String clusterUrlText = props.getString(PUSH_CLUSTER);
176 for(String url: Utils.COMMA_SEP.split(clusterUrlText.trim())) {
177 if(url.trim().length() > 0) {
178 this.clusterURLs.add(url);
179 AdminClient adminClient = new AdminClient(url, new AdminClientConfig(), new ClientConfig());
180 this.adminClientPerCluster.put(url, adminClient);
181 this.closeables.add(adminClient);
185 int numberOfClusters = this.clusterURLs.size();
187 if (numberOfClusters <= 0) {
188 throw new RuntimeException("Number of URLs should be at least 1");
191 // Support multiple output dirs if the user mentions only PUSH, no BUILD.
192 // If user mentions both then should have only one
193 String dataDirText = props.getString(BUILD_OUTPUT_DIR);
194 for(String dataDir: Utils.COMMA_SEP.split(dataDirText.trim()))
195 if(dataDir.trim().length() > 0)
196 this.dataDirs.add(dataDir);
198 if(this.dataDirs.size() <= 0)
199 throw new RuntimeException("Number of data dirs should be at least 1");
201 this.nodeId = props.getInt(PUSH_NODE, 0);
203 this.hdfsFetcherProtocol = props.getString(VOLDEMORT_FETCHER_PROTOCOL, "hftp");
204 this.hdfsFetcherPort = props.getString(VOLDEMORT_FETCHER_PORT, "50070");
206 log.info(VOLDEMORT_FETCHER_PROTOCOL + " is set to : " + hdfsFetcherProtocol);
207 log.info(VOLDEMORT_FETCHER_PORT + " is set to : " + hdfsFetcherPort);
209 // Serialization configs
211 isAvroJob = props.getBoolean(BUILD_TYPE_AVRO, false);
212 // Set default to false, this ensures existing clients who are not aware of
213 // the new serializer type don't bail out
214 this.isAvroVersioned = props.getBoolean(AVRO_SERIALIZER_VERSIONED, false);
215 this.keyFieldName = props.getString(AVRO_KEY_FIELD, null);
216 this.valueFieldName = props.getString(AVRO_VALUE_FIELD, null);
217 if(this.isAvroJob) {
218 if(this.keyFieldName == null)
219 throw new RuntimeException("The key field must be specified in the properties for the Avro build and push job!");
220 if(this.valueFieldName == null)
221 throw new RuntimeException("The value field must be specified in the properties for the Avro build and push job!");
223 this.jsonKeyField = props.getString(KEY_SELECTION, null);
224 this.jsonValueField = props.getString(VALUE_SELECTION, null);
226 // Other configs
228 this.minNumberOfRecords = props.getLong(MIN_NUMBER_OF_RECORDS, 1);
230 // By default, Push HA will be enabled if the server says so.
231 // If the job sets Push HA to false, then it will be disabled, no matter what the server asks for.
232 this.pushHighAvailability = props.getBoolean(VoldemortConfig.PUSH_HA_ENABLED, true);
234 // Initializing hooks
235 this.heartBeatHookIntervalTime = props.getInt(HEARTBEAT_HOOK_INTERVAL_MS, 60000);
236 this.heartBeatHookRunnable = new HeartBeatHookRunnable(heartBeatHookIntervalTime);
237 String hookNamesText = this.props.getString(HOOKS, null);
238 if (hookNamesText != null && !hookNamesText.isEmpty()) {
239 for (String hookName : Utils.COMMA_SEP.split(hookNamesText.trim())) {
240 try {
241 BuildAndPushHook hook = (BuildAndPushHook) ReflectUtils.callConstructor(Class.forName(hookName));
242 try {
243 hook.init(props);
244 log.info("Initialized BuildAndPushHook [" + hook.getName() + "]");
245 this.hooks.add(hook);
246 } catch (Exception e) {
247 log.warn("Failed to initialize BuildAndPushHook [" + hook.getName() + "]. It will not be invoked.", e);
249 } catch (ClassNotFoundException e) {
250 log.error("The requested BuildAndPushHook [" + hookName + "] was not found! Check your classpath and config!", e);
255 this.executorService = Executors.newFixedThreadPool(numberOfClusters);
257 log.info("Build and Push Job constructed for " + numberOfClusters + " cluster(s).");
260 private void invokeHooks(BuildAndPushStatus status) {
261 invokeHooks(status, null);
264 private void invokeHooks(BuildAndPushStatus status, String details) {
265 for (BuildAndPushHook hook : hooks) {
266 try {
267 hook.invoke(status, details);
268 } catch (Exception e) {
269 // Hooks are never allowed to fail a job...
270 log.warn("Failed to invoke BuildAndPushHook [" + hook.getName() + "] because of exception: ", e);
277 * Compare two clusters to see if they have the equal number of partitions,
278 * equal number of nodes and each node hosts the same partition ids.
280 * @param lhs Left hand side Cluster object
281 * @param rhs Right hand side cluster object
282 * @return True if the clusters are congruent (equal number of partitions,
283 * equal number of nodes and same partition ids
285 private boolean areTwoClustersEqual(final Cluster lhs, final Cluster rhs) {
286 if (lhs.getNumberOfPartitions() != rhs.getNumberOfPartitions())
287 return false;
288 if (!lhs.getNodeIds().equals(rhs.getNodeIds()))
289 return false;
290 for (Node lhsNode: lhs.getNodes()) {
291 Node rhsNode = rhs.getNodeById(lhsNode.getId());
292 if (!rhsNode.getPartitionIds().equals(lhsNode.getPartitionIds())) {
293 return false;
296 return true;
300 * Check if all cluster objects in the list are congruent.
302 * @param clusterUrls of cluster objects
303 * @return
306 private void allClustersEqual(final List<String> clusterUrls) {
307 Validate.notEmpty(clusterUrls, "Clusterurls cannot be null");
308 // If only one clusterUrl return immediately
309 if (clusterUrls.size() == 1)
310 return;
311 AdminClient adminClientLhs = adminClientPerCluster.get(clusterUrls.get(0));
312 Cluster clusterLhs = adminClientLhs.getAdminClientCluster();
313 for (int index = 1; index < clusterUrls.size(); index++) {
314 AdminClient adminClientRhs = adminClientPerCluster.get(clusterUrls.get(index));
315 Cluster clusterRhs = adminClientRhs.getAdminClientCluster();
316 if (!areTwoClustersEqual(clusterLhs, clusterRhs))
317 throw new VoldemortException("Cluster " + clusterLhs.getName()
318 + "is not the same as " + clusterRhs.getName());
322 private void checkForPreconditions(boolean build, boolean push) {
323 if (!build && !push) {
324 throw new RuntimeException(" Both build and push cannot be false ");
326 else if (build && push && dataDirs.size() != 1) {
327 // Should have only one data directory (which acts like the parent directory to all urls)
328 throw new RuntimeException(" Should have only one data directory ( which acts like root "
329 + " directory ) since they are auto-generated during build phase ");
330 } else if (!build && push && dataDirs.size() != clusterURLs.size()) {
331 // Since we are only pushing number of data directories should be equal to number of cluster urls
332 throw new RuntimeException(" Since we are only pushing, number of data directories"
333 + " ( comma separated ) should be equal to number of cluster"
334 + " urls ");
336 if ((!build && push) || (build && !push)) {
337 log.warn("DEPRECATED : Creating one build job and separate push job is a deprecated strategy. Instead create"
338 + " just one job with both build and push set as true and pass a list of cluster urls.");
342 private String getMatchingServerSupportedCompressionCodec(int nodeId) {
344 * Strict operational assumption made by this method:
346 * All servers have symmetrical settings.
348 * TODO Currently this method requests only one server in one of the
349 * clusters to check for Server supported compression codec. This could
350 * be a problem if we were to do rolling upgrade on RO servers AND still
351 * allow for Bnp jobs to progress.
353 * Fix: The ideal solution is to check all nodes in all colos to ensure
354 * all of them support same configs for compression.
356 * Currently this is okay since we anyway dont do rolling bounce and
357 * stop all Bnp jobs for any kind of maintenance.
360 log.info("Requesting block-level compression codec expected by Server");
362 List<String> supportedCodecs;
363 try{
364 supportedCodecs = adminClientPerCluster.get(clusterURLs.get(0))
365 .readonlyOps.getSupportedROStorageCompressionCodecs(nodeId);
366 } catch(Exception e) {
367 log.error("Exception thrown when requesting for supported block-level compression codecs. " +
368 "Server might be running in a older version. Exception: "
369 + e.getMessage());
370 // return here
371 return null;
374 String codecList = "[ ";
375 for(String str: supportedCodecs) {
376 codecList += str + " ";
378 codecList += "]";
379 log.info("Server responded with block-level compression codecs: " + codecList);
381 * TODO for now only checking if there is a match between the server
382 * supported codec and the one that we support. Later this method can be
383 * extended to add more compression types or pick the first type
384 * returned by the server.
387 for(String codecStr: supportedCodecs) {
388 if(codecStr.toUpperCase(Locale.ENGLISH).equals(KeyValueWriter.COMPRESSION_CODEC)) {
389 return codecStr;
392 return null; // no matching compression codec. defaults to uncompressed data.
395 private class StorePushTask implements Callable<Boolean> {
396 final Props props;
397 final String url;
398 final String buildOutputDir;
400 StorePushTask(Props props, String url, String buildOutputDir) {
401 this.props = props;
402 this.url = url;
403 this.buildOutputDir = buildOutputDir;
405 log.debug("StorePushTask constructed for URL: " + url);
408 public Boolean call() throws Exception {
409 log.info("StorePushTask.call() invoked for URL: " + url);
410 invokeHooks(BuildAndPushStatus.PUSHING, url);
411 try {
412 runPushStore(props, url, buildOutputDir);
413 } catch (RecoverableFailedFetchException e) {
414 log.warn("There was a problem with some of the fetches, " +
415 "but a swap was still able to go through for URL: " + url, e);
416 invokeHooks(BuildAndPushStatus.SWAPPED_WITH_FAILURES, url);
417 throw e;
418 } catch(Exception e) {
419 log.error("Exception during push for URL: " + url, e);
420 throw e;
422 invokeHooks(BuildAndPushStatus.SWAPPED, url);
423 log.info("StorePushTask.call() finished for URL: " + url);
424 return true;
429 @Override
430 public void run() throws Exception {
431 invokeHooks(BuildAndPushStatus.STARTING);
432 if (hooks.size() > 0) {
433 Thread t = new Thread(heartBeatHookRunnable);
434 t.setDaemon(true);
435 t.start();
438 try {
439 // These two options control the build and push phases of the job respectively.
440 boolean build = props.getBoolean(BUILD, true);
441 boolean push = props.getBoolean(PUSH, true);
443 checkForPreconditions(build, push);
445 try {
446 allClustersEqual(clusterURLs);
447 } catch(VoldemortException e) {
448 log.error("Exception during cluster equality check", e);
449 fail("Exception during cluster equality check: " + e.toString());
450 return;
453 String reducerOutputCompressionCodec = getMatchingServerSupportedCompressionCodec(nodeId);
454 if(reducerOutputCompressionCodec != null) {
455 log.info("Using block-level compression codec: " + reducerOutputCompressionCodec);
456 props.put(REDUCER_OUTPUT_COMPRESS, "true");
457 props.put(REDUCER_OUTPUT_COMPRESS_CODEC, reducerOutputCompressionCodec);
458 } else {
459 log.info("Using no block-level compression");
462 // Create a hashmap to capture exception per url
463 HashMap<String, Exception> exceptions = Maps.newHashMap();
464 String buildOutputDir = null;
465 Map<String, Future<Boolean>> tasks = Maps.newHashMap();
466 for (int index = 0; index < clusterURLs.size(); index++) {
467 String url = clusterURLs.get(index);
468 if (isAvroJob) {
469 // Verify the schema if the store exists or else add the new store
470 verifyOrAddStoreAvro(url, isAvroVersioned);
471 } else {
472 // Verify the schema if the store exists or else add the new store
473 verifyOrAddStore(url);
475 if (build) {
476 // If we are only building and not pushing then we want the build to
477 // happen on all three clusters || we are pushing and we want to build
478 // it to only once
479 if (!push || buildOutputDir == null) {
480 try {
481 invokeHooks(BuildAndPushStatus.BUILDING);
482 buildOutputDir = runBuildStore(props, url);
483 } catch(Exception e) {
484 log.error("Exception during build for URL: " + url, e);
485 exceptions.put(url, e);
489 if (push) {
490 log.info("Pushing to cluster URL: " + clusterURLs.get(index));
491 // If we are not building and just pushing then we want to get the built
492 // from the dataDirs, or else we will just the one that we built earlier
493 if (!build) {
494 buildOutputDir = dataDirs.get(index);
496 // If there was an exception during the build part the buildOutputDir might be null, check
497 // if that's the case, if yes then continue and don't even try pushing
498 if (buildOutputDir == null) {
499 continue;
501 tasks.put(url, executorService.submit(new StorePushTask(props, url, buildOutputDir)));
505 for (Map.Entry<String, Future<Boolean>> task: tasks.entrySet()) {
506 String url = task.getKey();
507 Boolean success = false;
508 try {
509 success = task.getValue().get();
510 } catch(Exception e) {
511 exceptions.put(url, e);
513 if (success) {
514 log.info("Successfully pushed to URL: " + url);
518 if(build && push && buildOutputDir != null
519 && !props.getBoolean(BUILD_OUTPUT_KEEP, false)) {
520 JobConf jobConf = new JobConf();
521 if(props.containsKey(HADOOP_JOB_UGI)) {
522 jobConf.set(HADOOP_JOB_UGI, props.getString(HADOOP_JOB_UGI));
524 log.info("Informing about delete start ..." + buildOutputDir);
525 HadoopUtils.deletePathIfExists(jobConf, buildOutputDir);
526 log.info("Deleted " + buildOutputDir);
529 if (exceptions.size() == 0) {
530 invokeHooks(BuildAndPushStatus.FINISHED);
531 cleanUp();
532 } else {
533 log.error("Got exceptions during Build and Push:");
534 for (Map.Entry<String, Exception> entry : exceptions.entrySet()) {
535 log.error("Exception for cluster: " + entry.getKey(), entry.getValue());
537 throw new VoldemortException("Got exceptions during Build and Push");
539 } catch (Exception e) {
540 fail(e.toString());
541 throw new VoldemortException("An exception occurred during Build and Push !!", e);
542 } catch (Throwable t) {
543 // This is for OOMs, StackOverflows and other uber nasties...
544 // We'll try to invoke hooks but all bets are off at this point :/
545 fail(t.toString());
546 // N.B.: Azkaban's AbstractJob#run throws Exception, not Throwable, so we can't rethrow directly...
547 throw new Exception("A non-Exception Throwable was caught! Bubbling it up as an Exception...", t);
551 @Override
552 public void cancel() throws java.lang.Exception {
553 log.info("VoldemortBuildAndPushJob.cancel() has been called!");
554 invokeHooks(BuildAndPushStatus.CANCELLED);
555 cleanUp();
558 private void fail(String details) {
559 invokeHooks(BuildAndPushStatus.FAILED, details);
560 cleanUp();
563 private void cleanUp() {
564 heartBeatHookRunnable.stop();
565 for (Closeable closeable: this.closeables) {
566 try {
567 log.info("Closing " + closeable.toString());
568 closeable.close();
569 } catch (Exception e) {
570 log.error("Got an error while trying to close " + closeable.toString(), e);
573 this.executorService.shutdownNow();
577 * Checks whether the store is already present in the cluster and verify its schema, otherwise add it
579 * @param url to check
580 * @return
583 private void verifyOrAddStore(String url) throws Exception {
584 // create new json store def with schema from the metadata in the input path
585 JsonSchema schema = HadoopUtils.getSchemaFromPath(getInputPath());
586 int replicationFactor = props.getInt(BUILD_REPLICATION_FACTOR, 2);
587 int requiredReads = props.getInt(BUILD_REQUIRED_READS, 1);
588 int requiredWrites = props.getInt(BUILD_REQUIRED_WRITES, 1);
589 String description = props.getString(PUSH_STORE_DESCRIPTION, "");
590 String owners = props.getString(PUSH_STORE_OWNERS, "");
591 String keySchema = "\n\t\t<type>json</type>\n\t\t<schema-info version=\"0\">"
592 + schema.getKeyType() + "</schema-info>\n\t";
594 if(jsonKeyField != null && jsonKeyField.length() > 0) {
595 keySchema = "\n\t\t<type>json</type>\n\t\t<schema-info version=\"0\">"
596 + schema.getKeyType().subtype(jsonKeyField) + "</schema-info>\n\t";
599 String valSchema = "\n\t\t<type>json</type>\n\t\t<schema-info version=\"0\">"
600 + schema.getValueType() + "</schema-info>\n\t";
602 if (jsonValueField != null && jsonValueField.length() > 0) {
603 valSchema = "\n\t\t<type>json</type>\n\t\t<schema-info version=\"0\">"
604 + schema.getValueType().subtype(jsonValueField) + "</schema-info>\n\t";
607 boolean hasCompression = false;
608 if(props.containsKey(BUILD_COMPRESS_VALUE)) {
609 hasCompression = true;
612 if(hasCompression) {
613 valSchema += "\t<compression><type>gzip</type></compression>\n\t";
616 if(props.containsKey(BUILD_FORCE_SCHEMA_KEY)) {
617 keySchema = props.get(BUILD_FORCE_SCHEMA_KEY);
620 if(props.containsKey(BUILD_FORCE_SCHEMA_VALUE)) {
621 valSchema = props.get(BUILD_FORCE_SCHEMA_VALUE);
624 String newStoreDefXml = VoldemortUtils.getStoreDefXml(storeName,
625 replicationFactor,
626 requiredReads,
627 requiredWrites,
628 props.containsKey(BUILD_PREFERRED_READS) ? props.getInt(BUILD_PREFERRED_READS)
629 : null,
630 props.containsKey(BUILD_PREFERRED_WRITES) ? props.getInt(BUILD_PREFERRED_WRITES)
631 : null,
632 (props.containsKey(PUSH_FORCE_SCHEMA_KEY)) ? props.getString(PUSH_FORCE_SCHEMA_KEY)
633 : keySchema,
634 (props.containsKey(PUSH_FORCE_SCHEMA_VALUE)) ? props.getString(PUSH_FORCE_SCHEMA_VALUE)
635 : valSchema,
636 description,
637 owners);
638 boolean foundStore = findAndVerify(url,
639 newStoreDefXml,
640 hasCompression,
641 replicationFactor,
642 requiredReads,
643 requiredWrites);
644 if (!foundStore) {
645 try {
646 StoreDefinition newStoreDef = VoldemortUtils.getStoreDef(newStoreDefXml);
647 addStore(description, owners, url, newStoreDef);
649 catch(RuntimeException e) {
650 log.error("Getting store definition from: " + url + " (node id " + this.nodeId + ")", e);
651 fail("Failed to add store");
652 throw new VoldemortException("Failed to add store", e);
655 // don't use newStoreDef because we want to ALWAYS use the JSON definition since the store
656 // builder assumes that you are using JsonTypeSerializer. This allows you to tweak your
657 // value/key store xml as you see fit, but still uses the json sequence file meta data
658 // to build the store.
659 storeDefs = ImmutableList.of(VoldemortUtils.getStoreDef(VoldemortUtils.getStoreDefXml(
660 storeName,
661 replicationFactor,
662 requiredReads,
663 requiredWrites,
664 props.containsKey(BUILD_PREFERRED_READS) ? props.getInt(BUILD_PREFERRED_READS) : null,
665 props.containsKey(BUILD_PREFERRED_WRITES) ? props.getInt(BUILD_PREFERRED_WRITES) : null,
666 keySchema,
667 valSchema)));
671 * Check if store exists and then verify the schema. Returns false if store doesn't exist
673 * @param url to check
674 * @param newStoreDefXml
675 * @param hasCompression
676 * @param replicationFactor
677 * @param requiredReads
678 * @param requiredWrites
679 * @return boolean value true means store exists, false otherwise
683 private boolean findAndVerify(String url,
684 String newStoreDefXml,
685 boolean hasCompression,
686 int replicationFactor,
687 int requiredReads,
688 int requiredWrites) {
689 log.info("Verifying store against cluster URL: " + url +
690 " (node id " + this.nodeId + ")\n" + newStoreDefXml.toString());
691 StoreDefinition newStoreDef = VoldemortUtils.getStoreDef(newStoreDefXml);
692 List<StoreDefinition> remoteStoreDefs =
693 adminClientPerCluster.get(url).metadataMgmtOps.getRemoteStoreDefList(this.nodeId).getValue();
694 boolean foundStore = false;
695 // go over all store defs and see if one has the same name as the store we're trying to build
696 for(StoreDefinition remoteStoreDef: remoteStoreDefs) {
697 if(remoteStoreDef.getName().equals(storeName)) {
698 // if the store already exists, but doesn't match what we want to push, we need to worry
699 if(!remoteStoreDef.equals(newStoreDef)) {
700 // it is possible that the stores actually DO match, but the json in the key/value
701 // serializers is out of order (eg {'a': 'int32', 'b': 'int32'} could have a/b reversed.
702 // This is just a reflection of the fact that voldemort json type defs use hashmaps that
703 // are unordered, and pig uses bags that are unordered as well. it's therefore unpredictable
704 // what order the keys will come out of pig. let's check to see if the key/value
705 // serializers are REALLY equal.
706 SerializerDefinition localKeySerializerDef = newStoreDef.getKeySerializer();
707 SerializerDefinition localValueSerializerDef = newStoreDef.getValueSerializer();
708 SerializerDefinition remoteKeySerializerDef = remoteStoreDef.getKeySerializer();
709 SerializerDefinition remoteValueSerializerDef = remoteStoreDef.getValueSerializer();
711 if(remoteKeySerializerDef.getName().equals("json")
712 && remoteValueSerializerDef.getName().equals("json")
713 && remoteKeySerializerDef.getAllSchemaInfoVersions().size() == 1
714 && remoteValueSerializerDef.getAllSchemaInfoVersions().size() == 1) {
715 JsonTypeDefinition remoteKeyDef = JsonTypeDefinition.fromJson(remoteKeySerializerDef.getCurrentSchemaInfo());
716 JsonTypeDefinition remoteValDef = JsonTypeDefinition.fromJson(remoteValueSerializerDef.getCurrentSchemaInfo());
717 JsonTypeDefinition localKeyDef = JsonTypeDefinition.fromJson(localKeySerializerDef.getCurrentSchemaInfo());
718 JsonTypeDefinition localValDef = JsonTypeDefinition.fromJson(localValueSerializerDef.getCurrentSchemaInfo());
720 if(remoteKeyDef.equals(localKeyDef) && remoteValDef.equals(localValDef)) {
721 String compressionPolicy = "";
722 if(hasCompression) {
723 compressionPolicy = "\n\t\t<compression><type>gzip</type></compression>";
725 // if the key/value serializers are REALLY equal (even though the strings may not match), then
726 // just use the remote stores to GUARANTEE that they match, and try again.
727 newStoreDefXml = VoldemortUtils.getStoreDefXml(storeName,
728 replicationFactor,
729 requiredReads,
730 requiredWrites,
731 props.containsKey(BUILD_PREFERRED_READS) ? props.getInt(BUILD_PREFERRED_READS)
732 : null,
733 props.containsKey(BUILD_PREFERRED_WRITES) ? props.getInt(BUILD_PREFERRED_WRITES)
734 : null,
735 "\n\t\t<type>json</type>\n\t\t<schema-info version=\"0\">"
736 + remoteKeySerializerDef.getCurrentSchemaInfo()
737 + "</schema-info>\n\t",
738 "\n\t\t<type>json</type>\n\t\t<schema-info version=\"0\">"
739 + remoteValueSerializerDef.getCurrentSchemaInfo()
740 + "</schema-info>"
741 + compressionPolicy
742 + "\n\t");
744 newStoreDef = VoldemortUtils.getStoreDef(newStoreDefXml);
745 if(!remoteStoreDef.equals(newStoreDef)) {
746 // if we still get a fail, then we know that the store defs don't match for reasons
747 // OTHER than the key/value serializer
748 String errorMessage = "Your store schema is identical, " +
749 "but the store definition does not match on cluster URL: " + url;
750 log.error(errorMessage + diffMessage(newStoreDef, remoteStoreDef));
751 throw new VoldemortException(errorMessage);
753 } else {
754 // if the key/value serializers are not equal (even in java, not just json strings),
755 // then fail
756 String errorMessage = "Your data schema does not match the schema which is already " +
757 "defined on cluster URL " + url;
758 log.error(errorMessage + diffMessage(newStoreDef, remoteStoreDef));
759 throw new VoldemortException(errorMessage);
761 } else {
762 String errorMessage = "Your store definition does not match the store definition that is " +
763 "already defined on cluster URL: " + url;
764 log.error(errorMessage + diffMessage(newStoreDef, remoteStoreDef));
765 throw new VoldemortException(errorMessage);
768 foundStore = true;
769 break;
772 return foundStore;
775 private String diffMessage(StoreDefinition newStoreDef, StoreDefinition remoteStoreDef) {
776 String thisName = "BnP config/data has";
777 String otherName = "Voldemort server has";
778 String message = "\n" + thisName + ":\t" + newStoreDef +
779 "\n" + otherName + ":\t" + remoteStoreDef +
780 "\n" + newStoreDef.diff(remoteStoreDef, thisName, otherName);
781 return message;
784 private void addStore(String description, String owners, String url, StoreDefinition newStoreDef) {
785 if (description.length() == 0) {
786 throw new RuntimeException("Description field missing in store definition. "
787 + "Please add \"" + PUSH_STORE_DESCRIPTION
788 + "\" with a line describing your store");
790 if (owners.length() == 0) {
791 throw new RuntimeException("Owner field missing in store definition. "
792 + "Please add \""
793 + PUSH_STORE_OWNERS
794 + "\" with value being a comma-separated list of email addresses.");
797 log.info("Could not find store " + storeName + " on Voldemort. Adding it to all nodes in cluster URL " + url);
798 try {
799 adminClientPerCluster.get(url).storeMgmtOps.addStore(newStoreDef);
801 catch(VoldemortException ve) {
802 throw new RuntimeException("Exception while adding store to cluster URL" + url, ve);
806 public String runBuildStore(Props props, String url) throws Exception {
807 int replicationFactor = props.getInt(BUILD_REPLICATION_FACTOR, 2);
808 int chunkSize = props.getInt(BUILD_CHUNK_SIZE, 1024 * 1024 * 1024);
809 Path tempDir = new Path(props.getString(BUILD_TEMP_DIR, "/tmp/vold-build-and-push-"
810 + new Random().nextLong()));
811 URI uri = new URI(url);
812 Path outputDir = new Path(props.getString(BUILD_OUTPUT_DIR), uri.getHost());
813 Path inputPath = getInputPath();
814 CheckSumType checkSumType = CheckSum.fromString(props.getString(CHECKSUM_TYPE,
815 CheckSum.toString(CheckSumType.MD5)));
816 boolean saveKeys = props.getBoolean(SAVE_KEYS, true);
817 boolean reducerPerBucket = props.getBoolean(REDUCER_PER_BUCKET, false);
818 int numChunks = props.getInt(NUM_CHUNKS, -1);
820 String recSchema = null;
821 String keySchema = null;
822 String valSchema = null;
824 if(isAvroJob) {
825 recSchema = getRecordSchema();
826 keySchema = getKeySchema();
827 valSchema = getValueSchema();
830 Cluster cluster = adminClientPerCluster.get(url).getAdminClientCluster();
832 new VoldemortStoreBuilderJob(
833 this.getId() + "-build-store",
834 props,
835 new VoldemortStoreBuilderConf(
836 replicationFactor,
837 chunkSize,
838 tempDir,
839 outputDir,
840 inputPath,
841 cluster,
842 storeDefs,
843 storeName,
844 checkSumType,
845 saveKeys,
846 reducerPerBucket,
847 numChunks,
848 keyFieldName,
849 valueFieldName,
850 recSchema,
851 keySchema,
852 valSchema,
853 isAvroJob,
854 minNumberOfRecords)).run();
855 return outputDir.toString();
858 public void runPushStore(Props props, String url, String dataDir) throws Exception {
859 // For backwards compatibility http timeout = admin timeout
860 int httpTimeoutMs = 1000 * props.getInt(PUSH_HTTP_TIMEOUT_SECONDS, 24 * 60 * 60);
861 long pushVersion = props.getLong(PUSH_VERSION, -1L);
862 if(props.containsKey(PUSH_VERSION_TIMESTAMP)) {
863 DateFormat format = new SimpleDateFormat("yyyyMMddHHmmss");
864 pushVersion = Long.parseLong(format.format(new Date()));
866 int maxBackoffDelayMs = 1000 * props.getInt(PUSH_BACKOFF_DELAY_SECONDS, 60);
867 List<FailedFetchStrategy> failedFetchStrategyList = Lists.newArrayList();
868 int maxNodeFailures = 0;
870 if (!pushHighAvailability) {
871 log.info("pushHighAvailability is disabled by the job config.");
872 } else {
873 // HA is enabled by the BnP job config
874 try {
875 VAdminProto.GetHighAvailabilitySettingsResponse serverSettings =
876 adminClientPerCluster.get(url).readonlyOps.getHighAvailabilitySettings(nodeId);
878 if (!serverSettings.getEnabled()) {
879 log.warn("The server requested pushHighAvailability to be DISABLED on cluster: " + url);
880 } else {
881 // HA is enabled by the server config
882 maxNodeFailures = serverSettings.getMaxNodeFailure();
883 Class<? extends FailedFetchLock> failedFetchLockClass =
884 (Class<? extends FailedFetchLock>) Class.forName(serverSettings.getLockImplementation());
885 Props propsForCluster = new Props(props);
886 propsForCluster.put(VoldemortConfig.PUSH_HA_LOCK_PATH, serverSettings.getLockPath());
887 propsForCluster.put(VoldemortConfig.PUSH_HA_CLUSTER_ID, serverSettings.getClusterId());
888 FailedFetchLock failedFetchLock =
889 ReflectUtils.callConstructor(failedFetchLockClass, new Object[]{propsForCluster});
890 failedFetchStrategyList.add(
891 new DisableStoreOnFailedNodeFailedFetchStrategy(
892 adminClientPerCluster.get(url),
893 failedFetchLock,
894 maxNodeFailures,
895 propsForCluster.toString()));
896 closeables.add(failedFetchLock);
897 log.info("pushHighAvailability is enabled for cluster URL: " + url +
898 " with cluster ID: " + serverSettings.getClusterId());
900 } catch (UninitializedMessageException e) {
901 // Not printing out the exception in the logs as that is a benign error.
902 log.error("The server does not support HA (introduced in release 1.9.18), so " +
903 "pushHighAvailability will be DISABLED on cluster: " + url);
904 } catch (ClassNotFoundException e) {
905 log.error("Failed to find requested FailedFetchLock implementation, so " +
906 "pushHighAvailability will be DISABLED on cluster: " + url, e);
907 } catch (Exception e) {
908 log.error("Got exception while trying to determine pushHighAvailability settings on cluster: " + url, e);
912 boolean rollback = props.getBoolean(PUSH_ROLLBACK, true);
914 if (rollback) {
915 failedFetchStrategyList.add(
916 new DeleteAllFailedFetchStrategy(adminClientPerCluster.get(url)));
919 Cluster cluster = adminClientPerCluster.get(url).getAdminClientCluster();
921 log.info("Push starting for cluster: " + url);
923 new VoldemortSwapJob(
924 this.getId() + "-push-store",
925 cluster,
926 dataDir,
927 storeName,
928 httpTimeoutMs,
929 pushVersion,
930 maxBackoffDelayMs,
931 rollback,
932 hdfsFetcherProtocol,
933 hdfsFetcherPort,
934 maxNodeFailures,
935 failedFetchStrategyList,
936 url).run();
940 * Get the sanitized input path. At the moment of writing, this means the
941 * #LATEST tag is expanded.
943 private Path getInputPath() throws IOException {
944 if (sanitizedInputPath == null) {
945 // No need to query Hadoop more than once as this shouldn't change mid-run,
946 // thus, we can lazily initialize and cache the result.
947 Path path = new Path(props.getString(BUILD_INPUT_PATH));
948 sanitizedInputPath = HadoopUtils.getSanitizedPath(path);
950 return sanitizedInputPath;
954 * Get the Avro Schema of the input path, assuming the path contains just one
955 * schema version in all files under that path.
957 private Schema getInputPathSchema() throws IOException {
958 if (inputPathSchema == null) {
959 // No need to query Hadoop more than once as this shouldn't change mid-run,
960 // thus, we can lazily initialize and cache the result.
961 inputPathSchema = AvroUtils.getAvroSchemaFromPath(getInputPath());
963 return inputPathSchema;
966 // Get the schema for the Avro Record from the object container file
967 public String getRecordSchema() throws IOException {
968 Schema schema = getInputPathSchema();
969 String recSchema = schema.toString();
970 return recSchema;
973 // Extract schema of the key field
974 public String getKeySchema() throws IOException {
975 Schema schema = getInputPathSchema();
976 String keySchema = schema.getField(keyFieldName).schema().toString();
977 return keySchema;
980 // Extract schema of the value field
981 public String getValueSchema() throws IOException {
982 Schema schema = getInputPathSchema();
983 String valueSchema = schema.getField(valueFieldName).schema().toString();
984 return valueSchema;
987 private static class KeyValueSchema {
988 String keySchema;
989 String valSchema;
990 KeyValueSchema(String key, String val) {
991 keySchema = key;
992 valSchema = val;
996 // Verify if the new avro schema being pushed is the same one as the last version present on the server
997 // supports schema evolution
999 public void verifyOrAddStoreAvro(String url, boolean isVersioned) throws Exception {
1000 // create new n store def with schema from the metadata in the input path
1001 Schema schema = getInputPathSchema();
1002 int replicationFactor = props.getInt(BUILD_REPLICATION_FACTOR, 2);
1003 int requiredReads = props.getInt(BUILD_REQUIRED_READS, 1);
1004 int requiredWrites = props.getInt(BUILD_REQUIRED_WRITES, 1);
1005 String description = props.getString(PUSH_STORE_DESCRIPTION, "");
1006 String owners = props.getString(PUSH_STORE_OWNERS, "");
1007 String serializerName;
1008 if (isVersioned)
1009 serializerName = DefaultSerializerFactory.AVRO_GENERIC_VERSIONED_TYPE_NAME;
1010 else
1011 serializerName = DefaultSerializerFactory.AVRO_GENERIC_TYPE_NAME;
1013 boolean hasCompression = false;
1014 if(props.containsKey(BUILD_COMPRESS_VALUE)) {
1015 hasCompression = true;
1018 String keySchema, valSchema;
1020 try {
1021 if(props.containsKey(BUILD_FORCE_SCHEMA_KEY)) {
1022 keySchema = props.get(BUILD_FORCE_SCHEMA_KEY);
1023 } else {
1024 Schema.Field keyField = schema.getField(keyFieldName);
1025 if (keyField == null) {
1026 throw new VoldemortException("The configured key field (" + keyFieldName + ") was not found in the input data.");
1027 } else {
1028 keySchema = "\n\t\t<type>" + serializerName + "</type>\n\t\t<schema-info version=\"0\">"
1029 + keyField.schema() + "</schema-info>\n\t";
1032 } catch (VoldemortException e) {
1033 throw e;
1034 } catch (Exception e) {
1035 throw new VoldemortException("Error while trying to extract the key field", e);
1038 try {
1039 if(props.containsKey(BUILD_FORCE_SCHEMA_VALUE)) {
1040 valSchema = props.get(BUILD_FORCE_SCHEMA_VALUE);
1041 } else {
1042 Schema.Field valueField = schema.getField(valueFieldName);
1043 if (valueField == null) {
1044 throw new VoldemortException("The configured value field (" + valueFieldName + ") was not found in the input data.");
1045 } else {
1046 valSchema = "\n\t\t<type>" + serializerName + "</type>\n\t\t<schema-info version=\"0\">"
1047 + valueField.schema() + "</schema-info>\n\t";
1049 if(hasCompression) {
1050 valSchema += "\t<compression><type>gzip</type></compression>\n\t";
1054 } catch (VoldemortException e) {
1055 throw e;
1056 } catch (Exception e) {
1057 throw new VoldemortException("Error while trying to extract the value field", e);
1060 if (keySchema == null || valSchema == null) {
1061 // This should already have failed on previous exceptions, but just in case...
1062 throw new VoldemortException("There was a problem defining the key or value schema for this job.");
1063 } else {
1064 String newStoreDefXml = VoldemortUtils.getStoreDefXml(storeName,
1065 replicationFactor,
1066 requiredReads,
1067 requiredWrites,
1068 props.containsKey(BUILD_PREFERRED_READS) ? props.getInt(BUILD_PREFERRED_READS)
1069 : null,
1070 props.containsKey(BUILD_PREFERRED_WRITES) ? props.getInt(BUILD_PREFERRED_WRITES)
1071 : null,
1072 (props.containsKey(PUSH_FORCE_SCHEMA_KEY)) ? props.getString(PUSH_FORCE_SCHEMA_KEY)
1073 : keySchema,
1074 (props.containsKey(PUSH_FORCE_SCHEMA_VALUE)) ? props.getString(PUSH_FORCE_SCHEMA_VALUE)
1075 : valSchema,
1076 description,
1077 owners);
1078 KeyValueSchema returnSchemaObj = new KeyValueSchema(keySchema, valSchema);
1079 boolean foundStore = findAndVerifyAvro(url,
1080 newStoreDefXml,
1081 hasCompression,
1082 replicationFactor,
1083 requiredReads,
1084 requiredWrites,
1085 serializerName,
1086 returnSchemaObj);
1087 if (!foundStore) {
1088 try {
1089 StoreDefinition newStoreDef = VoldemortUtils.getStoreDef(newStoreDefXml);
1090 addStore(description, owners, url, newStoreDef);
1092 catch(RuntimeException e) {
1093 log.error("Error in adding store definition from: " + url, e);
1094 throw new VoldemortException("Error in adding store definition from: " + url, e);
1097 // don't use newStoreDef because we want to ALWAYS use the JSON definition since the store
1098 // builder assumes that you are using JsonTypeSerializer. This allows you to tweak your
1099 // value/key store xml as you see fit, but still uses the json sequence file meta data
1100 // to build the store.
1101 storeDefs = ImmutableList.of(VoldemortUtils.getStoreDef(VoldemortUtils.getStoreDefXml(storeName,
1102 replicationFactor,
1103 requiredReads,
1104 requiredWrites,
1105 props.containsKey(BUILD_PREFERRED_READS) ? props.getInt(BUILD_PREFERRED_READS)
1106 : null,
1107 props.containsKey(BUILD_PREFERRED_WRITES) ? props.getInt(BUILD_PREFERRED_WRITES)
1108 : null,
1109 returnSchemaObj.keySchema,
1110 returnSchemaObj.valSchema)));
1115 * Check if store exists and then verify the schema. Returns false if store doesn't exist
1117 * @param url to check
1118 * @param newStoreDefXml
1119 * @param hasCompression
1120 * @param replicationFactor
1121 * @param requiredReads
1122 * @param requiredWrites
1123 * @param serializerName
1124 * @param schemaObj key/value schema obj
1125 * @return boolean value true means store exists, false otherwise
1128 private boolean findAndVerifyAvro(String url,
1129 String newStoreDefXml,
1130 boolean hasCompression,
1131 int replicationFactor,
1132 int requiredReads,
1133 int requiredWrites,
1134 String serializerName,
1135 KeyValueSchema schemaObj) {
1136 log.info("Verifying store against cluster URL: " + url +
1137 " (node id " + this.nodeId + ")\n" + newStoreDefXml.toString());
1138 StoreDefinition newStoreDef = VoldemortUtils.getStoreDef(newStoreDefXml);
1139 // get store def from cluster
1140 List<StoreDefinition> remoteStoreDefs =
1141 adminClientPerCluster.get(url).metadataMgmtOps.getRemoteStoreDefList(this.nodeId).getValue();
1142 boolean foundStore = false;
1143 // go over all store defs and see if one has the same name as the store we're trying to build
1144 for(StoreDefinition remoteStoreDef: remoteStoreDefs) {
1145 if(remoteStoreDef.getName().equals(storeName)) {
1146 // if the store already exists, but doesn't match what we want to push, we need to worry
1147 if(!remoteStoreDef.equals(newStoreDef)) {
1148 // let's check to see if the key/value serializers are
1149 // REALLY equal.
1150 SerializerDefinition localKeySerializerDef = newStoreDef.getKeySerializer();
1151 SerializerDefinition localValueSerializerDef = newStoreDef.getValueSerializer();
1152 SerializerDefinition remoteKeySerializerDef = remoteStoreDef.getKeySerializer();
1153 SerializerDefinition remoteValueSerializerDef = remoteStoreDef.getValueSerializer();
1154 if(remoteKeySerializerDef.getName().equals(serializerName)
1155 && remoteValueSerializerDef.getName().equals(serializerName)) {
1157 Schema remoteKeyDef = Schema.parse(remoteKeySerializerDef.getCurrentSchemaInfo());
1158 Schema remoteValDef = Schema.parse(remoteValueSerializerDef.getCurrentSchemaInfo());
1159 Schema localKeyDef = Schema.parse(localKeySerializerDef.getCurrentSchemaInfo());
1160 Schema localValDef = Schema.parse(localValueSerializerDef.getCurrentSchemaInfo());
1162 if(remoteKeyDef.equals(localKeyDef) && remoteValDef.equals(localValDef)) {
1164 String compressionPolicy = "";
1165 if(hasCompression) {
1166 compressionPolicy = "\n\t\t<compression><type>gzip</type></compression>";
1169 // if the key/value serializers are REALLY equal
1170 // (even though the strings may not match), then
1171 // just use the remote stores to GUARANTEE that
1172 // they
1173 // match, and try again.
1175 String keySerializerStr = "\n\t\t<type>"
1176 + remoteKeySerializerDef.getName()
1177 + "</type>";
1179 if(remoteKeySerializerDef.hasVersion()) {
1181 Map<Integer, String> versions = new HashMap<Integer, String>();
1182 for(Map.Entry<Integer, String> entry: remoteKeySerializerDef.getAllSchemaInfoVersions()
1183 .entrySet()) {
1184 keySerializerStr += "\n\t\t <schema-info version=\""
1185 + entry.getKey() + "\">"
1186 + entry.getValue()
1187 + "</schema-info>\n\t";
1190 } else {
1191 keySerializerStr = "\n\t\t<type>"
1192 + serializerName
1193 + "</type>\n\t\t<schema-info version=\"0\">"
1194 + remoteKeySerializerDef.getCurrentSchemaInfo()
1195 + "</schema-info>\n\t";
1198 schemaObj.keySchema = keySerializerStr;
1199 String valueSerializerStr = "\n\t\t<type>"
1200 + remoteValueSerializerDef.getName()
1201 + "</type>";
1203 if(remoteValueSerializerDef.hasVersion()) {
1205 Map<Integer, String> versions = new HashMap<Integer, String>();
1206 for(Map.Entry<Integer, String> entry: remoteValueSerializerDef.getAllSchemaInfoVersions()
1207 .entrySet()) {
1208 valueSerializerStr += "\n\t\t <schema-info version=\""
1209 + entry.getKey() + "\">"
1210 + entry.getValue()
1211 + "</schema-info>\n\t";
1213 valueSerializerStr += compressionPolicy + "\n\t";
1215 } else {
1217 valueSerializerStr = "\n\t\t<type>"
1218 + serializerName
1219 + "</type>\n\t\t<schema-info version=\"0\">"
1220 + remoteValueSerializerDef.getCurrentSchemaInfo()
1221 + "</schema-info>" + compressionPolicy
1222 + "\n\t";
1225 schemaObj.valSchema = valueSerializerStr;
1227 newStoreDefXml = VoldemortUtils.getStoreDefXml(storeName,
1228 replicationFactor,
1229 requiredReads,
1230 requiredWrites,
1231 props.containsKey(BUILD_PREFERRED_READS) ? props.getInt(BUILD_PREFERRED_READS)
1232 : null,
1233 props.containsKey(BUILD_PREFERRED_WRITES) ? props.getInt(BUILD_PREFERRED_WRITES)
1234 : null,
1235 keySerializerStr,
1236 valueSerializerStr);
1238 newStoreDef = VoldemortUtils.getStoreDef(newStoreDefXml);
1240 if(!remoteStoreDef.equals(newStoreDef)) {
1241 // if we still get a fail, then we know that the store defs don't match for reasons
1242 // OTHER than the key/value serializer
1243 String errorMessage = "Your store schema is identical, " +
1244 "but the store definition does not match on cluster URL: " + url;
1245 log.error(errorMessage + diffMessage(newStoreDef, remoteStoreDef));
1246 throw new VoldemortException(errorMessage);
1250 } else {
1251 // if the key/value serializers are not equal (even in java, not just json strings),
1252 // then fail
1253 String errorMessage = "Your data schema does not match the schema which is already " +
1254 "defined on cluster URL " + url;
1255 log.error(errorMessage + diffMessage(newStoreDef, remoteStoreDef));
1256 throw new VoldemortException(errorMessage);
1259 } else {
1260 String errorMessage = "Your store definition does not match the store definition that is " +
1261 "already defined on cluster URL: " + url;
1262 log.error(errorMessage + diffMessage(newStoreDef, remoteStoreDef));
1263 throw new VoldemortException(errorMessage);
1267 foundStore = true;
1268 break;
1271 return foundStore;
1274 private class HeartBeatHookRunnable implements Runnable {
1275 final int sleepTimeMs;
1276 boolean keepRunning = true;
1278 HeartBeatHookRunnable(int sleepTimeMs) {
1279 this.sleepTimeMs = sleepTimeMs;
1282 public void stop() {
1283 keepRunning = false;
1286 public void run() {
1287 while (keepRunning) {
1288 try {
1289 Thread.sleep(sleepTimeMs);
1290 invokeHooks(BuildAndPushStatus.HEARTBEAT);
1291 } catch (InterruptedException e) {
1292 keepRunning = false;