1 package com
.google
.appengine
.api
.datastore
;
3 import static com
.google
.appengine
.api
.datastore
.DatastoreAttributes
.DatastoreType
.MASTER_SLAVE
;
4 import static com
.google
.appengine
.api
.datastore
.ReadPolicy
.Consistency
.EVENTUAL
;
6 import com
.google
.appengine
.api
.datastore
.Batcher
.ReorderingMultiFuture
;
7 import com
.google
.appengine
.api
.datastore
.DatastoreService
.KeyRangeState
;
8 import com
.google
.appengine
.api
.datastore
.DatastoreServiceConfig
.ApiVersion
;
9 import com
.google
.appengine
.api
.datastore
.FutureHelper
.MultiFuture
;
10 import com
.google
.appengine
.api
.datastore
.Index
.IndexState
;
11 import com
.google
.appengine
.api
.utils
.FutureWrapper
;
12 import com
.google
.common
.collect
.ImmutableList
;
13 import com
.google
.common
.collect
.Lists
;
14 import com
.google
.common
.collect
.Maps
;
15 import com
.google
.datastore
.v1beta3
.AllocateIdsRequest
;
16 import com
.google
.datastore
.v1beta3
.AllocateIdsResponse
;
17 import com
.google
.datastore
.v1beta3
.BeginTransactionRequest
;
18 import com
.google
.datastore
.v1beta3
.BeginTransactionResponse
;
19 import com
.google
.datastore
.v1beta3
.CommitRequest
;
20 import com
.google
.datastore
.v1beta3
.CommitResponse
;
21 import com
.google
.datastore
.v1beta3
.EntityResult
;
22 import com
.google
.datastore
.v1beta3
.Key
.PathElement
;
23 import com
.google
.datastore
.v1beta3
.Key
.PathElement
.IdTypeCase
;
24 import com
.google
.datastore
.v1beta3
.LookupRequest
;
25 import com
.google
.datastore
.v1beta3
.LookupResponse
;
26 import com
.google
.datastore
.v1beta3
.Mutation
;
27 import com
.google
.datastore
.v1beta3
.MutationResult
;
28 import com
.google
.datastore
.v1beta3
.ReadOptions
;
29 import com
.google
.protobuf
.Message
;
31 import java
.util
.Arrays
;
32 import java
.util
.Collection
;
33 import java
.util
.Collections
;
34 import java
.util
.Iterator
;
35 import java
.util
.List
;
38 import java
.util
.concurrent
.ExecutionException
;
39 import java
.util
.concurrent
.Future
;
40 import java
.util
.concurrent
.TimeUnit
;
41 import java
.util
.concurrent
.TimeoutException
;
44 * An implementation of {@link AsyncDatastoreService} using the Cloud Datastore v1 API.
46 class AsyncCloudDatastoreV1ServiceImpl
extends BaseAsyncDatastoreServiceImpl
{
49 * A base batcher for Cloud Datastore v1 operations executed in the context of an {@link
50 * AsyncCloudDatastoreV1ServiceImpl}.
52 * @param <S> the response message type
53 * @param <R> the request message builder type
54 * @param <F> the Java specific representation of a value
55 * @param <T> the proto representation of value
57 private abstract class V1Batcher
<S
extends Message
, R
extends Message
.Builder
, F
,
58 T
extends Message
> extends BaseRpcBatcher
<S
, R
, F
, T
> {
60 @SuppressWarnings("unchecked")
61 final R
newBatch(R baseBatch
) {
62 return (R
) baseBatch
.clone();
66 private final V1Batcher
<CommitResponse
, CommitRequest
.Builder
, Key
, Mutation
> deleteBatcher
=
67 new V1Batcher
<CommitResponse
, CommitRequest
.Builder
, Key
, Mutation
>() {
69 void addToBatch(Mutation mutation
, CommitRequest
.Builder batch
) {
70 batch
.addMutations(mutation
);
75 return datastoreServiceConfig
.maxBatchWriteEntities
;
79 protected Future
<CommitResponse
> makeCall(CommitRequest
.Builder batch
) {
80 return datastoreProxy
.commit(batch
.build());
84 final Object
getGroup(Key key
) {
85 return key
.getRootKey();
89 final Mutation
toPb(Key value
) {
90 return Mutation
.newBuilder()
91 .setOp(Mutation
.Operation
.DELETE
)
92 .setKey(DataTypeTranslator
.toV1Key(value
))
97 private final V1Batcher
<LookupResponse
, LookupRequest
.Builder
, Key
,
98 com
.google
.datastore
.v1beta3
.Key
> lookupByKeyBatcher
=
99 new V1Batcher
<LookupResponse
, LookupRequest
.Builder
, Key
,
100 com
.google
.datastore
.v1beta3
.Key
>() {
102 void addToBatch(com
.google
.datastore
.v1beta3
.Key key
, LookupRequest
.Builder batch
) {
108 return datastoreServiceConfig
.maxBatchReadEntities
;
112 protected Future
<LookupResponse
> makeCall(LookupRequest
.Builder batch
) {
113 return datastoreProxy
.lookup(batch
.build());
117 final Object
getGroup(Key key
) {
118 return key
.getRootKey();
122 final com
.google
.datastore
.v1beta3
.Key
toPb(Key value
) {
123 return DataTypeTranslator
.toV1Key(value
).build();
127 private final V1Batcher
<LookupResponse
, LookupRequest
.Builder
, com
.google
.datastore
.v1beta3
.Key
,
128 com
.google
.datastore
.v1beta3
.Key
>
130 new V1Batcher
<LookupResponse
, LookupRequest
.Builder
, com
.google
.datastore
.v1beta3
.Key
,
131 com
.google
.datastore
.v1beta3
.Key
>() {
133 void addToBatch(com
.google
.datastore
.v1beta3
.Key key
, LookupRequest
.Builder batch
) {
139 return datastoreServiceConfig
.maxBatchReadEntities
;
143 protected Future
<LookupResponse
> makeCall(LookupRequest
.Builder batch
) {
144 return datastoreProxy
.lookup(batch
.build());
148 final Object
getGroup(com
.google
.datastore
.v1beta3
.Key key
) {
149 return key
.getPath(0);
153 final com
.google
.datastore
.v1beta3
.Key
toPb(com
.google
.datastore
.v1beta3
.Key value
) {
158 private final V1Batcher
<CommitResponse
, CommitRequest
.Builder
, Entity
, Mutation
>
159 putBatcher
= new V1Batcher
<CommitResponse
, CommitRequest
.Builder
, Entity
, Mutation
>() {
161 void addToBatch(Mutation mutation
, CommitRequest
.Builder batch
) {
162 batch
.addMutations(mutation
);
167 return datastoreServiceConfig
.maxBatchWriteEntities
;
171 protected Future
<CommitResponse
> makeCall(CommitRequest
.Builder batch
) {
172 return datastoreProxy
.commit(batch
.build());
176 final Object
getGroup(Entity value
) {
177 return value
.getKey().getRootKey();
181 final Mutation
toPb(Entity value
) {
182 return Mutation
.newBuilder()
183 .setOp(Mutation
.Operation
.UPSERT
)
184 .setEntity(DataTypeTranslator
.toV1Entity(value
))
189 private final V1Batcher
<AllocateIdsResponse
, AllocateIdsRequest
.Builder
, Key
,
190 com
.google
.datastore
.v1beta3
.Key
>
192 new V1Batcher
<AllocateIdsResponse
, AllocateIdsRequest
.Builder
, Key
,
193 com
.google
.datastore
.v1beta3
.Key
>() {
195 void addToBatch(com
.google
.datastore
.v1beta3
.Key key
, AllocateIdsRequest
.Builder batch
) {
201 return datastoreServiceConfig
.maxBatchAllocateIdKeys
;
205 protected Future
<AllocateIdsResponse
> makeCall(AllocateIdsRequest
.Builder batch
) {
206 return datastoreProxy
.allocateIds(batch
.build());
210 final Object
getGroup(Key key
) {
211 Key parent
= key
.getParent();
212 if (parent
== null) {
213 return PathElement
.getDefaultInstance();
215 return DataTypeTranslator
.toV1Key(parent
).getPath(0);
220 final com
.google
.datastore
.v1beta3
.Key
toPb(Key value
) {
221 return DataTypeTranslator
.toV1Key(value
).build();
225 private final CloudDatastoreV1Proxy datastoreProxy
;
227 public AsyncCloudDatastoreV1ServiceImpl(
228 DatastoreServiceConfig datastoreServiceConfig
, CloudDatastoreV1Proxy datastoreProxy
,
229 TransactionStack defaultTxnProvider
) {
230 super(datastoreServiceConfig
, defaultTxnProvider
,
231 new QueryRunnerCloudDatastoreV1(datastoreServiceConfig
, datastoreProxy
));
232 this.datastoreProxy
= datastoreProxy
;
236 protected TransactionImpl
.InternalTransaction
doBeginTransaction(TransactionOptions options
) {
237 BeginTransactionRequest
.Builder request
= BeginTransactionRequest
.newBuilder();
239 Future
<BeginTransactionResponse
> future
= datastoreProxy
.beginTransaction(request
.build());
241 ApiVersion apiVersion
= datastoreServiceConfig
.getApiVersion();
242 switch (apiVersion
) {
243 case CLOUD_DATASTORE_V1_VIA_API_PROXY
:
244 case CLOUD_DATASTORE_V1_REMOTE
:
245 return InternalTransactionCloudDatastoreV1
.create(datastoreProxy
, future
);
247 throw new IllegalStateException("Unsupported api version: " + apiVersion
);
252 protected Future
<Map
<Key
, Entity
>> doBatchGet( Transaction txn
,
253 final Set
<Key
> keysToGet
, final Map
<Key
, Entity
> resultMap
) {
254 final LookupRequest
.Builder baseReq
= LookupRequest
.newBuilder();
255 ReadOptions
.Builder readOptionsBuilder
= baseReq
.getReadOptionsBuilder();
257 TransactionImpl
.ensureTxnActive(txn
);
258 readOptionsBuilder
.setTransaction(
259 InternalTransactionCloudDatastoreV1
.getById(txn
.getId()).getHandle());
260 } else if (datastoreServiceConfig
.getReadPolicy().getConsistency() == EVENTUAL
) {
261 readOptionsBuilder
.setReadConsistency(ReadOptions
.ReadConsistency
.EVENTUAL
);
263 baseReq
.clearReadOptions();
266 final boolean shouldUseMultipleBatches
= getDatastoreType() != MASTER_SLAVE
&& txn
== null
267 && datastoreServiceConfig
.getReadPolicy().getConsistency() != EVENTUAL
;
269 Iterator
<LookupRequest
.Builder
> batches
= lookupByKeyBatcher
.getBatches(keysToGet
, baseReq
,
270 baseReq
.build().getSerializedSize(), shouldUseMultipleBatches
);
271 List
<Future
<LookupResponse
>> futures
= lookupByKeyBatcher
.makeCalls(batches
);
273 return registerInTransaction(txn
, new MultiFuture
<LookupResponse
, Map
<Key
, Entity
>>(futures
) {
275 * A Map from a Key without an app id specified to the corresponding Key that the
276 * user requested. This is a workaround for the Remote API to support matching requested
277 * Keys to Entities that may be from a different app id.
279 private Map
<com
.google
.datastore
.v1beta3
.Key
, Key
> keyMapIgnoringAppId
;
282 public Map
<Key
, Entity
> get() throws InterruptedException
, ExecutionException
{
284 aggregate(futures
, null, null);
285 } catch (TimeoutException e
) {
286 throw new RuntimeException(e
);
292 public Map
<Key
, Entity
> get(long timeout
, TimeUnit unit
)
293 throws InterruptedException
, ExecutionException
, TimeoutException
{
294 aggregate(futures
, timeout
, unit
);
299 * Aggregates the results of the given Futures and issues (synchronous) followup requests if
300 * any results were deferred.
302 * @param currentFutures the Futures corresponding to the batches of the initial
304 * @param timeout the timeout to use while waiting on the Future, or null for none.
305 * @param timeoutUnit the unit of the timeout, or null for none.
307 private void aggregate(
308 Iterable
<Future
<LookupResponse
>> currentFutures
, Long timeout
, TimeUnit timeoutUnit
)
309 throws ExecutionException
, InterruptedException
, TimeoutException
{
311 List
<com
.google
.datastore
.v1beta3
.Key
> deferredKeys
= Lists
.newArrayList();
313 for (Future
<LookupResponse
> currentFuture
: currentFutures
) {
314 LookupResponse resp
= getFutureWithOptionalTimeout(currentFuture
, timeout
, timeoutUnit
);
315 addEntitiesToResultMap(resp
);
316 deferredKeys
.addAll(resp
.getDeferredList());
319 if (deferredKeys
.isEmpty()) {
323 Iterator
<LookupRequest
.Builder
> followupBatches
= lookupByPbBatcher
.getBatches(
324 deferredKeys
, baseReq
, baseReq
.build().getSerializedSize(), shouldUseMultipleBatches
);
325 currentFutures
= lookupByPbBatcher
.makeCalls(followupBatches
);
330 * Convenience method to get the result of a Future and optionally specify a timeout.
332 * @param future the Future to get.
333 * @param timeout the timeout to use while waiting on the Future, or null for none.
334 * @param timeoutUnit the unit of the timeout, or null for none.
335 * @return the result of the Future.
336 * @throws TimeoutException will only ever be thrown if a timeout is provided.
338 private LookupResponse
getFutureWithOptionalTimeout(
339 Future
<LookupResponse
> future
, Long timeout
, TimeUnit timeoutUnit
)
340 throws ExecutionException
, InterruptedException
, TimeoutException
{
341 if (timeout
== null) {
344 return future
.get(timeout
, timeoutUnit
);
349 * Adds the Entities from the LookupResponse to the resultMap. Will omit Keys that were
350 * missing. Handles Keys with different App Ids from the Entity.Key. See
351 * {@link #findKeyFromRequestIgnoringAppId}.
353 private void addEntitiesToResultMap(LookupResponse response
) {
354 for (EntityResult entityResult
: response
.getFoundList()) {
355 Entity responseEntity
= DataTypeTranslator
.toEntity(entityResult
.getEntity());
356 Key responseKey
= responseEntity
.getKey();
358 if (!keysToGet
.contains(responseKey
)) {
359 responseKey
= findKeyFromRequestIgnoringAppId(entityResult
.getEntity().getKey());
361 resultMap
.put(responseKey
, responseEntity
);
366 * This is a hack to support calls going through the Remote API. The problem is:
368 * The requested Key may have a local app id.
369 * The returned Entity may have a remote app id.
371 * In this case, we want to return a Map.Entry with {LocalKey, RemoteEntity}. This way, the
372 * user can always do map.get(keyFromRequest).
374 * This method will find the corresponding requested LocalKey for a RemoteKey by ignoring the
377 * Note that we used to be able to rely on the order of the Response Entities matching the
378 * order of Request Keys. We can no longer do so with the addition of Deferred results.
380 * @param keyFromResponse the key from the Response that did not match any of the requested
382 * @return the Key from the request that corresponds to the given Key from the Response
385 private Key
findKeyFromRequestIgnoringAppId(
386 com
.google
.datastore
.v1beta3
.Key keyFromResponse
) {
387 if (keyMapIgnoringAppId
== null) {
388 keyMapIgnoringAppId
= Maps
.newHashMap();
389 for (Key requestKey
: keysToGet
) {
390 com
.google
.datastore
.v1beta3
.Key
.Builder requestKeyAsRefWithoutApp
=
391 DataTypeTranslator
.toV1Key(requestKey
);
392 requestKeyAsRefWithoutApp
.getPartitionIdBuilder().clearProjectId();
393 keyMapIgnoringAppId
.put(requestKeyAsRefWithoutApp
.build(), requestKey
);
397 com
.google
.datastore
.v1beta3
.Key
.Builder keyBuilder
= keyFromResponse
.toBuilder();
398 keyBuilder
.getPartitionIdBuilder().clearProjectId();
399 Key result
= keyMapIgnoringAppId
.get(keyBuilder
.build());
400 if (result
== null) {
401 throw new DatastoreFailureException("Internal error");
409 protected Future
<List
<Key
>> doBatchPut( final Transaction txn
,
410 final List
<Entity
> entities
) {
412 CommitRequest
.Builder baseReq
= CommitRequest
.newBuilder();
413 baseReq
.setMode(CommitRequest
.Mode
.NON_TRANSACTIONAL
);
414 List
<Integer
> order
= Lists
.newArrayListWithCapacity(entities
.size());
415 Iterator
<CommitRequest
.Builder
> batches
= putBatcher
.getBatches(entities
, baseReq
,
416 baseReq
.build().getSerializedSize(), true, order
);
417 List
<Future
<CommitResponse
>> futures
= putBatcher
.makeCalls(batches
);
419 return new ReorderingMultiFuture
<CommitResponse
, List
<Key
>>(futures
, order
) {
421 protected List
<Key
> aggregate(CommitResponse intermediateResult
, Iterator
<Integer
> indexItr
,
423 for (MutationResult mutationResult
: intermediateResult
.getMutationResultsList()) {
424 int index
= indexItr
.next();
425 Key key
= entities
.get(index
).getKey();
426 if (mutationResult
.hasKey()) {
427 List
<PathElement
> pathElements
=
428 mutationResult
.getKey().getPathList();
429 key
.setId(pathElements
.get(pathElements
.size() - 1).getId());
431 result
.set(index
, key
);
437 protected List
<Key
> initResult(int size
) {
438 List
<Key
> keyList
= Lists
.newArrayListWithCapacity(size
);
439 keyList
.addAll(Collections
.<Key
>nCopies(size
, null));
445 TransactionImpl
.ensureTxnActive(txn
);
446 final InternalTransactionCloudDatastoreV1 txnV1
=
447 InternalTransactionCloudDatastoreV1
.getById(txn
.getId());
449 ImmutableList
.Builder
<Key
> keyListBuilder
= ImmutableList
.builder();
450 final List
<Key
> incompleteKeys
= Lists
.newArrayList();
451 final List
<com
.google
.datastore
.v1beta3
.Entity
.Builder
> incompleteEntityBldrs
=
452 Lists
.newArrayList();
453 for (Entity entity
: entities
) {
454 Key key
= entity
.getKey();
455 keyListBuilder
.add(key
);
456 if (key
.isComplete()) {
457 txnV1
.deferPut(entity
);
459 com
.google
.datastore
.v1beta3
.Entity
.Builder entityV1
=
460 com
.google
.datastore
.v1beta3
.Entity
.newBuilder();
461 DataTypeTranslator
.addPropertiesToPb(entity
.getPropertyMap(), entityV1
);
462 incompleteEntityBldrs
.add(entityV1
);
463 incompleteKeys
.add(key
);
466 final List
<Key
> allKeys
= keyListBuilder
.build();
468 if (incompleteKeys
.isEmpty()) {
469 return new FutureHelper
.FakeFuture
<List
<Key
>>(allKeys
);
471 return registerInTransaction(txn
,
472 new FutureWrapper
<List
<com
.google
.datastore
.v1beta3
.Key
>,
473 List
<Key
>>(allocateIds(incompleteKeys
)) {
475 protected List
<Key
> wrap(List
<com
.google
.datastore
.v1beta3
.Key
> completedKeyPbs
) {
476 Iterator
<com
.google
.datastore
.v1beta3
.Entity
.Builder
> entityPbBldrIt
=
477 incompleteEntityBldrs
.iterator();
478 Iterator
<Key
> incompleteKeysIt
= incompleteKeys
.iterator();
479 for (com
.google
.datastore
.v1beta3
.Key keyV1
: completedKeyPbs
) {
480 updateKey(keyV1
, incompleteKeysIt
.next());
481 txnV1
.deferPut(entityPbBldrIt
.next().setKey(keyV1
));
487 protected Throwable
convertException(Throwable cause
) {
494 protected Future
<Void
> doBatchDelete( Transaction txn
, Collection
<Key
> keys
) {
496 TransactionImpl
.ensureTxnActive(txn
);
497 InternalTransactionCloudDatastoreV1 txnV1
=
498 InternalTransactionCloudDatastoreV1
.getById(txn
.getId());
499 for (Key key
: keys
) {
500 txnV1
.deferDelete(key
);
502 return new FutureHelper
.FakeFuture
<Void
>(null);
505 CommitRequest
.Builder baseReq
= CommitRequest
.newBuilder();
506 baseReq
.setMode(CommitRequest
.Mode
.NON_TRANSACTIONAL
);
507 Iterator
<CommitRequest
.Builder
> batches
= deleteBatcher
.getBatches(keys
, baseReq
,
508 baseReq
.build().getSerializedSize(), true);
509 List
<Future
<CommitResponse
>> futures
= deleteBatcher
.makeCalls(batches
);
510 return new MultiFuture
<CommitResponse
, Void
>(futures
) {
512 public Void
get() throws InterruptedException
, ExecutionException
{
513 for (Future
<CommitResponse
> future
: futures
) {
520 public Void
get(long timeout
, TimeUnit unit
) throws InterruptedException
, ExecutionException
,
522 for (Future
<CommitResponse
> future
: futures
) {
523 future
.get(timeout
, unit
);
531 * This API is specific to sequential IDs, which Cloud Datastore v1 does not support.
534 public Future
<KeyRange
> allocateIds(final Key parent
, final String kind
, long num
) {
535 throw new UnsupportedOperationException();
539 * This API is specific to sequential IDs, which Cloud Datastore v1 does not support.
542 public Future
<KeyRangeState
> allocateIdRange(final KeyRange range
) {
543 throw new UnsupportedOperationException();
547 * Allocates scattered IDs for a list of incomplete keys.
549 protected Future
<List
<com
.google
.datastore
.v1beta3
.Key
>> allocateIds(List
<Key
> keyList
) {
550 List
<Integer
> order
= Lists
.newArrayListWithCapacity(keyList
.size());
551 Iterator
<AllocateIdsRequest
.Builder
> batches
= allocateIdsBatcher
.getBatches(keyList
,
552 AllocateIdsRequest
.newBuilder(), 0, true, order
);
553 List
<Future
<AllocateIdsResponse
>> futures
= allocateIdsBatcher
.makeCalls(batches
);
555 return new ReorderingMultiFuture
<AllocateIdsResponse
,
556 List
<com
.google
.datastore
.v1beta3
.Key
>>(futures
, order
) {
558 protected List
<com
.google
.datastore
.v1beta3
.Key
> aggregate(AllocateIdsResponse batch
,
559 Iterator
<Integer
> indexItr
,
560 List
<com
.google
.datastore
.v1beta3
.Key
> result
) {
561 for (com
.google
.datastore
.v1beta3
.Key key
: batch
.getKeysList()) {
562 result
.set(indexItr
.next(), key
);
568 protected List
<com
.google
.datastore
.v1beta3
.Key
> initResult(int size
) {
569 return Arrays
.asList(new com
.google
.datastore
.v1beta3
.Key
[size
]);
575 public Future
<Map
<Index
, IndexState
>> getIndexes() {
576 throw new UnsupportedOperationException();
580 * Update a {@link Key} with the id from a key proto, if it is populated.
582 private static void updateKey(com
.google
.datastore
.v1beta3
.Key keyV1
, Key key
) {
583 List
<PathElement
> pathElements
= keyV1
.getPathList();
584 if (!pathElements
.isEmpty()) {
585 PathElement lastElement
= pathElements
.get(pathElements
.size() - 1);
586 if (lastElement
.getIdTypeCase() == IdTypeCase
.ID
) {
587 key
.setId(lastElement
.getId());