1 package com
.google
.appengine
.api
.datastore
;
3 import static com
.google
.appengine
.api
.datastore
.DatastoreAttributes
.DatastoreType
.MASTER_SLAVE
;
4 import static com
.google
.appengine
.api
.datastore
.ReadPolicy
.Consistency
.EVENTUAL
;
6 import com
.google
.appengine
.api
.datastore
.Batcher
.ReorderingMultiFuture
;
7 import com
.google
.appengine
.api
.datastore
.DatastoreService
.KeyRangeState
;
8 import com
.google
.appengine
.api
.datastore
.DatastoreServiceConfig
.ApiVersion
;
9 import com
.google
.appengine
.api
.datastore
.FutureHelper
.MultiFuture
;
10 import com
.google
.appengine
.api
.datastore
.Index
.IndexState
;
11 import com
.google
.appengine
.api
.utils
.FutureWrapper
;
12 import com
.google
.common
.collect
.ImmutableList
;
13 import com
.google
.common
.collect
.Lists
;
14 import com
.google
.common
.collect
.Maps
;
15 import com
.google
.datastore
.v1beta3
.AllocateIdsRequest
;
16 import com
.google
.datastore
.v1beta3
.AllocateIdsResponse
;
17 import com
.google
.datastore
.v1beta3
.BeginTransactionRequest
;
18 import com
.google
.datastore
.v1beta3
.BeginTransactionResponse
;
19 import com
.google
.datastore
.v1beta3
.CommitRequest
;
20 import com
.google
.datastore
.v1beta3
.CommitResponse
;
21 import com
.google
.datastore
.v1beta3
.EntityResult
;
22 import com
.google
.datastore
.v1beta3
.Key
.PathElement
;
23 import com
.google
.datastore
.v1beta3
.Key
.PathElement
.IdTypeCase
;
24 import com
.google
.datastore
.v1beta3
.LookupRequest
;
25 import com
.google
.datastore
.v1beta3
.LookupResponse
;
26 import com
.google
.datastore
.v1beta3
.Mutation
;
27 import com
.google
.datastore
.v1beta3
.MutationResult
;
28 import com
.google
.datastore
.v1beta3
.ReadOptions
;
29 import com
.google
.protobuf
.Message
;
31 import java
.util
.Arrays
;
32 import java
.util
.Collection
;
33 import java
.util
.Collections
;
34 import java
.util
.Iterator
;
35 import java
.util
.List
;
38 import java
.util
.concurrent
.ExecutionException
;
39 import java
.util
.concurrent
.Future
;
40 import java
.util
.concurrent
.TimeUnit
;
41 import java
.util
.concurrent
.TimeoutException
;
44 * An implementation of {@link AsyncDatastoreService} using the Cloud Datastore v1 API.
46 class AsyncCloudDatastoreV1ServiceImpl
extends BaseAsyncDatastoreServiceImpl
{
49 * A base batcher for Cloud Datastore v1 operations executed in the context of an {@link
50 * AsyncCloudDatastoreV1ServiceImpl}.
52 * @param <S> the response message type
53 * @param <R> the request message builder type
54 * @param <F> the Java specific representation of a value
55 * @param <T> the proto representation of value
57 private abstract class V1Batcher
<S
extends Message
, R
extends Message
.Builder
, F
,
58 T
extends Message
> extends BaseRpcBatcher
<S
, R
, F
, T
> {
60 @SuppressWarnings("unchecked")
61 final R
newBatch(R baseBatch
) {
62 return (R
) baseBatch
.clone();
66 private final V1Batcher
<CommitResponse
, CommitRequest
.Builder
, Key
, Mutation
> deleteBatcher
=
67 new V1Batcher
<CommitResponse
, CommitRequest
.Builder
, Key
, Mutation
>() {
69 void addToBatch(Mutation mutation
, CommitRequest
.Builder batch
) {
70 batch
.addMutations(mutation
);
75 return datastoreServiceConfig
.maxBatchWriteEntities
;
79 protected Future
<CommitResponse
> makeCall(CommitRequest
.Builder batch
) {
80 return datastoreProxy
.commit(batch
.build());
84 final Object
getGroup(Key key
) {
85 return key
.getRootKey();
89 final Mutation
toPb(Key value
) {
90 return Mutation
.newBuilder()
91 .setDelete(DataTypeTranslator
.toV1Key(value
))
96 private final V1Batcher
<LookupResponse
, LookupRequest
.Builder
, Key
,
97 com
.google
.datastore
.v1beta3
.Key
> lookupByKeyBatcher
=
98 new V1Batcher
<LookupResponse
, LookupRequest
.Builder
, Key
,
99 com
.google
.datastore
.v1beta3
.Key
>() {
101 void addToBatch(com
.google
.datastore
.v1beta3
.Key key
, LookupRequest
.Builder batch
) {
107 return datastoreServiceConfig
.maxBatchReadEntities
;
111 protected Future
<LookupResponse
> makeCall(LookupRequest
.Builder batch
) {
112 return datastoreProxy
.lookup(batch
.build());
116 final Object
getGroup(Key key
) {
117 return key
.getRootKey();
121 final com
.google
.datastore
.v1beta3
.Key
toPb(Key value
) {
122 return DataTypeTranslator
.toV1Key(value
).build();
126 private final V1Batcher
<LookupResponse
, LookupRequest
.Builder
, com
.google
.datastore
.v1beta3
.Key
,
127 com
.google
.datastore
.v1beta3
.Key
>
129 new V1Batcher
<LookupResponse
, LookupRequest
.Builder
, com
.google
.datastore
.v1beta3
.Key
,
130 com
.google
.datastore
.v1beta3
.Key
>() {
132 void addToBatch(com
.google
.datastore
.v1beta3
.Key key
, LookupRequest
.Builder batch
) {
138 return datastoreServiceConfig
.maxBatchReadEntities
;
142 protected Future
<LookupResponse
> makeCall(LookupRequest
.Builder batch
) {
143 return datastoreProxy
.lookup(batch
.build());
147 final Object
getGroup(com
.google
.datastore
.v1beta3
.Key key
) {
148 return key
.getPath(0);
152 final com
.google
.datastore
.v1beta3
.Key
toPb(com
.google
.datastore
.v1beta3
.Key value
) {
157 private final V1Batcher
<CommitResponse
, CommitRequest
.Builder
, Entity
, Mutation
>
158 putBatcher
= new V1Batcher
<CommitResponse
, CommitRequest
.Builder
, Entity
, Mutation
>() {
160 void addToBatch(Mutation mutation
, CommitRequest
.Builder batch
) {
161 batch
.addMutations(mutation
);
166 return datastoreServiceConfig
.maxBatchWriteEntities
;
170 protected Future
<CommitResponse
> makeCall(CommitRequest
.Builder batch
) {
171 return datastoreProxy
.commit(batch
.build());
175 final Object
getGroup(Entity value
) {
176 return value
.getKey().getRootKey();
180 final Mutation
toPb(Entity value
) {
181 return Mutation
.newBuilder()
182 .setUpsert(DataTypeTranslator
.toV1Entity(value
))
187 private final V1Batcher
<AllocateIdsResponse
, AllocateIdsRequest
.Builder
, Key
,
188 com
.google
.datastore
.v1beta3
.Key
>
190 new V1Batcher
<AllocateIdsResponse
, AllocateIdsRequest
.Builder
, Key
,
191 com
.google
.datastore
.v1beta3
.Key
>() {
193 void addToBatch(com
.google
.datastore
.v1beta3
.Key key
, AllocateIdsRequest
.Builder batch
) {
199 return datastoreServiceConfig
.maxBatchAllocateIdKeys
;
203 protected Future
<AllocateIdsResponse
> makeCall(AllocateIdsRequest
.Builder batch
) {
204 return datastoreProxy
.allocateIds(batch
.build());
208 final Object
getGroup(Key key
) {
209 Key parent
= key
.getParent();
210 if (parent
== null) {
211 return PathElement
.getDefaultInstance();
213 return DataTypeTranslator
.toV1Key(parent
).getPath(0);
218 final com
.google
.datastore
.v1beta3
.Key
toPb(Key value
) {
219 return DataTypeTranslator
.toV1Key(value
).build();
223 private final CloudDatastoreV1Proxy datastoreProxy
;
225 public AsyncCloudDatastoreV1ServiceImpl(
226 DatastoreServiceConfig datastoreServiceConfig
, CloudDatastoreV1Proxy datastoreProxy
,
227 TransactionStack defaultTxnProvider
) {
228 super(datastoreServiceConfig
, defaultTxnProvider
,
229 new QueryRunnerCloudDatastoreV1(datastoreServiceConfig
, datastoreProxy
));
230 this.datastoreProxy
= datastoreProxy
;
234 protected TransactionImpl
.InternalTransaction
doBeginTransaction(TransactionOptions options
) {
235 BeginTransactionRequest
.Builder request
= BeginTransactionRequest
.newBuilder();
237 Future
<BeginTransactionResponse
> future
= datastoreProxy
.beginTransaction(request
.build());
239 ApiVersion apiVersion
= datastoreServiceConfig
.getApiVersion();
240 switch (apiVersion
) {
241 case CLOUD_DATASTORE_V1_VIA_API_PROXY
:
242 case CLOUD_DATASTORE_V1_REMOTE
:
243 return InternalTransactionCloudDatastoreV1
.create(datastoreProxy
, future
);
245 throw new IllegalStateException("Unsupported api version: " + apiVersion
);
250 protected Future
<Map
<Key
, Entity
>> doBatchGet( Transaction txn
,
251 final Set
<Key
> keysToGet
, final Map
<Key
, Entity
> resultMap
) {
252 final LookupRequest
.Builder baseReq
= LookupRequest
.newBuilder();
253 ReadOptions
.Builder readOptionsBuilder
= baseReq
.getReadOptionsBuilder();
255 TransactionImpl
.ensureTxnActive(txn
);
256 readOptionsBuilder
.setTransaction(
257 InternalTransactionCloudDatastoreV1
.get(txn
).getTransactionBytes());
258 } else if (datastoreServiceConfig
.getReadPolicy().getConsistency() == EVENTUAL
) {
259 readOptionsBuilder
.setReadConsistency(ReadOptions
.ReadConsistency
.EVENTUAL
);
261 baseReq
.clearReadOptions();
264 final boolean shouldUseMultipleBatches
= getDatastoreType() != MASTER_SLAVE
&& txn
== null
265 && datastoreServiceConfig
.getReadPolicy().getConsistency() != EVENTUAL
;
267 Iterator
<LookupRequest
.Builder
> batches
= lookupByKeyBatcher
.getBatches(keysToGet
, baseReq
,
268 baseReq
.build().getSerializedSize(), shouldUseMultipleBatches
);
269 List
<Future
<LookupResponse
>> futures
= lookupByKeyBatcher
.makeCalls(batches
);
271 return registerInTransaction(txn
, new MultiFuture
<LookupResponse
, Map
<Key
, Entity
>>(futures
) {
273 * A Map from a Key without an app id specified to the corresponding Key that the
274 * user requested. This is a workaround for the Remote API to support matching requested
275 * Keys to Entities that may be from a different app id.
277 private Map
<com
.google
.datastore
.v1beta3
.Key
, Key
> keyMapIgnoringAppId
;
280 public Map
<Key
, Entity
> get() throws InterruptedException
, ExecutionException
{
282 aggregate(futures
, null, null);
283 } catch (TimeoutException e
) {
284 throw new RuntimeException(e
);
290 public Map
<Key
, Entity
> get(long timeout
, TimeUnit unit
)
291 throws InterruptedException
, ExecutionException
, TimeoutException
{
292 aggregate(futures
, timeout
, unit
);
297 * Aggregates the results of the given Futures and issues (synchronous) followup requests if
298 * any results were deferred.
300 * @param currentFutures the Futures corresponding to the batches of the initial
302 * @param timeout the timeout to use while waiting on the Future, or null for none.
303 * @param timeoutUnit the unit of the timeout, or null for none.
305 private void aggregate(
306 Iterable
<Future
<LookupResponse
>> currentFutures
, Long timeout
, TimeUnit timeoutUnit
)
307 throws ExecutionException
, InterruptedException
, TimeoutException
{
309 List
<com
.google
.datastore
.v1beta3
.Key
> deferredKeys
= Lists
.newArrayList();
311 for (Future
<LookupResponse
> currentFuture
: currentFutures
) {
312 LookupResponse resp
= getFutureWithOptionalTimeout(currentFuture
, timeout
, timeoutUnit
);
313 addEntitiesToResultMap(resp
);
314 deferredKeys
.addAll(resp
.getDeferredList());
317 if (deferredKeys
.isEmpty()) {
321 Iterator
<LookupRequest
.Builder
> followupBatches
= lookupByPbBatcher
.getBatches(
322 deferredKeys
, baseReq
, baseReq
.build().getSerializedSize(), shouldUseMultipleBatches
);
323 currentFutures
= lookupByPbBatcher
.makeCalls(followupBatches
);
328 * Convenience method to get the result of a Future and optionally specify a timeout.
330 * @param future the Future to get.
331 * @param timeout the timeout to use while waiting on the Future, or null for none.
332 * @param timeoutUnit the unit of the timeout, or null for none.
333 * @return the result of the Future.
334 * @throws TimeoutException will only ever be thrown if a timeout is provided.
336 private LookupResponse
getFutureWithOptionalTimeout(
337 Future
<LookupResponse
> future
, Long timeout
, TimeUnit timeoutUnit
)
338 throws ExecutionException
, InterruptedException
, TimeoutException
{
339 if (timeout
== null) {
342 return future
.get(timeout
, timeoutUnit
);
347 * Adds the Entities from the LookupResponse to the resultMap. Will omit Keys that were
348 * missing. Handles Keys with different App Ids from the Entity.Key. See
349 * {@link #findKeyFromRequestIgnoringAppId}.
351 private void addEntitiesToResultMap(LookupResponse response
) {
352 for (EntityResult entityResult
: response
.getFoundList()) {
353 Entity responseEntity
= DataTypeTranslator
.toEntity(entityResult
.getEntity());
354 Key responseKey
= responseEntity
.getKey();
356 if (!keysToGet
.contains(responseKey
)) {
357 responseKey
= findKeyFromRequestIgnoringAppId(entityResult
.getEntity().getKey());
359 resultMap
.put(responseKey
, responseEntity
);
364 * This is a hack to support calls going through the Remote API. The problem is:
366 * The requested Key may have a local app id.
367 * The returned Entity may have a remote app id.
369 * In this case, we want to return a Map.Entry with {LocalKey, RemoteEntity}. This way, the
370 * user can always do map.get(keyFromRequest).
372 * This method will find the corresponding requested LocalKey for a RemoteKey by ignoring the
375 * Note that we used to be able to rely on the order of the Response Entities matching the
376 * order of Request Keys. We can no longer do so with the addition of Deferred results.
378 * @param keyFromResponse the key from the Response that did not match any of the requested
380 * @return the Key from the request that corresponds to the given Key from the Response
383 private Key
findKeyFromRequestIgnoringAppId(
384 com
.google
.datastore
.v1beta3
.Key keyFromResponse
) {
385 if (keyMapIgnoringAppId
== null) {
386 keyMapIgnoringAppId
= Maps
.newHashMap();
387 for (Key requestKey
: keysToGet
) {
388 com
.google
.datastore
.v1beta3
.Key
.Builder requestKeyAsRefWithoutApp
=
389 DataTypeTranslator
.toV1Key(requestKey
);
390 requestKeyAsRefWithoutApp
.getPartitionIdBuilder().clearProjectId();
391 keyMapIgnoringAppId
.put(requestKeyAsRefWithoutApp
.build(), requestKey
);
395 com
.google
.datastore
.v1beta3
.Key
.Builder keyBuilder
= keyFromResponse
.toBuilder();
396 keyBuilder
.getPartitionIdBuilder().clearProjectId();
397 Key result
= keyMapIgnoringAppId
.get(keyBuilder
.build());
398 if (result
== null) {
399 throw new DatastoreFailureException("Internal error");
407 protected Future
<List
<Key
>> doBatchPut( final Transaction txn
,
408 final List
<Entity
> entities
) {
410 CommitRequest
.Builder baseReq
= CommitRequest
.newBuilder();
411 baseReq
.setMode(CommitRequest
.Mode
.NON_TRANSACTIONAL
);
412 List
<Integer
> order
= Lists
.newArrayListWithCapacity(entities
.size());
413 Iterator
<CommitRequest
.Builder
> batches
= putBatcher
.getBatches(entities
, baseReq
,
414 baseReq
.build().getSerializedSize(), true, order
);
415 List
<Future
<CommitResponse
>> futures
= putBatcher
.makeCalls(batches
);
417 return new ReorderingMultiFuture
<CommitResponse
, List
<Key
>>(futures
, order
) {
419 protected List
<Key
> aggregate(CommitResponse intermediateResult
, Iterator
<Integer
> indexItr
,
421 for (MutationResult mutationResult
: intermediateResult
.getMutationResultsList()) {
422 int index
= indexItr
.next();
423 Key key
= entities
.get(index
).getKey();
424 if (mutationResult
.hasKey()) {
425 List
<PathElement
> pathElements
=
426 mutationResult
.getKey().getPathList();
427 key
.setId(pathElements
.get(pathElements
.size() - 1).getId());
429 result
.set(index
, key
);
435 protected List
<Key
> initResult(int size
) {
436 List
<Key
> keyList
= Lists
.newArrayListWithCapacity(size
);
437 keyList
.addAll(Collections
.<Key
>nCopies(size
, null));
443 TransactionImpl
.ensureTxnActive(txn
);
444 final InternalTransactionCloudDatastoreV1 txnV1
= InternalTransactionCloudDatastoreV1
.get(txn
);
446 ImmutableList
.Builder
<Key
> keyListBuilder
= ImmutableList
.builder();
447 final List
<Key
> incompleteKeys
= Lists
.newArrayList();
448 final List
<com
.google
.datastore
.v1beta3
.Entity
.Builder
> incompleteEntityBldrs
=
449 Lists
.newArrayList();
450 for (Entity entity
: entities
) {
451 Key key
= entity
.getKey();
452 keyListBuilder
.add(key
);
453 if (key
.isComplete()) {
454 txnV1
.deferPut(entity
);
456 com
.google
.datastore
.v1beta3
.Entity
.Builder entityV1
=
457 com
.google
.datastore
.v1beta3
.Entity
.newBuilder();
458 DataTypeTranslator
.addPropertiesToPb(entity
.getPropertyMap(), entityV1
);
459 incompleteEntityBldrs
.add(entityV1
);
460 incompleteKeys
.add(key
);
463 final List
<Key
> allKeys
= keyListBuilder
.build();
465 if (incompleteKeys
.isEmpty()) {
466 return new FutureHelper
.FakeFuture
<List
<Key
>>(allKeys
);
468 return registerInTransaction(txn
,
469 new FutureWrapper
<List
<com
.google
.datastore
.v1beta3
.Key
>,
470 List
<Key
>>(allocateIds(incompleteKeys
)) {
472 protected List
<Key
> wrap(List
<com
.google
.datastore
.v1beta3
.Key
> completedKeyPbs
) {
473 Iterator
<com
.google
.datastore
.v1beta3
.Entity
.Builder
> entityPbBldrIt
=
474 incompleteEntityBldrs
.iterator();
475 Iterator
<Key
> incompleteKeysIt
= incompleteKeys
.iterator();
476 for (com
.google
.datastore
.v1beta3
.Key keyV1
: completedKeyPbs
) {
477 updateKey(keyV1
, incompleteKeysIt
.next());
478 txnV1
.deferPut(entityPbBldrIt
.next().setKey(keyV1
));
484 protected Throwable
convertException(Throwable cause
) {
491 protected Future
<Void
> doBatchDelete( Transaction txn
, Collection
<Key
> keys
) {
493 TransactionImpl
.ensureTxnActive(txn
);
494 InternalTransactionCloudDatastoreV1 txnV1
= InternalTransactionCloudDatastoreV1
.get(txn
);
495 for (Key key
: keys
) {
496 txnV1
.deferDelete(key
);
498 return new FutureHelper
.FakeFuture
<Void
>(null);
501 CommitRequest
.Builder baseReq
= CommitRequest
.newBuilder();
502 baseReq
.setMode(CommitRequest
.Mode
.NON_TRANSACTIONAL
);
503 Iterator
<CommitRequest
.Builder
> batches
= deleteBatcher
.getBatches(keys
, baseReq
,
504 baseReq
.build().getSerializedSize(), true);
505 List
<Future
<CommitResponse
>> futures
= deleteBatcher
.makeCalls(batches
);
506 return new MultiFuture
<CommitResponse
, Void
>(futures
) {
508 public Void
get() throws InterruptedException
, ExecutionException
{
509 for (Future
<CommitResponse
> future
: futures
) {
516 public Void
get(long timeout
, TimeUnit unit
) throws InterruptedException
, ExecutionException
,
518 for (Future
<CommitResponse
> future
: futures
) {
519 future
.get(timeout
, unit
);
527 * This API is specific to sequential IDs, which Cloud Datastore v1 does not support.
530 public Future
<KeyRange
> allocateIds(final Key parent
, final String kind
, long num
) {
531 throw new UnsupportedOperationException();
535 * This API is specific to sequential IDs, which Cloud Datastore v1 does not support.
538 public Future
<KeyRangeState
> allocateIdRange(final KeyRange range
) {
539 throw new UnsupportedOperationException();
543 * Allocates scattered IDs for a list of incomplete keys.
545 protected Future
<List
<com
.google
.datastore
.v1beta3
.Key
>> allocateIds(List
<Key
> keyList
) {
546 List
<Integer
> order
= Lists
.newArrayListWithCapacity(keyList
.size());
547 Iterator
<AllocateIdsRequest
.Builder
> batches
= allocateIdsBatcher
.getBatches(keyList
,
548 AllocateIdsRequest
.newBuilder(), 0, true, order
);
549 List
<Future
<AllocateIdsResponse
>> futures
= allocateIdsBatcher
.makeCalls(batches
);
551 return new ReorderingMultiFuture
<AllocateIdsResponse
,
552 List
<com
.google
.datastore
.v1beta3
.Key
>>(futures
, order
) {
554 protected List
<com
.google
.datastore
.v1beta3
.Key
> aggregate(AllocateIdsResponse batch
,
555 Iterator
<Integer
> indexItr
,
556 List
<com
.google
.datastore
.v1beta3
.Key
> result
) {
557 for (com
.google
.datastore
.v1beta3
.Key key
: batch
.getKeysList()) {
558 result
.set(indexItr
.next(), key
);
564 protected List
<com
.google
.datastore
.v1beta3
.Key
> initResult(int size
) {
565 return Arrays
.asList(new com
.google
.datastore
.v1beta3
.Key
[size
]);
571 public Future
<Map
<Index
, IndexState
>> getIndexes() {
572 throw new UnsupportedOperationException();
576 * Update a {@link Key} with the id from a key proto, if it is populated.
578 private static void updateKey(com
.google
.datastore
.v1beta3
.Key keyV1
, Key key
) {
579 List
<PathElement
> pathElements
= keyV1
.getPathList();
580 if (!pathElements
.isEmpty()) {
581 PathElement lastElement
= pathElements
.get(pathElements
.size() - 1);
582 if (lastElement
.getIdTypeCase() == IdTypeCase
.ID
) {
583 key
.setId(lastElement
.getId());