Revision created by MOE tool push_codebase.
[gae.git] / java / src / main / com / google / appengine / api / datastore / AsyncDatastoreServiceImpl.java
blob443647786dafb5cbe5c8c9fa7d0c55142b108339
1 // Copyright 2010 Google Inc. All rights reserved.
3 package com.google.appengine.api.datastore;
5 import static com.google.appengine.api.datastore.DatastoreApiHelper.makeAsyncCall;
6 import static com.google.appengine.api.datastore.DatastoreAttributes.DatastoreType.MASTER_SLAVE;
7 import static com.google.appengine.api.datastore.FetchOptions.Builder.withLimit;
8 import static com.google.appengine.api.datastore.FutureHelper.quietGet;
9 import static com.google.appengine.api.datastore.ReadPolicy.Consistency.EVENTUAL;
11 import com.google.appengine.api.datastore.Batcher.IndexedItem;
12 import com.google.appengine.api.datastore.Batcher.ReorderingMultiFuture;
13 import com.google.appengine.api.datastore.DatastoreAttributes.DatastoreType;
14 import com.google.appengine.api.datastore.DatastoreService.KeyRangeState;
15 import com.google.appengine.api.datastore.EntityCachingStrategy.PreGetCachingResult;
16 import com.google.appengine.api.datastore.EntityCachingStrategy.PreMutationCachingResult;
17 import com.google.appengine.api.datastore.FutureHelper.MultiFuture;
18 import com.google.appengine.api.datastore.Index.IndexState;
19 import com.google.appengine.api.datastore.Query.FilterOperator;
20 import com.google.appengine.api.utils.FutureWrapper;
21 import com.google.apphosting.api.ApiBasePb.StringProto;
22 import com.google.apphosting.datastore.DatastoreV3Pb.AllocateIdsRequest;
23 import com.google.apphosting.datastore.DatastoreV3Pb.AllocateIdsResponse;
24 import com.google.apphosting.datastore.DatastoreV3Pb.CompositeIndices;
25 import com.google.apphosting.datastore.DatastoreV3Pb.DeleteRequest;
26 import com.google.apphosting.datastore.DatastoreV3Pb.DeleteResponse;
27 import com.google.apphosting.datastore.DatastoreV3Pb.GetRequest;
28 import com.google.apphosting.datastore.DatastoreV3Pb.GetResponse;
29 import com.google.apphosting.datastore.DatastoreV3Pb.PutRequest;
30 import com.google.apphosting.datastore.DatastoreV3Pb.PutResponse;
31 import com.google.common.collect.ImmutableList;
32 import com.google.common.collect.Lists;
33 import com.google.common.collect.Maps;
34 import com.google.common.collect.Sets;
35 import com.google.io.protocol.ProtocolMessage;
36 import com.google.storage.onestore.v3.OnestoreEntity.CompositeIndex;
37 import com.google.storage.onestore.v3.OnestoreEntity.EntityProto;
38 import com.google.storage.onestore.v3.OnestoreEntity.Reference;
40 import java.util.ArrayList;
41 import java.util.Arrays;
42 import java.util.Collection;
43 import java.util.Collections;
44 import java.util.HashMap;
45 import java.util.Iterator;
46 import java.util.LinkedHashMap;
47 import java.util.List;
48 import java.util.Map;
49 import java.util.Set;
50 import java.util.concurrent.ExecutionException;
51 import java.util.concurrent.Future;
52 import java.util.concurrent.TimeUnit;
53 import java.util.concurrent.TimeoutException;
54 import java.util.logging.Level;
56 /**
57 * Implements AsyncDatastoreService by making calls to ApiProxy.
60 class AsyncDatastoreServiceImpl extends BaseDatastoreServiceImpl
61 implements AsyncDatastoreService, CurrentTransactionProvider {
63 /**
64 * A base batcher for DatastoreV3 operations executed in the context of an {@link
65 * AsyncDatastoreServiceImpl}.
66 * @param <S> the response message type
67 * @param <R> the request message type
68 * @param <F> the Java specific representation of a value
69 * @param <T> the proto representation of value
71 private abstract class V3Batcher<S extends ProtocolMessage<S>, R extends ProtocolMessage<R>,
72 F, T extends ProtocolMessage<T>> extends Batcher<R, F, T> {
73 protected abstract Future<S> makeCall(R batch);
75 @Override
76 final R newBatch(R baseBatch) {
77 return baseBatch.clone();
80 @Override
81 final int getMaxSize() {
82 return datastoreServiceConfig.maxRpcSizeBytes;
85 @Override
86 final int getMaxGroups() {
87 return datastoreServiceConfig.maxEntityGroupsPerRpc;
90 final List<Future<S>> makeCalls(Iterator<R> batches) {
91 List<Future<S>> futures = new ArrayList<Future<S>>();
92 while (batches.hasNext()) {
93 futures.add(makeCall(batches.next()));
95 return futures;
99 /**
100 * A base batcher for operations that operate on {@link Key}s.
101 * @param <S> the response message type
102 * @param <R> the request message type
104 private abstract class V3KeyBatcher<S extends ProtocolMessage<S>, R extends ProtocolMessage<R>>
105 extends V3Batcher<S, R, Key, Reference> {
106 @Override
107 final Object getGroup(Key value) {
108 return value.getRootKey();
111 @Override
112 final Reference toPb(Key value) {
113 return KeyTranslator.convertToPb(value);
117 private final V3KeyBatcher<DeleteResponse, DeleteRequest> deleteBatcher =
118 new V3KeyBatcher<DeleteResponse, DeleteRequest>() {
119 @Override
120 void addToBatch(Reference value, DeleteRequest batch) {
121 batch.addKey(value);
124 @Override
125 int getMaxCount() {
126 return datastoreServiceConfig.maxBatchWriteEntities;
129 @Override
130 protected Future<DeleteResponse> makeCall(DeleteRequest batch) {
131 return makeAsyncCall(apiConfig, "Delete", batch, new DeleteResponse());
135 private final V3KeyBatcher<GetResponse, GetRequest> getByKeyBatcher =
136 new V3KeyBatcher<GetResponse, GetRequest>() {
137 @Override
138 void addToBatch(Reference value, GetRequest batch) {
139 batch.addKey(value);
142 @Override
143 int getMaxCount() {
144 return datastoreServiceConfig.maxBatchReadEntities;
147 @Override
148 protected Future<GetResponse> makeCall(GetRequest batch) {
149 return makeAsyncCall(apiConfig, "Get", batch, new GetResponse());
153 private final V3Batcher<GetResponse, GetRequest, Reference, Reference> getByReferenceBatcher =
154 new V3Batcher<GetResponse, GetRequest, Reference, Reference>() {
155 @Override
156 final Object getGroup(Reference value) {
157 return value.getPath().getElement(0);
160 @Override
161 final Reference toPb(Reference value) {
162 return value;
165 @Override
166 void addToBatch(Reference value, GetRequest batch) {
167 batch.addKey(value);
170 @Override
171 int getMaxCount() {
172 return datastoreServiceConfig.maxBatchReadEntities;
175 @Override
176 protected Future<GetResponse> makeCall(GetRequest batch) {
177 return makeAsyncCall(apiConfig, "Get", batch, new GetResponse());
181 private final V3Batcher<PutResponse, PutRequest, Entity, EntityProto> putBatcher =
182 new V3Batcher<PutResponse, PutRequest, Entity, EntityProto>() {
183 @Override
184 Object getGroup(Entity value) {
185 return value.getKey().getRootKey();
188 @Override
189 void addToBatch(EntityProto value, PutRequest batch) {
190 batch.addEntity(value);
193 @Override
194 int getMaxCount() {
195 return datastoreServiceConfig.maxBatchWriteEntities;
198 @Override
199 protected Future<PutResponse> makeCall(PutRequest batch) {
200 return makeAsyncCall(apiConfig, "Put", batch, new PutResponse());
203 @Override
204 EntityProto toPb(Entity value) {
205 return EntityTranslator.convertToPb(value);
209 private DatastoreType datastoreType;
211 public AsyncDatastoreServiceImpl(
212 DatastoreServiceConfig datastoreServiceConfig, TransactionStack defaultTxnProvider) {
213 super(datastoreServiceConfig, defaultTxnProvider);
216 @Override
217 public Future<Entity> get(Key key) {
218 if (key == null) {
219 throw new NullPointerException("key cannot be null");
221 return wrapSingleGet(key, get(Arrays.asList(key)));
224 @Override
225 public Future<Entity> get( Transaction txn, final Key key) {
226 if (key == null) {
227 throw new NullPointerException("key cannot be null");
229 return wrapSingleGet(key, get(txn, Arrays.asList(key)));
232 private Future<Entity> wrapSingleGet(final Key key, Future<Map<Key, Entity>> futureEntities) {
233 return new FutureWrapper<Map<Key, Entity>, Entity>(futureEntities) {
234 @Override
235 protected Entity wrap(Map<Key, Entity> entities) throws Exception {
236 Entity entity = entities.get(key);
237 if (entity == null) {
238 throw new EntityNotFoundException(key);
240 return entity;
243 @Override
244 protected Throwable convertException(Throwable cause) {
245 return cause;
250 @Override
251 public Future<Map<Key, Entity>> get(final Iterable<Key> keys) {
252 return new TransactionRunner<Map<Key, Entity>>(getOrCreateTransaction()) {
253 @Override
254 protected Future<Map<Key, Entity>> runInternal(Transaction txn) {
255 return get(txn, keys);
257 }.runReadInTransaction();
260 @Override
261 public Future<Map<Key, Entity>> get(Transaction txn, Iterable<Key> keys) {
262 if (keys == null) {
263 throw new NullPointerException("keys cannot be null");
266 List<Key> keyList = Lists.newArrayList(keys);
268 Map<Key, Entity> resultMap = new HashMap<Key, Entity>();
269 PreGetContext preGetContext = new PreGetContext(this, keyList, resultMap);
270 datastoreServiceConfig.getDatastoreCallbacks().executePreGetCallbacks(preGetContext);
272 keyList.removeAll(resultMap.keySet());
274 PreGetCachingResult preGetCachingResult =
275 entityCachingStrategy.preGet(this, keyList, resultMap);
276 keyList.removeAll(preGetCachingResult.getKeysToSkipLoading());
278 Future<Map<Key, Entity>> result = doBatchGet(txn, Sets.newLinkedHashSet(keyList), resultMap);
280 result = entityCachingStrategy.createPostGetFuture(result, preGetCachingResult);
281 return new PostLoadFuture(result, datastoreServiceConfig.getDatastoreCallbacks(), this);
284 private Future<Map<Key, Entity>> doBatchGet( Transaction txn, final Set<Key> keysToGet, final Map<Key, Entity> resultMap) {
285 final GetRequest baseReq = new GetRequest();
286 baseReq.setAllowDeferred(true);
287 if (txn != null) {
288 TransactionImpl.ensureTxnActive(txn);
289 baseReq.setTransaction(localTxnToRemoteTxn(txn));
291 if (datastoreServiceConfig.getReadPolicy().getConsistency() == EVENTUAL) {
292 baseReq.setFailoverMs(ARBITRARY_FAILOVER_READ_MS);
293 baseReq.setStrong(false);
296 final boolean shouldUseMultipleBatches = getDatastoreType() != MASTER_SLAVE && txn == null
297 && datastoreServiceConfig.getReadPolicy().getConsistency() != EVENTUAL;
299 Iterator<GetRequest> batches =
300 getByKeyBatcher.getBatches(keysToGet, baseReq, shouldUseMultipleBatches);
301 List<Future<GetResponse>> futures = getByKeyBatcher.makeCalls(batches);
303 return registerInTransaction(txn, new MultiFuture<GetResponse, Map<Key, Entity>>(futures) {
305 * A Map from a Reference without an App Id specified to the corresponding Key that the user
306 * requested. This is a workaround for the Remote API to support matching requested Keys to
307 * Entities that may be from a different App Id .
309 private Map<Reference, Key> keyMapIgnoringAppId;
311 @Override
312 public Map<Key, Entity> get() throws InterruptedException, ExecutionException {
313 try {
314 aggregate(futures, null, null);
315 } catch (TimeoutException e) {
316 throw new RuntimeException(e);
318 return resultMap;
321 @Override
322 public Map<Key, Entity> get(long timeout, TimeUnit unit)
323 throws InterruptedException, ExecutionException, TimeoutException {
324 aggregate(futures, timeout, unit);
325 return resultMap;
329 * Aggregates the results of the given Futures and issues (synchronous) followup requests if
330 * any results were deferred.
332 * @param currentFutures the Futures corresponding to the batches of the initial GetRequests.
333 * @param timeout the timeout to use while waiting on the Future, or null for none.
334 * @param timeoutUnit the unit of the timeout, or null for none.
336 private void aggregate(
337 Iterable<Future<GetResponse>> currentFutures, Long timeout, TimeUnit timeoutUnit)
338 throws ExecutionException, InterruptedException, TimeoutException {
339 while (true) {
340 List<Reference> deferredRefs = Lists.newLinkedList();
342 for (Future<GetResponse> currentFuture : currentFutures) {
343 GetResponse resp = getFutureWithOptionalTimeout(currentFuture, timeout, timeoutUnit);
344 addEntitiesToResultMap(resp);
345 deferredRefs.addAll(resp.deferreds());
348 if (deferredRefs.isEmpty()) {
349 break;
352 Iterator<GetRequest> followupBatches =
353 getByReferenceBatcher.getBatches(deferredRefs, baseReq, shouldUseMultipleBatches);
354 currentFutures = getByReferenceBatcher.makeCalls(followupBatches);
359 * Convenience method to get the result of a Future and optionally specify a timeout.
361 * @param future the Future to get.
362 * @param timeout the timeout to use while waiting on the Future, or null for none.
363 * @param timeoutUnit the unit of the timeout, or null for none.
364 * @return the result of the Future.
365 * @throws TimeoutException will only ever be thrown if a timeout is provided.
367 private GetResponse getFutureWithOptionalTimeout(
368 Future<GetResponse> future, Long timeout, TimeUnit timeoutUnit)
369 throws ExecutionException, InterruptedException, TimeoutException {
370 if (timeout == null) {
371 return future.get();
372 } else {
373 return future.get(timeout, timeoutUnit);
378 * Adds the Entities from the GetResponse to the resultMap. Will omit Keys that were missing.
379 * Handles Keys with different App Ids from the Entity.Key. See
380 * {@link #findKeyFromRequestIgnoringAppId(Reference)}
382 private void addEntitiesToResultMap(GetResponse response) {
383 for (GetResponse.Entity entityResult : response.entitys()) {
384 if (entityResult.hasEntity()) {
385 Entity responseEntity = EntityTranslator.createFromPb(entityResult.getEntity());
386 Key responseKey = responseEntity.getKey();
388 if (!keysToGet.contains(responseKey)) {
389 responseKey = findKeyFromRequestIgnoringAppId(entityResult.getEntity().getKey());
391 resultMap.put(responseKey, responseEntity);
397 * This is a hack to support calls going through the Remote API. The problem is:
399 * The requested Key may have a local app id.
400 * The returned Entity may have a remote app id.
402 * In this case, we want to return a Map.Entry with {LocalKey, RemoteEntity}. This way, the
403 * user can always do map.get(keyFromRequest).
405 * This method will find the corresponding requested LocalKey for a RemoteKey by ignoring the
406 * AppId field.
408 * Note that we used to be able to rely on the order of the Response Entities matching the
409 * order of Request Keys. We can no longer do so with the addition of Deferred results.
411 * @param referenceFromResponse the reference from the Response that did not match any of the
412 * requested Keys. (May be mutated.)
413 * @return the Key from the request that corresponds to the given Reference from the Response
414 * (ignoring AppId.)
416 private Key findKeyFromRequestIgnoringAppId(Reference referenceFromResponse) {
417 if (keyMapIgnoringAppId == null) {
418 keyMapIgnoringAppId = Maps.newHashMap();
419 for (Key requestKey : keysToGet) {
420 Reference requestKeyAsRefWithoutApp = KeyTranslator.convertToPb(requestKey).clearApp();
421 keyMapIgnoringAppId.put(requestKeyAsRefWithoutApp, requestKey);
425 Key result = keyMapIgnoringAppId.get(referenceFromResponse.clearApp());
426 if (result == null) {
427 throw new DatastoreFailureException("Internal error");
429 return result;
434 @Override
435 public Future<Key> put(Entity entity) {
436 return wrapSinglePut(put(Arrays.asList(entity)));
439 @Override
440 public Future<Key> put(Transaction txn, Entity entity) {
441 return wrapSinglePut(put(txn, Arrays.asList(entity)));
444 private Future<Key> wrapSinglePut(Future<List<Key>> futureKeys) {
445 return new FutureWrapper<List<Key>, Key>(futureKeys) {
446 @Override
447 protected Key wrap(List<Key> keys) throws Exception {
448 return keys.get(0);
451 @Override
452 protected Throwable convertException(Throwable cause) {
453 return cause;
458 @Override
459 public Future<List<Key>> put(final Iterable<Entity> entities) {
460 return new TransactionRunner<List<Key>>(getOrCreateTransaction()) {
461 @Override
462 protected Future<List<Key>> runInternal(Transaction txn) {
463 return put(txn, entities);
465 }.runWriteInTransaction();
468 @Override
469 public Future<List<Key>> put( Transaction txn, Iterable<Entity> entities) {
470 List<Entity> entityList = entities instanceof List ?
471 (List<Entity>) entities : Lists.newArrayList(entities);
472 PutContext prePutContext = new PutContext(this, entityList);
473 datastoreServiceConfig.getDatastoreCallbacks().executePrePutCallbacks(prePutContext);
474 PreMutationCachingResult preMutationCachingResult =
475 entityCachingStrategy.prePut(this, entityList);
477 List<IndexedItem<Key>> indexedKeysToSkip = Lists.newArrayList();
478 Set<Key> mutationKeysToSkip = preMutationCachingResult.getMutationKeysToSkip();
479 List<Entity> entitiesToPut;
480 if (!mutationKeysToSkip.isEmpty()) {
481 entitiesToPut = Lists.newArrayList();
482 int index = 0;
483 for (Entity entity : entityList) {
484 if (mutationKeysToSkip.contains(entity.getKey())) {
485 indexedKeysToSkip.add(new IndexedItem<Key>(index++, entity.getKey()));
486 } else {
487 entitiesToPut.add(entity);
488 ++index;
491 } else {
492 entitiesToPut = ImmutableList.copyOf(entities);
495 Future<List<Key>> result = combinePutResult(doBatchPut(txn, entitiesToPut), indexedKeysToSkip);
497 if (txn == null) {
498 result =
499 entityCachingStrategy.createPostMutationFuture(result, preMutationCachingResult);
500 PutContext postPutContext = new PutContext(this, entityList);
501 result = new PostPutFuture(result, datastoreServiceConfig.getDatastoreCallbacks(),
502 postPutContext);
503 } else {
504 defaultTxnProvider.addPutEntities(txn, entityList);
506 return result;
509 private Future<List<Key>> combinePutResult(Future<List<Key>> rpcResult,
510 final List<IndexedItem<Key>> skippedKeys) {
511 if (skippedKeys.isEmpty()) {
512 return rpcResult;
515 return new FutureWrapper<List<Key>, List<Key>>(rpcResult) {
516 @Override
517 protected List<Key> wrap(List<Key> result) throws Exception {
518 List<Key> combined = Lists.newLinkedList(result);
519 for (IndexedItem<Key> indexedKey : skippedKeys) {
520 combined.add(indexedKey.index, indexedKey.item);
522 return combined;
525 @Override
526 protected Throwable convertException(Throwable cause) {
527 return cause;
532 private Future<List<Key>> doBatchPut( Transaction txn,
533 final List<Entity> entities) {
534 PutRequest baseReq = new PutRequest();
535 if (txn != null) {
536 TransactionImpl.ensureTxnActive(txn);
537 baseReq.setTransaction(localTxnToRemoteTxn(txn));
539 boolean group = !baseReq.hasTransaction();
540 List<Integer> order = Lists.newArrayListWithCapacity(entities.size());
541 Iterator<PutRequest> batches = putBatcher.getBatches(entities, baseReq, group, order);
542 List<Future<PutResponse>> futures = putBatcher.makeCalls(batches);
544 return registerInTransaction(txn,
545 new ReorderingMultiFuture<PutResponse, List<Key>>(futures, order) {
546 @Override
547 protected List<Key> aggregate(
548 PutResponse intermediateResult, Iterator<Integer> indexItr, List<Key> result) {
549 for (Reference reference : intermediateResult.keys()) {
550 int index = indexItr.next();
551 Key key = entities.get(index).getKey();
552 KeyTranslator.updateKey(reference, key);
553 result.set(index, key);
555 return result;
558 @Override
559 protected List<Key> initResult(int size) {
560 List<Key> result = new ArrayList<Key>(Collections.<Key>nCopies(size, null));
561 return result;
566 @Override
567 public Future<Void> delete(Key... keys) {
568 return delete(Arrays.asList(keys));
571 @Override
572 public Future<Void> delete(Transaction txn, Key... keys) {
573 return delete(txn, Arrays.asList(keys));
576 @Override
577 public Future<Void> delete(final Iterable<Key> keys) {
578 return new TransactionRunner<Void>(getOrCreateTransaction()) {
579 @Override
580 protected Future<Void> runInternal(Transaction txn) {
581 return delete(txn, keys);
583 }.runWriteInTransaction();
586 @Override
587 public Future<Void> delete(Transaction txn, Iterable<Key> keys) {
588 List<Key> allKeys = keys instanceof List ?
589 (List<Key>) keys : ImmutableList.copyOf(keys);
590 DeleteContext preDeleteContext = new DeleteContext(this, allKeys);
591 datastoreServiceConfig.getDatastoreCallbacks().executePreDeleteCallbacks(preDeleteContext);
592 PreMutationCachingResult preMutationCachingResult =
593 entityCachingStrategy.preDelete(this, allKeys);
594 Future<Void> result = null;
595 Collection<Key> keysToDelete;
596 Set<Key> keysToSkip = preMutationCachingResult.getMutationKeysToSkip();
597 if (keysToSkip.isEmpty()) {
598 keysToDelete = allKeys;
599 } else {
600 Set<Key> keySet = Sets.newHashSet(allKeys);
601 keySet.removeAll(keysToSkip);
602 keysToDelete = keySet;
604 result = doBatchDelete(txn, keysToDelete);
606 if (txn == null) {
607 result = entityCachingStrategy.createPostMutationFuture(result, preMutationCachingResult);
608 result = new PostDeleteFuture(
609 result, datastoreServiceConfig.getDatastoreCallbacks(),
610 new DeleteContext(this, allKeys));
611 } else {
612 defaultTxnProvider.addDeletedKeys(txn, allKeys);
614 return result;
617 private Future<Void> doBatchDelete( Transaction txn, Collection<Key> keys) {
618 DeleteRequest baseReq = new DeleteRequest();
619 if (txn != null) {
620 TransactionImpl.ensureTxnActive(txn);
621 baseReq.setTransaction(localTxnToRemoteTxn(txn));
623 boolean group = !baseReq.hasTransaction();
624 Iterator<DeleteRequest> batches = deleteBatcher.getBatches(keys, baseReq, group);
625 List<Future<DeleteResponse>> futures = deleteBatcher.makeCalls(batches);
626 return registerInTransaction(txn, new MultiFuture<DeleteResponse, Void>(futures) {
627 @Override
628 public Void get() throws InterruptedException, ExecutionException {
629 for (Future<DeleteResponse> future : futures) {
630 future.get();
632 return null;
635 @Override
636 public Void get(long timeout, TimeUnit unit)
637 throws InterruptedException, ExecutionException, TimeoutException {
638 for (Future<DeleteResponse> future : futures) {
639 future.get(timeout, unit);
641 return null;
646 @Override
647 public Collection<Transaction> getActiveTransactions() {
648 return defaultTxnProvider.getAll();
652 * Register the provided future with the provided txn so that we know to
653 * perform a {@link java.util.concurrent.Future#get()} before the txn is
654 * committed.
656 * @param txn The txn with which the future must be associated.
657 * @param future The future to associate with the txn.
658 * @param <T> The type of the Future
659 * @return The same future that was passed in, for caller convenience.
661 private <T> Future<T> registerInTransaction( Transaction txn, Future<T> future) {
662 if (txn != null) {
663 defaultTxnProvider.addFuture(txn, future);
664 return new FutureHelper.TxnAwareFuture<T>(future, txn, defaultTxnProvider);
666 return future;
669 @Override
670 public Future<Transaction> beginTransaction() {
671 return beginTransaction(TransactionOptions.Builder.withDefaults());
674 @Override
675 public Future<Transaction> beginTransaction(TransactionOptions options) {
676 return new FutureHelper.FakeFuture<Transaction>(beginTransactionInternal(options, true));
679 @Override
680 public PreparedQuery prepare(Query query) {
681 return prepare(null, query);
684 @SuppressWarnings("deprecation")
685 @Override
686 public PreparedQuery prepare(Transaction txn, Query query) {
687 PreQueryContext context = new PreQueryContext(this, query);
688 datastoreServiceConfig.getDatastoreCallbacks().executePreQueryCallbacks(context);
690 query = context.getElements().get(0);
691 validateQuery(query);
692 List<MultiQueryBuilder> queriesToRun = QuerySplitHelper.splitQuery(query);
693 query.setFilter(null);
694 query.getFilterPredicates().clear();
695 if (queriesToRun.size() == 1 && queriesToRun.get(0).isSingleton()) {
696 query.getFilterPredicates().addAll(queriesToRun.get(0).getBaseFilters());
697 return new PreparedQueryImpl(apiConfig, datastoreServiceConfig, query, txn);
699 return new PreparedMultiQuery(apiConfig, datastoreServiceConfig, query, queriesToRun, txn);
702 @Override
703 public Future<KeyRange> allocateIds(String kind, long num) {
704 return allocateIds(null, kind, num);
707 static Reference buildAllocateIdsRef(
708 Key parent, String kind, AppIdNamespace appIdNamespace) {
709 if (parent != null && !parent.isComplete()) {
710 throw new IllegalArgumentException("parent key must be complete");
712 Key key = new Key(kind, parent, Key.NOT_ASSIGNED, "ignored", appIdNamespace);
713 return KeyTranslator.convertToPb(key);
716 @Override
717 public Future<KeyRange> allocateIds(final Key parent, final String kind, long num) {
718 if (num <= 0) {
719 throw new IllegalArgumentException("num must be > 0");
722 if (num > 1000000000) {
723 throw new IllegalArgumentException("num must be < 1 billion");
726 final AppIdNamespace appIdNamespace = datastoreServiceConfig.getAppIdNamespace();
727 Reference allocateIdsRef = buildAllocateIdsRef(parent, kind, appIdNamespace);
728 AllocateIdsRequest req =
729 new AllocateIdsRequest().setSize(num).setModelKey(allocateIdsRef);
730 AllocateIdsResponse resp = new AllocateIdsResponse();
731 Future<AllocateIdsResponse> future = makeAsyncCall(apiConfig, "AllocateIds", req, resp);
732 return new FutureWrapper<AllocateIdsResponse, KeyRange>(future) {
733 @Override
734 protected KeyRange wrap(AllocateIdsResponse resp) throws Exception {
735 return new KeyRange(parent, kind, resp.getStart(), resp.getEnd(), appIdNamespace);
738 @Override
739 protected Throwable convertException(Throwable cause) {
740 return cause;
745 Future<KeyRangeState> allocateIdRange(final KeyRange range) {
746 Key parent = range.getParent();
747 final String kind = range.getKind();
748 final long start = range.getStart().getId();
749 long end = range.getEnd().getId();
751 AllocateIdsRequest req = new AllocateIdsRequest()
752 .setModelKey(AsyncDatastoreServiceImpl.buildAllocateIdsRef(parent, kind, null))
753 .setMax(end);
754 AllocateIdsResponse resp = new AllocateIdsResponse();
755 Future<AllocateIdsResponse> future = makeAsyncCall(apiConfig, "AllocateIds", req, resp);
756 return new FutureWrapper<AllocateIdsResponse, KeyRangeState>(future) {
757 @Override
758 protected KeyRangeState wrap(AllocateIdsResponse resp) throws Exception {
759 Query query = new Query(kind).setKeysOnly();
760 query.addFilter(
761 Entity.KEY_RESERVED_PROPERTY, FilterOperator.GREATER_THAN_OR_EQUAL, range.getStart());
762 query.addFilter(
763 Entity.KEY_RESERVED_PROPERTY, FilterOperator.LESS_THAN_OR_EQUAL, range.getEnd());
764 List<Entity> collision = prepare(query).asList(withLimit(1));
766 if (!collision.isEmpty()) {
767 return KeyRangeState.COLLISION;
770 boolean raceCondition = start < resp.getStart();
771 return raceCondition ? KeyRangeState.CONTENTION : KeyRangeState.EMPTY;
774 @Override
775 protected Throwable convertException(Throwable cause) {
776 return cause;
781 protected DatastoreType getDatastoreType() {
782 if (datastoreType == null) {
783 datastoreType = quietGet(getDatastoreAttributes()).getDatastoreType();
785 return datastoreType;
788 @Override
789 public Future<DatastoreAttributes> getDatastoreAttributes() {
790 String appId = datastoreServiceConfig.getAppIdNamespace().getAppId();
791 DatastoreAttributes attributes = new DatastoreAttributes(appId);
792 return new FutureHelper.FakeFuture<DatastoreAttributes>(attributes);
795 @Override
796 public Future<Map<Index, IndexState>> getIndexes() {
797 StringProto req = new StringProto();
798 req.setValue(datastoreServiceConfig.getAppIdNamespace().getAppId());
799 return new FutureWrapper<CompositeIndices, Map<Index, IndexState>>(
800 makeAsyncCall(apiConfig, "GetIndices", req, new CompositeIndices())) {
801 @Override
802 protected Map<Index, IndexState> wrap(CompositeIndices indices) throws Exception {
803 Map<Index, IndexState> answer = new LinkedHashMap<Index, IndexState>();
804 for (CompositeIndex ci : indices.indexs()) {
805 Index index = IndexTranslator.convertFromPb(ci);
806 switch (ci.getStateEnum()) {
807 case DELETED:
808 answer.put(index, IndexState.DELETING);
809 break;
810 case ERROR:
811 answer.put(index, IndexState.ERROR);
812 break;
813 case READ_WRITE:
814 answer.put(index, IndexState.SERVING);
815 break;
816 case WRITE_ONLY:
817 answer.put(index, IndexState.BUILDING);
818 break;
819 default:
820 logger.log(Level.WARNING, "Unrecognized index state for " + index);
821 break;
824 return answer;
827 @Override
828 protected Throwable convertException(Throwable cause) {
829 return cause;