1 // Copyright 2010 Google Inc. All rights reserved.
3 package com
.google
.appengine
.api
.datastore
;
5 import static com
.google
.appengine
.api
.datastore
.DatastoreApiHelper
.makeAsyncCall
;
6 import static com
.google
.appengine
.api
.datastore
.DatastoreAttributes
.DatastoreType
.MASTER_SLAVE
;
7 import static com
.google
.appengine
.api
.datastore
.FetchOptions
.Builder
.withLimit
;
8 import static com
.google
.appengine
.api
.datastore
.FutureHelper
.quietGet
;
9 import static com
.google
.appengine
.api
.datastore
.ReadPolicy
.Consistency
.EVENTUAL
;
11 import com
.google
.appengine
.api
.datastore
.Batcher
.IndexedItem
;
12 import com
.google
.appengine
.api
.datastore
.Batcher
.ReorderingMultiFuture
;
13 import com
.google
.appengine
.api
.datastore
.DatastoreAttributes
.DatastoreType
;
14 import com
.google
.appengine
.api
.datastore
.DatastoreService
.KeyRangeState
;
15 import com
.google
.appengine
.api
.datastore
.EntityCachingStrategy
.PreGetCachingResult
;
16 import com
.google
.appengine
.api
.datastore
.EntityCachingStrategy
.PreMutationCachingResult
;
17 import com
.google
.appengine
.api
.datastore
.FutureHelper
.MultiFuture
;
18 import com
.google
.appengine
.api
.datastore
.Index
.IndexState
;
19 import com
.google
.appengine
.api
.datastore
.Query
.FilterOperator
;
20 import com
.google
.appengine
.api
.utils
.FutureWrapper
;
21 import com
.google
.apphosting
.api
.ApiBasePb
.StringProto
;
22 import com
.google
.apphosting
.datastore
.DatastoreV3Pb
.AllocateIdsRequest
;
23 import com
.google
.apphosting
.datastore
.DatastoreV3Pb
.AllocateIdsResponse
;
24 import com
.google
.apphosting
.datastore
.DatastoreV3Pb
.CompositeIndices
;
25 import com
.google
.apphosting
.datastore
.DatastoreV3Pb
.DeleteRequest
;
26 import com
.google
.apphosting
.datastore
.DatastoreV3Pb
.DeleteResponse
;
27 import com
.google
.apphosting
.datastore
.DatastoreV3Pb
.GetRequest
;
28 import com
.google
.apphosting
.datastore
.DatastoreV3Pb
.GetResponse
;
29 import com
.google
.apphosting
.datastore
.DatastoreV3Pb
.PutRequest
;
30 import com
.google
.apphosting
.datastore
.DatastoreV3Pb
.PutResponse
;
31 import com
.google
.common
.collect
.ImmutableList
;
32 import com
.google
.common
.collect
.Lists
;
33 import com
.google
.common
.collect
.Maps
;
34 import com
.google
.common
.collect
.Sets
;
35 import com
.google
.io
.protocol
.ProtocolMessage
;
36 import com
.google
.storage
.onestore
.v3
.OnestoreEntity
.CompositeIndex
;
37 import com
.google
.storage
.onestore
.v3
.OnestoreEntity
.EntityProto
;
38 import com
.google
.storage
.onestore
.v3
.OnestoreEntity
.Reference
;
40 import java
.util
.ArrayList
;
41 import java
.util
.Arrays
;
42 import java
.util
.Collection
;
43 import java
.util
.Collections
;
44 import java
.util
.HashMap
;
45 import java
.util
.Iterator
;
46 import java
.util
.LinkedHashMap
;
47 import java
.util
.List
;
50 import java
.util
.concurrent
.ExecutionException
;
51 import java
.util
.concurrent
.Future
;
52 import java
.util
.concurrent
.TimeUnit
;
53 import java
.util
.concurrent
.TimeoutException
;
54 import java
.util
.logging
.Level
;
57 * Implements AsyncDatastoreService by making calls to ApiProxy.
60 class AsyncDatastoreServiceImpl
extends BaseDatastoreServiceImpl
61 implements AsyncDatastoreService
, CurrentTransactionProvider
{
64 * A base batcher for DatastoreV3 operations executed in the context of an {@link
65 * AsyncDatastoreServiceImpl}.
66 * @param <S> the response message type
67 * @param <R> the request message type
68 * @param <F> the Java specific representation of a value
69 * @param <T> the proto representation of value
71 private abstract class V3Batcher
<S
extends ProtocolMessage
<S
>, R
extends ProtocolMessage
<R
>,
72 F
, T
extends ProtocolMessage
<T
>> extends Batcher
<R
, F
, T
> {
73 protected abstract Future
<S
> makeCall(R batch
);
76 final R
newBatch(R baseBatch
) {
77 return baseBatch
.clone();
81 final int getMaxSize() {
82 return datastoreServiceConfig
.maxRpcSizeBytes
;
86 final int getMaxGroups() {
87 return datastoreServiceConfig
.maxEntityGroupsPerRpc
;
90 final List
<Future
<S
>> makeCalls(Iterator
<R
> batches
) {
91 List
<Future
<S
>> futures
= new ArrayList
<Future
<S
>>();
92 while (batches
.hasNext()) {
93 futures
.add(makeCall(batches
.next()));
100 * A base batcher for operations that operate on {@link Key}s.
101 * @param <S> the response message type
102 * @param <R> the request message type
104 private abstract class V3KeyBatcher
<S
extends ProtocolMessage
<S
>, R
extends ProtocolMessage
<R
>>
105 extends V3Batcher
<S
, R
, Key
, Reference
> {
107 final Object
getGroup(Key value
) {
108 return value
.getRootKey();
112 final Reference
toPb(Key value
) {
113 return KeyTranslator
.convertToPb(value
);
117 private final V3KeyBatcher
<DeleteResponse
, DeleteRequest
> deleteBatcher
=
118 new V3KeyBatcher
<DeleteResponse
, DeleteRequest
>() {
120 void addToBatch(Reference value
, DeleteRequest batch
) {
126 return datastoreServiceConfig
.maxBatchWriteEntities
;
130 protected Future
<DeleteResponse
> makeCall(DeleteRequest batch
) {
131 return makeAsyncCall(apiConfig
, "Delete", batch
, new DeleteResponse());
135 private final V3KeyBatcher
<GetResponse
, GetRequest
> getByKeyBatcher
=
136 new V3KeyBatcher
<GetResponse
, GetRequest
>() {
138 void addToBatch(Reference value
, GetRequest batch
) {
144 return datastoreServiceConfig
.maxBatchReadEntities
;
148 protected Future
<GetResponse
> makeCall(GetRequest batch
) {
149 return makeAsyncCall(apiConfig
, "Get", batch
, new GetResponse());
153 private final V3Batcher
<GetResponse
, GetRequest
, Reference
, Reference
> getByReferenceBatcher
=
154 new V3Batcher
<GetResponse
, GetRequest
, Reference
, Reference
>() {
156 final Object
getGroup(Reference value
) {
157 return value
.getPath().getElement(0);
161 final Reference
toPb(Reference value
) {
166 void addToBatch(Reference value
, GetRequest batch
) {
172 return datastoreServiceConfig
.maxBatchReadEntities
;
176 protected Future
<GetResponse
> makeCall(GetRequest batch
) {
177 return makeAsyncCall(apiConfig
, "Get", batch
, new GetResponse());
181 private final V3Batcher
<PutResponse
, PutRequest
, Entity
, EntityProto
> putBatcher
=
182 new V3Batcher
<PutResponse
, PutRequest
, Entity
, EntityProto
>() {
184 Object
getGroup(Entity value
) {
185 return value
.getKey().getRootKey();
189 void addToBatch(EntityProto value
, PutRequest batch
) {
190 batch
.addEntity(value
);
195 return datastoreServiceConfig
.maxBatchWriteEntities
;
199 protected Future
<PutResponse
> makeCall(PutRequest batch
) {
200 return makeAsyncCall(apiConfig
, "Put", batch
, new PutResponse());
204 EntityProto
toPb(Entity value
) {
205 return EntityTranslator
.convertToPb(value
);
209 private DatastoreType datastoreType
;
211 public AsyncDatastoreServiceImpl(
212 DatastoreServiceConfig datastoreServiceConfig
, TransactionStack defaultTxnProvider
) {
213 super(datastoreServiceConfig
, defaultTxnProvider
);
217 public Future
<Entity
> get(Key key
) {
219 throw new NullPointerException("key cannot be null");
221 return wrapSingleGet(key
, get(Arrays
.asList(key
)));
225 public Future
<Entity
> get( Transaction txn
, final Key key
) {
227 throw new NullPointerException("key cannot be null");
229 return wrapSingleGet(key
, get(txn
, Arrays
.asList(key
)));
232 private Future
<Entity
> wrapSingleGet(final Key key
, Future
<Map
<Key
, Entity
>> futureEntities
) {
233 return new FutureWrapper
<Map
<Key
, Entity
>, Entity
>(futureEntities
) {
235 protected Entity
wrap(Map
<Key
, Entity
> entities
) throws Exception
{
236 Entity entity
= entities
.get(key
);
237 if (entity
== null) {
238 throw new EntityNotFoundException(key
);
244 protected Throwable
convertException(Throwable cause
) {
251 public Future
<Map
<Key
, Entity
>> get(final Iterable
<Key
> keys
) {
252 return new TransactionRunner
<Map
<Key
, Entity
>>(getOrCreateTransaction()) {
254 protected Future
<Map
<Key
, Entity
>> runInternal(Transaction txn
) {
255 return get(txn
, keys
);
257 }.runReadInTransaction();
261 public Future
<Map
<Key
, Entity
>> get(Transaction txn
, Iterable
<Key
> keys
) {
263 throw new NullPointerException("keys cannot be null");
266 List
<Key
> keyList
= Lists
.newArrayList(keys
);
268 Map
<Key
, Entity
> resultMap
= new HashMap
<Key
, Entity
>();
269 PreGetContext preGetContext
= new PreGetContext(this, keyList
, resultMap
);
270 datastoreServiceConfig
.getDatastoreCallbacks().executePreGetCallbacks(preGetContext
);
272 keyList
.removeAll(resultMap
.keySet());
274 PreGetCachingResult preGetCachingResult
=
275 entityCachingStrategy
.preGet(this, keyList
, resultMap
);
276 keyList
.removeAll(preGetCachingResult
.getKeysToSkipLoading());
278 Future
<Map
<Key
, Entity
>> result
= doBatchGet(txn
, Sets
.newLinkedHashSet(keyList
), resultMap
);
280 result
= entityCachingStrategy
.createPostGetFuture(result
, preGetCachingResult
);
281 return new PostLoadFuture(result
, datastoreServiceConfig
.getDatastoreCallbacks(), this);
284 private Future
<Map
<Key
, Entity
>> doBatchGet( Transaction txn
, final Set
<Key
> keysToGet
, final Map
<Key
, Entity
> resultMap
) {
285 final GetRequest baseReq
= new GetRequest();
286 baseReq
.setAllowDeferred(true);
288 TransactionImpl
.ensureTxnActive(txn
);
289 baseReq
.setTransaction(localTxnToRemoteTxn(txn
));
291 if (datastoreServiceConfig
.getReadPolicy().getConsistency() == EVENTUAL
) {
292 baseReq
.setFailoverMs(ARBITRARY_FAILOVER_READ_MS
);
293 baseReq
.setStrong(false);
296 final boolean shouldUseMultipleBatches
= getDatastoreType() != MASTER_SLAVE
&& txn
== null
297 && datastoreServiceConfig
.getReadPolicy().getConsistency() != EVENTUAL
;
299 Iterator
<GetRequest
> batches
=
300 getByKeyBatcher
.getBatches(keysToGet
, baseReq
, shouldUseMultipleBatches
);
301 List
<Future
<GetResponse
>> futures
= getByKeyBatcher
.makeCalls(batches
);
303 return registerInTransaction(txn
, new MultiFuture
<GetResponse
, Map
<Key
, Entity
>>(futures
) {
305 * A Map from a Reference without an App Id specified to the corresponding Key that the user
306 * requested. This is a workaround for the Remote API to support matching requested Keys to
307 * Entities that may be from a different App Id .
309 private Map
<Reference
, Key
> keyMapIgnoringAppId
;
312 public Map
<Key
, Entity
> get() throws InterruptedException
, ExecutionException
{
314 aggregate(futures
, null, null);
315 } catch (TimeoutException e
) {
316 throw new RuntimeException(e
);
322 public Map
<Key
, Entity
> get(long timeout
, TimeUnit unit
)
323 throws InterruptedException
, ExecutionException
, TimeoutException
{
324 aggregate(futures
, timeout
, unit
);
329 * Aggregates the results of the given Futures and issues (synchronous) followup requests if
330 * any results were deferred.
332 * @param currentFutures the Futures corresponding to the batches of the initial GetRequests.
333 * @param timeout the timeout to use while waiting on the Future, or null for none.
334 * @param timeoutUnit the unit of the timeout, or null for none.
336 private void aggregate(
337 Iterable
<Future
<GetResponse
>> currentFutures
, Long timeout
, TimeUnit timeoutUnit
)
338 throws ExecutionException
, InterruptedException
, TimeoutException
{
340 List
<Reference
> deferredRefs
= Lists
.newLinkedList();
342 for (Future
<GetResponse
> currentFuture
: currentFutures
) {
343 GetResponse resp
= getFutureWithOptionalTimeout(currentFuture
, timeout
, timeoutUnit
);
344 addEntitiesToResultMap(resp
);
345 deferredRefs
.addAll(resp
.deferreds());
348 if (deferredRefs
.isEmpty()) {
352 Iterator
<GetRequest
> followupBatches
=
353 getByReferenceBatcher
.getBatches(deferredRefs
, baseReq
, shouldUseMultipleBatches
);
354 currentFutures
= getByReferenceBatcher
.makeCalls(followupBatches
);
359 * Convenience method to get the result of a Future and optionally specify a timeout.
361 * @param future the Future to get.
362 * @param timeout the timeout to use while waiting on the Future, or null for none.
363 * @param timeoutUnit the unit of the timeout, or null for none.
364 * @return the result of the Future.
365 * @throws TimeoutException will only ever be thrown if a timeout is provided.
367 private GetResponse
getFutureWithOptionalTimeout(
368 Future
<GetResponse
> future
, Long timeout
, TimeUnit timeoutUnit
)
369 throws ExecutionException
, InterruptedException
, TimeoutException
{
370 if (timeout
== null) {
373 return future
.get(timeout
, timeoutUnit
);
378 * Adds the Entities from the GetResponse to the resultMap. Will omit Keys that were missing.
379 * Handles Keys with different App Ids from the Entity.Key. See
380 * {@link #findKeyFromRequestIgnoringAppId(Reference)}
382 private void addEntitiesToResultMap(GetResponse response
) {
383 for (GetResponse
.Entity entityResult
: response
.entitys()) {
384 if (entityResult
.hasEntity()) {
385 Entity responseEntity
= EntityTranslator
.createFromPb(entityResult
.getEntity());
386 Key responseKey
= responseEntity
.getKey();
388 if (!keysToGet
.contains(responseKey
)) {
389 responseKey
= findKeyFromRequestIgnoringAppId(entityResult
.getEntity().getKey());
391 resultMap
.put(responseKey
, responseEntity
);
397 * This is a hack to support calls going through the Remote API. The problem is:
399 * The requested Key may have a local app id.
400 * The returned Entity may have a remote app id.
402 * In this case, we want to return a Map.Entry with {LocalKey, RemoteEntity}. This way, the
403 * user can always do map.get(keyFromRequest).
405 * This method will find the corresponding requested LocalKey for a RemoteKey by ignoring the
408 * Note that we used to be able to rely on the order of the Response Entities matching the
409 * order of Request Keys. We can no longer do so with the addition of Deferred results.
411 * @param referenceFromResponse the reference from the Response that did not match any of the
412 * requested Keys. (May be mutated.)
413 * @return the Key from the request that corresponds to the given Reference from the Response
416 private Key
findKeyFromRequestIgnoringAppId(Reference referenceFromResponse
) {
417 if (keyMapIgnoringAppId
== null) {
418 keyMapIgnoringAppId
= Maps
.newHashMap();
419 for (Key requestKey
: keysToGet
) {
420 Reference requestKeyAsRefWithoutApp
= KeyTranslator
.convertToPb(requestKey
).clearApp();
421 keyMapIgnoringAppId
.put(requestKeyAsRefWithoutApp
, requestKey
);
425 Key result
= keyMapIgnoringAppId
.get(referenceFromResponse
.clearApp());
426 if (result
== null) {
427 throw new DatastoreFailureException("Internal error");
435 public Future
<Key
> put(Entity entity
) {
436 return wrapSinglePut(put(Arrays
.asList(entity
)));
440 public Future
<Key
> put(Transaction txn
, Entity entity
) {
441 return wrapSinglePut(put(txn
, Arrays
.asList(entity
)));
444 private Future
<Key
> wrapSinglePut(Future
<List
<Key
>> futureKeys
) {
445 return new FutureWrapper
<List
<Key
>, Key
>(futureKeys
) {
447 protected Key
wrap(List
<Key
> keys
) throws Exception
{
452 protected Throwable
convertException(Throwable cause
) {
459 public Future
<List
<Key
>> put(final Iterable
<Entity
> entities
) {
460 return new TransactionRunner
<List
<Key
>>(getOrCreateTransaction()) {
462 protected Future
<List
<Key
>> runInternal(Transaction txn
) {
463 return put(txn
, entities
);
465 }.runWriteInTransaction();
469 public Future
<List
<Key
>> put( Transaction txn
, Iterable
<Entity
> entities
) {
470 List
<Entity
> entityList
= entities
instanceof List ?
471 (List
<Entity
>) entities
: Lists
.newArrayList(entities
);
472 PutContext prePutContext
= new PutContext(this, entityList
);
473 datastoreServiceConfig
.getDatastoreCallbacks().executePrePutCallbacks(prePutContext
);
474 PreMutationCachingResult preMutationCachingResult
=
475 entityCachingStrategy
.prePut(this, entityList
);
477 List
<IndexedItem
<Key
>> indexedKeysToSkip
= Lists
.newArrayList();
478 Set
<Key
> mutationKeysToSkip
= preMutationCachingResult
.getMutationKeysToSkip();
479 List
<Entity
> entitiesToPut
;
480 if (!mutationKeysToSkip
.isEmpty()) {
481 entitiesToPut
= Lists
.newArrayList();
483 for (Entity entity
: entityList
) {
484 if (mutationKeysToSkip
.contains(entity
.getKey())) {
485 indexedKeysToSkip
.add(new IndexedItem
<Key
>(index
++, entity
.getKey()));
487 entitiesToPut
.add(entity
);
492 entitiesToPut
= ImmutableList
.copyOf(entities
);
495 Future
<List
<Key
>> result
= combinePutResult(doBatchPut(txn
, entitiesToPut
), indexedKeysToSkip
);
499 entityCachingStrategy
.createPostMutationFuture(result
, preMutationCachingResult
);
500 PutContext postPutContext
= new PutContext(this, entityList
);
501 result
= new PostPutFuture(result
, datastoreServiceConfig
.getDatastoreCallbacks(),
504 defaultTxnProvider
.addPutEntities(txn
, entityList
);
509 private Future
<List
<Key
>> combinePutResult(Future
<List
<Key
>> rpcResult
,
510 final List
<IndexedItem
<Key
>> skippedKeys
) {
511 if (skippedKeys
.isEmpty()) {
515 return new FutureWrapper
<List
<Key
>, List
<Key
>>(rpcResult
) {
517 protected List
<Key
> wrap(List
<Key
> result
) throws Exception
{
518 List
<Key
> combined
= Lists
.newLinkedList(result
);
519 for (IndexedItem
<Key
> indexedKey
: skippedKeys
) {
520 combined
.add(indexedKey
.index
, indexedKey
.item
);
526 protected Throwable
convertException(Throwable cause
) {
532 private Future
<List
<Key
>> doBatchPut( Transaction txn
,
533 final List
<Entity
> entities
) {
534 PutRequest baseReq
= new PutRequest();
536 TransactionImpl
.ensureTxnActive(txn
);
537 baseReq
.setTransaction(localTxnToRemoteTxn(txn
));
539 boolean group
= !baseReq
.hasTransaction();
540 List
<Integer
> order
= Lists
.newArrayListWithCapacity(entities
.size());
541 Iterator
<PutRequest
> batches
= putBatcher
.getBatches(entities
, baseReq
, group
, order
);
542 List
<Future
<PutResponse
>> futures
= putBatcher
.makeCalls(batches
);
544 return registerInTransaction(txn
,
545 new ReorderingMultiFuture
<PutResponse
, List
<Key
>>(futures
, order
) {
547 protected List
<Key
> aggregate(
548 PutResponse intermediateResult
, Iterator
<Integer
> indexItr
, List
<Key
> result
) {
549 for (Reference reference
: intermediateResult
.keys()) {
550 int index
= indexItr
.next();
551 Key key
= entities
.get(index
).getKey();
552 KeyTranslator
.updateKey(reference
, key
);
553 result
.set(index
, key
);
559 protected List
<Key
> initResult(int size
) {
560 List
<Key
> result
= new ArrayList
<Key
>(Collections
.<Key
>nCopies(size
, null));
567 public Future
<Void
> delete(Key
... keys
) {
568 return delete(Arrays
.asList(keys
));
572 public Future
<Void
> delete(Transaction txn
, Key
... keys
) {
573 return delete(txn
, Arrays
.asList(keys
));
577 public Future
<Void
> delete(final Iterable
<Key
> keys
) {
578 return new TransactionRunner
<Void
>(getOrCreateTransaction()) {
580 protected Future
<Void
> runInternal(Transaction txn
) {
581 return delete(txn
, keys
);
583 }.runWriteInTransaction();
587 public Future
<Void
> delete(Transaction txn
, Iterable
<Key
> keys
) {
588 List
<Key
> allKeys
= keys
instanceof List ?
589 (List
<Key
>) keys
: ImmutableList
.copyOf(keys
);
590 DeleteContext preDeleteContext
= new DeleteContext(this, allKeys
);
591 datastoreServiceConfig
.getDatastoreCallbacks().executePreDeleteCallbacks(preDeleteContext
);
592 PreMutationCachingResult preMutationCachingResult
=
593 entityCachingStrategy
.preDelete(this, allKeys
);
594 Future
<Void
> result
= null;
595 Collection
<Key
> keysToDelete
;
596 Set
<Key
> keysToSkip
= preMutationCachingResult
.getMutationKeysToSkip();
597 if (keysToSkip
.isEmpty()) {
598 keysToDelete
= allKeys
;
600 Set
<Key
> keySet
= Sets
.newHashSet(allKeys
);
601 keySet
.removeAll(keysToSkip
);
602 keysToDelete
= keySet
;
604 result
= doBatchDelete(txn
, keysToDelete
);
607 result
= entityCachingStrategy
.createPostMutationFuture(result
, preMutationCachingResult
);
608 result
= new PostDeleteFuture(
609 result
, datastoreServiceConfig
.getDatastoreCallbacks(),
610 new DeleteContext(this, allKeys
));
612 defaultTxnProvider
.addDeletedKeys(txn
, allKeys
);
617 private Future
<Void
> doBatchDelete( Transaction txn
, Collection
<Key
> keys
) {
618 DeleteRequest baseReq
= new DeleteRequest();
620 TransactionImpl
.ensureTxnActive(txn
);
621 baseReq
.setTransaction(localTxnToRemoteTxn(txn
));
623 boolean group
= !baseReq
.hasTransaction();
624 Iterator
<DeleteRequest
> batches
= deleteBatcher
.getBatches(keys
, baseReq
, group
);
625 List
<Future
<DeleteResponse
>> futures
= deleteBatcher
.makeCalls(batches
);
626 return registerInTransaction(txn
, new MultiFuture
<DeleteResponse
, Void
>(futures
) {
628 public Void
get() throws InterruptedException
, ExecutionException
{
629 for (Future
<DeleteResponse
> future
: futures
) {
636 public Void
get(long timeout
, TimeUnit unit
)
637 throws InterruptedException
, ExecutionException
, TimeoutException
{
638 for (Future
<DeleteResponse
> future
: futures
) {
639 future
.get(timeout
, unit
);
647 public Collection
<Transaction
> getActiveTransactions() {
648 return defaultTxnProvider
.getAll();
652 * Register the provided future with the provided txn so that we know to
653 * perform a {@link java.util.concurrent.Future#get()} before the txn is
656 * @param txn The txn with which the future must be associated.
657 * @param future The future to associate with the txn.
658 * @param <T> The type of the Future
659 * @return The same future that was passed in, for caller convenience.
661 private <T
> Future
<T
> registerInTransaction( Transaction txn
, Future
<T
> future
) {
663 defaultTxnProvider
.addFuture(txn
, future
);
664 return new FutureHelper
.TxnAwareFuture
<T
>(future
, txn
, defaultTxnProvider
);
670 public Future
<Transaction
> beginTransaction() {
671 return beginTransaction(TransactionOptions
.Builder
.withDefaults());
675 public Future
<Transaction
> beginTransaction(TransactionOptions options
) {
676 return new FutureHelper
.FakeFuture
<Transaction
>(beginTransactionInternal(options
, true));
680 public PreparedQuery
prepare(Query query
) {
681 return prepare(null, query
);
684 @SuppressWarnings("deprecation")
686 public PreparedQuery
prepare(Transaction txn
, Query query
) {
687 PreQueryContext context
= new PreQueryContext(this, query
);
688 datastoreServiceConfig
.getDatastoreCallbacks().executePreQueryCallbacks(context
);
690 query
= context
.getElements().get(0);
691 validateQuery(query
);
692 List
<MultiQueryBuilder
> queriesToRun
= QuerySplitHelper
.splitQuery(query
);
693 query
.setFilter(null);
694 query
.getFilterPredicates().clear();
695 if (queriesToRun
.size() == 1 && queriesToRun
.get(0).isSingleton()) {
696 query
.getFilterPredicates().addAll(queriesToRun
.get(0).getBaseFilters());
697 return new PreparedQueryImpl(apiConfig
, datastoreServiceConfig
, query
, txn
);
699 return new PreparedMultiQuery(apiConfig
, datastoreServiceConfig
, query
, queriesToRun
, txn
);
703 public Future
<KeyRange
> allocateIds(String kind
, long num
) {
704 return allocateIds(null, kind
, num
);
707 static Reference
buildAllocateIdsRef(
708 Key parent
, String kind
, AppIdNamespace appIdNamespace
) {
709 if (parent
!= null && !parent
.isComplete()) {
710 throw new IllegalArgumentException("parent key must be complete");
712 Key key
= new Key(kind
, parent
, Key
.NOT_ASSIGNED
, "ignored", appIdNamespace
);
713 return KeyTranslator
.convertToPb(key
);
717 public Future
<KeyRange
> allocateIds(final Key parent
, final String kind
, long num
) {
719 throw new IllegalArgumentException("num must be > 0");
722 if (num
> 1000000000) {
723 throw new IllegalArgumentException("num must be < 1 billion");
726 final AppIdNamespace appIdNamespace
= datastoreServiceConfig
.getAppIdNamespace();
727 Reference allocateIdsRef
= buildAllocateIdsRef(parent
, kind
, appIdNamespace
);
728 AllocateIdsRequest req
=
729 new AllocateIdsRequest().setSize(num
).setModelKey(allocateIdsRef
);
730 AllocateIdsResponse resp
= new AllocateIdsResponse();
731 Future
<AllocateIdsResponse
> future
= makeAsyncCall(apiConfig
, "AllocateIds", req
, resp
);
732 return new FutureWrapper
<AllocateIdsResponse
, KeyRange
>(future
) {
734 protected KeyRange
wrap(AllocateIdsResponse resp
) throws Exception
{
735 return new KeyRange(parent
, kind
, resp
.getStart(), resp
.getEnd(), appIdNamespace
);
739 protected Throwable
convertException(Throwable cause
) {
745 Future
<KeyRangeState
> allocateIdRange(final KeyRange range
) {
746 Key parent
= range
.getParent();
747 final String kind
= range
.getKind();
748 final long start
= range
.getStart().getId();
749 long end
= range
.getEnd().getId();
751 AllocateIdsRequest req
= new AllocateIdsRequest()
752 .setModelKey(AsyncDatastoreServiceImpl
.buildAllocateIdsRef(parent
, kind
, null))
754 AllocateIdsResponse resp
= new AllocateIdsResponse();
755 Future
<AllocateIdsResponse
> future
= makeAsyncCall(apiConfig
, "AllocateIds", req
, resp
);
756 return new FutureWrapper
<AllocateIdsResponse
, KeyRangeState
>(future
) {
758 protected KeyRangeState
wrap(AllocateIdsResponse resp
) throws Exception
{
759 Query query
= new Query(kind
).setKeysOnly();
761 Entity
.KEY_RESERVED_PROPERTY
, FilterOperator
.GREATER_THAN_OR_EQUAL
, range
.getStart());
763 Entity
.KEY_RESERVED_PROPERTY
, FilterOperator
.LESS_THAN_OR_EQUAL
, range
.getEnd());
764 List
<Entity
> collision
= prepare(query
).asList(withLimit(1));
766 if (!collision
.isEmpty()) {
767 return KeyRangeState
.COLLISION
;
770 boolean raceCondition
= start
< resp
.getStart();
771 return raceCondition ? KeyRangeState
.CONTENTION
: KeyRangeState
.EMPTY
;
775 protected Throwable
convertException(Throwable cause
) {
781 protected DatastoreType
getDatastoreType() {
782 if (datastoreType
== null) {
783 datastoreType
= quietGet(getDatastoreAttributes()).getDatastoreType();
785 return datastoreType
;
789 public Future
<DatastoreAttributes
> getDatastoreAttributes() {
790 String appId
= datastoreServiceConfig
.getAppIdNamespace().getAppId();
791 DatastoreAttributes attributes
= new DatastoreAttributes(appId
);
792 return new FutureHelper
.FakeFuture
<DatastoreAttributes
>(attributes
);
796 public Future
<Map
<Index
, IndexState
>> getIndexes() {
797 StringProto req
= new StringProto();
798 req
.setValue(datastoreServiceConfig
.getAppIdNamespace().getAppId());
799 return new FutureWrapper
<CompositeIndices
, Map
<Index
, IndexState
>>(
800 makeAsyncCall(apiConfig
, "GetIndices", req
, new CompositeIndices())) {
802 protected Map
<Index
, IndexState
> wrap(CompositeIndices indices
) throws Exception
{
803 Map
<Index
, IndexState
> answer
= new LinkedHashMap
<Index
, IndexState
>();
804 for (CompositeIndex ci
: indices
.indexs()) {
805 Index index
= IndexTranslator
.convertFromPb(ci
);
806 switch (ci
.getStateEnum()) {
808 answer
.put(index
, IndexState
.DELETING
);
811 answer
.put(index
, IndexState
.ERROR
);
814 answer
.put(index
, IndexState
.SERVING
);
817 answer
.put(index
, IndexState
.BUILDING
);
820 logger
.log(Level
.WARNING
, "Unrecognized index state for " + index
);
828 protected Throwable
convertException(Throwable cause
) {