1 package com
.google
.appengine
.api
.datastore
;
3 import static com
.google
.appengine
.api
.datastore
.DatastoreAttributes
.DatastoreType
.MASTER_SLAVE
;
4 import static com
.google
.appengine
.api
.datastore
.ReadPolicy
.Consistency
.EVENTUAL
;
6 import com
.google
.appengine
.api
.datastore
.Batcher
.ReorderingMultiFuture
;
7 import com
.google
.appengine
.api
.datastore
.DatastoreService
.KeyRangeState
;
8 import com
.google
.appengine
.api
.datastore
.DatastoreServiceConfig
.ApiVersion
;
9 import com
.google
.appengine
.api
.datastore
.FutureHelper
.MultiFuture
;
10 import com
.google
.appengine
.api
.datastore
.Index
.IndexState
;
11 import com
.google
.appengine
.api
.utils
.FutureWrapper
;
12 import com
.google
.apphosting
.datastore
.DatastoreV4
.AllocateIdsRequest
;
13 import com
.google
.apphosting
.datastore
.DatastoreV4
.AllocateIdsResponse
;
14 import com
.google
.apphosting
.datastore
.DatastoreV4
.BeginTransactionRequest
;
15 import com
.google
.apphosting
.datastore
.DatastoreV4
.BeginTransactionResponse
;
16 import com
.google
.apphosting
.datastore
.DatastoreV4
.CommitRequest
;
17 import com
.google
.apphosting
.datastore
.DatastoreV4
.CommitResponse
;
18 import com
.google
.apphosting
.datastore
.DatastoreV4
.EntityResult
;
19 import com
.google
.apphosting
.datastore
.DatastoreV4
.LookupRequest
;
20 import com
.google
.apphosting
.datastore
.DatastoreV4
.LookupResponse
;
21 import com
.google
.apphosting
.datastore
.DatastoreV4
.Mutation
;
22 import com
.google
.apphosting
.datastore
.DatastoreV4
.MutationResult
;
23 import com
.google
.apphosting
.datastore
.DatastoreV4
.ReadOptions
;
24 import com
.google
.apphosting
.datastore
.EntityV4
;
25 import com
.google
.apphosting
.datastore
.EntityV4
.Key
.PathElement
;
26 import com
.google
.common
.collect
.ImmutableList
;
27 import com
.google
.common
.collect
.Lists
;
28 import com
.google
.common
.collect
.Maps
;
29 import com
.google
.protobuf
.Message
;
31 import java
.util
.Arrays
;
32 import java
.util
.Collection
;
33 import java
.util
.Collections
;
34 import java
.util
.Iterator
;
35 import java
.util
.List
;
38 import java
.util
.concurrent
.ExecutionException
;
39 import java
.util
.concurrent
.Future
;
40 import java
.util
.concurrent
.TimeUnit
;
41 import java
.util
.concurrent
.TimeoutException
;
44 * An implementation of AsyncDatastoreService using the DatastoreV4 API.
46 class AsyncDatastoreV4ServiceImpl
extends BaseAsyncDatastoreServiceImpl
{
49 * A base batcher for DatastoreV4 operations executed in the context of an {@link
50 * AsyncDatastoreV4ServiceImpl}.
51 * @param <S> the response message type
52 * @param <R> the request message builder type
53 * @param <F> the Java specific representation of a value
54 * @param <T> the proto representation of value
56 private abstract class V4Batcher
<S
extends Message
, R
extends Message
.Builder
, F
,
57 T
extends Message
> extends BaseRpcBatcher
<S
, R
, F
, T
> {
59 @SuppressWarnings("unchecked")
60 final R
newBatch(R baseBatch
) {
61 return (R
) baseBatch
.clone();
65 private final V4Batcher
<CommitResponse
, CommitRequest
.Builder
, Key
, Mutation
> deleteBatcher
=
66 new V4Batcher
<CommitResponse
, CommitRequest
.Builder
, Key
, Mutation
>() {
68 void addToBatch(Mutation mutation
, CommitRequest
.Builder batch
) {
69 batch
.addMutation(mutation
);
74 return datastoreServiceConfig
.maxBatchWriteEntities
;
78 protected Future
<CommitResponse
> makeCall(CommitRequest
.Builder batch
) {
79 return datastoreProxy
.commit(batch
.build());
83 final Object
getGroup(Key key
) {
84 return key
.getRootKey();
88 final Mutation
toPb(Key value
) {
89 return Mutation
.newBuilder()
90 .setOp(Mutation
.Operation
.DELETE
)
91 .setKey(DataTypeTranslator
.toV4Key(value
))
96 private final V4Batcher
<LookupResponse
, LookupRequest
.Builder
, Key
, EntityV4
.Key
>
98 new V4Batcher
<LookupResponse
, LookupRequest
.Builder
, Key
, EntityV4
.Key
>() {
100 void addToBatch(EntityV4
.Key key
, LookupRequest
.Builder batch
) {
106 return datastoreServiceConfig
.maxBatchReadEntities
;
110 protected Future
<LookupResponse
> makeCall(LookupRequest
.Builder batch
) {
111 return datastoreProxy
.lookup(batch
.build());
115 final Object
getGroup(Key key
) {
116 return key
.getRootKey();
120 final EntityV4
.Key
toPb(Key value
) {
121 return DataTypeTranslator
.toV4Key(value
).build();
125 private final V4Batcher
<LookupResponse
, LookupRequest
.Builder
, EntityV4
.Key
, EntityV4
.Key
>
127 new V4Batcher
<LookupResponse
, LookupRequest
.Builder
, EntityV4
.Key
, EntityV4
.Key
>() {
129 void addToBatch(EntityV4
.Key key
, LookupRequest
.Builder batch
) {
135 return datastoreServiceConfig
.maxBatchReadEntities
;
139 protected Future
<LookupResponse
> makeCall(LookupRequest
.Builder batch
) {
140 return datastoreProxy
.lookup(batch
.build());
144 final Object
getGroup(EntityV4
.Key key
) {
145 return key
.getPathElement(0);
149 final EntityV4
.Key
toPb(EntityV4
.Key value
) {
154 private final V4Batcher
<CommitResponse
, CommitRequest
.Builder
, Entity
, Mutation
>
155 putBatcher
= new V4Batcher
<CommitResponse
, CommitRequest
.Builder
, Entity
, Mutation
>() {
157 void addToBatch(Mutation mutation
, CommitRequest
.Builder batch
) {
158 batch
.addMutation(mutation
);
163 return datastoreServiceConfig
.maxBatchWriteEntities
;
167 protected Future
<CommitResponse
> makeCall(CommitRequest
.Builder batch
) {
168 return datastoreProxy
.commit(batch
.build());
172 final Object
getGroup(Entity value
) {
173 return value
.getKey().getRootKey();
177 final Mutation
toPb(Entity value
) {
178 return Mutation
.newBuilder()
179 .setOp(Mutation
.Operation
.UPSERT
)
180 .setEntity(DataTypeTranslator
.toV4Entity(value
))
185 private final V4Batcher
<AllocateIdsResponse
, AllocateIdsRequest
.Builder
, Key
, EntityV4
.Key
>
187 new V4Batcher
<AllocateIdsResponse
, AllocateIdsRequest
.Builder
, Key
, EntityV4
.Key
>() {
189 void addToBatch(EntityV4
.Key key
, AllocateIdsRequest
.Builder batch
) {
190 batch
.addAllocate(key
);
195 return datastoreServiceConfig
.maxBatchAllocateIdKeys
;
199 protected Future
<AllocateIdsResponse
> makeCall(AllocateIdsRequest
.Builder batch
) {
200 return datastoreProxy
.allocateIds(batch
.build());
204 final Object
getGroup(Key key
) {
205 Key parent
= key
.getParent();
206 if (parent
== null) {
207 return EntityV4
.Key
.PathElement
.getDefaultInstance();
209 return DataTypeTranslator
.toV4Key(parent
).getPathElement(0);
214 final EntityV4
.Key
toPb(Key value
) {
215 return DataTypeTranslator
.toV4Key(value
).build();
219 private final DatastoreV4Proxy datastoreProxy
;
221 public AsyncDatastoreV4ServiceImpl(
222 DatastoreServiceConfig datastoreServiceConfig
, DatastoreV4Proxy datastoreProxy
,
223 TransactionStack defaultTxnProvider
) {
224 super(datastoreServiceConfig
, defaultTxnProvider
,
225 new QueryRunnerV4(datastoreServiceConfig
, datastoreProxy
));
226 this.datastoreProxy
= datastoreProxy
;
230 protected TransactionImpl
.InternalTransaction
doBeginTransaction(TransactionOptions options
) {
231 BeginTransactionRequest
.Builder request
= BeginTransactionRequest
.newBuilder();
232 request
.setCrossGroup(options
.isXG());
234 Future
<BeginTransactionResponse
> future
= datastoreProxy
.beginTransaction(request
.build());
236 ApiVersion apiVersion
= datastoreServiceConfig
.getApiVersion();
237 switch (apiVersion
) {
238 case CLOUD_DATASTORE
:
239 return InternalTransactionCloudDatastore
.create(datastoreProxy
, future
);
241 return InternalTransactionV4
.create(datastoreProxy
, future
);
243 throw new IllegalStateException("Unsupported api version: " + apiVersion
);
248 protected Future
<Map
<Key
, Entity
>> doBatchGet( Transaction txn
,
249 final Set
<Key
> keysToGet
, final Map
<Key
, Entity
> resultMap
) {
250 final LookupRequest
.Builder baseReq
= LookupRequest
.newBuilder();
251 ReadOptions
.Builder readOptionsBuilder
= baseReq
.getReadOptionsBuilder();
253 TransactionImpl
.ensureTxnActive(txn
);
254 readOptionsBuilder
.setTransaction(InternalTransactionV4
.getById(txn
.getId()).getHandle());
255 } else if (datastoreServiceConfig
.getReadPolicy().getConsistency() == EVENTUAL
) {
256 readOptionsBuilder
.setReadConsistency(ReadOptions
.ReadConsistency
.EVENTUAL
);
258 baseReq
.clearReadOptions();
261 final boolean shouldUseMultipleBatches
= getDatastoreType() != MASTER_SLAVE
&& txn
== null
262 && datastoreServiceConfig
.getReadPolicy().getConsistency() != EVENTUAL
;
264 Iterator
<LookupRequest
.Builder
> batches
= lookupByKeyBatcher
.getBatches(keysToGet
, baseReq
,
265 baseReq
.build().getSerializedSize(), shouldUseMultipleBatches
);
266 List
<Future
<LookupResponse
>> futures
= lookupByKeyBatcher
.makeCalls(batches
);
268 return registerInTransaction(txn
, new MultiFuture
<LookupResponse
, Map
<Key
, Entity
>>(futures
) {
270 * A Map from an EntityV4.Key without an App Id specified to the corresponding Key that the
271 * user requested. This is a workaround for the Remote API to support matching requested
272 * Keys to Entities that may be from a different App Id.
274 private Map
<EntityV4
.Key
, Key
> keyMapIgnoringAppId
;
277 public Map
<Key
, Entity
> get() throws InterruptedException
, ExecutionException
{
279 aggregate(futures
, null, null);
280 } catch (TimeoutException e
) {
281 throw new RuntimeException(e
);
287 public Map
<Key
, Entity
> get(long timeout
, TimeUnit unit
)
288 throws InterruptedException
, ExecutionException
, TimeoutException
{
289 aggregate(futures
, timeout
, unit
);
294 * Aggregates the results of the given Futures and issues (synchronous) followup requests if
295 * any results were deferred.
297 * @param currentFutures the Futures corresponding to the batches of the initial
299 * @param timeout the timeout to use while waiting on the Future, or null for none.
300 * @param timeoutUnit the unit of the timeout, or null for none.
302 private void aggregate(
303 Iterable
<Future
<LookupResponse
>> currentFutures
, Long timeout
, TimeUnit timeoutUnit
)
304 throws ExecutionException
, InterruptedException
, TimeoutException
{
306 List
<EntityV4
.Key
> deferredKeys
= Lists
.newArrayList();
308 for (Future
<LookupResponse
> currentFuture
: currentFutures
) {
309 LookupResponse resp
= getFutureWithOptionalTimeout(currentFuture
, timeout
, timeoutUnit
);
310 addEntitiesToResultMap(resp
);
311 deferredKeys
.addAll(resp
.getDeferredList());
314 if (deferredKeys
.isEmpty()) {
318 Iterator
<LookupRequest
.Builder
> followupBatches
= lookupByPbBatcher
.getBatches(
319 deferredKeys
, baseReq
, baseReq
.build().getSerializedSize(), shouldUseMultipleBatches
);
320 currentFutures
= lookupByPbBatcher
.makeCalls(followupBatches
);
325 * Convenience method to get the result of a Future and optionally specify a timeout.
327 * @param future the Future to get.
328 * @param timeout the timeout to use while waiting on the Future, or null for none.
329 * @param timeoutUnit the unit of the timeout, or null for none.
330 * @return the result of the Future.
331 * @throws TimeoutException will only ever be thrown if a timeout is provided.
333 private LookupResponse
getFutureWithOptionalTimeout(
334 Future
<LookupResponse
> future
, Long timeout
, TimeUnit timeoutUnit
)
335 throws ExecutionException
, InterruptedException
, TimeoutException
{
336 if (timeout
== null) {
339 return future
.get(timeout
, timeoutUnit
);
344 * Adds the Entities from the LookupResponse to the resultMap. Will omit Keys that were
345 * missing. Handles Keys with different App Ids from the Entity.Key. See
346 * {@link #findKeyFromRequestIgnoringAppId(EntityV4.Key)}
348 private void addEntitiesToResultMap(LookupResponse response
) {
349 for (EntityResult entityResult
: response
.getFoundList()) {
350 Entity responseEntity
= DataTypeTranslator
.toEntity(entityResult
.getEntity());
351 Key responseKey
= responseEntity
.getKey();
353 if (!keysToGet
.contains(responseKey
)) {
354 responseKey
= findKeyFromRequestIgnoringAppId(entityResult
.getEntity().getKey());
356 resultMap
.put(responseKey
, responseEntity
);
361 * This is a hack to support calls going through the Remote API. The problem is:
363 * The requested Key may have a local app id.
364 * The returned Entity may have a remote app id.
366 * In this case, we want to return a Map.Entry with {LocalKey, RemoteEntity}. This way, the
367 * user can always do map.get(keyFromRequest).
369 * This method will find the corresponding requested LocalKey for a RemoteKey by ignoring the
372 * Note that we used to be able to rely on the order of the Response Entities matching the
373 * order of Request Keys. We can no longer do so with the addition of Deferred results.
375 * @param keyFromResponse the key from the Response that did not match any of the requested
377 * @return the Key from the request that corresponds to the given Key from the Response
380 private Key
findKeyFromRequestIgnoringAppId(EntityV4
.Key keyFromResponse
) {
381 if (keyMapIgnoringAppId
== null) {
382 keyMapIgnoringAppId
= Maps
.newHashMap();
383 for (Key requestKey
: keysToGet
) {
384 EntityV4
.Key
.Builder requestKeyAsRefWithoutApp
= DataTypeTranslator
.toV4Key(requestKey
);
385 requestKeyAsRefWithoutApp
.getPartitionIdBuilder().clearDatasetId();
386 keyMapIgnoringAppId
.put(requestKeyAsRefWithoutApp
.build(), requestKey
);
390 EntityV4
.Key
.Builder keyBuilder
= keyFromResponse
.toBuilder();
391 keyBuilder
.getPartitionIdBuilder().clearDatasetId();
392 Key result
= keyMapIgnoringAppId
.get(keyBuilder
.build());
393 if (result
== null) {
394 throw new DatastoreFailureException("Internal error");
402 protected Future
<List
<Key
>> doBatchPut( final Transaction txn
,
403 final List
<Entity
> entities
) {
405 CommitRequest
.Builder baseReq
= CommitRequest
.newBuilder();
406 baseReq
.setMode(CommitRequest
.Mode
.NON_TRANSACTIONAL
);
407 List
<Integer
> order
= Lists
.newArrayListWithCapacity(entities
.size());
408 Iterator
<CommitRequest
.Builder
> batches
= putBatcher
.getBatches(entities
, baseReq
,
409 baseReq
.build().getSerializedSize(), true, order
);
410 List
<Future
<CommitResponse
>> futures
= putBatcher
.makeCalls(batches
);
412 return new ReorderingMultiFuture
<CommitResponse
, List
<Key
>>(futures
, order
) {
414 protected List
<Key
> aggregate(CommitResponse intermediateResult
, Iterator
<Integer
> indexItr
,
416 for (MutationResult mutationResult
: intermediateResult
.getMutationResultList()) {
417 int index
= indexItr
.next();
418 Key key
= entities
.get(index
).getKey();
419 if (mutationResult
.hasKey()) {
420 List
<EntityV4
.Key
.PathElement
> pathElements
=
421 mutationResult
.getKey().getPathElementList();
422 key
.setId(pathElements
.get(pathElements
.size() - 1).getId());
424 result
.set(index
, key
);
430 protected List
<Key
> initResult(int size
) {
431 List
<Key
> keyList
= Lists
.newArrayListWithCapacity(size
);
432 keyList
.addAll(Collections
.<Key
>nCopies(size
, null));
438 TransactionImpl
.ensureTxnActive(txn
);
439 final BaseInternalTransactionV4
<?
> v4txn
= InternalTransactionV4
.getById(txn
.getId());
441 ImmutableList
.Builder
<Key
> keyListBuilder
= ImmutableList
.builder();
442 final List
<Key
> incompleteKeys
= Lists
.newArrayList();
443 final List
<EntityV4
.Entity
.Builder
> incompleteEntityBldrs
= Lists
.newArrayList();
444 for (Entity entity
: entities
) {
445 Key key
= entity
.getKey();
446 keyListBuilder
.add(key
);
447 if (key
.isComplete()) {
448 v4txn
.deferPut(entity
);
450 EntityV4
.Entity
.Builder v4Entity
= EntityV4
.Entity
.newBuilder();
451 DataTypeTranslator
.addPropertiesToPb(entity
.getPropertyMap(), v4Entity
);
452 incompleteEntityBldrs
.add(v4Entity
);
453 incompleteKeys
.add(key
);
456 final List
<Key
> allKeys
= keyListBuilder
.build();
458 if (incompleteKeys
.isEmpty()) {
459 return new FutureHelper
.FakeFuture
<List
<Key
>>(allKeys
);
461 return registerInTransaction(txn
,
462 new FutureWrapper
<List
<EntityV4
.Key
>, List
<Key
>>(allocateIds(incompleteKeys
)) {
464 protected List
<Key
> wrap(List
<EntityV4
.Key
> completedKeyPbs
) {
465 Iterator
<EntityV4
.Entity
.Builder
> entityPbBldrIt
= incompleteEntityBldrs
.iterator();
466 Iterator
<Key
> incompleteKeysIt
= incompleteKeys
.iterator();
467 for (EntityV4
.Key v4Key
: completedKeyPbs
) {
468 updateKey(v4Key
, incompleteKeysIt
.next());
469 v4txn
.deferPut(entityPbBldrIt
.next().setKey(v4Key
));
475 protected Throwable
convertException(Throwable cause
) {
482 protected Future
<Void
> doBatchDelete( Transaction txn
, Collection
<Key
> keys
) {
484 TransactionImpl
.ensureTxnActive(txn
);
485 BaseInternalTransactionV4
<?
> v4txn
= InternalTransactionV4
.getById(txn
.getId());
486 for (Key key
: keys
) {
487 v4txn
.deferDelete(key
);
489 return new FutureHelper
.FakeFuture
<Void
>(null);
492 CommitRequest
.Builder baseReq
= CommitRequest
.newBuilder();
493 baseReq
.setMode(CommitRequest
.Mode
.NON_TRANSACTIONAL
);
494 Iterator
<CommitRequest
.Builder
> batches
= deleteBatcher
.getBatches(keys
, baseReq
,
495 baseReq
.build().getSerializedSize(), true);
496 List
<Future
<CommitResponse
>> futures
= deleteBatcher
.makeCalls(batches
);
497 return new MultiFuture
<CommitResponse
, Void
>(futures
) {
499 public Void
get() throws InterruptedException
, ExecutionException
{
500 for (Future
<CommitResponse
> future
: futures
) {
507 public Void
get(long timeout
, TimeUnit unit
) throws InterruptedException
, ExecutionException
,
509 for (Future
<CommitResponse
> future
: futures
) {
510 future
.get(timeout
, unit
);
518 * This API is specific to sequential IDs, which V4 does not support.
521 public Future
<KeyRange
> allocateIds(final Key parent
, final String kind
, long num
) {
522 throw new UnsupportedOperationException();
526 * This API is specific to sequential IDs, which V4 does not support.
529 public Future
<KeyRangeState
> allocateIdRange(final KeyRange range
) {
530 throw new UnsupportedOperationException();
534 * Allocates scattered IDs for a list of incomplete keys.
536 protected Future
<List
<EntityV4
.Key
>> allocateIds(List
<Key
> keyList
) {
537 List
<Integer
> order
= Lists
.newArrayListWithCapacity(keyList
.size());
538 Iterator
<AllocateIdsRequest
.Builder
> batches
= allocateIdsBatcher
.getBatches(keyList
,
539 AllocateIdsRequest
.newBuilder(), 0, true, order
);
540 List
<Future
<AllocateIdsResponse
>> futures
= allocateIdsBatcher
.makeCalls(batches
);
542 return new ReorderingMultiFuture
<AllocateIdsResponse
, List
<EntityV4
.Key
>>(futures
, order
) {
544 protected List
<EntityV4
.Key
> aggregate(AllocateIdsResponse batch
, Iterator
<Integer
> indexItr
,
545 List
<EntityV4
.Key
> result
) {
546 for (EntityV4
.Key key
: batch
.getAllocatedList()) {
547 result
.set(indexItr
.next(), key
);
553 protected List
<com
.google
.apphosting
.datastore
.EntityV4
.Key
> initResult(int size
) {
554 return Arrays
.asList(new EntityV4
.Key
[size
]);
560 public Future
<Map
<Index
, IndexState
>> getIndexes() {
561 throw new UnsupportedOperationException();
565 * Update a key object with the id in the proto, if one exists.
567 static void updateKey(EntityV4
.Key v4Key
, Key key
) {
568 List
<EntityV4
.Key
.PathElement
> pathElements
= v4Key
.getPathElementList();
569 if (!pathElements
.isEmpty()) {
570 PathElement lastElement
= pathElements
.get(pathElements
.size() - 1);
571 if (lastElement
.hasId()) {
572 key
.setId(lastElement
.getId());